beanbroker

package module
v0.0.0-...-9618169 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 9, 2015 License: LGPL-3.0 Imports: 7 Imported by: 0

README

Go Beanstalkd Jobs Broker

Go API implementation for simple registering worker and posting jobs to beanstalkd

Simple usage
package main

import (
	"fmt"
	"strings"
	"time"

	"github.com/kadekcipta/beanbroker"
	"golang.org/x/net/context"
)

// simple smart echo worker
type echoWorker struct {
	id string
}

func (e *echoWorker) Do(c context.Context, j *beanbroker.Job) beanbroker.JobResult {
	// print the data as string
	fmt.Println(e.id, string(j.Data))

	if strings.HasSuffix(string(j.Data), "wait") {
		// get the reference to the broker
		b := c.Value(beanbroker.BrokerKey).(beanbroker.JobBroker)
		// use it to post new data
		b.PostJob(&beanbroker.JobRequest{
			Type:  "catcher",
			Data:  []byte("beanstalkd !"),
			Delay: time.Second * 5,
		})
	}
	return beanbroker.Delete
}

func catcher(c context.Context, j *beanbroker.Job) beanbroker.JobResult {
	fmt.Println("Catcher:", string(j.Data))
	return beanbroker.Delete
}

func main() {
	// create root context
	c, cancel := context.WithCancel(context.Background())
	// create connection
	broker := beanbroker.New(c, "localhost:11300")

	// register some collaborative workers
	broker.RegisterWorker(&echoWorker{"Echoer"}, "echo", time.Minute)
	broker.RegisterWorker(beanbroker.WorkerFunc(catcher), "catcher", time.Second*10)

	// post a job
	broker.PostJob(&beanbroker.JobRequest{
		Type: "echo",
		Data: []byte("hello..wait"),
	})

	// wait for enter key
	fmt.Scanln()
	// close root context and propagate
	cancel()
}

Output
Echoer hello..wait
Catcher: beanstalkd !

NOTES:

Experimental and need many improvements

Documentation

Index

Constants

View Source
const (
	BrokerKey = "_beanbroker_"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Job

type Job struct {
	Id   uint64
	Data JobData
}

Job represents the beanstalkd job

type JobBroker

type JobBroker interface {
	// RegisterWorker registers the worker and a new connection to a tube is created
	RegisterWorker(w Worker, jobType JobType, reservationTimeout time.Duration)

	// PostJob puts a job request to be dispatched to matched workers
	PostJob(*JobRequest) error
}

JobBroker represents central contact point for worker and job poster

func New

func New(c context.Context, address string) JobBroker

factory function

type JobData

type JobData []byte

JobData represents data bytes

type JobId

type JobId uint64

JobId represents beanstalkd job id

type JobRequest

type JobRequest struct {
	Type     JobType
	Data     JobData
	Priority uint32
	Delay    time.Duration
	TTR      time.Duration
}

JobRequest represents job request

type JobResult

type JobResult int

JobResult represents the expected action done by broker after Do() returns

const (
	Bury JobResult = iota
	Delete
	Release
	Touch
)

type JobType

type JobType string

JobType represents tube name in beanstalkd

type Worker

type Worker interface {
	// Do performs whatever the worker supposed to do
	// upon finish or intentional break, return value will determine the job state
	Do(context.Context, *Job) JobResult
}

Worker provides common interface for worker implementation

type WorkerFunc

type WorkerFunc func(context.Context, *Job) JobResult

func (WorkerFunc) Do

func (f WorkerFunc) Do(c context.Context, job *Job) JobResult

Directories

Path Synopsis
examples
simple command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL