eventsourcing

package module
v0.6.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 19, 2025 License: MIT Imports: 6 Imported by: 1

README

Go Event Sourcing Library

Go Report Card

This is a robust and flexible Go library designed for implementing the Event Sourcing pattern in your applications, leveraging the power of Go's generics. Event Sourcing is a design pattern that enables you to capture changes to an application state as a sequence of events, rather than just storing the current state.

The use of generics in this library provides several advantages:

  1. Type Safety: Generics allow for compile-time type checking, reducing the likelihood of runtime errors and increasing overall code safety.
  2. Code Reusability: With generics, you can write functions or types that are abstracted over types, meaning they can be reused with different types.
  3. Improved Performance: Generics can lead to performance improvements as they eliminate the need for type assertions and reflection.
  4. Clearer and Cleaner Code: Generics result in code that's easier to read, understand, and maintain.

The library includes key components such as Aggregates, Events, and an Event Store, all fundamental to the Event Sourcing architecture. By facilitating an efficient method of handling data changes, this library serves as a valuable tool for maintaining and auditing the complete history of data changes over time in your applications.

Features

This Go Event Sourcing Library offers a broad range of features to make implementing Event Sourcing in your Go applications as straightforward and efficient as possible. Here's a rundown of the key features this library provides:

  1. Generics-Based Architecture: The library leverages Go's generics capabilities to provide type-safe and reusable components, resulting in clearer and cleaner code, improved performance, and reduced likelihood of runtime errors.

  2. Aggregate Management: It provides robust support for managing aggregates, entities with unique IDs whose state evolves over time through a sequence of events. Aggregates also have invariants that represent business rules to maintain consistency.

  3. Event Handling: The library offers comprehensive tools for creating and handling events, which are fundamental units of state change in the Event Sourcing pattern. It includes functionality to apply events to aggregates, track event metadata, and even marshal events to JSON.

  4. Event Store Interface: It provides a standard interface for an Event Store, the storage system for events, enabling the saving, loading, and retrieving of an aggregate's history. This interface can be implemented with various storage systems according to your application's needs.

  5. Version Management: It includes functionality for managing the version of an aggregate, which is incremented each time an event is applied. This feature aids in tracking the evolution of the aggregate over time.

These features come together to provide a solid foundation for implementing the Event Sourcing pattern in your Go applications, facilitating efficient data change handling and thorough auditing of changes over time.

Certainly! Here's a completed section that references the to-do list example as a practical illustration of how to use the event sourcing library:

Usage

To use this library, you would define your own aggregate states and event states that satisfy the AggregateState and EventState interfaces, respectively. Then, you can create events and apply them to aggregates, and store the aggregates in an event store.

Usage Examples

For a hands-on illustration of how to apply this library, check out our to-do list example. This repository demonstrates a simple yet effective implementation of event sourcing in a practical application: managing a to-do list. You'll see how aggregate and event states are defined and utilized within a Go application, providing a clear blueprint for incorporating event sourcing into your projects.

In this example, you will learn how to:

  • Define aggregate states for your to-do items and event states for actions such as creating a to-do or adding a task.
  • Create and apply events to these aggregates, effectively demonstrating the event sourcing process.
  • Utilize the Command Query Responsibility Segregation (CQRS) pattern to separate the read and write operations of your application, enhancing its architecture and scalability.

This example serves as a practical guide to understanding and implementing event sourcing with our library. By exploring the repository, you can gain insights into structuring your application for event sourcing and managing state through events in a real-world scenario.

Main Components

Aggregate

Aggregate represents an entity in your system that is identified by a unique ID. It encapsulates state that evolves over time through a sequence of events.

type Aggregate[S AggregateState] struct {...}
Event

Event represents something that has occurred within your system. In the context of event sourcing, events are the source of truth and they determine the current state of an aggregate. Each event is associated with an aggregate and contains data that describes the state change.

type Event[S AggregateState] struct {...}
Event Store

EventStore is the storage interface for events. It provides methods to save, load, and retrieve the history of an aggregate.

type EventStore[S AggregateState] interface {...}

Key Interfaces

AggregateState

AggregateState interface represents the state of an aggregate. It defines the Type method to return the type of the aggregate, and the Zero method to return a zero-valued instance of the aggregate state.

type AggregateState interface {...}
EventState

EventState is an interface representing an event state that can be applied to an aggregate. It has Type method to return the type of the event and Apply method to apply the event to an aggregate.

type EventState[S AggregateState] interface {...}
Identifiable

Identifiable is an interface representing objects that can be identified by an Aggregate ID. It's used to ensure that the state of an event can provide an Aggregate ID when the event is applied to an aggregate.

type Identifiable interface {...}
InceptionRecorder

InceptionRecorder is an interface representing objects that can record the inception date of an aggregate. It's used to ensure that the state of an event can record the inception date of an aggregate when the event is applied to an aggregate.

type InceptionRecorder interface {...}
ModificationRecorder

ModificationRecorder is an interface representing objects that can record the modification date of an aggregate. It's used to ensure that the state of an event can record the modification date of an aggregate when the event is applied to an aggregate.

type ModificationRecorder interface {...}
StorageNamer Interface

The v0.2.0 release introduces a significant enhancement to the Go Event Sourcing Library: the StorageNamer interface. This interface provides a customizable way for aggregates to define their corresponding storage entities, such as database tables or collections. This feature is especially beneficial as your project scales up, offering a more adaptable approach to storage naming conventions.

Definition:
type StorageNamer interface {
    StorageName() string
}
Usage:
  • Implement the StorageNamer interface in your aggregate state if you require a custom storage name different from the default aggregate state type name.
  • If an aggregate state does not implement StorageNamer, the library defaults to using the aggregate state's type as the storage identifier.
Example:

Suppose you have an aggregate state UserState. By default, the storage name would be UserState. If you need a different storage name, say user_table, implement the StorageNamer interface as shown below:

type UserState struct {...}

func (u *UserState) StorageName() string {
    return "user_table"
}

func (u *UserState) Type() string {
    return "user"
}

This implementation allows UserState to explicitly specify user_table as its storage name.

Benefits of Using StorageNamer
  • Explicit Naming: Offers a clear and explicit way to define storage names, enhancing readability and maintainability of your code.
  • Flexibility: Adapts to various storage backends and user requirements, making the library more versatile.
  • Backward Compatibility: Ensures existing codebases remain functional without modifications.
Implementation Considerations
  • This interface is optional; you can choose to implement it based on your project's requirements.
  • Ensure consistency in the naming conventions used across your project for clarity.

PostgreSQL Event Store Implementation

The Go Event Sourcing Library includes a PostgreSQL implementation of the Event Store interface. This provides the ability to persist and query your events in a PostgreSQL database, giving you the ability to leverage robust and scalable SQL database capabilities for your Event Sourcing needs.

The PostgreSQL Event Store implementation is located in the pgeventstore package, and includes the following key functionalities:

  1. Database Setup and Initialization: The library must be initialized with the necessary environment variables or config for your database connection and schema name, and establishes a connection to the PostgreSQL server. If necessary, it will also create the schema and corresponding tables for the specified aggregates in your database.

  2. Event Marshaling: The library provides a mechanism to marshal your events into a format that can be stored in PostgreSQL. It takes care of null checks and type conversions, ensuring your data is safe and reliable.

  3. Event Store Interface Implementation: This includes the following operations:

    • Load: Retrieves the latest aggregate state based on the aggregate ID and version number.
    • History: Returns the history of changes to an aggregate, based on the aggregate ID, starting from a specific version.
    • Save: Persists the changes to an aggregate into the PostgreSQL database.

The implementation also includes functionalities like creating the necessary tables and indexes in your PostgreSQL database and handling batch inserts to optimize the storage of events.

The PostgreSQL Event Store implementation is designed to be highly scalable, capable of handling large volumes of events and providing fast query performance. It leverages PostgreSQL's capabilities for handling JSON data types, making it an ideal choice for Event Sourcing applications.

For the detailed code and usage, see the pgeventstore/README.md. Be sure to initialize the library properly for your PostgreSQL server's URL and schema before running your application.

Remember to carefully handle your database connections and transactions to ensure data consistency and reliability. The library supports transaction management, allowing you to handle operations atomically and safely.

Note: Always ensure that your PostgreSQL server is properly configured and optimized for your workload. Performance can vary based on server specifications, data volume, query complexity, and other factors.

Configuration

The PostgreSQL Event Store implementation requires certain config variables to be set for its proper functioning. These are:

  • EVENT_STORE_PG_URL: This should be the connection string for your PostgreSQL database. This must be a valid connection string that includes the username, password, host, port, and database name. For example, postgresql://user:pass@localhost:5432/mydatabase.

  • EVENT_STORE_SCHEMA: This variable represents the name of the schema under which your event tables will be created. If not provided, the implementation will default to using eventsourcing as the schema name.

  • EVENT_STORE_AGGREGATES: A comma-separated list of all aggregate tables to be created in your PostgreSQL database. These tables will hold your events.

For example, if you have a user and order aggregates, you can set EVENT_STORE_AGGREGATES=user,order.

You can also initialize using pgeventstore.EventStorageConfig struct:

config := pgeventstore.EventStorageConfig{
    PostgresURL: "postgres://user:pass@localhost:5432?sslmode=disable",
    Schema:      "eventsourcing",
    Aggregates:  "user,order",
}

if err := pgeventstore.Init(config); err != nil {
    return err
}

The library uses the godotenv package for loading environment variables from a .env file. You can also set these environment variables manually in your deployment environment.

For security reasons, make sure that your database connection string (EVENT_STORE_PG_URL) is kept secure and not exposed in your code or version control system.

Remember to restart your application after changing your environment variables to ensure the changes take effect.

Event Consumer Mechanism

Overview

The eventconsumer package provides a robust and easy-to-use mechanism for consuming events from an event-sourced system. It is designed to help you build projections, background processors, and other event-driven components that need to reliably process events with offset tracking and at-least-once delivery semantics.

The eventconsumer/pgeventconsumer package offers a PostgreSQL-backed implementation for storing consumer offsets, making it easy to persist and recover consumer state across restarts.

Packages
  • eventsourcing (root): core primitives (Aggregate, Event, EventStore, invariants, metadata, StorageNamer).
  • pgeventstore: PostgreSQL implementation of EventStore with Init bootstrap and Storage[S]() factory.
  • eventconsumer: subscribers for consuming events in order with at-least-once delivery.
    • AsyncSequentialSubscriber (non-transactional handlers)
    • FIFOSubscriber (transactional handlers)
  • eventconsumer/pgeventconsumer: PostgreSQL ConsumerStore and a convenience constructor for the async subscriber.
Why Use This Mechanism?
  • Reliable Offset Tracking: Ensures that each event is processed exactly once per consumer group, even in the face of failures.
  • Easy Integration: Simple interfaces for registering handlers and starting consumers.
  • Supports Projections: Ideal for building read models and projections that need to process all events in order.
  • Pluggable Storage: Use the provided PostgreSQL implementation or create your own by implementing the ConsumerStore interface.
How It Works
  • ConsumerStore: Abstracts the storage and retrieval of consumer offsets. The pgeventconsumer package provides a ready-to-use implementation for PostgreSQL.
  • Subscriber: Represents a consumer that processes events. You register handlers for event types, and the subscriber manages fetching and dispatching events to the appropriate handler.
  • Manager: Allows you to manage and run multiple subscribers concurrently.
Main Interfaces
// ConsumerStore is an interface for storing and retrieving consumer offsets.
type ConsumerStore interface {
    Save(ctx context.Context, transaction transactional.Transaction, name string, offsetAcked, offsetConsumed int) error
    Load(ctx context.Context, transaction transactional.Transaction, name string) (Consumer, error)
}

// Subscriber starts and reports status; concrete subscribers expose their own
// RegisterHandler variant matching the handler type they support (Handler or TxHandler).
type Subscriber[S eventsourcing.AggregateState] interface {
    Start(ctx context.Context) error
    Status() ConsumerStatus
}
PostgreSQL Implementation

The pgeventconsumer package provides a PostgresConsumerStore that persists consumer offsets in a PostgreSQL table. This allows consumers to resume processing from the last acknowledged event after a restart.

Usage Example
import (
    "context"

    "github.com/thefabric-io/eventsourcing"
    "github.com/thefabric-io/eventsourcing/eventconsumer"
    "github.com/thefabric-io/eventsourcing/eventconsumer/pgeventconsumer"
    "github.com/thefabric-io/eventsourcing/pgeventstore"
    "github.com/thefabric-io/transactional"
    "github.com/thefabric-io/transactional/pgtransactional"
    

    )

// Define your aggregate state and event handler
type MyAggregateState struct{}

// Implement eventconsumer.Handler (async, no transaction) for your event type
type MyHandler struct{}
func (h *MyHandler) HandleEvent(ctx context.Context, ev *eventsourcing.Event[*MyAggregateState]) error {
    // Process the event
    return nil
}

func main() {
    ctx := context.Background()
	
    tx, err := pgtransactional.InitSQLXTransactionalConnection(ctx, os.Getenv("TRANSACTIONAL_DATABASE_URL"))
    if err != nil {
        log.Fatalln(err)
    }
	
    eventStore := pgeventstore.Storage[*MyAggregateState]()
    consumerStore := pgeventconsumer.PostgresConsumerStore()

    params := eventconsumer.NewAsyncSequentialParams[*MyAggregateState](
        tx,
        "my-consumer-group",
        consumerStore,
        eventStore,
    )
    subscriber := eventconsumer.NewAsyncSequentialSubscriber[*MyAggregateState](
        ctx,
        params,
        eventconsumer.WithBatchSize[*MyAggregateState](100),
        eventconsumer.WithWaitTimes[*MyAggregateState](1*time.Second, 100*time.Millisecond),
        eventconsumer.WithAckBatch[*MyAggregateState](10, 500*time.Millisecond),
    )

    // Register handlers
    subscriber.RegisterHandler("my-event", &MyHandler{})

    manager := eventconsumer.NewManager[*MyAggregateState]()
    manager.AddProcessors(subscriber)
    manager.Run(ctx)
}
AsyncSequentialSubscriber (Non-Blocking, Sequential)

The AsyncSequentialSubscriber processes events strictly in order while keeping your handler work outside of any database transaction. It is ideal for long-running side effects (HTTP calls, e-mails, ML inference) and high-contention systems where you want to minimize time spent under DB locks.

  • Fetches the next page of events after the last ACKed offset inside a short transaction, then closes that transaction before handing events to handlers.
  • Processes events one-by-one, in order, using registered handlers. If a handler is missing for a type, the event is auto-acknowledged so the cursor can advance.
  • Acknowledges progress via a dedicated ack loop that batches contiguous offsets and commits them atomically. The cursor never skips a missing offset by default (no-skip semantics).
  • Handles retention gaps and non-1 starting offsets: when the first visible offset is ahead of the expected next, the subscriber safely fast‑forwards the starting point to the first seen offset and continues contiguously from there.
  • Built-in back-pressure: bounded in-memory queues; non-blocking enqueue with jittered, context-aware backoff; adaptive post-fetch sleep based on queue fill.
  • Clean shutdown: closes channels, drains queues, and flushes outstanding acks deterministically.
  • Optional per-event handler timeout to bound shutdown latency (default disabled).

Configuration highlights (constructor params):

  • batchSize: number of events to fetch per poll (also influences queue sizes)
  • waitTime / waitTimeIfEvents: polling cadence when idle vs after processing
  • ackBatchSize / ackBatchTimeout: batch size and max delay for ack flushing (set timeout to 0 to flush every ack)
  • SetHandlerTimeout(d): optional per‑event timeout; set to 0 to disable

When to use

  • Handlers perform IO or heavy work and must not hold DB transactions
  • You need strict in-order processing with at-least-once delivery
  • You want resilient shutdown and strong back-pressure under bursty loads
FIFO Consumer (Transactional, Sequential)

The FIFO (First-In-First-Out) consumer processes events strictly in order while keeping a database transaction open for the duration of each batch and each handler call. This is a good fit when handler logic must be performed atomically within the same DB transaction as the offset update (e.g., transactional projections).

  • Opens a transaction per polling cycle (Serializable by default), loads the consumer, and fetches events after the last ACKed offset for the registered types.
  • Processes events sequentially inside that transaction and calls your TxHandler with the live tx.
  • On success, updates both offset_acked and offset_consumed (to the last processed offset of the batch) in the same transaction and commits.
  • On error/panic, the transaction is rolled back and nothing is persisted; events will be retried on the next loop.
  • Sleeps waitTime when no events were fetched, otherwise waitTimeIfEvents before the next poll.
  • Register handlers with RegisterHandlerTx(eventType, TxHandler) and implement TxHandler.

When to use

  • You require strict transactional consistency between handler writes and offset updates.
  • Handlers are short-lived and safe to run under an open DB transaction.
  • You prefer simpler flow without in-memory buffering/back-pressure.
