eventbus

package module
v0.0.0-...-4670758 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 20, 2025 License: CC0-1.0 Imports: 9 Imported by: 4

README

eventbus

PkgGoDev

eventbus is a naive implementation of a type-safe thread-safe optimistically-sending-event-zero-allocation event bus.

Initially I used github.com/asaskevich/EventBus, but it was buggy (deadlocks) and on top of that quite inconvenient. So I just quickly thrown together a bit of code to implement an event bus from scratch. This time with type safety, no deadlocks and more flexibility.

Project priorities:

  • Safety
  • Flexibility
  • User-friendliness (e.g. cancel&cleanup by simply Done()-ing the context).

Non-priorities:

  • Performance fine-tuning.

Quick start

Make an event type:

type MyCustomEvent struct {
    // ...fields...
}

Init:

bus := eventbus.New()

Subscribe:

ctx, cancelFn := context.WithCancel(ctx)
sub := eventbus.Subscribe[MyCustomEvent](
    ctx,
    bus,
    eventbus.OptionQueueSize(10),
)
for ev := range sub.EventChan() {
    // ...do something with `ev`, which is already of type `MyCustomEvent`...
}

Send an event:

eventbus.SendEvent(ctx, bus, MyCustomEvent{ /*...field values...*/ })

(the for range above will receive the event)

To cancel the subscription:

cancelFn()

or

sub.Finish(context.Background())

If you need a custom topic, instead of using the event type as the topic then:

sub := eventbus.SubscribeWithCustomTopic[MyCustomEvent](
    ctx,
    bus, "my-custom-topic"
    eventbus.OptionQueueSize(10),
)

and:

eventbus.SendEventWithCustomTopic(ctx, bus, "my-custom-topic", MyCustomEvent{ /*...field values...*/ })

Logging

For example, if you use logrus:

import (
    "github.com/sirupsen/logrus"
    beltlogger "github.com/facebookincubator/go-belt/tool/logger"
    beltlogrus "github.com/facebookincubator/go-belt/tool/logger/implementation/logrus"
)

...

    eventbus.LoggingEnabled = true
    myLogrusLogger.SetLevel(logrus.TraceLevel)
    ctx = beltlogger.CtxWithLogger(ctx, beltlogrus.New(myLogrusLogger))

    ...

    eventbus.SendEventWithCustomTopic(ctx, bus, "my-custom-topic", MyCustomEvent{ /*...field values...*/ })

But other loggers are also supported.

Benchmark

goos: linux
goarch: amd64
pkg: github.com/xaionaro-go/eventbus
cpu: AMD Ryzen 9 5900X 12-Core Processor
BenchmarkSendEvent/subCount0-24         	324881984	        74.42 ns/op	       0 B/op	       0 allocs/op
BenchmarkSendEvent/subCount1-24         	95566108	       254.4 ns/op	       0 B/op	       0 allocs/op
BenchmarkSendEvent/subCount2-24         	65961566	       360.1 ns/op	       0 B/op	       0 allocs/op
BenchmarkSendEvent/subCount4-24         	42975062	       561.3 ns/op	       0 B/op	       0 allocs/op
BenchmarkSendEvent/subCount8-24         	25660252	       915.3 ns/op	       0 B/op	       0 allocs/op
BenchmarkSendEvent/subCount16-24        	13101732	      1818 ns/op	       0 B/op	       0 allocs/op
BenchmarkSendEvent/subCount32-24        	 6860270	      3493 ns/op	       0 B/op	       0 allocs/op
BenchmarkSendEvent/subCount64-24        	 3585793	      6704 ns/op	       0 B/op	       0 allocs/op
BenchmarkSendEvent/subCount128-24       	 1778506	     13490 ns/op	       0 B/op	       0 allocs/op
BenchmarkSendEvent/subCount256-24       	  846909	     26608 ns/op	       0 B/op	       0 allocs/op
BenchmarkSendEvent/subCount512-24       	  419862	     56979 ns/op	       0 B/op	       0 allocs/op
BenchmarkSendEvent/subCount1024-24      	  193911	    127469 ns/op	       0 B/op	       0 allocs/op
PASS
ok  	github.com/xaionaro-go/eventbus	286.868s

You can remove logging, replace chanLocker with normal sync.Mutex and perform other trivial optimizations, and it will be at least 2-3 times faster (e.g. in the case of a single subscriber). But we consciously don't care about that: we care about usability more than about performance.

Examples of usage:

Documentation

Index

Constants

This section is empty.

Variables

View Source
var LoggingEnabled = false

Functions

func OnOverflowPileUpOrClose

func OnOverflowPileUpOrClose(
	pileSize uint,
	timeout time.Duration,
) onOverflowPileUpOrClose

func OptionOnOverflow

func OptionOnOverflow(v OnOverflow) optionOnOverflowT

func Unsubscribe

func Unsubscribe[E any](
	ctx context.Context,
	bus *EventBus,
	sub *Subscription[E, E],
) bool

func UnsubscribeWithCustomTopic

func UnsubscribeWithCustomTopic[T, E any](
	ctx context.Context,
	bus *EventBus,
	topic T,
	sub *Subscription[T, E],
) bool

Types

type EventBus

type EventBus struct {
	// contains filtered or unexported fields
}

func New

func New() *EventBus

func (EventBus) Lock

func (l EventBus) Lock(ctx context.Context) bool

func (EventBus) TryLock

func (l EventBus) TryLock(ctx context.Context) bool

func (EventBus) Unlock

func (l EventBus) Unlock()

type OnOverflow

type OnOverflow interface {
	// contains filtered or unexported methods
}

OnOverflow defines a behavior if unable to immediately send an event to an subscriber due to the queue being already full.

type OnOverflowClose

type OnOverflowClose struct{}

OnOverflowClose is an OnOverflow that closes the subscription.

type OnOverflowDrop

type OnOverflowDrop struct{}

OnOverflowDrop is an OnOverflow that drops the event.

type OnOverflowWait

type OnOverflowWait time.Duration

OnOverflowWait is an OnOverflow that waits for the given duration (which is infinite if value is <= 0), and drops the event if ultimately was unable to send it.

type OnOverflowWaitOrClose

type OnOverflowWaitOrClose time.Duration

OnOverflowWaitOrClose is an OnOverflow that waits for the given duration (which is infinite if value is <= 0), and closes the subscription if ultimately was unable to send the event.

type Option

type Option interface {
	// contains filtered or unexported methods
}

type OptionBeforeSubscribed

type OptionBeforeSubscribed[T, E any] SubscriptionCallback[T, E]

type OptionOnSubscribed

type OptionOnSubscribed[T, E any] SubscriptionCallback[T, E]

type OptionOnUnsubscribe

type OptionOnUnsubscribe[T, E any] SubscriptionCallback[T, E]

type OptionQueueSize

type OptionQueueSize uint

type Options

type Options []Option

func (Options) Config

func (s Options) Config() config

type SendEventResult

type SendEventResult struct {
	SentCountImmediate uint
	SentCountDeferred  uint
	PiledCount         uint
	DropCountImmediate uint
	DropCountDeferred  uint
}

func SendEvent

func SendEvent[E any](
	ctx context.Context,
	bus *EventBus,
	event E,
) (result SendEventResult)

func SendEventWithCustomTopic

func SendEventWithCustomTopic[T, E any](
	ctx context.Context,
	bus *EventBus,
	topic T,
	event E,
) (result SendEventResult)

type Subscription

type Subscription[T, E any] struct {
	// contains filtered or unexported fields
}

func Subscribe

func Subscribe[E any](
	ctx context.Context,
	bus *EventBus,
	opts ...Option,
) *Subscription[E, E]

func SubscribeWithCustomTopic

func SubscribeWithCustomTopic[T, E any](
	ctx context.Context,
	bus *EventBus,
	topic T,
	opts ...Option,
) (_ret *Subscription[T, E])

func (*Subscription[T, E]) Cancel

func (sub *Subscription[T, E]) Cancel()

func (*Subscription[T, E]) Done

func (sub *Subscription[T, E]) Done() <-chan struct{}

func (*Subscription[T, E]) EventChan

func (sub *Subscription[T, E]) EventChan() chan E

func (*Subscription[T, E]) Finish

func (sub *Subscription[T, E]) Finish(ctx context.Context) bool

func (*Subscription[T, E]) Ready

func (sub *Subscription[T, E]) Ready() <-chan struct{}

func (*Subscription[T, E]) SetReady

func (sub *Subscription[T, E]) SetReady()

type SubscriptionCallback

type SubscriptionCallback[T, E any] func(context.Context, *Subscription[T, E])

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL