eventbus

package module
v0.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 10, 2026 License: MIT Imports: 8 Imported by: 1

README

RabbitMQ 事件总线组件

组件介绍

这是一个基于 RabbitMQ 的事件总线实现,提供了简单的发布/订阅功能,使用 Watermill 库作为底层 AMQP 协议实现。

主要特性
  • 接口抽象:定义了通用的 EventBus 接口,便于替换不同的消息中间件实现
  • 发布订阅:支持消息的发布和订阅功能
  • 元数据支持:在消息中携带额外的元数据信息
  • 自动确认:订阅者处理完消息后自动发送 ACK 确认
核心组件
  • EventBus:事件总线接口定义
  • RabbitMQEventBus:RabbitMQ 实现的具体结构体
  • NewRabbitMQEventBus:创建事件总线实例的工厂函数

Event Bus

基于 RabbitMQ 的事件总线实现,使用 Watermill 库提供 AMQP 支持。

功能特性

  • 消息发布/订阅模式
  • 支持消息元数据
  • 自动消息确认机制
  • 接口化设计,易于扩展

安装

bash
go get github.com/your-repo/eventbus

使用示例

go
package main

import (
    "context"
    "eventbus"
    "github.com/ThreeDotsLabs/watermill/message"
)

func main() {
    // 创建事件总线实例
    eb, err := eventbus.NewRabbitMQEventBus("amqp://user:password@localhost:5672/")
    if err != nil {
        panic(err)
    }

    // 订阅主题
    err = eb.Subscribe(context.Background(), "test-topic", func(msg *message.Message) {
        println("Received:", string(msg.Payload))
    })
    if err != nil {
        panic(err)
    }

    // 发布消息
    err = eb.Publish(context.Background(), "test-topic", []byte("Hello World"), nil)
    if err != nil {
        panic(err)
    }
}

API 接口

  • Publish(ctx, topic, payload, metadata) - 发布消息到指定主题
  • Subscribe(ctx, topic, handler) - 订阅指定主题的消息

依赖

  • github.com/ThreeDotsLabs/watermill
  • github.com/ThreeDotsLabs/watermill-amqp/v2/pkg/amqp

许可证

MIT

Documentation

Index

Constants

View Source
const MetadataPublishedAtUnixMS = "published_at_unix_ms"

Variables

This section is empty.

Functions

func EventMetadataToMap added in v0.2.0

func EventMetadataToMap(em *EventMetadata) map[string]string

EventMetadataToMap 将 EventMetadata 转换为 map[string]string

Types

type EventBus

type EventBus interface {
	Publish(ctx context.Context, topic string, payload []byte, metadata map[string]string) error
	Subscribe(ctx context.Context, topic string, handler EventHandler) error
}

EventBus 接口定义

func NewRabbitMQEventBus

func NewRabbitMQEventBus(amqpUri string, prefetchCount int) (EventBus, error)

NewRabbitMQEventBus 初始化事件总线 prefetchCount 只控制 RabbitMQ QoS PrefetchCount,旧构造函数保持单消息处理。

func NewRabbitMQEventBusWithConsumerConcurrency added in v0.2.1

func NewRabbitMQEventBusWithConsumerConcurrency(amqpUri string, prefetchCount int, consumerConcurrency int) (EventBus, error)

type EventHandler

type EventHandler func(eventID string, payload []byte, metadata map[string]string) error

type EventMetadata added in v0.2.0

type EventMetadata struct {
	//EventID      string  `json:"event_id"` // UUID
	EventType   string `json:"event_type"`
	TenantID    uint32 `json:"tenant_id"`
	UserID      uint32 `json:"user_id"`
	TaskID      uint32 `json:"task_id"` // 执行任务WorkTaskID
	WorkTaskID  uint32 `json:"work_task_id"`
	CompanyID   uint32 `json:"company_id"`
	CompanyName string `json:"company_name"`
}

EventMetadata 事件元数据,结果入库用

func MapToEventMetadata added in v0.2.0

func MapToEventMetadata(m map[string]string) (*EventMetadata, error)

type RabbitMQEventBus

type RabbitMQEventBus struct {
	// contains filtered or unexported fields
}

RabbitMQEventBus 实现

func (*RabbitMQEventBus) Publish

func (e *RabbitMQEventBus) Publish(ctx context.Context, topic string, payload []byte, metadata map[string]string) error

Publish 发布事件

func (*RabbitMQEventBus) Subscribe

func (e *RabbitMQEventBus) Subscribe(ctx context.Context, topic string, handler EventHandler) error

Subscribe 订阅事件,并发消费,并发度由 consumerConcurrency 控制。

type TaskPayload added in v0.2.0

type TaskPayload struct {
	OssPath string      `json:"oss_path"`
	Options interface{} `json:"options,omitempty"`
}

TaskPayload 任务信息,启动任务用

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL