Documentation
¶
Overview ¶
Package spine provides a simple abstraction for processing a Kafka stream as an at-least-once "sink" of any kind. Messages are split into "chunks" which can be processed by a user-provided blocking function that performs some side effect. The Kafka stream is then consumed in "batches", with each batch consisting of one or more chunks of messages. Chunks within the batch are parallelized to the user-provided side effect function in go loops. After each batch is sucessfully processed, the kafka consumer commits the offset, and consumes another batch.
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ConsumerInterface ¶
type KafkaConsumer ¶
type KafkaConsumer struct {
Consumer ConsumerInterface
Timeout time.Duration
BatchSize int
ChunkSize int
}
KafkaConsumer is the primary type exposed by this package. A KafkaConsumer consumes messages in batches of size BatchSize and commits after each batch. It splits the batch into chunks of size ChunkSize for processing by a user-provided SideEffectFn. All chunks in a batch are processed in parallel.
func NewKafkaConsumer ¶
func NewKafkaConsumer(topic string, brokers string, group string, timeout time.Duration, batchSize int, chunkSize int) KafkaConsumer
NewKafkaConsumer is a convenience function to create the kafka.Consumer and return a new KafkaConsumer struct for later use.
func (KafkaConsumer) SideEffect ¶
func (consumer KafkaConsumer) SideEffect(fn SideEffectFn, checkError func(error), errs chan error)
SideEffect is a blocking call that will calls the given function for each chunk of data in the batch, in parallel, and commits after the batch is processed. All Kafka errors are emitted on the error channel. type GetProvider func(pool *pgxpool.Pool, event *PaymentEvent) (Provider, error) Give it a check error function the check error function will check all errors from the side effect function when the check error function finishes for each error, SideEffect will commit
Example ¶
consumer, _ := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": "localhost:9092",
"group.id": "foo",
})
timeout, _ := time.ParseDuration("1m")
c := KafkaConsumer{
Consumer: consumer,
Timeout: timeout,
BatchSize: 12,
ChunkSize: 4,
}
// create error channel for
// monitoring in a go loop
errs := make(chan error)
// this is your side effect function,
// does some work.
fn := func(msgs []*kafka.Message) error {
for _, m := range msgs {
fmt.Print(m)
}
return nil
}
checkError := func(err error) {
// Kill your process on error to avoid comitting
log.Fatal(err)
}
// Perform work!
// commits after every "batch" is processed
for {
c.SideEffect(fn, checkError, errs)
}
type SideEffectFn ¶
type TestConsumer ¶
func (*TestConsumer) Commit ¶
func (c *TestConsumer) Commit() ([]kafka.TopicPartition, error)