spine

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 27, 2023 License: MIT Imports: 4 Imported by: 2

README

Spine

See the godoc.

Documentation

Overview

Package spine provides a simple abstraction for processing a Kafka stream as an at-least-once "sink" of any kind. Messages are split into "chunks" which can be processed by a user-provided blocking function that performs some side effect. The Kafka stream is then consumed in "batches", with each batch consisting of one or more chunks of messages. Chunks within the batch are parallelized to the user-provided side effect function in go loops. After each batch is sucessfully processed, the kafka consumer commits the offset, and consumes another batch.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ConsumerInterface

type ConsumerInterface interface {
	ReadMessage(time.Duration) (*kafka.Message, error)
	Commit() ([]kafka.TopicPartition, error)
}

type KafkaConsumer

type KafkaConsumer struct {
	Consumer  ConsumerInterface
	Timeout   time.Duration
	BatchSize int
	ChunkSize int
}

KafkaConsumer is the primary type exposed by this package. A KafkaConsumer consumes messages in batches of size BatchSize and commits after each batch. It splits the batch into chunks of size ChunkSize for processing by a user-provided SideEffectFn. All chunks in a batch are processed in parallel.

func NewKafkaConsumer

func NewKafkaConsumer(topic string, brokers string, group string, timeout time.Duration, batchSize int, chunkSize int) KafkaConsumer

NewKafkaConsumer is a convenience function to create the kafka.Consumer and return a new KafkaConsumer struct for later use.

func (KafkaConsumer) SideEffect

func (consumer KafkaConsumer) SideEffect(fn SideEffectFn, checkError func(error), errs chan error)

SideEffect is a blocking call that will calls the given function for each chunk of data in the batch, in parallel, and commits after the batch is processed. All Kafka errors are emitted on the error channel. type GetProvider func(pool *pgxpool.Pool, event *PaymentEvent) (Provider, error) Give it a check error function the check error function will check all errors from the side effect function when the check error function finishes for each error, SideEffect will commit

Example
consumer, _ := kafka.NewConsumer(&kafka.ConfigMap{
	"bootstrap.servers": "localhost:9092",
	"group.id":          "foo",
})

timeout, _ := time.ParseDuration("1m")

c := KafkaConsumer{
	Consumer:  consumer,
	Timeout:   timeout,
	BatchSize: 12,
	ChunkSize: 4,
}

// create error channel for
// monitoring in a go loop
errs := make(chan error)

// this is your side effect function,
// does some work.
fn := func(msgs []*kafka.Message) error {
	for _, m := range msgs {
		fmt.Print(m)
	}
	return nil
}

checkError := func(err error) {
	// Kill your process on error to avoid comitting
	log.Fatal(err)
}

// Perform work!
// commits after every "batch" is processed
for {
	c.SideEffect(fn, checkError, errs)
}

type SideEffectFn

type SideEffectFn func([]*kafka.Message) error

type TestConsumer

type TestConsumer struct {
	Messages    []*kafka.Message
	Commits     int
	CommitError bool
}

func (*TestConsumer) Commit

func (c *TestConsumer) Commit() ([]kafka.TopicPartition, error)

func (*TestConsumer) ReadMessage

func (c *TestConsumer) ReadMessage(d time.Duration) (*kafka.Message, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL