gluon

package module
v0.2.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 18, 2021 License: Apache-2.0 Imports: 14 Imported by: 0

README

🎇 Gluon

A composable message bus for Event-Driven systems written in Go.

Documentation

Index

Constants

View Source
const CloudEventsSpecVersion = "1.0"

CloudEventsSpecVersion The CloudEvents specification version used by `Gluon` internals.

Variables

View Source
var ErrBusClosed = errors.New("gluon: The bus is closed")

ErrBusClosed Cannot perform the action with a closed Bus.

View Source
var (
	// ErrMessageNotRegistered The message type was not found on the message registry
	ErrMessageNotRegistered = errors.New("gluon: The specified message type is not present on the message registry")
)
View Source
var ErrMissingSchemaDefinition = errors.New("gluon: Missing schema definition")

Functions

func Register

func Register(name string, driver Driver)

Register makes a message broker driver available for the Bus.

If Register is called with a driver equals to nil, it panics.

Types

type Bus

type Bus struct {
	BaseContext    context.Context
	Marshaler      Marshaler
	Factories      Factories
	SchemaRegistry SchemaRegistry
	Configuration  BusConfiguration
	Logger         *log.Logger
	Addresses      []string
	// contains filtered or unexported fields
}

Bus Is a facade component used to interact with foreign systems through streaming messaging mechanisms.

func NewBus

func NewBus(driver string, opts ...Option) *Bus

NewBus Allocate a new Bus with default configurations.

func (*Bus) GetSchemaMetadata

func (b *Bus) GetSchemaMetadata(schema interface{}) (*MessageMetadata, error)

GetSchemaMetadata retrieves metadata from the internal schema registry

func (*Bus) GetSchemaMetadataFromTopic

func (b *Bus) GetSchemaMetadataFromTopic(topic string) *MessageMetadata

GetSchemaMetadataFromTopic retrieves metadata from the internal schema registry using the topic name

func (*Bus) ListSubscribersFromTopic

func (b *Bus) ListSubscribersFromTopic(t string) []*Subscriber

ListSubscribersFromTopic Get the subscription task queue of a registered topic.

func (*Bus) ListenAndServe

func (b *Bus) ListenAndServe() error

ListenAndServe Bootstrap and start a Bus along its internal components (subscribers).

func (*Bus) Publish

func (b *Bus) Publish(ctx context.Context, data interface{}) error

Publish Propagate a message to the ecosystem using the internal topic registry agent to generate the topic.

Note: To propagate correlation and causation IDs, use Subscription's context.

func (*Bus) PublishBulk

func (b *Bus) PublishBulk(ctx context.Context, data ...interface{}) error

PublishBulk Propagate multiple messages to the ecosystem.

func (*Bus) PublishRaw

func (b *Bus) PublishRaw(ctx context.Context, msg *TransportMessage) error

PublishRaw Propagate a raw `Gluon` internal message to the ecosystem.

func (*Bus) PublishWithSubject

func (b *Bus) PublishWithSubject(ctx context.Context, data interface{}, subject string) error

PublishWithSubject Propagate a message to the ecosystem using the internal topic registry agent to generate the topic.

This method also exposes the `Subject` property to define the CloudEvent property with the same name.

func (*Bus) PublishWithTopic

func (b *Bus) PublishWithTopic(ctx context.Context, topic string, data interface{}) error

PublishWithTopic Propagate a message to the ecosystem using the internal topic registry agent to generate the topic.

Note: To propagate correlation and causation IDs, use Subscription's context.

func (*Bus) PublishWithTopicAndSubject

func (b *Bus) PublishWithTopicAndSubject(ctx context.Context, topic, subject string, data interface{}) error

PublishWithTopicAndSubject Propagate a message to the ecosystem using the internal topic registry agent to generate the topic.

This method also exposes the `Subject` property to define the CloudEvent property with the same name.

Note: To propagate correlation and causation IDs, use Subscription's context.

func (*Bus) RegisterSchema

func (b *Bus) RegisterSchema(schema interface{}, opts ...SchemaRegistryOption)

RegisterSchema Link a message schema to specific metadata (MessageMetadata) and store it for Bus further operations.

func (*Bus) Shutdown

func (b *Bus) Shutdown(ctx context.Context) error

Shutdown Close a Bus and its internal resources gracefully.

func (*Bus) Subscribe

func (b *Bus) Subscribe(schema interface{}) *Subscriber

Subscribe Set a subscription task using schema metadata.

It will return nil if no schema was found on local schema registry.

func (*Bus) SubscribeTopic

func (b *Bus) SubscribeTopic(topic string) *Subscriber

SubscribeTopic Set a subscription task using a raw topic name.

type BusConfiguration

type BusConfiguration struct {
	ConsumerGroup string
	MajorVersion  int
	// Driver Custom driver configuration(s)
	Driver interface{}
}

type Driver

type Driver interface {
	SetParentBus(b *Bus)
	SetInternalHandler(h InternalMessageHandler)
	Start(context.Context) error
	Shutdown(context.Context) error
	Subscribe(ctx context.Context, subscriber *Subscriber) error
	// Publish Propagate a low-level message (CloudEvent) to the message bus.
	Publish(ctx context.Context, message *TransportMessage) error
}

Driver Is the transportation vendor (e.g. Apache Kafka, AWS SNS/SQS, Redis Streams) which implements `Gluon` internal mechanisms.

type Factories

type Factories struct {
	IDFactory IDFactory
}

Factories Is a list of factories used by `Gluon` internals (e.g. ID factory) to successfully execute certain operations.

For example, when Gluon is constructing a message when a publication was requested, Gluon internals use the specified ID factory to generate IDs which contains the underlying unique identification algorithm (e.g. UUID).

type FactoryUUID

type FactoryUUID struct{}

func (FactoryUUID) NewID

func (f FactoryUUID) NewID() (string, error)

type Handler

type Handler interface {
	Handle(context.Context, *Message) error
}

Handler Is a component used to subscribe to a topic.

type HandlerFunc

type HandlerFunc func(context.Context, *Message) error

HandlerFunc Is an anonymous function used to subscribe to a topic.

type IDFactory

type IDFactory interface {
	NewID() (string, error)
}

IDFactory Is the kind of factory for generating unique identifications using a specific algorithm (e.g. UUID, Nano ID)

The default IDFactory is FactoryUUID (uses google/uuid package).

type InternalMessageHandler

type InternalMessageHandler func(ctx context.Context, subscriber *Subscriber, message *TransportMessage) error

InternalMessageHandler is the message handler used by concrete drivers.

type LocalSchemaRegistry

type LocalSchemaRegistry struct {
	BasePath string
}

func (LocalSchemaRegistry) GetBaseLocation

func (l LocalSchemaRegistry) GetBaseLocation() string

func (LocalSchemaRegistry) GetSchemaDefinition

func (l LocalSchemaRegistry) GetSchemaDefinition(schemaName string, _ int) (string, error)

func (LocalSchemaRegistry) IsUsingLatestSchema

func (l LocalSchemaRegistry) IsUsingLatestSchema() bool

type Marshaler

type Marshaler interface {
	SetParentBus(b *Bus)
	GetContentType() string
	Marshal(v interface{}) ([]byte, error)
	Unmarshal(data []byte, v interface{}) error
}

Marshaler Is a composable component which parses given data into a specific codec in binary format.

The Marshaler composable component is used by `Gluon` internals to preserve a specific codec for Message(s) which are transported through stream pipelines.

The default Marshaler is MarshalerJSON.

type MarshalerAvro

type MarshalerAvro struct {
	// contains filtered or unexported fields
}

func NewMarshalerAvro

func NewMarshalerAvro() *MarshalerAvro

func (*MarshalerAvro) GetContentType

func (m *MarshalerAvro) GetContentType() string

func (*MarshalerAvro) Marshal

func (m *MarshalerAvro) Marshal(v interface{}) ([]byte, error)

func (*MarshalerAvro) SetParentBus

func (m *MarshalerAvro) SetParentBus(b *Bus)

func (*MarshalerAvro) Unmarshal

func (m *MarshalerAvro) Unmarshal(data []byte, v interface{}) error

type MarshalerJSON

type MarshalerJSON struct{}

func (MarshalerJSON) GetContentType

func (m MarshalerJSON) GetContentType() string

func (MarshalerJSON) Marshal

func (m MarshalerJSON) Marshal(v interface{}) ([]byte, error)

func (MarshalerJSON) SetParentBus

func (m MarshalerJSON) SetParentBus(b *Bus)

func (MarshalerJSON) Unmarshal

func (m MarshalerJSON) Unmarshal(data []byte, v interface{}) error

type Message

type Message struct {
	Headers map[string]interface{}
	Data    interface{}
}

func (Message) GetCausationID

func (m Message) GetCausationID() string

func (Message) GetConsumerGroup

func (m Message) GetConsumerGroup() string

func (Message) GetContentType

func (m Message) GetContentType() string

func (Message) GetCorrelationID

func (m Message) GetCorrelationID() string

func (Message) GetMessageID

func (m Message) GetMessageID() string

func (Message) GetMessageTime

func (m Message) GetMessageTime() time.Time

func (Message) GetMessageType

func (m Message) GetMessageType() string

func (Message) GetSchema

func (m Message) GetSchema() string

func (Message) GetSource

func (m Message) GetSource() string

func (Message) GetSpecVersion

func (m Message) GetSpecVersion() string

func (Message) GetSubject

func (m Message) GetSubject() string

func (Message) GetTraceContext

func (m Message) GetTraceContext() interface{}

type MessageMetadata

type MessageMetadata struct {
	Topic         string
	Source        string
	SchemaName    string
	SchemaVersion int
	// contains filtered or unexported fields
}

MessageMetadata Is a set of definitions to describe a specific message schema.

type MiddlewareHandlerFunc

type MiddlewareHandlerFunc func(next HandlerFunc) HandlerFunc

MiddlewareHandlerFunc Is an anonymous function used to add behaviour to a consumer process.

This pattern is also known as Chain of Responsibility (CoR).

type MiddlewarePublisherFunc

type MiddlewarePublisherFunc func(next PublisherFunc) PublisherFunc

MiddlewarePublisherFunc Is an anonymous function used to add behaviour to a publishing process.

This pattern is also known as Chain of Responsibility (CoR).

type Option

type Option interface {
	// contains filtered or unexported methods
}

Option set a specific configuration of a resource (e.g. bus).

func WithBaseContext

func WithBaseContext(ctx context.Context) Option

WithBaseContext Set the Bus base context.

func WithCluster

func WithCluster(addr ...string) Option

WithCluster Set one up to N nodes for the Bus to use.

Not applicable when using a local Bus.

func WithConsumerGroup

func WithConsumerGroup(s string) Option

WithConsumerGroup Set a global consumer group, useful for microservices.

func WithConsumerMiddleware

func WithConsumerMiddleware(f ...MiddlewareHandlerFunc) Option

WithConsumerMiddleware Attach a chain of behaviour(s) for `Gluon` message consumption operations.

func WithDriverConfiguration

func WithDriverConfiguration(cfg interface{}) Option

WithDriverConfiguration Set a configuration for a specific driver.

func WithIDFactory

func WithIDFactory(f IDFactory) Option

WithIDFactory Set a unique identifier factory for `Gluon` operations.

func WithLogger

func WithLogger(l *log.Logger) Option

WithLogger Set a global logger to output Bus internal operations.

func WithMajorVersion

func WithMajorVersion(v int) Option

WithMajorVersion Set a global major version for message schemas.

func WithMarshaler

func WithMarshaler(m Marshaler) Option

WithMarshaler Set a marshaler strategy to encode/decode in-transit messages.

func WithPublisherMiddleware

func WithPublisherMiddleware(f ...MiddlewarePublisherFunc) Option

WithPublisherMiddleware Attach a chain of behaviour(s) for `Gluon` message production of message operations.

func WithSchemaRegistry

func WithSchemaRegistry(s SchemaRegistry) Option

WithSchemaRegistry Set a schema registry used by specific codecs (e.g. Apache Avro) to decode/encode in-transit messages.

type PublisherFunc

type PublisherFunc func(ctx context.Context, message *TransportMessage) error

PublisherFunc Is an anonymous function used by `Gluon` to propagate low-level messages.

type SchemaRegistry

type SchemaRegistry interface {
	GetBaseLocation() string
	IsUsingLatestSchema() bool
	GetSchemaDefinition(schemaName string, version int) (string, error)
}

type SchemaRegistryOption

type SchemaRegistryOption interface {
	// contains filtered or unexported methods
}

SchemaRegistryOption set a specific configuration for internal schema registry.

func WithSchemaName

func WithSchemaName(s string) SchemaRegistryOption

WithSchemaName Set the name of the schema stored on the SchemaRegistry.

func WithSchemaVersion

func WithSchemaVersion(v int) SchemaRegistryOption

WithSchemaVersion Set a major version for a message schema.

func WithSource

func WithSource(s string) SchemaRegistryOption

WithSource Set a source to a message schema.

func WithTopic

func WithTopic(s string) SchemaRegistryOption

WithTopic Set a topic name to a message schema.

type Subscriber

type Subscriber struct {
	// contains filtered or unexported fields
}

Subscriber Is an entry for the subscriber registry component.

It contains metadata for a specific consumer.

func (*Subscriber) DriverConfiguration

func (e *Subscriber) DriverConfiguration(cfg interface{}) *Subscriber

DriverConfiguration Set configuration for a specific driver.

func (Subscriber) GetDefaultHandler

func (e Subscriber) GetDefaultHandler() HandlerFunc

GetDefaultHandler Retrieve the Subscriber's default HandlerFunc.

It will return the defined HandlerFunc by default. If no HandlerFunc was specified, then the Handler.Handle function is returned.

func (*Subscriber) GetDriverConfiguration

func (e *Subscriber) GetDriverConfiguration() interface{}

GetDriverConfiguration Get the configuration of a specific driver.

func (Subscriber) GetGroup

func (e Subscriber) GetGroup() string

GetGroup Retrieve the Subscriber's consumer group.

func (Subscriber) GetHandler

func (e Subscriber) GetHandler() Handler

GetHandler Retrieve the Subscriber's Handler.

func (Subscriber) GetHandlerFunc

func (e Subscriber) GetHandlerFunc() HandlerFunc

GetHandlerFunc Retrieve the Subscriber's HandlerFunc.

func (Subscriber) GetTopic

func (e Subscriber) GetTopic() string

GetTopic Retrieve the Subscriber's topic name.

func (*Subscriber) Group

func (e *Subscriber) Group(g string) *Subscriber

Group Set a consumer group (if Driver allows them).

A consumer group is a mechanism used to consume messages in parallel by multiple workers as a single unit.

This kind of mechanisms are very useful when consuming messages from within a microservice environment, where each node from the microservice cluster gets a message every specific time (depending on the load balancing algorithm).

func (*Subscriber) Handler

func (e *Subscriber) Handler(h Handler) *Subscriber

Handler Set a Handler component.

In-transit messages will go straight through to Handler.Handle function.

func (*Subscriber) HandlerFunc

func (e *Subscriber) HandlerFunc(h HandlerFunc) *Subscriber

HandlerFunc Set a HandlerFunc component.

In-transit messages will go straight through to the function.

type TransportMessage

type TransportMessage struct {
	ID          string `json:"id"`
	Source      string `json:"source"`
	SpecVersion string `json:"specversion"`
	Type        string `json:"type"`
	Data        []byte `json:"data"`

	DataContentType string `json:"datacontenttype,omitempty"`
	DataSchema      string `json:"dataschema,omitempty"`
	Subject         string `json:"subject,omitempty"`
	Time            string `json:"time,omitempty"`

	CorrelationID string      `json:"gluon_correlation_id"`
	CausationID   string      `json:"gluon_causation_id"`
	TraceContext  interface{} `json:"gluon_trace_context"`

	// Internal fields
	Topic         string            `json:"-"`
	DriverHeaders map[string]string `json:"-"`
}

TransportMessage Is the basic unit of data transportation (also known as integration event).

Based on the CloudEvents specification (a project from the CNCF), the payload of this structure was made to comply with most of event-driven systems.

For greater performance gains in further streaming pipelines, it is widely recommended to transport small pieces of data (<64 KB). In addition, there are infrastructure vendors that might accept up to a fixed message size.

For more information about the fields that compose this structure, check https://github.com/cloudevents/spec/blob/master/spec.md

Directories

Path Synopsis
examples
apache-kafka command
simple-consumer command
sns-sqs command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL