Documentation
¶
Index ¶
- Constants
- Variables
- func ConnStr(DBName string, Host string, Port int, Username string, Password string) string
- func PGConnStringFromEnv() string
- type Batcher
- type DBItem
- type DefaultMessageHandler
- func (d *DefaultMessageHandler) HandleBeginMessage(idx int, msg *pglogrepl.BeginMessage) (err error)
- func (d *DefaultMessageHandler) HandleCommitMessage(idx int, msg *pglogrepl.CommitMessage) (err error)
- func (d *DefaultMessageHandler) HandleDeleteMessage(idx int, msg *pglogrepl.DeleteMessage, reln *pglogrepl.RelationMessage, ...) (err error)
- func (d *DefaultMessageHandler) HandleInsertMessage(idx int, msg *pglogrepl.InsertMessage, reln *pglogrepl.RelationMessage, ...) (err error)
- func (d *DefaultMessageHandler) HandleRefresh(key any, value any) (err error)
- func (d *DefaultMessageHandler) HandleRelationMessage(idx int, msg *pglogrepl.RelationMessage, tableInfo *PGTableInfo) (err error)
- func (d *DefaultMessageHandler) HandleUpdateMessage(idx int, msg *pglogrepl.UpdateMessage, reln *pglogrepl.RelationMessage, ...) (err error)
- type EchoMessageHandler
- func (d *EchoMessageHandler) HandleBeginMessage(idx int, msg *pglogrepl.BeginMessage) (err error)
- func (d *EchoMessageHandler) HandleCommitMessage(idx int, msg *pglogrepl.CommitMessage) (err error)
- func (d *EchoMessageHandler) HandleDeleteMessage(idx int, msg *pglogrepl.DeleteMessage, reln *pglogrepl.RelationMessage, ...) (err error)
- func (d *EchoMessageHandler) HandleInsertMessage(idx int, msg *pglogrepl.InsertMessage, reln *pglogrepl.RelationMessage, ...) (err error)
- func (d *EchoMessageHandler) HandleRefresh(key any, value any) (err error)
- func (d *EchoMessageHandler) HandleRelationMessage(idx int, msg *pglogrepl.RelationMessage, tableInfo *PGTableInfo) (err error)
- func (d *EchoMessageHandler) HandleUpdateMessage(idx int, msg *pglogrepl.UpdateMessage, reln *pglogrepl.RelationMessage, ...) (err error)
- type EventsProcessor
- type InMemSelection
- type MessageHandler
- type PGColumnInfo
- type PGMSG
- type PGTableInfo
- type StringMap
- type Syncer
- func (p *Syncer) DB() *sql.DB
- func (p *Syncer) Forward(nummsgs int) error
- func (p *Syncer) GetMessages(numMessages int, consume bool, out []PGMSG) (msgs []PGMSG, err error)
- func (d *Syncer) GetRelation(relationId uint32) *pglogrepl.RelationMessage
- func (p *Syncer) GetTableInfo(relationID uint32) *PGTableInfo
- func (d *Syncer) IsRunning() bool
- func (d *Syncer) MarkDeleted(doctype string, docid string)
- func (d *Syncer) MarkUpdated(doctype string, docid string, doc StringMap)
- func (s *Syncer) MessageToMap(msg *pglogrepl.TupleData, reln *pglogrepl.RelationMessage) (pkey string, out map[string]any, errors map[string]error)
- func (d *Syncer) Refresh(selectQuery string) bool
- func (p *Syncer) RefreshTableInfo(relationID uint32, namespace string, table_name string) (tableInfo *PGTableInfo, err error)
- func (d *Syncer) Start()
- func (d *Syncer) Stop()
- func (d *Syncer) UpdateRelation(reln *pglogrepl.RelationMessage) *PGTableInfo
- type SyncerOpt
Constants ¶
const DEFAULT_DBSYNC_CTRL_NAMESPACE = "dbsync_ctrl"
const DEFAULT_DBSYNC_PUBNAME = "dbsync_mypub"
const DEFAULT_DBSYNC_REPLSLOT = "dbsync_replslot"
const DEFAULT_DBSYNC_WM_TABLENAME = "dbsync_wmtable"
const DEFAULT_POSTGRES_DB = "postgres"
const DEFAULT_POSTGRES_HOST = "localhost"
const DEFAULT_POSTGRES_PASSWORD = "docker"
const DEFAULT_POSTGRES_PORT = "5432"
const DEFAULT_POSTGRES_USER = "postgres"
const PG_TIMESTAMP_FORMAT = "2006-01-02 15:04:05.999999+00"
Variables ¶
var ErrStopProcessingMessages = errors.New("message processing halted")
Functions ¶
func PGConnStringFromEnv ¶
func PGConnStringFromEnv() string
Gets the postgres connection string from environment variables.
Types ¶
type DefaultMessageHandler ¶
type DefaultMessageHandler struct {
}
A default implementation of the messagge handler with no-op methods
func (*DefaultMessageHandler) HandleBeginMessage ¶
func (d *DefaultMessageHandler) HandleBeginMessage(idx int, msg *pglogrepl.BeginMessage) (err error)
func (*DefaultMessageHandler) HandleCommitMessage ¶
func (d *DefaultMessageHandler) HandleCommitMessage(idx int, msg *pglogrepl.CommitMessage) (err error)
func (*DefaultMessageHandler) HandleDeleteMessage ¶
func (d *DefaultMessageHandler) HandleDeleteMessage(idx int, msg *pglogrepl.DeleteMessage, reln *pglogrepl.RelationMessage, tableInfo *PGTableInfo) (err error)
func (*DefaultMessageHandler) HandleInsertMessage ¶
func (d *DefaultMessageHandler) HandleInsertMessage(idx int, msg *pglogrepl.InsertMessage, reln *pglogrepl.RelationMessage, tableInfo *PGTableInfo) (err error)
func (*DefaultMessageHandler) HandleRefresh ¶
func (d *DefaultMessageHandler) HandleRefresh(key any, value any) (err error)
func (*DefaultMessageHandler) HandleRelationMessage ¶
func (d *DefaultMessageHandler) HandleRelationMessage(idx int, msg *pglogrepl.RelationMessage, tableInfo *PGTableInfo) (err error)
func (*DefaultMessageHandler) HandleUpdateMessage ¶
func (d *DefaultMessageHandler) HandleUpdateMessage(idx int, msg *pglogrepl.UpdateMessage, reln *pglogrepl.RelationMessage, tableInfo *PGTableInfo) (err error)
type EchoMessageHandler ¶
type EchoMessageHandler struct {
DefaultMessageHandler
}
A message handler that prints events to stdout
func (*EchoMessageHandler) HandleBeginMessage ¶
func (d *EchoMessageHandler) HandleBeginMessage(idx int, msg *pglogrepl.BeginMessage) (err error)
func (*EchoMessageHandler) HandleCommitMessage ¶
func (d *EchoMessageHandler) HandleCommitMessage(idx int, msg *pglogrepl.CommitMessage) (err error)
func (*EchoMessageHandler) HandleDeleteMessage ¶
func (d *EchoMessageHandler) HandleDeleteMessage(idx int, msg *pglogrepl.DeleteMessage, reln *pglogrepl.RelationMessage, tableInfo *PGTableInfo) (err error)
func (*EchoMessageHandler) HandleInsertMessage ¶
func (d *EchoMessageHandler) HandleInsertMessage(idx int, msg *pglogrepl.InsertMessage, reln *pglogrepl.RelationMessage, tableInfo *PGTableInfo) (err error)
func (*EchoMessageHandler) HandleRefresh ¶
func (d *EchoMessageHandler) HandleRefresh(key any, value any) (err error)
func (*EchoMessageHandler) HandleRelationMessage ¶
func (d *EchoMessageHandler) HandleRelationMessage(idx int, msg *pglogrepl.RelationMessage, tableInfo *PGTableInfo) (err error)
func (*EchoMessageHandler) HandleUpdateMessage ¶
func (d *EchoMessageHandler) HandleUpdateMessage(idx int, msg *pglogrepl.UpdateMessage, reln *pglogrepl.RelationMessage, tableInfo *PGTableInfo) (err error)
type EventsProcessor ¶
Type of the callback function to handle messages read (peeked) from replication slot before the offset is forwarded. The callback accepts the list of messages (at the head of the replication slot) and returns two outputs:
- numProcessed - The number of messages processed. The poller then forwards the slot's by this amount (instead of how many it originally peeked).
- stop - A bool variable that signals to the poller if it should stop processing any further messages from the replication slot.
type InMemSelection ¶
type InMemSelection struct {
// contains filtered or unexported fields
}
func NewInMemSelection ¶
func NewInMemSelection() *InMemSelection
Creates a new in memory selection with a random ID
func (*InMemSelection) Clear ¶
func (i *InMemSelection) Clear() bool
Clears all items from this selection to release any storage needed
func (*InMemSelection) GetItem ¶
func (i *InMemSelection) GetItem(key any) (value any, exists bool)
Get the value of an item in this selection given its key.
func (*InMemSelection) ID ¶
func (i *InMemSelection) ID() string
Returns the Unique ID of this selection
func (*InMemSelection) Items ¶
func (i *InMemSelection) Items() map[any]DBItem
Gets all items currently remaining in the selection TODO - Should we allow iteration on this in case we are ok to have a *really* large dataset as part of this
func (*InMemSelection) RemoveItem ¶
func (i *InMemSelection) RemoveItem(key any) (out any)
Removes an item from this collection
type MessageHandler ¶
type MessageHandler interface {
HandleRefresh(key any, value any) error
HandleBeginMessage(idx int, msg *pglogrepl.BeginMessage) error
HandleCommitMessage(idx int, msg *pglogrepl.CommitMessage) error
HandleRelationMessage(idx int, msg *pglogrepl.RelationMessage, tableInfo *PGTableInfo) error
HandleUpdateMessage(idx int, msg *pglogrepl.UpdateMessage, reln *pglogrepl.RelationMessage, tableInfo *PGTableInfo) error
HandleDeleteMessage(idx int, msg *pglogrepl.DeleteMessage, reln *pglogrepl.RelationMessage, tableInfo *PGTableInfo) error
HandleInsertMessage(idx int, msg *pglogrepl.InsertMessage, reln *pglogrepl.RelationMessage, tableInfo *PGTableInfo) error
}
Handles different kinds of messages from the replication slot
type PGColumnInfo ¶
type PGColumnInfo struct {
DBName string
Namespace string
TableName string
ColumnName string
ColumnType string
OrdinalPosition int
}
func (*PGColumnInfo) DecodeBytes ¶
func (c *PGColumnInfo) DecodeBytes(input []byte) (out interface{}, err error)
func (*PGColumnInfo) DecodeText ¶
func (c *PGColumnInfo) DecodeText(input []byte) (out interface{}, err error)
type PGTableInfo ¶
type PGTableInfo struct {
RelationID uint32
DBName string
Namespace string
TableName string
ColInfo map[string]*PGColumnInfo
}
func (*PGTableInfo) GetRecordID ¶
func (t *PGTableInfo) GetRecordID(msg *pglogrepl.TupleData, reln *pglogrepl.RelationMessage) string
type Syncer ¶
type Syncer struct {
MessageHandler MessageHandler
Batcher Batcher
// The poller peeks for messages in the replication slot periodically. This determines delay between peeks.
DelayBetweenPeekRetries time.Duration
// The delay between peeks is not fixed. Instead if no messages are found a backoff is applied
// so this delay increases until there are more messages. This factor determins how much the
// delay should be increased by when no messages are found on the slot.
TimerDelayBackoffFactor float32
// The delay between empty peeks is capped at this amount.
MaxDelayBetweenEmptyPeeks time.Duration
// Maximum number of messages to read and process at a time.
MaxMessagesToRead int
// contains filtered or unexported fields
}
Our main Syncer type keeps track of a postgres replication slot that is being consumed and synced
func NewSyncer ¶
Creates a new Syncer instance from parameters obtained from environment variables. The environment variables looked up are:
- POSTGRES_NAME - Name of the Postgres DB to setup replication on
- POSTGRES_HOST - Host where the DB is executing
- POSTGRES_PORT - Port on which the DB is served from
- POSTGRES_USER - Admin username to connect to the postgres db to setup replication on
- POSTGRES_PASSWORD - Password of the admin user
- DBSYNC_CTRL_NAMESPACE - Name of the control namespace where dbsync will creates its auxiliary tables
- DBSYNC_PUBNAME - Name of the publication tracked by dbsync
- DBSYNC_REPLSLOT - Name of the replication slot dbsync will track
- DBSYNC_WM_TABLENAME - Name of the table dbsync will use to create/track watermarks on
func (*Syncer) Forward ¶
Forwards the message offset on the replication slot. Typically GetMessages is called to peek N messages. Then after those messages are processed the offset is forwarded to ensure at-least once processing of messages.
func (*Syncer) GetMessages ¶
Returns numMessages number of events at the front of the replication slot (queue). If consume parameter is set, then the offset is automatically forwarded, otherwise repeated calls to this method will simply returned "peeked" messages.
func (*Syncer) GetRelation ¶
func (d *Syncer) GetRelation(relationId uint32) *pglogrepl.RelationMessage
func (*Syncer) GetTableInfo ¶
func (p *Syncer) GetTableInfo(relationID uint32) *PGTableInfo
Returns the info about a table given its relation ID
func (*Syncer) MarkDeleted ¶
func (*Syncer) MarkUpdated ¶
func (*Syncer) MessageToMap ¶
func (*Syncer) RefreshTableInfo ¶
func (p *Syncer) RefreshTableInfo(relationID uint32, namespace string, table_name string) (tableInfo *PGTableInfo, err error)
Queries the DB for the latest schema of a given relation and stores it
func (*Syncer) Stop ¶
func (d *Syncer) Stop()
Signals to the poller to stop consuming messages from the replication slot
func (*Syncer) UpdateRelation ¶
func (d *Syncer) UpdateRelation(reln *pglogrepl.RelationMessage) *PGTableInfo
type SyncerOpt ¶
func ForTables ¶
Sets the table names to automatically create a publication for if the user does not want to create the publication manually
func WithCtrlNamespace ¶
Sets name of the control namespace used by dbsync internally
func WithPGConnStr ¶
Sets the connection string for the postsgres database to be synced
func WithPublication ¶
Sets the publication name tracked by dbsync
func WithReplicationSlot ¶
Sets the replication slot tracked by dbsync
func WithWMTable ¶
Sets name of the watermark table to be used