Documentation
¶
Index ¶
- Constants
- func NewQueueFramework(q ali_mns.AliMNSQueue, c cl.QueueConfig, h QueueEventHandlerInterface) *queueFramework
- type DefaultEventHandler
- func (d *DefaultEventHandler) AfterChangeVisibility(_ ali_mns.AliMNSQueue, _ *ali_mns.MessageReceiveResponse, ...)
- func (d *DefaultEventHandler) AfterLaunch(_ QueueFramework)
- func (d *DefaultEventHandler) BeforeChangeVisibility(_ ali_mns.AliMNSQueue, _ *ali_mns.MessageReceiveResponse)
- func (d *DefaultEventHandler) BeforeLaunch(qf QueueFramework)
- func (d *DefaultEventHandler) ConsumeMessage(_ []byte, _ *ali_mns.MessageReceiveResponse) error
- func (d *DefaultEventHandler) OnChangeVisibilityFailed(_ ali_mns.AliMNSQueue, _ *ali_mns.MessageReceiveResponse, ...)
- func (d *DefaultEventHandler) OnConsumeFailed(_ error, _ []byte, _ *ali_mns.MessageReceiveResponse)
- func (d *DefaultEventHandler) OnError(err error, queue ali_mns.AliMNSQueue, resp *ali_mns.MessageReceiveResponse, ...)
- func (d *DefaultEventHandler) OnParseMessageBodyFailed(_ error, _ *ali_mns.MessageReceiveResponse)
- func (d *DefaultEventHandler) OnRecoverProcessing(_ QueueFramework)
- func (d *DefaultEventHandler) OnWaitingMessage(qf QueueFramework)
- func (d *DefaultEventHandler) OnWaitingProcessing(qf QueueFramework)
- func (d *DefaultEventHandler) ParseMessageBody(resp *ali_mns.MessageReceiveResponse) ([]byte, error)
- type QueueEventHandlerInterface
- type QueueFramework
- type Statistic
- func (ss *Statistic) Fetch(paramName string) uint64
- func (ss *Statistic) HandleError(count ...uint64)
- func (ss *Statistic) HandleSuccess(count ...uint64)
- func (ss *Statistic) Loop(count ...uint64)
- func (ss *Statistic) MessageReceived(count ...uint64)
- func (ss *Statistic) Monitor() bool
- func (ss *Statistic) MonitorLog() string
- func (ss *Statistic) Performance() string
- func (ss *Statistic) QueueError(count ...uint64)
- func (ss *Statistic) Start()
- func (ss Statistic) String() string
- func (ss *Statistic) Wait(count ...uint64)
Constants ¶
const MinLogDuration = 1 // seconds
MinLogDuration is the minimum log period for top QPS monitor
Variables ¶
This section is empty.
Functions ¶
func NewQueueFramework ¶
func NewQueueFramework(q ali_mns.AliMNSQueue, c cl.QueueConfig, h QueueEventHandlerInterface) *queueFramework
NewQueueFramework creates and returns a queue framework which consists with a provided event handler, and configures the queue framework by provided QueueConfig.
Types ¶
type DefaultEventHandler ¶
type DefaultEventHandler struct{}
DefaultEventHandler implements QueueEventHandlerInterface for default apis. Users can derive from this struct and override the apis.
func (*DefaultEventHandler) AfterChangeVisibility ¶
func (d *DefaultEventHandler) AfterChangeVisibility(_ ali_mns.AliMNSQueue, _ *ali_mns.MessageReceiveResponse, _ *ali_mns.MessageVisibilityChangeResponse)
AfterChangeVisibility does nothing by default.
func (*DefaultEventHandler) AfterLaunch ¶
func (d *DefaultEventHandler) AfterLaunch(_ QueueFramework)
AfterLaunch does nothing by default.
func (*DefaultEventHandler) BeforeChangeVisibility ¶
func (d *DefaultEventHandler) BeforeChangeVisibility(_ ali_mns.AliMNSQueue, _ *ali_mns.MessageReceiveResponse)
BeforeChangeVisibility does nothing by default.
func (*DefaultEventHandler) BeforeLaunch ¶
func (d *DefaultEventHandler) BeforeLaunch(qf QueueFramework)
BeforeLaunch registers system INT, TERM, STOP signals to stop the queue, and HUP signal to update the performance log.
func (*DefaultEventHandler) ConsumeMessage ¶
func (d *DefaultEventHandler) ConsumeMessage(_ []byte, _ *ali_mns.MessageReceiveResponse) error
ConsumeMessage does nothing by default.
func (*DefaultEventHandler) OnChangeVisibilityFailed ¶
func (d *DefaultEventHandler) OnChangeVisibilityFailed(_ ali_mns.AliMNSQueue, _ *ali_mns.MessageReceiveResponse, _ *ali_mns.MessageVisibilityChangeResponse)
OnChangeVisibilityFailed does nothing by default.
func (*DefaultEventHandler) OnConsumeFailed ¶
func (d *DefaultEventHandler) OnConsumeFailed(_ error, _ []byte, _ *ali_mns.MessageReceiveResponse)
OnConsumeFailed does nothing by default.
func (*DefaultEventHandler) OnError ¶
func (d *DefaultEventHandler) OnError(err error, queue ali_mns.AliMNSQueue, resp *ali_mns.MessageReceiveResponse, vret *ali_mns.MessageVisibilityChangeResponse, qf QueueFramework)
OnError logs the error in statistic and logger. The message will be deleted if dequeue count is over MaxDequeueCount config.
func (*DefaultEventHandler) OnParseMessageBodyFailed ¶
func (d *DefaultEventHandler) OnParseMessageBodyFailed(_ error, _ *ali_mns.MessageReceiveResponse)
OnParseMessageBodyFailed does nothing by default.
func (*DefaultEventHandler) OnRecoverProcessing ¶
func (d *DefaultEventHandler) OnRecoverProcessing(_ QueueFramework)
OnRecoverProcessing only logs queue recovered message by default.
func (*DefaultEventHandler) OnWaitingMessage ¶
func (d *DefaultEventHandler) OnWaitingMessage(qf QueueFramework)
OnWaitingMessage only updates statistic logs by default.
func (*DefaultEventHandler) OnWaitingProcessing ¶
func (d *DefaultEventHandler) OnWaitingProcessing(qf QueueFramework)
OnWaitingProcessing only logs waiting message by default.
func (*DefaultEventHandler) ParseMessageBody ¶
func (d *DefaultEventHandler) ParseMessageBody(resp *ali_mns.MessageReceiveResponse) ([]byte, error)
ParseMessageBody decodes response string in base64 by default.
type QueueEventHandlerInterface ¶
type QueueEventHandlerInterface interface {
// BeforeLaunch function is invoked when framework Launch function starts.
// qf QueueFramework is the framework
BeforeLaunch(qf QueueFramework)
// AfterLaunch function is invoked when framework Launch function starts.
// qf QueueFramework is the framework
AfterLaunch(qf QueueFramework)
// OnWaitingMessage is invoked when queue framework starts to wait for one queue message.
// User can log queue status or do something besides normal dispose flow.
OnWaitingMessage(qf QueueFramework)
// ParseMessageBody decodes the message body and is invoked when message is received.
// The decoded message will be passed to ConsumeMessage interface as the first parameter.
ParseMessageBody(resp *ali_mns.MessageReceiveResponse) ([]byte, error)
// OnParseMessageBodyFailed is invoked if ParseMessageBody return a non-nil error.
// User can log and deal with the error and response body in this function.
OnParseMessageBodyFailed(err error, resp *ali_mns.MessageReceiveResponse)
// ConsumeMessage is the entry for user business logic. The decoded body and response struct are provided.
ConsumeMessage(body []byte, resp *ali_mns.MessageReceiveResponse) error
// OnConsumeFailed is invoked if ConsumeMessage return a non-nil error.
OnConsumeFailed(err error, body []byte, resp *ali_mns.MessageReceiveResponse)
// BeforeChangeVisibility is invoked before the queue framework changes message visibility.
BeforeChangeVisibility(q ali_mns.AliMNSQueue, resp *ali_mns.MessageReceiveResponse)
// AfterChangeVisibility is invoked after the queue framework changes message visibility.
AfterChangeVisibility(q ali_mns.AliMNSQueue, resp *ali_mns.MessageReceiveResponse,
vr *ali_mns.MessageVisibilityChangeResponse)
// OnChangeVisibilityFailed is invoked if the queue framework can't change message visibility.
OnChangeVisibilityFailed(q ali_mns.AliMNSQueue, resp *ali_mns.MessageReceiveResponse,
vr *ali_mns.MessageVisibilityChangeResponse)
// OnWaitingProcessing is invoked whenever too many messages are processing and the queue will
// wait for a few seconds. If the queue need to wait for over config.OverloadBreakSeconds,
// the queue will stop itself.
OnWaitingProcessing(qf QueueFramework)
// OnRecoverProcessing is invoked when the queue is recovered from waiting for processing.
OnRecoverProcessing(qf QueueFramework)
// OnError is invoked whenever an error happens.
OnError(err error, q ali_mns.AliMNSQueue,
rr *ali_mns.MessageReceiveResponse, vr *ali_mns.MessageVisibilityChangeResponse,
qf QueueFramework)
}
QueueEventHandlerInterface defines the interfaces for queue event handler. User should implement all the API functions, or derive from DefaultEventHandler struct.
type QueueFramework ¶
type QueueFramework interface {
RegisterBreakQueueOsSingal(sigs ...os.Signal)
GetConfig() cl.QueueConfig
GetStatistic() *Statistic
SetQueue(q ali_mns.AliMNSQueue)
HasValidQueue() bool
SetEventHandler(h QueueEventHandlerInterface)
HasEventHandler() bool
Launch()
Stop()
WaitProcessingSeconds(pop bool) int
}
QueueFramework is the interface that should be satisfied for any modified queue framework.
type Statistic ¶
type Statistic struct {
// contains filtered or unexported fields
}
Statistic is a struct for queue performance monitoring data
func (*Statistic) Fetch ¶
Fetch returns statistic value by paramName request. The function is thread safe.
func (*Statistic) HandleError ¶
HandleError adds 'count' for encountering problems situation in statistic value. If count is not provided, the default value for count is 1.
func (*Statistic) HandleSuccess ¶
HandleSuccess adds 'count' for successful handled situation in statistic value. If count is not provided, the default value for count is 1.
func (*Statistic) Loop ¶
Loop adds 'count' for loops that fetched no messages in statistic value. If count is not provided, the default value for count is 1.
func (*Statistic) MessageReceived ¶
MessageReceived adds 'count' for received message in statistic value. If count is not provided, the default value for count is 1.
func (*Statistic) Monitor ¶
Monitor update periodMonitor objects which logs a period performance for queue handling.
func (*Statistic) MonitorLog ¶
MonitorLog returns period monitor log in string format.
func (*Statistic) Performance ¶
Performance returns performance log result in string format.
func (*Statistic) QueueError ¶
QueueError adds 'count' for queue problems in statistic value. If count is not provided, the default value for count is 1.