Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type CacheState ¶
type CacheState int
type Client ¶
type Client struct {
State State
Exchanger *Exchanger
Consumer *kafka.Consumer
ConsumerTopic string
Producer *kafka.Producer
ProducerTopic string
}
Client is the interface to interact with a kafka cluster
func NewClient ¶
func NewClient(config ClientConfig, requestTopic, responseTopic string, opts ...ClientOption) (*Client, error)
NewClient creates a new client object with the specified configuration and options
func (*Client) ExchangeLoop ¶
ExchangeLoop starts a loop to indefinitely wait and handle messages on the Kafka cluster
func (*Client) ExchangeMessage ¶
ExchangeMessage waits for a message and sends a message to the kafka cluster with the parameterized Exchanger
func (*Client) ReadMessage ¶
ReadMessage waits for a message from the Kafka cluster to be received on the current request topic
func (*Client) SendMessage ¶
SendMessage sends a message from the Kafka cluster to be received on the current request topic
type ClientConfig ¶
type ClientConfig struct {
BootstrapServers string
SecurityProtocol string
SaslMechanisms string
SaslUsername string
SaslPassword string
}
ClientConfig defines the parameters to be used by the client
type ClientHandler ¶
ClientHandler is a function that takes a specified object and creates a response
type ClientOption ¶
type ClientOption func(client *Client)
ClientOption defines possible options that can be used to customize the client
func WithExchanger ¶
func WithExchanger(e *Exchanger) ClientOption
WithExchanger customizes the client to use a specified Exchanger
func WithNewExchanger ¶
func WithNewExchanger(h ClientHandler, reqSchema, respSchema interface{}) ClientOption
WithNewExchanger customizes the client to use a newly specified Exchanger
type Error ¶
type Error struct {
Message error `json:"message"`
}
Error is the interface for sending errors through the Kafka client
type Exchanger ¶
type Exchanger struct {
Client *Client
Handler ClientHandler
RequestSchema interface{}
ResponseSchema interface{}
}
Exchanger represents a request-response exchanger, handling requests and responses with the specified schemas
func NewExchanger ¶
func NewExchanger(c *Client, h ClientHandler, reqSchema, respSchema interface{}) *Exchanger
NewExchanger creates a new message exchanger for a client with a custom handler for messages
type KafkakeConfig ¶
type MessageState ¶
type MessageState struct {
// contains filtered or unexported fields
}
type Processor ¶
type Processor struct {
ProcessorConfig
State State
RequestTopic string
ResponseTopic string
Consumer *kafka.Consumer
Producers map[int32]*kafka.Producer
Handler Handler
MessageStates map[string]*MessageState
}
func NewProcessor ¶
func NewProcessor(config ProcessorConfig, requestTopic, responseTopic string, handler Handler) (*Processor, error)