Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
var ( // ErrTopicRequired is returned when topic is not passed as parameter. ErrTopicRequired = errors.New("topic is mandatory") // ErrHandlerRequired is returned when handler is not passed as parameter. ErrHandlerRequired = errors.New("handler is mandatory") // ErrChannelRequired is returned when channel is not passed as parameter in bus.ListenerConfig. ErrChannelRequired = errors.New("channel is mandatory") )
var ( ErrNoEventListeners = errors.New("At least one eventListener is required") ErrServiceAlreadyRunning = errors.New("Service is already running") )
var ErrTimeoutOccurred error = errors.New("Timeout reached")
Functions ¶
func On ¶
func On(lc ListenerConfig) (err error)
On listen to a message from a specific topic using nsq consumer, returns an error if topic and channel not passed or if an error occurred while creating nsq consumer.
Types ¶
type Emitter ¶
type Emitter interface {
Emit(topic string, payload interface{}) error
EmitAsync(topic string, payload interface{}) error
EmitBulkAsync(topic string, payload []interface{}) error
Request(topic string, payload interface{}, handler HandlerFunc) error
EmitAndWaitForResultWithTimeout(topic string, payload interface{}, timeoutDuration time.Duration) (interface{}, error)
}
Emitter exposes a interface for emitting and listening for events.
func NewEmitter ¶
func NewEmitter(ec EmitterConfig) (emitter Emitter, err error)
NewEmitter returns a new eventEmitter configured with the variables from the config parameter, or returning an non-nil err if an error occurred while creating nsq producer.
type EmitterConfig ¶
type EmitterConfig struct {
Address string
DialTimeout time.Duration
ReadTimeout time.Duration
WriteTimeout time.Duration
LocalAddr net.Addr
LookupdPollInterval time.Duration
LookupdPollJitter float64
MaxRequeueDelay time.Duration
DefaultRequeueDelay time.Duration
BackoffStrategy nsq.BackoffStrategy
MaxBackoffDuration time.Duration
BackoffMultiplier time.Duration
MaxAttempts uint16
LowRdyIdleTimeout time.Duration
RDYRedistributeInterval time.Duration
ClientID string
Hostname string
UserAgent string
HeartbeatInterval time.Duration
SampleRate int32
TLSV1 bool
TLSConfig *tls.Config
Deflate bool
DeflateLevel int
Snappy bool
OutputBufferSize int64
OutputBufferTimeout time.Duration
MaxInFlight int
MsgTimeout time.Duration
AuthSecret string
}
EmitterConfig carries the different variables to tune a newly started emitter, it exposes the same configuration available from official nsq go client.
type EventListener ¶
func NewConsumer ¶
func NewConsumer(topic, channel string, worker HandlerFunc) *EventListener
func (*EventListener) AddMiddleware ¶
func (el *EventListener) AddMiddleware(fn Middleware)
func (*EventListener) SetConcurrency ¶
func (el *EventListener) SetConcurrency(concurrency int)
func (*EventListener) SetMaxAttempts ¶
func (el *EventListener) SetMaxAttempts(attempts uint16)
func (*EventListener) SetMaxInFlight ¶
func (el *EventListener) SetMaxInFlight(maxInFlight int)
type HandlerFunc ¶
type ListenerConfig ¶
type ListenerConfig struct {
Topic string
Channel string
Lookup []string
HandlerFunc HandlerFunc
HandlerConcurrency int
DialTimeout time.Duration
ReadTimeout time.Duration
WriteTimeout time.Duration
LocalAddr net.Addr
LookupdPollInterval time.Duration
LookupdPollJitter float64
MaxRequeueDelay time.Duration
DefaultRequeueDelay time.Duration
BackoffStrategy nsq.BackoffStrategy
MaxBackoffDuration time.Duration
BackoffMultiplier time.Duration
MaxAttempts uint16
LowRdyIdleTimeout time.Duration
RDYRedistributeInterval time.Duration
ClientID string
Hostname string
UserAgent string
HeartbeatInterval time.Duration
SampleRate int32
TLSV1 bool
TLSConfig *tls.Config
Deflate bool
DeflateLevel int
Snappy bool
OutputBufferSize int64
OutputBufferTimeout time.Duration
MaxInFlight int
MsgTimeout time.Duration
AuthSecret string
}
ListenerConfig carries the different variables to tune a newly started consumer, it exposes the same configuration available from official nsq go client.
type Message ¶
type Message struct {
*nsq.Message
ReplyTo string
Payload []byte
Context context.Context `json:"-"`
}
Message carries nsq.Message fields and methods and adds extra fields for handling messages internally.
func NewMessage ¶
NewMessage returns a new bus.Message.
func (*Message) DecodePayload ¶
DecodePayload deserializes data (as []byte) and creates a new struct passed by parameter.
type Middleware ¶
type Middleware func(HandlerFunc) HandlerFunc
type Service ¶
type Service struct {
// contains filtered or unexported fields
}
func NewService ¶
func NewService() *Service
func (*Service) AddListener ¶
func (s *Service) AddListener(el *EventListener)