consumer

package module
v0.0.0-...-28e3633 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 22, 2023 License: Apache-2.0 Imports: 12 Imported by: 0

README

使用方法

使用命令行传入配置参数:

  • -csmr (uint):消费者组的消费者个数
  • -topic (string):Kafka中的Topic
  • -ipaddr (string):Kafka服务的IPv4地址
  • -port (string):Kafka服务的端口

执行 ./bin目录下的 csmr 文件

Documentation

Index

Constants

View Source
const CommonMSGSerialVersionUID int64 = 2611556444074013268

Variables

This section is empty.

Functions

func CreateNewConsumer

func CreateNewConsumer(addressSet []string, consumerGroupTopic map[string]ConsumeTopic, consumerConfig *sarama.Config) *consumer

创建消费者对象 addressSet: kafka地址集合 consumerGroupTopic: 消费者组信息 consumerConfig: 消费者配置信息,如果为空就采用默认的配置

func PrintJSONMessage

func PrintJSONMessage(msg *sarama.ConsumerMessage) error

func PrintProtoMessage

func PrintProtoMessage(msg *sarama.ConsumerMessage) error

func PrintSimpleMSG

func PrintSimpleMSG(msg *sarama.ConsumerMessage) error

func PrintStringMessage

func PrintStringMessage(msg *sarama.ConsumerMessage) error

func String2Stamp

func String2Stamp(t *string) *int64

func TypeConvert

func TypeConvert(tableName, columnName, value string, sqlType int32, mysqlType string) interface{}

Types

type CommonMessage

type CommonMessage struct {
	Database string                   // 数据库或schema
	Table    string                   // 表名
	PkNames  []string                 // 主键名列表
	IsDdl    bool                     // 是否是DDL操作
	Type     string                   // 类型:INSERT/UPDATE/DELETE
	Es       int64                    // binlog执行时间,执行耗时
	Ts       int64                    // DML构建时间戳,同步时间
	SQL      string                   // 执行的SQL,DML SQL为空
	Data     []map[string]interface{} // 数据列表
	Old      []map[string]interface{} // 旧数据列表,用于更新,size和data的size一一对应
}

func Convert

func Convert(msg *Message) ([]*CommonMessage, error)

func ReadingFlatMSG

func ReadingFlatMSG(data []byte) *CommonMessage

*

使用fastjson解析flatmessage

func (*CommonMessage) Clear

func (c *CommonMessage) Clear()

func (*CommonMessage) GetCreatedTime

func (c *CommonMessage) GetCreatedTime() []time.Time

func (*CommonMessage) GetId

func (c *CommonMessage) GetId() int

func (*CommonMessage) GetUpdatedTime

func (c *CommonMessage) GetUpdatedTime() []time.Time

func (*CommonMessage) String

func (c *CommonMessage) String() string

type ConsumeTopic

type ConsumeTopic struct {
	//一个消费者组里包含几个消费者
	ConsumeNum int
	//消费者组监听的主题
	Topics []string
	//回调的Handler, 需要调用者自己实现
	Callback sarama.ConsumerGroupHandler
}

type Message

type Message struct {
	// serialVersionUID uint64
	ID         int64
	Entries    []*protocal_message.Entry
	Raw        bool
	RawEntries [][]byte
}

func Deserializer

func Deserializer(data []byte, lazyParseEntry bool) *Message

根据.proto文件的定义,层层解析

func NewMessage

func NewMessage(id int64, entries []*protocal_message.Entry) *Message

func NewRawMessage

func NewRawMessage(id int64, entries [][]byte) *Message

func (*Message) AddEntry

func (m *Message) AddEntry(entry *protocal_message.Entry)

func (*Message) AddRawEntry

func (m *Message) AddRawEntry(rawEntry []byte)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL