kueue

package module
v0.0.0-...-e10dddd Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 4, 2014 License: MIT Imports: 9 Imported by: 0

README

kueue

A simple producer&consumer wrapper based on NSQ.

Features

  • Publish any type of message
  • Support delay job
  • Within producers pool
  • Worker babysitter can take care of multiple consumers

Usage

Producers
import (
    "time"

    "github.com/heroicyang/kueue"
)

kueue.SetupProducers(nsqdAddr, poolSize)

// publish any type of message
kueue.Publish("topic", 0, topicStruct)

// publish a delayed message
kueue.Publish("delayedTopic", 1 * time.Hour, topicStruct)
Consumers
import (
    "encoding/json"

    "github.com/heroicyang/kueue"
)

type TopicJob struct {
    Topic *TopicStruct
}

func (t *TopicJob) Perform() error {
    // perform your job
    return
}

func newTopicJob(payload *kueue.Payload) (kueue.Job, error) {
    job := new(TopicJob)

    err := json.Unmarshal(payload.Body, &job.Topic)

    return job, err
}

func main() {
    worker := kueue.NewWorker()

    consumer := kueue.NewConsumer("topic", "channel", 5, newTopicJob)
    worker.AddConsumer(consumer)

    worker.Startup(nsqlookupAddr)
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func MultiPublish

func MultiPublish(topic string, delay time.Duration, vs ...interface{}) (err error)

Publish multiple messages to the specified topic with a given delay.

func Publish

func Publish(topic string, delay time.Duration, v interface{}) (err error)

Publish a message to the specified topic with a given delay.

func SetupProducers

func SetupProducers(nsqdAddr string, poolSize int) (err error)

Setup producers with the given nsqdAddr and poolSize.

Types

type Consumer

type Consumer struct {
	Topic       string
	Channel     string
	Concurrency int
	Config      *nsq.Config
	// contains filtered or unexported fields
}

Consumer is a wrapper for go-nsq Consumer.

func NewConsumer

func NewConsumer(topic, channel string, concurrency int, jobGenerator JobGenerator) (c *Consumer)

Create a new Consumer with a given topic, channel, concurrency and JobGenerator.

func (*Consumer) ConnectToNSQLookupd

func (c *Consumer) ConnectToNSQLookupd(lookupdAddr string) (err error)

Create the go-nsq Consumer, adds handler and connects to nsq.

type Job

type Job interface {
	Perform() error
}

Job is the message processing interface.

type JobGenerator

type JobGenerator func(*Payload) (Job, error)

JobGenerator is a convenience type that processes the payload, and returns a stateful Job and error.

type Payload

type Payload struct {
	Body    []byte
	Delayed bool
	Message *nsq.Message
}

Payload holds the message body and original go-nsq Message.

type Worker

type Worker struct {
	StopTimeout time.Duration
	// contains filtered or unexported fields
}

Worker start and stop multiple consumers.

func NewWorker

func NewWorker() *Worker

Create new worker.

func (*Worker) AddConsumer

func (m *Worker) AddConsumer(c *Consumer)

Add a consumer to this worker.

func (*Worker) Startup

func (m *Worker) Startup(lookupdAddr string) (err error)

Startup this worker, and waiting for a signal to shutdown.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL