Documentation
¶
Index ¶
- func CounterTypeValuesString() string
- type ARN
- type App
- func (app *App) Handler(ctx context.Context, event *KinesisTimeWindowEvent) (*TimeWindowEventResponse, error)
- func (app *App) Run(ctx context.Context, streamName string, tumblingWindow time.Duration) error
- func (app *App) SetIgnorePutRecord(f bool)
- func (app *App) SetOutput(w io.Writer)
- func (app *App) SetVersion(version string)
- type BatchItemFailure
- type Config
- type CounterConfig
- type CounterState
- type CounterStates
- type CounterType
- func (i CounterType) IsACounterType() bool
- func (i CounterType) MarshalJSON() ([]byte, error)
- func (i CounterType) MarshalYAML() (interface{}, error)
- func (t *CounterType) Set(str string) error
- func (i CounterType) String() string
- func (i *CounterType) UnmarshalJSON(data []byte) error
- func (i *CounterType) UnmarshalYAML(unmarshal func(interface{}) error) error
- type FirehoseClient
- type IntermediateRecord
- type KinesisClient
- type KinesisTimeWindow
- type KinesisTimeWindowEvent
- type TimeWindowEventResponse
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func CounterTypeValuesString ¶
func CounterTypeValuesString() string
Types ¶
type ARN ¶
func (*ARN) IsAmbiguous ¶
func (*ARN) IsKinesisDataStream ¶
func (*ARN) MarshalText ¶
func (*ARN) StreamName ¶
func (*ARN) UnmarshalText ¶
type App ¶
type App struct {
// contains filtered or unexported fields
}
func NewWithClient ¶
func NewWithClient(cfg *Config, kinesisClient KinesisClient, firehoseClient FirehoseClient) *App
func (*App) Handler ¶
func (app *App) Handler(ctx context.Context, event *KinesisTimeWindowEvent) (*TimeWindowEventResponse, error)
func (*App) SetIgnorePutRecord ¶
func (*App) SetVersion ¶
type BatchItemFailure ¶
type BatchItemFailure struct {
ItemIdentifier string `json:"itemIdentifier"`
}
type Config ¶
type Config struct {
RequiredVersion string `yaml:"required_version"`
Counters []*CounterConfig `yaml:"counters"`
// contains filtered or unexported fields
}
func NewDefaultConfig ¶
func NewDefaultConfig() *Config
func (*Config) ValidateVersion ¶
type CounterConfig ¶
type CounterConfig struct {
ID string `yaml:"id,omitempty"`
InputStreamARN *ARN `yaml:"input_stream_arn,omitempty"`
OutputStreamARN *ARN `yaml:"output_stream_arn,omitempty"`
AggregateStreamArn *ARN `yaml:"aggregate_stream_arn,omitempty"`
TargetColumn string `yaml:"target_column,omitempty"`
TargetExpr string `yaml:"target_expr,omitempty"`
CounterType CounterType `yaml:"counter_type,omitempty"`
SipHashKeyHex string `yaml:"siphash_key_hex"`
JQExpr string `yaml:"jq_expr"`
// contains filtered or unexported fields
}
func (*CounterConfig) Restrict ¶
func (cfg *CounterConfig) Restrict() error
type CounterState ¶
type CounterState struct {
CounterType CounterType `json:"counter_type"`
RowCount int64 `json:"row_count,omitempty"`
Base64HLLPP string `json:"base64_hllpp,omitempty"`
}
type CounterStates ¶
type CounterStates map[string]map[string]*CounterState
func (CounterStates) MergeInto ¶
func (s CounterStates) MergeInto(other CounterStates)
type CounterType ¶
type CounterType int
const ( Count CounterType = iota + 1 ApproxCountDistinct )
func CounterTypeString ¶
func CounterTypeString(s string) (CounterType, error)
CounterTypeString retrieves an enum value from the enum constants string name. Throws an error if the param is not part of the enum.
func CounterTypeValues ¶
func CounterTypeValues() []CounterType
CounterTypeValues returns all values of the enum
func (CounterType) IsACounterType ¶
func (i CounterType) IsACounterType() bool
IsACounterType returns "true" if the value is listed in the enum definition. "false" otherwise
func (CounterType) MarshalJSON ¶
func (i CounterType) MarshalJSON() ([]byte, error)
MarshalJSON implements the json.Marshaler interface for CounterType
func (CounterType) MarshalYAML ¶
func (i CounterType) MarshalYAML() (interface{}, error)
MarshalYAML implements a YAML Marshaler for CounterType
func (*CounterType) Set ¶
func (t *CounterType) Set(str string) error
func (CounterType) String ¶
func (i CounterType) String() string
func (*CounterType) UnmarshalJSON ¶
func (i *CounterType) UnmarshalJSON(data []byte) error
UnmarshalJSON implements the json.Unmarshaler interface for CounterType
func (*CounterType) UnmarshalYAML ¶
func (i *CounterType) UnmarshalYAML(unmarshal func(interface{}) error) error
UnmarshalYAML implements a YAML Unmarshaler for CounterType
type FirehoseClient ¶
type FirehoseClient interface {
PutRecord(ctx context.Context, params *firehose.PutRecordInput, optFns ...func(*firehose.Options)) (*firehose.PutRecordOutput, error)
}
type IntermediateRecord ¶
type IntermediateRecord struct {
EventSourceARN string `json:"event_source_arn,omitempty"`
ShardID string `json:"shard_id,omitempty"`
CounterID string `json:"counter_id,omitempty"`
CounterType CounterType `json:"counter_type,omitempty"`
CounterVersion string `json:"counter_version,omitempty"`
Window *KinesisTimeWindow `json:"window,omitempty"`
State *CounterState `json:"counter_state,omitempty"`
}
type KinesisClient ¶
type KinesisClient interface {
GetRecords(ctx context.Context, params *kinesis.GetRecordsInput, optFns ...func(*kinesis.Options)) (*kinesis.GetRecordsOutput, error)
GetShardIterator(ctx context.Context, params *kinesis.GetShardIteratorInput, optFns ...func(*kinesis.Options)) (*kinesis.GetShardIteratorOutput, error)
DescribeStream(ctx context.Context, params *kinesis.DescribeStreamInput, optFns ...func(*kinesis.Options)) (*kinesis.DescribeStreamOutput, error)
PutRecord(ctx context.Context, params *kinesis.PutRecordInput, optFns ...func(*kinesis.Options)) (*kinesis.PutRecordOutput, error)
}
type KinesisTimeWindow ¶
func (*KinesisTimeWindow) String ¶ added in v0.1.2
func (w *KinesisTimeWindow) String() string
type KinesisTimeWindowEvent ¶
type KinesisTimeWindowEvent struct {
Records []events.KinesisEventRecord `json:"Records"`
Window *KinesisTimeWindow `json:"window"`
State CounterStates `json:"state"`
ShardID string `json:"shardId"`
EventSourceArn string `json:"eventSourceARN"`
IsFinalInvokeForWindow bool `json:"isFinalInvokeForWindow"`
IsWindowTerminatedEarly bool `json:"isWindowTerminatedEarly"`
}
type TimeWindowEventResponse ¶
type TimeWindowEventResponse struct {
State CounterStates `json:"state"`
BatchItemFailures []BatchItemFailure `json:"batchItemFailures"`
// contains filtered or unexported fields
}
func (*TimeWindowEventResponse) AddBatchItemFailures ¶
func (resp *TimeWindowEventResponse) AddBatchItemFailures(items ...BatchItemFailure)
func (*TimeWindowEventResponse) MergeInto ¶
func (resp *TimeWindowEventResponse) MergeInto(other *TimeWindowEventResponse)
Source Files
¶
Click to show internal directories.
Click to hide internal directories.