events

package module
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 6, 2026 License: MIT Imports: 11 Imported by: 1

README

go-events

CI

AMQP/Kafka event toolkit for Go services: a domain event model, an outbox-style event bus, a RabbitMQ consumer/publisher, an RPC client/server and a Kafka producer.

Install

go get github.com/tech-nimble/go-events

Subscribe to events

consumer := rabbitmq.NewConsumer("amqp://guest:guest@rabbitmq:5672/")

consumer.
	WithConsumerOptions(&rabbitmq.ConsumerOptions{
		Consumer:         "consumer-name",
		QoSPrefetchCount: 5,
	}).
	WithQueueOptions(&rabbitmq.QueueOptions{
		Name:    "consumer-queue-name",
		Durable: true,
	}).
	WithBindOptions(&rabbitmq.BindOptions{
		Key:      "routing-key",
		Exchange: "exchange-name",
	})

_ = consumer.AddHandler("order.created", func(ctx context.Context, msg amqp.Delivery) {
	// handle message
})

if err := consumer.ListenAndServe(); err != nil {
	log.Fatal().Err(err).Msg("consumer failed")
}
defer consumer.Close()

Routing

Messages are matched to handlers by a Router. HeaderRouter (default) routes by the type header; RouterKeyRouter routes by the AMQP routing key. Both support strict, regexp or custom matchers.

Packages

Package Purpose
(root) Event model, EventBus, Router, DB outbox repository.
rabbitmq Connection, consumer, publisher and RPC client/server.
kafka Kafka producer.
initializers/amqp_v2 Env-based bootstrap helpers for RabbitMQ, Kafka and the event bus.

License

MIT © Nimble Tech

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Command

type Command interface {
	GetBody() ([]byte, error)
	GetExchangeName() string
	GetCommandName() string
	GetHeaders() map[string]any
	SetHeaders(map[string]any)
}

type DBRepository

type DBRepository struct {
	*repositories.Repositories
	// contains filtered or unexported fields
}

func NewDBRepository

func NewDBRepository(rep *repositories.Repositories, tableName string) *DBRepository

func (*DBRepository) Create

func (e *DBRepository) Create(ctx context.Context, entity *Event) error

func (*DBRepository) Update

func (e *DBRepository) Update(ctx context.Context, entity *Event) error

type DeliveryMode

type DeliveryMode uint8
const (
	Transient DeliveryMode = 1 + iota
	Persistent
)

type Event

type Event struct {
	ID         string
	EntityID   string
	EntityName string
	Published  bool
	Payload    any
	Headers    map[string]any
	Exchange   string
	RoutingKey string
	CreatedAt  time.Time
	UpdatedAt  time.Time
}

func NewEvent

func NewEvent(payload any, headers map[string]any, entityID, entityName, routingKey, exchange string) *Event

func (*Event) GetBody

func (e *Event) GetBody() ([]byte, error)

func (*Event) GetDeliveryMode

func (e *Event) GetDeliveryMode() DeliveryMode

func (*Event) GetHeaders

func (e *Event) GetHeaders() amqp.Table

func (*Event) GetRoutingKey

func (e *Event) GetRoutingKey() string

func (*Event) Sent

func (e *Event) Sent()

func (*Event) SetHeaders

func (e *Event) SetHeaders(headers map[string]any)

type EventBus

type EventBus struct {
	// contains filtered or unexported fields
}

func NewEventBus

func NewEventBus(client Publisher, repository Repository) *EventBus

func (*EventBus) Send

func (e *EventBus) Send(ctx context.Context, event *Event) error

type HeaderRouter

type HeaderRouter struct {
	// contains filtered or unexported fields
}

HeaderRouter routes by a message header. By default it inspects the "type" header.

func NewHeaderRouter

func NewHeaderRouter(options ...HeaderRouterOption) *HeaderRouter

NewHeaderRouter builds a header-based router.

func (*HeaderRouter) Match

func (r *HeaderRouter) Match(msg amqp.Delivery, route string) bool

Match reports whether the message matches the given route.

type HeaderRouterOption

type HeaderRouterOption interface {
	// contains filtered or unexported methods
}

HeaderRouterOption configures a HeaderRouter.

func WithHeaderRouterKey

func WithHeaderRouterKey(key string) HeaderRouterOption

WithHeaderRouterKey sets the header key used for routing.

func WithHeaderRouterMatcher

func WithHeaderRouterMatcher(matcher Matcher) HeaderRouterOption

WithHeaderRouterMatcher sets a custom match function.

func WithHeaderRouterRegexpMatcher

func WithHeaderRouterRegexpMatcher() HeaderRouterOption

WithHeaderRouterRegexpMatcher matches routes using regular expressions.

type Matcher

type Matcher func(msgRoute string, handlerRoute string) bool

type Model

type Model struct {
	ID         string
	EntityID   string
	EntityName string
	Published  bool
	Payload    pgtype.JSONB
	Headers    pgtype.JSONB
	Exchange   string
	RoutingKey string
	CreatedAt  pgtype.Timestamp
	UpdatedAt  sql.NullTime
}

func NewModel

func NewModel(entity *Event) (*Model, error)

func (*Model) GetEntity

func (e *Model) GetEntity() (*Event, error)

type Publisher

type Publisher interface {
	Publish(publishing Publishing) error
}

type Publishing

type Publishing interface {
	GetBody() ([]byte, error)
	GetHeaders() amqp.Table
	SetHeaders(map[string]any)
	GetDeliveryMode() DeliveryMode
	GetRoutingKey() string
}

type Repository

type Repository interface {
	Create(ctx context.Context, event *Event) error
	Update(ctx context.Context, event *Event) error
}

type Response

type Response interface {
	GetBody() ([]byte, error)
	GetHeaders() map[string]any
	SetHeaders(map[string]any)
}

type Router

type Router interface {
	Match(msg amqp.Delivery, route string) bool
}

Router matches a message against a route.

type RouterKeyRouter

type RouterKeyRouter struct {
	// contains filtered or unexported fields
}

RouterKeyRouter routes by the AMQP routing key.

func NewRouterKeyRouter

func NewRouterKeyRouter(options ...RouterKeyRouterOption) *RouterKeyRouter

NewRouterKeyRouter builds a routing-key based router.

func (*RouterKeyRouter) Match

func (r *RouterKeyRouter) Match(msg amqp.Delivery, route string) bool

Match reports whether the message matches the given route.

type RouterKeyRouterOption

type RouterKeyRouterOption interface {
	// contains filtered or unexported methods
}

RouterKeyRouterOption configures a RouterKeyRouter.

func WithRouterKeyMatcher

func WithRouterKeyMatcher(matcher Matcher) RouterKeyRouterOption

WithRouterKeyMatcher sets a custom match function.

func WithRouterKeyRegexpMatcher

func WithRouterKeyRegexpMatcher() RouterKeyRouterOption

WithRouterKeyRegexpMatcher matches routes using regular expressions.

Directories

Path Synopsis
initializers

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL