nsqAdapter

package module
v0.0.0-...-5c807be Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 11, 2015 License: MIT Imports: 11 Imported by: 3

README

GoDoc

nsq-adapter

A small adapter library that makes using the awesome nsq-service easier.

It provides as thin wrapper around bitly's nsq-services to simplify usage of nsq's asynchronous message queues for midsized projects.

Basically, you define a new nsq-adapter struct and call the following methods:

  • Subscribe() to subscribe to a specific topic and handle incoming messages
  • Publish() to send a message to a specific topic
  • Request() to send a request to a specific topic and wait for a response
  • RespondTo() to send a response message to a request

Please take a look at the example and the samples in the testing directory for an idea about the usage of the package.

usage

package main

import (
	"github.com/tikiatua/nsq-adapter"
)

const (
	nsqlookupdAddress string = "192.168.99.100:4161"
)

func main() {

	// create a new nsqadapter with the name of our service
	queue := nsqAdapter.New("name-of-my-service", nsqlookupdAddress)

	// initialize the ability to handle responses.
	// this will create a topic called [name-of-service]-responses
	// that will be used to receive responses to requests issued
	// from this service
	queue.InitializeResponseHandling()

	// create a channel that will receive message from
	// a topic we would like to subscribe to
	messageChan := make(chan nsqAdapter.Message)

	// subscribe to any topic and channel
	// (note: multiple subscribers to the same channel can be used for load balancing)
	queue.Subscribe("please-do-something-with-this", "requests", messageChan)

	// handle all incoming messages of our subscription
	for {
  		// wait for incoming messages
		message := <-messageChan

		// handle the messages
		fmt.Println("RECEIVED:", message.Payload)

    	// send a response if the message is a request
		if message.MessageType == nsqAdapter.MessageTypeRequest {
			queue.RespondTo(message, "this is an answer from name-of-my-service")
		}
	}
}

You can find an example usage with a testclient, a webserver and a simple repository service in the testing directory

Documentation

Overview

The package nsqAdapter provides a thin wrapper around bitly's nsq-services to simplify usage of nsq's asynchronous message queues for midsized projects.

Basically, you define a new nsq-adapter struct and call the following methods: - Subscribe() to subscribe to a specific topic and handle incoming messages - Publish() to send a message to a specific topic - Request() to send a request to a specific topic and wait for a response - RespondTo() to send a response message to a request

Please take a look at the example and the samples in the testing directory for an idea about the usage of the package.

Example
package main

import "fmt"

const (
	nsqlookupdAddress string = "192.168.99.100:4161"
)

func main() {

	// create a new nsqadapter with the name of our service
	queue := New("name-of-my-service", nsqlookupdAddress)

	// initialize the ability to handle responses.
	// this will create a topic called [name-of-service]-responses
	// that will be used to receive responses to requests issued
	// from this service
	queue.InitializeResponseHandling()

	// create a channel that will receive message from
	// a topic we would like to subscribe to
	messageChan := make(chan Message)

	// subscribe to any topic and channel
	// (note: multiple subscribers to the same channel can be used for load balancing)
	queue.Subscribe("please-do-something-with-this", "requests", messageChan)

	// handle all incoming messages of our subscription
	for {
		// wait for incoming messages
		message := <-messageChan

		// handle the messages
		fmt.Println("RECEIVED:", message.Payload)

		// send a response if the message is a request
		if message.MessageType == MessageTypeRequest {
			queue.RespondTo(message, "this is an answer from name-of-my-service")
		}
	}
}

Index

Examples

Constants

View Source
const (
	MessageTypeRequest   string = "request"
	MessageTypeResponse  string = "response"
	MessageTypeBroadcast string = "broadcast"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Message

type Message struct {
	Id        string          // a unique id for the message
	From      string          // the originating service
	To        string          // the topic that the message is posted to
	StartTime string          // the time the processing was started
	EndTime   string          // the time the process has ended
	Payload   json.RawMessage // the payload of the data

	MessageType string  // the type of message (i.e. broadcast, request, response)
	Request     Request // specific information for requests
}

Message is a struct to hold all messages sent over nsq

type NsqAdapter

type NsqAdapter struct {
	Name string // then name of this service
	// contains filtered or unexported fields
}

NsqAdapter is our main struct for external packages to interact with

func New

func New(serviceName string, nsqlookupHttpAddress string) *NsqAdapter

NewNsqAdapter will create a new nsq-adapter using the given address to connect to a nsqlookupd-service and use the default configuration for connections

func NewWithCustomConfig

func NewWithCustomConfig(serviceName string, nsqlookupHttpAddress string, config *nsq.Config) *NsqAdapter

NewWithCustomConfig will create a new nsq-adapter with a custom nsq-configuration

func (*NsqAdapter) ForwardRequest

func (queue *NsqAdapter) ForwardRequest(topic string, message *Message) error

ForwardRequest will forward the given message to a different topic, the message payload can be appended or overwritten

func (*NsqAdapter) Handle

func (queue *NsqAdapter) Handle(topic string, channel string, handleFunction func(message Message))

Handle will start handling all incoming messages with the given function in a separate go routine

func (*NsqAdapter) InitializeResponseHandling

func (queue *NsqAdapter) InitializeResponseHandling() error

InitResponseHandling will register a consumer on a topic called [NAME]-responses

func (*NsqAdapter) NewMessage

func (queue *NsqAdapter) NewMessage(topic string, messageType string, payload interface{}) *Message

New Message will create a new message struct to send to nsq

func (*NsqAdapter) Process

func (queue *NsqAdapter) Process()

Process will simply block execution so the requests to the specified topics can be processed

func (*NsqAdapter) Publish

func (queue *NsqAdapter) Publish(topic string, payload interface{}) error

Publish will wrap the given payload into a custom message and send it off to nsq (without waiting for a response)

func (*NsqAdapter) PublishMessage

func (queue *NsqAdapter) PublishMessage(topic string, message *Message) error

PublishMessage will publish a message struct to nsq and forget about it

func (*NsqAdapter) RespondTo

func (queue *NsqAdapter) RespondTo(message Message, responsePayload interface{}) error

RespondTo will send a response to a message sent by a client via SendRequest

func (*NsqAdapter) SendRequest

func (queue *NsqAdapter) SendRequest(topic string, payload interface{}, timeout time.Duration) (*Message, error)

SendRequest will send the given message to the given topic and wait for a response

func (*NsqAdapter) Subscribe

func (queue *NsqAdapter) Subscribe(topic string, channel string, messageChannel chan<- Message) error

Subscribe will start listening on the given topic for messages

type Request

type Request struct {
	Until     string // until which moment does the request have to be fulfilled
	RespondTo string // the topic to respond to
	OriginId  string // the id of the originating response
}

Request is a struct to hold additional information for a request message

Directories

Path Synopsis
testing
simple command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL