Documentation
¶
Overview ¶
Package kinesumer is a generated GoMock package.
Index ¶
- Variables
- type CommitConfig
- type Config
- type Kinesumer
- type MockStateStore
- func (m *MockStateStore) DeregisterClient(ctx context.Context, clientID string) error
- func (m *MockStateStore) EXPECT() *MockStateStoreMockRecorder
- func (m *MockStateStore) GetShards(ctx context.Context, stream string) (Shards, error)
- func (m *MockStateStore) ListAllAliveClientIDs(ctx context.Context) ([]string, error)
- func (m *MockStateStore) ListCheckPoints(ctx context.Context, stream string, shardIDs []string) (map[string]string, error)
- func (m *MockStateStore) PingClientAliveness(ctx context.Context, clientID string) error
- func (m *MockStateStore) PruneClients(ctx context.Context) error
- func (m *MockStateStore) RegisterClient(ctx context.Context, clientID string) error
- func (m *MockStateStore) UpdateCheckPoints(ctx context.Context, checkpoints []*ShardCheckPoint) error
- func (m *MockStateStore) UpdateShards(ctx context.Context, stream string, shards Shards) error
- type MockStateStoreMockRecorder
- func (mr *MockStateStoreMockRecorder) DeregisterClient(ctx, clientID interface{}) *gomock.Call
- func (mr *MockStateStoreMockRecorder) GetShards(ctx, stream interface{}) *gomock.Call
- func (mr *MockStateStoreMockRecorder) ListAllAliveClientIDs(ctx interface{}) *gomock.Call
- func (mr *MockStateStoreMockRecorder) ListCheckPoints(ctx, stream, shardIDs interface{}) *gomock.Call
- func (mr *MockStateStoreMockRecorder) PingClientAliveness(ctx, clientID interface{}) *gomock.Call
- func (mr *MockStateStoreMockRecorder) PruneClients(ctx interface{}) *gomock.Call
- func (mr *MockStateStoreMockRecorder) RegisterClient(ctx, clientID interface{}) *gomock.Call
- func (mr *MockStateStoreMockRecorder) UpdateCheckPoints(ctx, checkpoints interface{}) *gomock.Call
- func (mr *MockStateStoreMockRecorder) UpdateShards(ctx, stream, shards interface{}) *gomock.Call
- type Record
- type Shard
- type ShardCheckPoint
- type Shards
- type StateStore
Constants ¶
This section is empty.
Variables ¶
var ( ErrEmptySequenceNumber = errors.New("kinesumer: sequence number can't be empty") ErrInvalidStream = errors.New("kinesumer: invalid stream") )
Error codes.
var ( ErrNoShardCache = errors.New("kinesumer: shard cache not found") ErrEmptyShardIDs = errors.New("kinesumer: empty shard ids given") )
Error codes.
Functions ¶
This section is empty.
Types ¶
type CommitConfig ¶ added in v0.4.0
type CommitConfig struct {
// Whether to auto-commit updated sequence number. (default is true)
Auto bool
// How frequently to commit updated sequence numbers. (default is 5s)
Interval time.Duration
// A Timeout config for commit per stream. (default is 2s)
Timeout time.Duration
}
CommitConfig holds options for how to offset handled.
func NewDefaultCommitConfig ¶ added in v0.4.0
func NewDefaultCommitConfig() *CommitConfig
NewDefaultCommitConfig returns a new default offset management configuration.
type Config ¶
type Config struct {
App string // Application name.
Region string // Region name. (optional)
ClientID string // Consumer group client id. (optional)
// Kinesis configs.
KinesisRegion string
KinesisEndpoint string // Only for local server.
// If you want to consume messages from Kinesis in a different account,
// you need to set up the IAM role to access to target account, and pass the role arn here.
// Reference: https://docs.aws.amazon.com/kinesisanalytics/latest/java/examples-cross.html.
RoleARN string
// State store configs.
StateStore *StateStore
DynamoDBRegion string
DynamoDBTable string
DynamoDBEndpoint string // Only for local server.
// These configs are not used in EFO mode.
ScanLimit int64
ScanTimeout time.Duration
ScanInterval time.Duration
EFOMode bool // On/off the Enhanced Fan-Out feature.
// This config is used for how to manage sequence number.
Commit *CommitConfig
}
Config defines configs for the Kinesumer client.
type Kinesumer ¶
type Kinesumer struct {
// contains filtered or unexported fields
}
Kinesumer implements auto re-balancing consumer group for Kinesis. TODO(mingrammer): export prometheus metrics.
func NewKinesumer ¶
NewKinesumer initializes and returns a new Kinesumer client.
func (*Kinesumer) Commit ¶ added in v0.4.0
func (k *Kinesumer) Commit()
Commit updates check point using current checkpoints.
func (*Kinesumer) MarkRecord ¶ added in v0.4.0
MarkRecord marks the provided record as consumed.
type MockStateStore ¶ added in v0.4.0
type MockStateStore struct {
// contains filtered or unexported fields
}
MockStateStore is a mock of StateStore interface.
func NewMockStateStore ¶ added in v0.4.0
func NewMockStateStore(ctrl *gomock.Controller) *MockStateStore
NewMockStateStore creates a new mock instance.
func (*MockStateStore) DeregisterClient ¶ added in v0.4.0
func (m *MockStateStore) DeregisterClient(ctx context.Context, clientID string) error
DeregisterClient mocks base method.
func (*MockStateStore) EXPECT ¶ added in v0.4.0
func (m *MockStateStore) EXPECT() *MockStateStoreMockRecorder
EXPECT returns an object that allows the caller to indicate expected use.
func (*MockStateStore) ListAllAliveClientIDs ¶ added in v0.4.0
func (m *MockStateStore) ListAllAliveClientIDs(ctx context.Context) ([]string, error)
ListAllAliveClientIDs mocks base method.
func (*MockStateStore) ListCheckPoints ¶ added in v0.4.0
func (m *MockStateStore) ListCheckPoints(ctx context.Context, stream string, shardIDs []string) (map[string]string, error)
ListCheckPoints mocks base method.
func (*MockStateStore) PingClientAliveness ¶ added in v0.4.0
func (m *MockStateStore) PingClientAliveness(ctx context.Context, clientID string) error
PingClientAliveness mocks base method.
func (*MockStateStore) PruneClients ¶ added in v0.4.0
func (m *MockStateStore) PruneClients(ctx context.Context) error
PruneClients mocks base method.
func (*MockStateStore) RegisterClient ¶ added in v0.4.0
func (m *MockStateStore) RegisterClient(ctx context.Context, clientID string) error
RegisterClient mocks base method.
func (*MockStateStore) UpdateCheckPoints ¶ added in v0.4.0
func (m *MockStateStore) UpdateCheckPoints(ctx context.Context, checkpoints []*ShardCheckPoint) error
UpdateCheckPoints mocks base method.
func (*MockStateStore) UpdateShards ¶ added in v0.4.0
UpdateShards mocks base method.
type MockStateStoreMockRecorder ¶ added in v0.4.0
type MockStateStoreMockRecorder struct {
// contains filtered or unexported fields
}
MockStateStoreMockRecorder is the mock recorder for MockStateStore.
func (*MockStateStoreMockRecorder) DeregisterClient ¶ added in v0.4.0
func (mr *MockStateStoreMockRecorder) DeregisterClient(ctx, clientID interface{}) *gomock.Call
DeregisterClient indicates an expected call of DeregisterClient.
func (*MockStateStoreMockRecorder) GetShards ¶ added in v0.4.0
func (mr *MockStateStoreMockRecorder) GetShards(ctx, stream interface{}) *gomock.Call
GetShards indicates an expected call of GetShards.
func (*MockStateStoreMockRecorder) ListAllAliveClientIDs ¶ added in v0.4.0
func (mr *MockStateStoreMockRecorder) ListAllAliveClientIDs(ctx interface{}) *gomock.Call
ListAllAliveClientIDs indicates an expected call of ListAllAliveClientIDs.
func (*MockStateStoreMockRecorder) ListCheckPoints ¶ added in v0.4.0
func (mr *MockStateStoreMockRecorder) ListCheckPoints(ctx, stream, shardIDs interface{}) *gomock.Call
ListCheckPoints indicates an expected call of ListCheckPoints.
func (*MockStateStoreMockRecorder) PingClientAliveness ¶ added in v0.4.0
func (mr *MockStateStoreMockRecorder) PingClientAliveness(ctx, clientID interface{}) *gomock.Call
PingClientAliveness indicates an expected call of PingClientAliveness.
func (*MockStateStoreMockRecorder) PruneClients ¶ added in v0.4.0
func (mr *MockStateStoreMockRecorder) PruneClients(ctx interface{}) *gomock.Call
PruneClients indicates an expected call of PruneClients.
func (*MockStateStoreMockRecorder) RegisterClient ¶ added in v0.4.0
func (mr *MockStateStoreMockRecorder) RegisterClient(ctx, clientID interface{}) *gomock.Call
RegisterClient indicates an expected call of RegisterClient.
func (*MockStateStoreMockRecorder) UpdateCheckPoints ¶ added in v0.4.0
func (mr *MockStateStoreMockRecorder) UpdateCheckPoints(ctx, checkpoints interface{}) *gomock.Call
UpdateCheckPoints indicates an expected call of UpdateCheckPoints.
func (*MockStateStoreMockRecorder) UpdateShards ¶ added in v0.4.0
func (mr *MockStateStoreMockRecorder) UpdateShards(ctx, stream, shards interface{}) *gomock.Call
UpdateShards indicates an expected call of UpdateShards.
type ShardCheckPoint ¶ added in v0.4.0
type ShardCheckPoint struct {
Stream string
ShardID string
SequenceNumber string
UpdatedAt time.Time
}
ShardCheckPoint manages a shard check point.
type StateStore ¶ added in v0.2.3
type StateStore interface {
GetShards(ctx context.Context, stream string) (Shards, error)
UpdateShards(ctx context.Context, stream string, shards Shards) error
ListAllAliveClientIDs(ctx context.Context) ([]string, error)
RegisterClient(ctx context.Context, clientID string) error
DeregisterClient(ctx context.Context, clientID string) error
PingClientAliveness(ctx context.Context, clientID string) error
PruneClients(ctx context.Context) error
ListCheckPoints(ctx context.Context, stream string, shardIDs []string) (map[string]string, error)
UpdateCheckPoints(ctx context.Context, checkpoints []*ShardCheckPoint) error
}
StateStore is a distributed key-value store for managing states.

