dlqdump

package module
v1.0.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 16, 2024 License: MIT Imports: 10 Imported by: 1

README

DLQ dump

Dumping DLQ implementation for queue package. Allows dump leaky items to some storage: disk, cloud, etc...

DLQ is an auxiliary queue, where drops leaky items from main queue due to workers limit reached (or error occurs).

This package also describes the opposite interface, that allows to restore items from the dump and return them back to the main queue (see "Restoring" chapter).

Dumping

Dumping-component implemented as queue (because DLQ must implement the corresponding [interface]https://github.com/koykov/queue/blob/master/interface.go#L4 of the queue) and calls Queue.

The queue implements using config.

The base param is a Version. This params will check on reading the dump and specify are data backward-compatibility or not.

Flush settings

Params Capacity and FlushInterval specifies how often the dumping queue must flush collected data to the dump. Capacity must be set up in bytes; FlushInterval limits how long the queue must wait before flushing (beginning with the moment of coming the first item in DLQ).

NOTE: Capacity must specify in bytes because dumping assumes the storing in some storage with limits the size (eg. some cloud with limit of file size). Thus, DLQ may collect limited amount of serialized data and will flush them by limit reach. This param is mandatory.

Similar works FlushInterval. It remembers the moment of coming the first item and by reaching FlushInterval time flushes the data with reason "interval reached". That param requires for cases when items comes to DLQ rarely and couldn't fill DLQ to Capacity limit. Because of FlushInterval the items will not stores in DLQ infinitely and will flush to storage even if size will small.

As a result, DLQ waits for incoming items and then checks what reason will occur first: size of collected data will overflow Capacity or FlushInterval will reach.

Note, on close the DLQ, the force flush will happen, independent of both params. Then DLQ will close.

Serialization

Queue has two abstraction layers. The first is param Encoder - special component, that must implement interface Encoder. That component takes the arbitrary item and tries to serialize it to buffer dst. Serialized data will send to storage afterward.

dlqdump has several built-in encoders: builtin и marshaller. The first may serialize string/bytes data or types implements Byter and Stringer interfaces. The second may serialize protobuf objects.

Dump writing

The second abstraction layer. There is a param Writer that must implement Writer interface. This object, using version and serialized data, writes a dump. dlqdump has builtin Writer implementation to write dumps to the disk.

You may write your own implementation to write dumps to the cloud, etc...

Restoring

Dump writing isn't the full issues. Data from dumps should be used (restored and processed again). dlqdump contains Restorer component, that is opposite to Queue.

The main idea: the source queue leaks and using DLQ sends the items to dumping queue. The queue flushed the data to storage and then Restorer checks it periodically and tries to send items back to target queue (the origin queue in most usable case, but you may specify any other queue). As result, the loop is formed:

  • queue leaks the items
  • DLQ writes a dump
  • Restorer reads the items from dump
  • Restorer send restored items back to the queue

The storage uses as big buffer in that case, but not in RAM.

Restorer uses the same config struct, but ignores specific for queue params (queue similarly ignores Restorer params).

The base param is Version. Work similar to queue config. If version in config and dump are different, then dump will be removed.

The target queue set up using param Queue and must implement queue interface.

Restoring settings

Restorer has three params:

  • CheckInterval - the interval between checks of dumps in storage
  • PostponeInterval - how long restoring must be postponed if target's queue rate overflows AllowRate
  • AllowRate - the maximum rate (items/capacity) of target queue that allows to send items to it. Required to avoid overflowing of target queue by Restorer
Dump reading

Restorer similar to Queue has two abstraction layers, but in reverse meaning.

The first layer represents by param Reader that must implement Reader interface. This object must read from the dump version and serialized data till EOF error caught.

dlqdump has builtin implementation that reads dump from the disk. As usual, you may write your own implementation for required storage.

Deserialization

Serialized data taken from Reader sends to Decoder afterward - special param that must implement Decoder interface. This object will deserialize the data or report about error occurs.

dlqdump has two built-in decoders: fallthrough и unmarshaller. The first one uses only for testing purposes. The second is opposite to marshaller encoder and may deserialize objects like protobuf.

After success deserialization the item sends to the target queue.

Metrics

Similar to queue dlqdump has param MetricsWriter that must implement MetricsWriter interface.

There is two implementations of that interface:

Log writer is useless for production. Prometheus implementation is fully tested and may be used in production.

Logging

Restorer can log own internal processes using Logger param (must implement Logger interface).

Demo stage

dlqdump uses the same demo stage for debugging and testing purposes.

Testing scenario of dlqdump https://github.com/koykov/demo/blob/master/queue/request/demo100k_dump.http

References

Documentation

Index

Constants

View Source
const (
	Byte     MemorySize = 1
	Kilobyte            = Byte * 1024
	Megabyte            = Kilobyte * 1024
	Gigabyte            = Megabyte * 1024
	Terabyte            = Gigabyte * 1024
)

Variables

View Source
var (
	ErrNoEncoder = errors.New("no encoder provided")
	ErrNoDecoder = errors.New("no decoder provided")
	ErrNoWriter  = errors.New("no writer provided")
	ErrNoReader  = errors.New("no reader provided")
	ErrNoQueue   = errors.New("no destination queue provided")
	ErrTimeout   = errors.New("operation too long")
)

Functions

This section is empty.

Types

type Config

type Config struct {
	/*
		Common params.
	*/
	// Write version. Must be changed at any change of Encoder param.
	Version Version
	// Metrics writer handler.
	MetricsWriter MetricsWriter
	// Logger handler.
	Logger queue.Logger

	/*
		Queue params.
		Params of this group will ignore by Restorer.
	*/
	// Max queue capacity in bytes.
	// When dumped data will reach size, queue will flush the data.
	// Mandatory param.
	Capacity MemorySize
	// Wait duration until flush the data.
	// After first incoming item will start the timer to flush the data when timer reach.
	// If this param omit defaultFlushInterval (30 seconds) will use by default.
	FlushInterval time.Duration
	// Encoder helper to convert item to bytes.
	// Mandatory param.
	Encoder Encoder
	// Writer helper to dump data to various destinations.
	// Mandatory param.
	Writer Writer

	/*
		Restorer params.
		Params of this group will ignore by Queue.
	*/
	// Interval between restore attempts.
	// If this param omit defaultWaitInterval (1 second) will use by default.
	CheckInterval time.Duration
	// Wait duration if queue rate exceeds AllowRate.
	// If this param omit CheckInterval will use instead.
	PostponeInterval time.Duration
	// Queue rate that allows restore.
	// If this param omit defaultAllowRate (95%) will use by default.
	AllowRate float32
	// Helper to achieve data from dump.
	// Mandatory param.
	Reader Reader
	// Decoder helper to convert bytes to item.
	// Mandatory param.
	Decoder Decoder
	// Destination queue to restore dump.
	// Mandatory param.
	Queue queue.Interface
}

func (*Config) Copy

func (c *Config) Copy() *Config

Copy copies config instance to protect queue from changing params after start. It means that after starting queue all config modifications will have no effect.

type Decoder

type Decoder interface {
	Decode(p []byte) (any, error)
}

Decoder is the interface that wraps the basic Write Decode.

Decode decodes value from p. It returns decoded value and any error encountered.

type DummyMetrics

type DummyMetrics struct{}

func (DummyMetrics) Dump

func (DummyMetrics) Dump(_ int)

func (DummyMetrics) Fail

func (DummyMetrics) Fail(_ string)

func (DummyMetrics) Flush

func (DummyMetrics) Flush(_ string, _ int)

func (DummyMetrics) Restore

func (DummyMetrics) Restore(_ int)

type Encoder

type Encoder interface {
	Encode(dst []byte, x any) ([]byte, error)
}

Encoder is the interface that wraps the basic Encode method.

Encode encodes x to dst. It returns dst and any error encountered.

type MemorySize

type MemorySize uint64

type MetricsWriter

type MetricsWriter interface {
	// Dump registers how many bytes dumped to the queue.
	Dump(size int)
	// Flush registers how many bytes flushed to the queue and what reason is.
	Flush(reason string, size int)
	// Restore registers how many bytes restored to the queue.
	Restore(size int)
	// Fail registers fail reason for given queue.
	Fail(reason string)
}

MetricsWriter is the interface that wraps the basic metrics methods.

type Queue

type Queue struct {
	bitset.Bitset

	Err error
	// contains filtered or unexported fields
}

Queue represents dumping queue.

func NewQueue

func NewQueue(config *Config) (*Queue, error)

NewQueue makes new dumping queue instance and initialize it according config params.

func (*Queue) Capacity

func (q *Queue) Capacity() int

Capacity returns maximum queue capacity.

func (*Queue) Close

func (q *Queue) Close() error

Close gracefully stops the queue.

func (*Queue) Enqueue

func (q *Queue) Enqueue(x any) (err error)

Enqueue puts x to the queue.

func (*Queue) Rate

func (q *Queue) Rate() float32

Rate returns size to capacity ratio.

func (*Queue) Size

func (q *Queue) Size() int

Size returns actual size in bytes of all queued items (since start or last flush).

type Reader

type Reader interface {
	Read(dst []byte) (Version, []byte, error)
}

Reader is the interface that wraps the basic Read method.

Read reads next encoded entry from the dump to dst. It returns version, dst contains entry and any error encountered.

type Restorer

type Restorer struct {
	Err error
	// contains filtered or unexported fields
}

Restorer represents dump restore handler. Restorer may be scheduled (see Config.CheckInterval).

func NewRestorer

func NewRestorer(config *Config) (*Restorer, error)

NewRestorer makes new restorer instance and initialize it according config params.

func (*Restorer) Close

func (r *Restorer) Close() error

Close gracefully stops the restorer.

func (*Restorer) CloseWithTimeout

func (r *Restorer) CloseWithTimeout(timeout time.Duration) error

CloseWithTimeout stops the queue with timeout.

func (*Restorer) ForceClose

func (r *Restorer) ForceClose() error

ForceClose immediately stops the queue.

func (*Restorer) Restore

func (r *Restorer) Restore() error

Restore makes an attempt of restoring operation.

type Version

type Version uint64

Version represent simple version container.

func NewVersion

func NewVersion(major, minor, patch, revision uint16) Version

NewVersion composes version from given parts.

func ParseVersion

func ParseVersion(ver string) Version

ParseVersion makes new version from source string.

See version_test.go for examples.

func (Version) Major

func (v Version) Major() uint16

func (Version) Minor

func (v Version) Minor() uint16

func (Version) Patch

func (v Version) Patch() uint16

func (Version) Revision

func (v Version) Revision() uint16

func (Version) String

func (v Version) String() string

type Writer

type Writer interface {
	// Write returns the number of bytes written from p (0 <= n <= len(p) + 8) and any error encountered.
	Write(ver Version, p []byte) (int, error)
	// Size returns size in bytes of collected data.
	Size() MemorySize
	// Flush flushes collected data.
	Flush() error
}

Writer is the interface that wraps the basic Write method.

Directories

Path Synopsis
metrics
log module
prometheus module
victoria module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL