Documentation
¶
Index ¶
- Constants
- func DefaultProcessor(ctx context.Context, dependencies ProcessorDependencies, msg ConsumerMessage) error
- type Config
- type ConsumerMessage
- type EncoderDecoder
- type KafkaClient
- type KafkaGoClient
- type KafkaGoMessage
- func (m KafkaGoMessage) InfoEvent(event string) string
- func (m KafkaGoMessage) Key() string
- func (m KafkaGoMessage) Offset() int64
- func (m KafkaGoMessage) Partition() int32
- func (m KafkaGoMessage) Topic() string
- func (m KafkaGoMessage) Unmarshall(ctx context.Context, native interface{}) (e error)
- func (m KafkaGoMessage) Value() []byte
- type ProcessorDependencies
- type RetryTopicMessage
- type SaramaClient
- type SaramaMessage
- type TopicConfig
Constants ¶
const ( // ConsumerTypeGroup configures the consumer as part of a consumer group ConsumerTypeGroup consumerType = "CONSUMER_GROUP" // MessageFormatAvro specifies that messages in a topic are stored in avro format MessageFormatAvro messageFormat = "MESSAGE_AVRO" // MessageFormatJSON specifies that messages in a topic are stored in JSON format MessageFormatJSON messageFormat = "MESSAGE_JSON" // MessageFormatString specifies that messages in a topic are stored in string format MessageFormatString messageFormat = "MESSAGE_STRING" // ProducerTypeAsync configures a producer with an asynchronous response mechanism ProducerTypeAsync producerType = "PRODUCER_ASYNC" // ProducerTypeSync configures a producer with synchronous feedback ProducerTypeSync producerType = "PRODUCER_SYNC" )
const ( // BaseSarama can be used in kafkaclient.New to specify that // the underlying library used will be Shopify's sarama (https://github.com/Shopify/sarama/) BaseSarama baseLibrary = "SARAMA" // BaseKafkaGO can be used in kafkaclient.New to specify that // the underlying library used will be kafkago (https://github.com/segmentio/kafka-go) BaseKafkaGO baseLibrary = "KAFKAGO" )
Variables ¶
This section is empty.
Functions ¶
func DefaultProcessor ¶
func DefaultProcessor(ctx context.Context, dependencies ProcessorDependencies, msg ConsumerMessage) error
Types ¶
type Config ¶
type Config struct {
KafkaVersion string
Brokers []string
Topics []TopicConfig
SchemaRegURL string
ConsumerType consumerType
ConsumerGroupID string
ProcDependencies ProcessorDependencies // injectable dependencies for message processors
ProducerType producerType
ReadFromOldest bool
TLS *tls.Config
Debug bool
}
Config holds specifics used to configure different part of the kafka client
func NewConfig ¶
func NewConfig( ctx context.Context, version string, brokers []string, topics []TopicConfig, procDependencies ProcessorDependencies, schemaRegURL string, consType consumerType, groupID string, prodType producerType, readFromOldest bool, tls *tls.Config, debug bool) (c Config, e error)
NewConfig constructs and returns a Config struct
func (Config) ReadTopicNames ¶
ReadTopicNames constructs and returns a slice of all topic names
func (Config) TopicMap ¶
func (c Config) TopicMap() (m map[string]TopicConfig)
TopicMap constructs and returns a map of topic configuration, using each topic name as the map key
func (Config) WriteTopicNames ¶
WriteTopicNames constructs and returns a slice of all topic names
type ConsumerMessage ¶
type ConsumerMessage interface {
Unmarshall(ctx context.Context, native interface{}) (e error)
Topic() string
Key() string
Offset() int64
Partition() int32
Value() []byte
}
ConsumerMessage is an interface implememented by kafka consumer message types
type EncoderDecoder ¶
type EncoderDecoder interface {
// Encode encodes native golang as binary.
//
// topic: name of topic the message will be sent to
// native: the golang data structure to be encoded
Encode(ctx context.Context, topic string, native interface{}) (b []byte, e error)
// Decode decodes binary into native golang.
//
// topic: name of topic the message was received from
// b: the binary to be decoded,
// target: pointer to data structure the binary data will be decoded into
Decode(ctx context.Context, topic string, b []byte, target interface{}) error
// GetSchemaID returns the topic schema ID, if applicable
GetSchemaID(ctx context.Context, topic string) (int, error)
}
EncoderDecoder interface
type KafkaClient ¶
type KafkaClient interface {
// StartConsume starts the consumption of messages from the configured Kafka topics
StartConsume(ctx context.Context) error
// CancelConsume cancels the consumption of messages from configured topics
CancelConsume() error
// ProduceMessage adds messages to a specified topic
ProduceMessage(ctx context.Context, topic string, key string, msg interface{}) error
// contains filtered or unexported methods
}
KafkaClient is an interface describing the primary uses of this library
func New ¶
func New(base baseLibrary, config Config) (KafkaClient, error)
New constructs and returns a new KafkaClient implementation
type KafkaGoClient ¶
type KafkaGoClient struct {
// contains filtered or unexported fields
}
KafkaGoClient implements the KafkaClient interface
func (*KafkaGoClient) CancelConsume ¶
func (c *KafkaGoClient) CancelConsume() (e error)
CancelConsume calls the context's context.cancelFunc in order to stop the process of message consumption
func (*KafkaGoClient) ProduceMessage ¶
func (c *KafkaGoClient) ProduceMessage( ctx context.Context, topic string, key string, msg interface{}) (e error)
ProduceMessage creates/encodes a message and sends it to the specified topic
func (*KafkaGoClient) StartConsume ¶
func (c *KafkaGoClient) StartConsume(ctx context.Context) (e error)
StartConsume starts consuming configured kafka topic messages
type KafkaGoMessage ¶
type KafkaGoMessage struct {
// contains filtered or unexported fields
}
KafkaGoMessage holds kafka-go message contents as well as an EncoderDecoder used to unmarshall message data
func (KafkaGoMessage) InfoEvent ¶
func (m KafkaGoMessage) InfoEvent(event string) string
InfoEvent constructs and returns a loggable event relating to the message
func (KafkaGoMessage) Offset ¶
func (m KafkaGoMessage) Offset() int64
Offset returns the message offset
func (KafkaGoMessage) Partition ¶
func (m KafkaGoMessage) Partition() int32
Partition returns the message partition
func (KafkaGoMessage) Topic ¶
func (m KafkaGoMessage) Topic() string
Topic returns the message topic
func (KafkaGoMessage) Unmarshall ¶
func (m KafkaGoMessage) Unmarshall(ctx context.Context, native interface{}) (e error)
Unmarshall unmarshalls the message contents into the provided struct
func (KafkaGoMessage) Value ¶
func (m KafkaGoMessage) Value() []byte
Value returns the message byte value
type ProcessorDependencies ¶ added in v1.0.2
type ProcessorDependencies interface{}
type RetryTopicMessage ¶
type RetryTopicMessage struct {
OriginalTopic string `json:"original_topic" avro:"original_topic"`
OriginalPartition int32 `json:"original_partition" avro:"original_partition"`
OriginalOffset int64 `json:"original_offset" avro:"original_offset"`
OriginalMessage []byte `json:"original_message" avro:"original_message"`
Error string `json:"error" avro:"error"`
}
RetryTopicMessage is a native go representation of a message on a retry topic
func NewRetryTopicMessage ¶
func NewRetryTopicMessage( origTopic string, origPart int32, origOffset int64, origMsg []byte, e error) RetryTopicMessage
NewRetryTopicMessage constructs and returns a new RetryTopicMessage to be added to a retry topic
type SaramaClient ¶
type SaramaClient struct {
// contains filtered or unexported fields
}
SaramaClient implements the KafkaClient interface
func (*SaramaClient) CancelConsume ¶
func (c *SaramaClient) CancelConsume() (e error)
CancelConsume call the context's context.cancelFunc in order to stop the process of message consumption
func (*SaramaClient) ProduceMessage ¶
func (c *SaramaClient) ProduceMessage( ctx context.Context, topic string, key string, msg interface{}) (e error)
ProduceMessage creates/encodes a message and sends it to the specified topic
func (*SaramaClient) StartConsume ¶
func (c *SaramaClient) StartConsume(ctx context.Context) (e error)
StartConsume starts consuming configured kafka topic messages
type SaramaMessage ¶
type SaramaMessage struct {
// contains filtered or unexported fields
}
SaramaMessage holds sarama message contents as well as an EncoderDecoder used to unmarshall message data
func (SaramaMessage) Offset ¶
func (m SaramaMessage) Offset() int64
Offset returns the message offset
func (SaramaMessage) Partition ¶
func (m SaramaMessage) Partition() int32
Partition returns the message partition
func (SaramaMessage) Unmarshall ¶
func (m SaramaMessage) Unmarshall(ctx context.Context, native interface{}) (e error)
Unmarshall unmarshalls the message contents into the provided struct
func (SaramaMessage) Value ¶
func (m SaramaMessage) Value() []byte
Value returns the message byte value
type TopicConfig ¶
type TopicConfig struct {
Name string
MessageFormat messageFormat
// Set DoConsume to true if this topic should be consumed from
DoConsume bool
// Set SoProduce to true if you will need to produce messages to this topic
DoProduce bool
DelayProcessingMins time.Duration
// FailedProcessingTopic is the retry topic to which a message
// should be handed off in the case of a failure to process the message
FailedProcessingTopic string
// Schema is an optional string representation of the topic schema
Schema string
SchemaVersion int
MessageProcessor func(context.Context, ProcessorDependencies, ConsumerMessage) error
// contains filtered or unexported fields
}
TopicConfig is a struct that holds data regarding an existing Kafka topic that can be consumed from or written to
Source Files
¶
- config.go
- encoder_decoder.go
- encoder_decoder_avro.go
- encoder_decoder_json.go
- encoder_decoder_string.go
- kafkaclient.go
- kafkago_client.go
- kafkago_consumer.go
- kafkago_message.go
- kafkago_producer.go
- log.go
- message.go
- processor.go
- sarama_client.go
- sarama_consumer.go
- sarama_encoder.go
- sarama_message.go
- sarama_producer.go
- schema_reg.go
- schema_reg_mock.go