AsyncSequentialSubscriber vs FIFO consumer
  • Transaction scope:

    • AsyncSequential: handler runs outside any DB transaction; only short transactions are used to fetch events and to persist acks.
    • FIFO: handler runs inside a transaction that remains open for the duration of the handler call; offsets are persisted at the end of the batch.
  • Ordering and acks:

    • Both are sequential and preserve event order per consumer-group.
    • AsyncSequential auto-acks unhandled types and uses a dedicated, batched ack loop with contiguous-ack logic and gap bootstrap (handles streams starting at 0 or after retention).
    • FIFO fetches only types with registered handlers and updates offsets once per batch inside the processing transaction.
  • Back-pressure and shutdown:

    • AsyncSequential provides bounded queues, non-blocking enqueue with backoff, adaptive sleeps, and deterministic channel-based shutdown.
    • FIFO performs straightforward loop processing inside a transaction and sleeps between polls; it does not buffer work.
  • When to pick which:

    • Choose AsyncSequential when handlers can be slow or you want minimal lock time in the DB.
    • Choose FIFO when handlers must participate in the same DB transaction as the fetch (e.g., strictly transactional projections) and handler latency is short.
FIFO usage example
// Implement eventconsumer.TxHandler for your event type
type MyTxHandler struct{}
func (h *MyTxHandler) HandleEvent(ctx context.Context, tx transactional.Transaction, ev *eventsourcing.Event[*MyAggregateState]) error {
    // Perform writes in the same DB transaction
    return nil
}

fifo := eventconsumer.NewFIFOSubscriber[*MyAggregateState](
    ctx,
    tx,
    eventconsumer.NewFIFOSubscriberParams[*MyAggregateState]{
        Name:             "my-consumer-group",
        ConsumerStore:    pgeventconsumer.PostgresConsumerStore(),
        EventStore:       pgeventstore.Storage[*MyAggregateState](),
        BatchSize:        100,
        WaitTime:         1 * time.Second,
        WaitTimeIfEvents: 100 * time.Millisecond,
    },
)
fifo.RegisterHandlerTx("my-event", &MyTxHandler{})

manager := eventconsumer.NewManager[*MyAggregateState]()
manager.AddProcessors(fifo)
manager.Run(ctx)
Handler interfaces (v0.x breaking change)
// Async (no transaction)
type Handler[S eventsourcing.AggregateState] interface {
    HandleEvent(ctx context.Context, ev *eventsourcing.Event[S]) error
}

// FIFO (transaction available)
type TxHandler[S eventsourcing.AggregateState] interface {
    HandleEvent(ctx context.Context, tx transactional.Transaction, ev *eventsourcing.Event[S]) error
}

// Adapter to reuse a TxHandler with the async subscriber
func FromTxHandler[S eventsourcing.AggregateState](h TxHandler[S]) Handler[S]

Registering:

  • Async: subscriber.RegisterHandler("type", myAsyncHandler) or FromTxHandler(myTxHandler)
  • FIFO: fifo.RegisterHandlerTx("type", myTxHandler)
Breaking changes (latest)
  • Split handler interfaces: use Handler[S] for async processing and TxHandler[S] for transactional processing. If you previously had a single handler signature, implement the appropriate interface or wrap TxHandler with FromTxHandler when using the async subscriber.
  • AsyncSequentialSubscriber constructor now uses a parameter object and functional options. Migrate to NewAsyncSequentialParams(...) + NewAsyncSequentialSubscriber(ctx, params, ...) or the convenience constructor in eventconsumer/pgeventconsumer.
  • Introduced StorageNamer to customize table names; defaults remain backward compatible.
  • Added global offset and Events API on pgeventstore for cross‑aggregate streaming; consumers should prefer offsets over per‑aggregate versions when building projections.
Technical Details
  • The FIFO consumer fetches events in batches, processes them using registered handlers, and updates the consumer offset in the store.
  • If no events are available, it waits for a configurable duration before polling again.
  • The PostgreSQL consumer store uses an upsert strategy to persist offsets, ensuring idempotency and reliability.

Contributing

Feel free to submit a pull request if you find any bugs or you want to make improvements to the library. For major changes, please open an issue first to discuss what you would like to change.

License

This project is licensed under the MIT License.


This Go library provides a solid foundation for working with Event Sourcing in Go. It is designed with flexibility in mind to allow it to be adapted to various application needs. We hope this library proves to be a valuable tool in your software development toolkit.

Documentation

Index

Constants

View Source
const LastVersion = 0

Variables

View Source
var (
	// ErrAggregateNotFound is returned when an aggregate is not found.
	ErrAggregateNotFound = errors.New("aggregate not found")
)

Functions

func StorageName added in v0.2.0

func StorageName(s AggregateState) string

Types

type Aggregate

type Aggregate[S AggregateState] struct {
	Invariants []func(*Aggregate[S]) error
	// contains filtered or unexported fields
}

Aggregate represents an aggregate in event sourcing.

func InitAggregate

func InitAggregate[S AggregateState](id string, version int, state S) *Aggregate[S]

InitAggregate initializes an aggregate with the provided id, version, and state.

func InitZeroAggregate

func InitZeroAggregate[S AggregateState](state S) *Aggregate[S]

InitZeroAggregate initializes an aggregate with the zero state.

func (*Aggregate[S]) Changes

func (a *Aggregate[S]) Changes() []*Event[S]

Changes returns all changes/events associated with the aggregate.

func (*Aggregate[S]) Check

func (a *Aggregate[S]) Check() error

Check checks all invariants of the aggregate and returns the first error encountered. Check returns nil if all Invariants passed without failing.

func (*Aggregate[S]) CheckAll

func (a *Aggregate[S]) CheckAll() []error

CheckAll checks all invariants of the aggregate and returns errors for every failed Invariants. CheckAll returns nil if all Invariants passed without failing.

func (*Aggregate[S]) Clone

func (a *Aggregate[S]) Clone() *Aggregate[S]

Clone creates a deep copy of the Aggregate.

func (*Aggregate[S]) ID

func (a *Aggregate[S]) ID() string

ID returns the ID of the aggregate.

func (*Aggregate[S]) MarshalJSON

func (a *Aggregate[S]) MarshalJSON() ([]byte, error)

MarshalJSON marshals the aggregate into JSON format.

func (*Aggregate[S]) Metadata

func (a *Aggregate[S]) Metadata() Metadata

Metadata returns the metadata of the aggregate.

func (*Aggregate[S]) Must

func (a *Aggregate[S]) Must(ii ...func(*Aggregate[S]) error)

Must replaces the current invariants with the provided ones.

func (*Aggregate[S]) MustAlso

func (a *Aggregate[S]) MustAlso(ii ...func(*Aggregate[S]) error)

MustAlso appends given invariants to the existing list of invariants.

func (*Aggregate[S]) State

func (a *Aggregate[S]) State() S

State returns the current state of the aggregate.

func (*Aggregate[S]) Type

func (a *Aggregate[S]) Type() string

Type returns the type of the aggregate.

func (*Aggregate[S]) Version

func (a *Aggregate[S]) Version() int

Version returns the version of the aggregate.

type AggregateState

type AggregateState interface {
	Type() string         // Returns the type of the aggregate state.
	Zero() AggregateState // Returns a zero value of the aggregate state.
}

AggregateState is an interface defining methods that should be implemented by any state that is part of an aggregate.

type AggregateStateEventMapper

type AggregateStateEventMapper[S AggregateState] interface {
	EventsMap() map[string]EventState[S]
}

type AggregateVersion

type AggregateVersion int

type Event

type Event[S AggregateState] struct {
	// contains filtered or unexported fields
}

Event represents an event in event sourcing.

func InitEvent

func InitEvent[S AggregateState](id string, occurredAt time.Time, state EventState[S], aggregate *Aggregate[S], data []byte, offset int, metadata []byte) (*Event[S], error)

InitEvent initializes an Event with provided id, occurredAt, state, aggregate, data, and metadata.

func NewEvent

func NewEvent[S AggregateState](event EventState[S], metadata map[string]any) *Event[S]

NewEvent creates a new event with the provided state and metadata.

func (*Event[S]) Aggregate

func (e *Event[S]) Aggregate() *Aggregate[S]

Aggregate returns the aggregate associated with the event.

func (*Event[S]) Apply

func (e *Event[S]) Apply(aggregate *Aggregate[S])

Apply applies the Event to an Aggregate. In the context of event sourcing, state changes are recorded as a sequence of events. When an event is applied to an aggregate, it changes the state of the aggregate. Here's a detailed breakdown of the steps:

  • It first increments the version of the Aggregate using the incrementVersion method. This helps in tracking the evolution of the aggregate. Each event applied to the aggregate increases its version number.

  • It then appends the Event itself to the changes slice of the Aggregate. This allows the aggregate to keep track of all events that have been applied to it.

  • It checks whether the state of the Event implements the Identifiable interface. If it does, it sets the ID of the Aggregate to the ID provided by the Event state's AggregateID method.

  • It applies the state of the Event to the Aggregate using the Apply method of the EventState. This is where the actual state change occurs.

  • Finally, it sets the Event's aggregate field to a clone of the updated Aggregate. This is crucial for keeping an accurate record of the state of the Aggregate at the time when the event was applied.

By enabling the event sourcing pattern in the provided Go codebase, this method allows the state of an Aggregate to evolve over time through the application of Events.

func (*Event[S]) ID

func (e *Event[S]) ID() string

ID returns the ID of the event.

func (*Event[S]) MarshalJSON

func (e *Event[S]) MarshalJSON() ([]byte, error)

MarshalJSON marshals the event into JSON format.

func (*Event[S]) Metadata

func (e *Event[S]) Metadata() Metadata

Metadata returns the metadata associated with the event.

func (*Event[S]) OccurredAt

func (e *Event[S]) OccurredAt() time.Time

OccurredAt returns the time when the event occurred.

func (*Event[S]) Offset added in v0.4.0

func (e *Event[S]) Offset() int

Offset returns the offset of the event.

func (*Event[S]) State

func (e *Event[S]) State() EventState[S]

State returns the state of the event.

func (*Event[S]) Type

func (e *Event[S]) Type() string

Type returns the type of the event.

type EventState

type EventState[S AggregateState] interface {
	Type() string
	Apply(aggregate *Aggregate[S])
}

func ParseEvent

func ParseEvent[S AggregateState](t string, eventsMap map[string]EventState[S]) (EventState[S], error)

type EventStore

type EventStore[S AggregateState] interface {
	Save(ctx context.Context, tx Transaction, aggregate ...*Aggregate[S]) (err error)
	Load(ctx context.Context, tx Transaction, aggregateID string, version AggregateVersion) (*Aggregate[S], error)
	History(ctx context.Context, tx Transaction, aggregateID string, fromVersion int, limit int) ([]*Event[S], error)
	Events(ctx context.Context, tx Transaction, params EventsParams) ([]*Event[S], error)
}

EventStore represents a storage for events. It provides methods to save, load, and retrieve the history of an aggregate.

type EventsParams added in v0.4.0

type EventsParams struct {
	AfterOffset int
	TypesOnly   []string
	Limit       int
}

type HistoryParams

type HistoryParams struct {
	AggregateID      string
	AggregateVersion AggregateVersion
}

type Identifiable

type Identifiable interface {
	// AggregateID returns the ID of the aggregate that is associated with the identifiable object.
	AggregateID() string
}

Identifiable is an interface representing objects that can be identified by an Aggregate ID. In the context of this event sourcing system, it's used to ensure that the state of an event can provide an Aggregate ID when the event is applied to an aggregate.

type InceptionRecorder

type InceptionRecorder[S AggregateState] interface {
	// RecordInception marks the time at which the entity was brought into existence.
	RecordInception(inceptionTime *time.Time, aggregate *Aggregate[S])
}

InceptionRecorder represents an interface for entities that are capable of recording the time at which they were brought into existence.

type LoadParams

type LoadParams[S AggregateState] struct {
	AggregateID      string
	Aggregate        S
	AggregateVersion AggregateVersion
}

type Metadata

type Metadata map[string]any

func (Metadata) Merge

func (m Metadata) Merge(m2 map[string]any) Metadata

type ModificationRecorder

type ModificationRecorder[S AggregateState] interface {
	// RecordModification marks the time at which the entity was recently modified.
	RecordModification(modificationTime *time.Time, aggregate *Aggregate[S])
}

ModificationRecorder represents an interface for entities that are capable of recording the time at which they were recently modified.

type StorageNamer added in v0.2.0

type StorageNamer interface {
	StorageName() string
}

type Transaction

type Transaction interface {
	Commit() error
	Rollback() error
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL