queue

package module
v0.28.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 20, 2026 License: MIT Imports: 11 Imported by: 4

README

queue

queue 是 infrago 的模块

包定位

  • 类型:模块
  • 作用:队列模块,负责生产、消费、确认、重试流程。

主要功能

  • 对上提供统一模块接口
  • 对下通过驱动接口接入具体后端
  • 支持按配置切换驱动实现

快速接入

import _ "github.com/infrago/queue"
[queue]
driver = "default"

驱动实现接口列表

以下接口由驱动实现(来自模块 driver.go):

Driver
  • Connect(*Instance) (Connection, error)
Connection
  • Open() error
  • Close() error
  • Start() error
  • Stop() error
  • Register(name string) error
  • Publish(name string, data []byte) error
  • DeferredPublish(name string, data []byte, delay time.Duration) error

全局配置项(所有配置键)

配置段:[queue]

  • driver
  • external
  • codec
  • prefix
  • weight
  • setting

说明

  • setting 一般用于向具体驱动透传专用参数
  • 默认内存驱动支持 setting.buffer,也可通过单个 Queue 的 Setting["buffer"] 覆盖
  • 默认内存驱动支持 setting.blocking_publishsetting.publish_timeout
  • Queue 的 Setting["dead"] / dead_letter / dlq 可配置最终失败消息转发队列
  • 业务处理 panic 会被捕获为可重试失败,Context.Context() 可感知停止取消
  • 多实例配置请参考模块源码中的 Config/configure 处理逻辑

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func DeferredPublish

func DeferredPublish(name string, value Map, delay time.Duration) error

func DeferredPublishTo

func DeferredPublishTo(conn, name string, value Map, delay time.Duration) error

func Publish

func Publish(name string, values ...Map) error

func PublishTo

func PublishTo(conn, name string, values ...Map) error

Types

type Config

type Config struct {
	Driver   string
	External bool
	Codec    string
	Weight   int
	Prefix   string
	Setting  Map
}

type Configs

type Configs map[string]Config

type Connection added in v0.7.0

type Connection interface {
	Open() error
	Close() error
	Start() error
	Stop() error

	Register(name string) error
	Publish(name string, data []byte) error
	DeferredPublish(name string, data []byte, delay time.Duration) error
}

type Context

type Context struct {
	*infra.Meta

	Name    string
	Config  *Queue
	Setting Map

	Value  Map
	Args   Map
	Locals Map

	Body Any
	// contains filtered or unexported fields
}

func (*Context) Attempts added in v0.7.0

func (ctx *Context) Attempts() int

func (*Context) Context added in v0.28.0

func (ctx *Context) Context() context.Context

func (*Context) Denied

func (ctx *Context) Denied(res Res)

func (*Context) Done added in v0.28.0

func (ctx *Context) Done() <-chan struct{}

func (*Context) Error added in v0.7.0

func (ctx *Context) Error(res Res)

func (*Context) Failed

func (ctx *Context) Failed(res Res)

func (*Context) Final added in v0.7.0

func (ctx *Context) Final() bool

func (*Context) Finish

func (ctx *Context) Finish()

func (*Context) Found

func (ctx *Context) Found()

func (*Context) Next

func (ctx *Context) Next()

func (*Context) Retry

func (ctx *Context) Retry(delays ...time.Duration)

type Declare

type Declare struct {
	Alias    []string `json:"alias"`
	Name     string   `json:"name"`
	Desc     string   `json:"desc"`
	Nullable bool     `json:"-"`
	Args     Vars     `json:"args"`
}

type Delay

type Delay = []time.Duration

type Driver

type Driver interface {
	Connect(*Instance) (Connection, error)
}

type Filter

type Filter struct {
	Name     string  `json:"name"`
	Desc     string  `json:"desc"`
	Serve    ctxFunc `json:"-"`
	Request  ctxFunc `json:"-"`
	Execute  ctxFunc `json:"-"`
	Response ctxFunc `json:"-"`
}

type Handler

type Handler struct {
	Name   string  `json:"name"`
	Desc   string  `json:"desc"`
	Found  ctxFunc `json:"-"`
	Error  ctxFunc `json:"-"`
	Failed ctxFunc `json:"-"`
	Denied ctxFunc `json:"-"`
}

type Instance

type Instance struct {
	Name    string
	Config  Config
	Setting base.Map
	// contains filtered or unexported fields
}

func (*Instance) Serve

func (inst *Instance) Serve(req Request) (resp Response)

func (*Instance) Submit

func (inst *Instance) Submit(next func())

type Module

type Module struct {
	// contains filtered or unexported fields
}

func (*Module) Close added in v0.7.0

func (m *Module) Close()

func (*Module) Config

func (m *Module) Config(global Map)

func (*Module) Open added in v0.7.0

func (m *Module) Open()

func (*Module) Register

func (m *Module) Register(name string, value Any)

func (*Module) RegisterConfig added in v0.7.0

func (m *Module) RegisterConfig(name string, cfg Config)

func (*Module) RegisterConfigs added in v0.7.0

func (m *Module) RegisterConfigs(configs Configs)

func (*Module) RegisterDeclare added in v0.7.0

func (m *Module) RegisterDeclare(name string, cfg Declare)

func (*Module) RegisterDriver added in v0.7.0

func (m *Module) RegisterDriver(name string, driver Driver)

func (*Module) RegisterFilter added in v0.7.0

func (m *Module) RegisterFilter(name string, cfg Filter)

func (*Module) RegisterHandler added in v0.7.0

func (m *Module) RegisterHandler(name string, cfg Handler)

func (*Module) RegisterQueue added in v0.7.0

func (m *Module) RegisterQueue(name string, cfg Queue)

func (*Module) RegisterQueues added in v0.7.0

func (m *Module) RegisterQueues(prefix string, queues Queues)

func (*Module) Setup added in v0.7.0

func (m *Module) Setup()

func (*Module) Start added in v0.7.0

func (m *Module) Start()

func (*Module) Stop added in v0.7.0

func (m *Module) Stop()

type Queue

type Queue struct {
	Alias    []string `json:"alias"`
	Name     string   `json:"name"`
	Desc     string   `json:"desc"`
	Nullable bool     `json:"-"`
	Args     Vars     `json:"args"`
	Setting  Map      `json:"setting"`

	Action  ctxFunc   `json:"-"`
	Actions []ctxFunc `json:"-"`

	Found  ctxFunc `json:"-"`
	Error  ctxFunc `json:"-"`
	Failed ctxFunc `json:"-"`
	Denied ctxFunc `json:"-"`

	Connect string `json:"connect"`
	Thread  int    `json:"thread"`
	Retry   Retry  `json:"retry"`
}

func (Queue) RegistryComponent added in v0.7.0

func (Queue) RegistryComponent() string

type Queues added in v0.7.0

type Queues map[string]Queue

func (Queues) RegistryComponent added in v0.7.0

func (Queues) RegistryComponent() string

type Request

type Request struct {
	Name      string
	Data      []byte
	Attempt   int
	Timestamp time.Time
}

type Response

type Response struct {
	Retry bool
	Delay time.Duration
}

type Retry

type Retry = []time.Duration

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL