Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type DLStorageInterface ¶
type DLStorageInterface interface {
// Store 存储队列中无法处理的消息内容
Store(queue string, data []byte) error
// Fetch 取出可重试的消息内容
Fetch(queue string) (map[string][]byte, error)
// Remove 根据标识移除内容
Remove(id string) error
}
DLStorageInterface 死信存储接口
type DriverInterface ¶
type DriverInterface interface {
// CreateQueue 创建队列
// name 队列名称, 确保唯一
// delay 队列消息延迟时长, 指定时长后方可被消费者获取
CreateQueue(name string, delay time.Duration) error
// CreateTopic 创建主题
// name 主题名称, 确保唯一
CreateTopic(name string) error
// Subscribe 订阅主题
// topic 订阅的主题名称
// queue 消息流转队列名称
// routeKey 路由键, 匹配的消息才会被路由到队列
// 请格外注意, 关于routeKey的使用具体实现会有不同
Subscribe(topic, queue, routeKey string) error
// UnSubscribe 取消订阅, 参数同Subscribe
UnSubscribe(topic, queue, routeKey string) error
// SendToQueue 发送消息至队列
// queue 发送目标队列名称
// content 发送消息字节内容
// delay 消息延迟时长, 优先级高于CreateQueue时指定的延迟
SendToQueue(queue string, content []byte, delay time.Duration) error
// SendToTopic 发送消息至主题
// topic 发送目标主题名称
// content 发送消息字节内容
// routeKey 路由键, 仅路由到匹配的订阅队列
SendToTopic(topic string, content []byte, routeKey string) error
// ReceiveMessage 监听队列获取消息
// ctx 上下文, 用于中断监听
// queue 接受消息的队列名称
// errChan 异常错误传输通道
// handler 消息回调处理函数
ReceiveMessage(ctx context.Context, queue string, errChan chan error, handler func([]byte) bool)
}
DriverInterface 驱动接口
type Handler ¶
type Handler struct {
sync.Once
Context context.Context
// Queue 处理队列的名称
Queue string
// Delay 消息处理延迟时长
Delay time.Duration
// Subscribe 订阅配置
Subscribe Subscribe
// Driver 驱动实例
Driver DriverInterface
// Logger 异常日志
Logger LoggerInterface
// DLStorage 死信存储
// 无法处理的消息最终流转到这里
DLStorage DLStorageInterface
// Idempotent 幂等判断实现
// 防止消息被重复处理保证数据一致性
// 若幂等性判断自身异常则可能导致判断失效
// 因此再严格一致的场景下配置EnsureFn进行二次确认
Idempotent IdempotentInterface
// HandleFunc 消息处理回调函数
// 若返回值为true则表示处理成功, 将删除该消息
// 若返回值为false则表示处理失败, 消息将延迟重试
HandleFunc func(msg *Message) (done bool)
// EnsureFunc 幂等性的二次确认
// 请一定要注意布尔返回值的代表含义
// 若返回值为true表示未处理, 即允许处理
// 若返回值为false表示已处理, 即不允许处理
// 若使用场景不严格要求数据一致的可以不用配置
EnsureFunc func(msg *Message) (allow bool)
// RetryDelay 重试延迟机制
// 返回值为重试间隔, 若 < 0 则代表不进行重试
RetryDelay func(attempts int) time.Duration
// contains filtered or unexported fields
}
Handler 消息处理器
type HandlerOpt ¶
type HandlerOpt func(h *Handler)
func HandlerDelay ¶
func HandlerDelay(delay time.Duration) HandlerOpt
type IdempotentInterface ¶
type IdempotentInterface interface {
// Acquire 获取key的操作权
// 若返回值为true表示获取成功, 允许操作
// 若返回值为false表示获取失败, 不允许操作
Acquire(key string) (bool, error)
// Release 释放key的操作权
Release(key string) error
}
IdempotentInterface 幂等性接口
type LoggerInterface ¶
type LoggerInterface interface {
Errorf(format string, args ...interface{})
}
type Message ¶
type Message struct {
// BizUID 消息唯一标识
// 无特殊业务含义, 通常用于幂等性处理防止重复消费
BizUID string `json:"b,omitempty"`
// Payload 原始消息内容
Payload []byte `json:"p,omitempty"`
// Retried 记录消息重试次数
Retried int `json:"r,omitempty"`
// RouteKey 路由键
RouteKey string `json:"k,omitempty"`
}
Message 消息结构体
func MessageAutoId ¶
MessageAutoId 实例化消息
func MessageWithId ¶
MessageWithId 实例化消息
type Sender ¶
type Sender struct {
sync.Once
// Topic 发送主题
Topic string
// Driver 驱动实例
Driver DriverInterface
// Logger 异常日志
Logger LoggerInterface
// TxOptions 事务配置
TxOptions *TxOptions
// contains filtered or unexported fields
}
Sender 发送器
type TXStorageInterface ¶
type TXStorageInterface interface {
// Store 将消息预存
// id 返回存储后的唯一标识
Store(data []byte) (id string, err error)
// Fetch 根据标识取出消息
Fetch(id string) (data []byte, err error)
// Remove 根据标识移除消息
Remove(id string) error
}
TXStorageInterface 预发存储接口
type TxOptions ¶
type TxOptions struct {
Context context.Context
// Timeout 事务处理时长
// 启用事务的消息不会立即发布给消费者
// 当本地事务回调执行返回true才会正式发布
// 详见事务流程图 ./tx_flow.png
Timeout time.Duration
// EnsureFunc 事务完成确认
// 请一定要注意布尔返回值的代表含义
// 若返回值为true则表示事务已处理, 发布消息
// 若返回值为false则表示事务未处理, 撤销消息
EnsureFunc func(msg *Message) (done bool)
// RetryDelay 重试延迟机制
// 返回值为重试间隔, 若 < 0 则代表不进行重试
RetryDelay func(attempts int) time.Duration
// TxStorage 事务消息存储
TxStorage TXStorageInterface
// contains filtered or unexported fields
}
TxOptions 事务配置
Source Files
¶
Click to show internal directories.
Click to hide internal directories.
