Documentation
¶
Index ¶
- func DecodeRecordFromNative(src interface{}, dst interface{}) error
- type CachedSchemaRegistryClient
- func (cached *CachedSchemaRegistryClient) DeleteSubject(subject string) (versions []int, err error)
- func (cached *CachedSchemaRegistryClient) GetLatestSchema(subject string) (*goavro.Codec, error)
- func (cached *CachedSchemaRegistryClient) GetSchemaByID(id int) (*goavro.Codec, error)
- func (cached *CachedSchemaRegistryClient) GetSchemaBySubject(subject string, version int) (*goavro.Codec, error)
- func (cached *CachedSchemaRegistryClient) IsSchemaRegistered(subject string, codec *goavro.Codec) (bool, schemaregistry.Schema, error)
- func (cached *CachedSchemaRegistryClient) RegisterNewSchema(subject string, codec *goavro.Codec) (int, error)
- func (cached *CachedSchemaRegistryClient) Subjects() ([]string, error)
- func (cached *CachedSchemaRegistryClient) Versions(subject string) ([]int, error)
- type Consumer
- func (ac *Consumer) Close()
- func (ac *Consumer) CommitMessage(msg ConsumerMessage) ([]kafka.TopicPartition, error)
- func (ac *Consumer) EnsureTopics() error
- func (ac *Consumer) Messages(stopChan chan struct{}) (chan ConsumerMessage, chan kafka.Event)
- func (ac *Consumer) SubscribeTopics(topics []string, rebalanceCb kafka.RebalanceCb) error
- type ConsumerMessage
- type NativeDecoder
- type OptionalDay
- type OptionalInt
- type OptionalString
- type Producer
- type ProducerConfig
- type SchemaRegistryClient
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func DecodeRecordFromNative ¶ added in v0.10.0
func DecodeRecordFromNative(src interface{}, dst interface{}) error
Types ¶
type CachedSchemaRegistryClient ¶
type CachedSchemaRegistryClient struct {
SchemaRegistryClient *schemaregistry.Client
// contains filtered or unexported fields
}
CachedSchemaRegistryClient is a schema registry client that will cache some data to improve performance
func NewCachedSchemaRegistryClient ¶
func NewCachedSchemaRegistryClient(baseURL string, options ...schemaregistry.Option) (*CachedSchemaRegistryClient, error)
func (*CachedSchemaRegistryClient) DeleteSubject ¶
func (cached *CachedSchemaRegistryClient) DeleteSubject(subject string) (versions []int, err error)
DeleteSubject deletes the subject, should only be used in development
func (*CachedSchemaRegistryClient) GetLatestSchema ¶
func (cached *CachedSchemaRegistryClient) GetLatestSchema(subject string) (*goavro.Codec, error)
GetLatestSchema returns the highest version schema for a subject
func (*CachedSchemaRegistryClient) GetSchemaByID ¶
func (cached *CachedSchemaRegistryClient) GetSchemaByID(id int) (*goavro.Codec, error)
GetSchemaByID will return and cache the codec with the given id
func (*CachedSchemaRegistryClient) GetSchemaBySubject ¶
func (cached *CachedSchemaRegistryClient) GetSchemaBySubject(subject string, version int) (*goavro.Codec, error)
GetSchemaBySubject returns the codec for a specific version of a subject
func (*CachedSchemaRegistryClient) IsSchemaRegistered ¶
func (cached *CachedSchemaRegistryClient) IsSchemaRegistered(subject string, codec *goavro.Codec) (bool, schemaregistry.Schema, error)
IsSchemaRegistered checks if a specific codec is already registered to a subject
func (*CachedSchemaRegistryClient) RegisterNewSchema ¶
func (cached *CachedSchemaRegistryClient) RegisterNewSchema(subject string, codec *goavro.Codec) (int, error)
RegisterNewSchema will return and cache the id with the given codec
func (*CachedSchemaRegistryClient) Subjects ¶
func (cached *CachedSchemaRegistryClient) Subjects() ([]string, error)
Subjects returns a list of subjects
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
func NewConsumer ¶
func NewConsumer(topics []string, consumer *kafka.Consumer, schemaRegistryClient SchemaRegistryClient) (*Consumer, error)
NewConsumer is a basic consumer to interact with schema registry, avro and kafka
Example ¶
package main
import (
"log"
"github.com/confluentinc/confluent-kafka-go/kafka"
kafkaavro "github.com/mycujoo/go-kafka-avro"
)
func main() {
kafkaConsumer, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": "localhost:29092",
"security.protocol": "ssl",
"socket.keepalive.enable": true,
"enable.auto.commit": false,
"ssl.key.location": "/path/to/service.key",
"ssl.certificate.location": "/path/to/service.cert",
"ssl.ca.location": "/path/to/ca.pem",
"group.id": "some-group-id",
"session.timeout.ms": 6000,
"default.topic.config": kafka.ConfigMap{"auto.offset.reset": "earliest"},
})
if err != nil {
log.Fatal(err)
}
cachedSchemaRegistry, err := kafkaavro.NewCachedSchemaRegistryClient("http://localhost:8081")
if err != nil {
log.Fatal(err)
}
kafkaavro.NewConsumer([]string{"topic1"}, kafkaConsumer, cachedSchemaRegistry)
}
Output:
func (*Consumer) CommitMessage ¶
func (ac *Consumer) CommitMessage(msg ConsumerMessage) ([]kafka.TopicPartition, error)
func (*Consumer) EnsureTopics ¶ added in v0.11.0
EnsureTopics returns error if one of the consumed topics was not found on the server.
func (*Consumer) Messages ¶
func (ac *Consumer) Messages(stopChan chan struct{}) (chan ConsumerMessage, chan kafka.Event)
Messages returns the ConsumerMessage channel (that contains decoded messages) and other events channel for events like kafka.PartitionEOF, kafka.Stats
func (*Consumer) SubscribeTopics ¶ added in v0.12.0
func (ac *Consumer) SubscribeTopics(topics []string, rebalanceCb kafka.RebalanceCb) error
type ConsumerMessage ¶
type NativeDecoder ¶ added in v0.10.0
type NativeDecoder interface {
FromNative(interface{}) error
}
type OptionalDay ¶ added in v0.10.0
func NewOptionalDay ¶ added in v0.10.0
func NewOptionalDay(t time.Time, valid bool) OptionalDay
func (*OptionalDay) FromNative ¶ added in v0.10.0
func (od *OptionalDay) FromNative(data interface{}) error
func (OptionalDay) MarshalJSON ¶ added in v0.10.0
func (od OptionalDay) MarshalJSON() ([]byte, error)
type OptionalInt ¶ added in v0.10.0
func NewOptionalInt ¶ added in v0.10.0
func NewOptionalInt(i int64, valid bool) OptionalInt
func (*OptionalInt) FromNative ¶ added in v0.10.0
func (i *OptionalInt) FromNative(data interface{}) error
type OptionalString ¶ added in v0.10.0
func NewOptionalString ¶ added in v0.10.0
func NewOptionalString(s string, valid bool) OptionalString
func (*OptionalString) FromNative ¶ added in v0.10.0
func (s *OptionalString) FromNative(data interface{}) error
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
func NewProducer ¶
func NewProducer(cfg ProducerConfig) (*Producer, error)
NewProducer is a producer that publishes messages to kafka topic using avro serialization format
type ProducerConfig ¶ added in v1.2.0
type ProducerConfig struct {
// Name of the topic where messages will be produced
TopicName string
// Avro schema for message key
KeySchema string
// Avro schema for message value
ValueSchema string
// Low level kafka producer used to produce messages
Producer kafkaProducer
// Schema registry client used for messages validation and schema management
SchemaRegistryClient SchemaRegistryClient
// BackOffConfig is used for setting backoff strategy for retry logic
BackOffConfig backoff.BackOff
}