Documentation
¶
Overview ¶
Package swf provides a full implementation of a client api for Amazon Simple Workflow Service http://docs.aws.amazon.com/amazonswf/latest/apireference/
In addition it provides a basic facility for modeling SWF workflows as FSMs (finite state machines), as well as implementations of pollers for both decision and activty tasks.
Client ¶
The Client in this library understands how to make requests to and receive responses from every endpoint in the SWF API, as well as the PutRecord endpoint for Kinesis.
TaskPollers ¶
DecisionTaskPoller and ActivityTaskPoller facilitate proper usage of the PollForDecisionTask and PollForActivityTask endpoints in the SWF API. These endpoints are used by DecisionTask and ActivityTask workers to claim tasks on which to work. The endpoints use long polling. SWF will keep the request open for up to 60 seconds before returning an 'empty' response. If a task is generated before that time, a non-empty task is delivered (and assigned to) a particular polling client.
There is an unfortunate bug in SWF that occurs when a long-polling request gets terminated client side, rather than waiting for the SWF API to respond. SWF does not recognize this condition so it can result in assigning a task to a disconnected worker, which will subsequently cause the task to timeout. This is not terrible if the task has a short timeout but can cause big delays if the task does have a long timeout.
Both types of pollers allow you to manage polling yourself by calling Poll() directly. However it is recommended that you use the PollUntilShutdownBy(...) function, which works in concert with a PollerShutdownManager to await all in-flight polls to complete. This facilitates clean shutdown of end user processes.
PollerShutdownManager ¶
When PollerShutdownManager.ShutdownPollers() is called, it will signal any registered pollers to shut down once any in-flight polls have completed, and block until this happens. The shutdown process can take up to 60 seconds due to the length of SWF long polls before an empty response is returned.
FSM ¶
The FSM in swf4go layers an erlang/akka style finite state machine abstraction on top of SWF, and facilitates modeling your workflows as FSMs. The FSM will be responsible for handling the decision tasks in your workflow that implicitly model it.
The FSM takes care of serializing/deserializing and threading a data model through the workflow history for you, as well as serialization/deserialization of any payloads in events your workflows recieve, as well as optionally sending the data model snapshots to kinesis, to facilitate a CQRS style application where the query models will be built off the Kinesis stream.
From http://www.erlang.org/doc/design_principles/fsm.html, a finite state machine, or FSM, can be described as a set of relations of the form:
State(S) x Event(E) -> Actions(A), State(S')
Substituting the relevant SWF/swf4go concepts, we get
(Your main data struct) x (an swf.HistoryEvent) -> (zero or more swf.Decisions), (A possibly updated main data struct)
See the http://godoc.org/github.com/sclasen/swf4go#example-FSM for a complete usage example.
Index ¶
- Constants
- Variables
- func ContinueFSMWorkflowInput(ctx *FSMContext, data interface{}) string
- func DecisionTypes() []string
- func EventTypes() []string
- func MustGetenv(key string) string
- func StartFSMWorkflowInput(serializer StateSerializer, data interface{}) string
- type ActivityCorrelator
- type ActivityInfo
- type ActivityTaskCancelRequestedEventAttributes
- type ActivityTaskCanceledEventAttributes
- type ActivityTaskCompletedEventAttributes
- type ActivityTaskFailedEventAttributes
- type ActivityTaskPoller
- type ActivityTaskScheduledEventAttributes
- type ActivityTaskStartedEventAttributes
- type ActivityTaskTimedOutEventAttributes
- type ActivityType
- type ActivityTypeConfiguration
- type ActivityTypeInfo
- type ActivityTypeMigrator
- type ActivityWorkerClient
- type CancelTimerDecisionAttributes
- type CancelTimerFailedEventAttributes
- type CancelWorkflowExecutionDecisionAttributes
- type CancelWorkflowExecutionFailedEventAttributes
- type ChildWorkflowExecutionCanceledEventAttributes
- type ChildWorkflowExecutionCompletedEventAttributes
- type ChildWorkflowExecutionFailedEventAttributes
- type ChildWorkflowExecutionStartedEventAttributes
- type ChildWorkflowExecutionTerminatedEventAttributes
- type ChildWorkflowExecutionTimedOutEventAttributes
- type Client
- func (c *Client) CountClosedWorkflowExecutions(request CountClosedWorkflowExecutionsRequest) (*CountResponse, error)
- func (c *Client) CountOpenWorkflowExecutions(request CountOpenWorkflowExecutionsRequest) (*CountResponse, error)
- func (c *Client) CountPendingActivityTasks(request CountPendingActivityTasksRequest) (*CountResponse, error)
- func (c *Client) CountPendingDecisionTasks(request CountPendingDecisionTasksRequest) (*CountResponse, error)
- func (c *Client) CreateStream(request CreateStream) error
- func (c *Client) DeprecateActivityType(request DeprecateActivityType) error
- func (c *Client) DeprecateDomain(request DeprecateDomain) error
- func (c *Client) DeprecateWorkflowType(request DeprecateWorkflowType) error
- func (c *Client) DescribeActivityType(request DescribeActivityTypeRequest) (*DescribeActivityTypeResponse, error)
- func (c *Client) DescribeDomain(request DescribeDomainRequest) (*DescribeDomainResponse, error)
- func (c *Client) DescribeStream(request DescribeStreamRequest) (*DescribeStreamResponse, error)
- func (c *Client) DescribeWorkflowExecution(request DescribeWorkflowExecutionRequest) (*DescribeWorkflowExecutionResponse, error)
- func (c *Client) DescribeWorkflowType(request DescribeWorkflowTypeRequest) (*DescribeWorkflowTypeResponse, error)
- func (c *Client) GetRecords(request GetRecordsRequest) (*GetRecordsResponse, error)
- func (c *Client) GetShardIterator(request GetShardIteratorRequest) (*GetShardIteratorResponse, error)
- func (c *Client) GetWorkflowExecutionHistory(request GetWorkflowExecutionHistoryRequest) (*GetWorkflowExecutionHistoryResponse, error)
- func (c *Client) ListActivityTypes(request ListActivityTypesRequest) (*ListActivityTypesResponse, error)
- func (c *Client) ListClosedWorkflowExecutions(request ListClosedWorkflowExecutionsRequest) (*ListClosedWorkflowExecutionsResponse, error)
- func (c *Client) ListDomains(request ListDomainsRequest) (*ListDomainsResponse, error)
- func (c *Client) ListOpenWorkflowExecutions(request ListOpenWorkflowExecutionsRequest) (*ListOpenWorkflowExecutionsResponse, error)
- func (c *Client) ListWorkflowTypes(request ListWorkflowTypesRequest) (*ListWorkflowTypesResponse, error)
- func (c *Client) PollForActivityTask(request PollForActivityTaskRequest) (*PollForActivityTaskResponse, error)
- func (c *Client) PollForDecisionTask(request PollForDecisionTaskRequest) (*PollForDecisionTaskResponse, error)
- func (c *Client) PutRecord(request PutRecordRequest) (*PutRecordResponse, error)
- func (c *Client) RecordActivityTaskHeartbeat(request RecordActivityTaskHeartbeatRequest) (*RecordActivityTaskHeartbeatResponse, error)
- func (c *Client) RegisterActivityType(request RegisterActivityType) error
- func (c *Client) RegisterDomain(request RegisterDomain) error
- func (c *Client) RegisterWorkflowType(request RegisterWorkflowType) error
- func (c *Client) RequestCancelWorkflowExecution(request RequestCancelWorkflowExecution) error
- func (c *Client) RespondActivityTaskCanceled(request RespondActivityTaskCanceledRequest) error
- func (c *Client) RespondActivityTaskCompleted(request RespondActivityTaskCompletedRequest) error
- func (c *Client) RespondActivityTaskFailed(request RespondActivityTaskFailedRequest) error
- func (c *Client) RespondDecisionTaskCompleted(request RespondDecisionTaskCompletedRequest) error
- func (c *Client) SignalWorkflow(request SignalWorkflowRequest) error
- func (c *Client) StartWorkflow(request StartWorkflowRequest) (*StartWorkflowResponse, error)
- func (c *Client) TerminateWorkflowExecution(request TerminateWorkflowExecution) error
- type CompleteOutcome
- type CompleteWorkflowExecutionDecisionAttributes
- type CompleteWorkflowExecutionFailedEventAttributes
- type ComposedDecider
- type ContinueAsNewWorkflowExecutionDecisionAttributes
- type ContinueAsNewWorkflowExecutionFailedEventAttributes
- type ContinueOutcome
- type CountClosedWorkflowExecutionsRequest
- type CountOpenWorkflowExecutionsRequest
- type CountPendingActivityTasksRequest
- type CountPendingDecisionTasksRequest
- type CountResponse
- type CreateStream
- type Date
- type Decider
- type Decision
- type DecisionFunc
- type DecisionTaskCompletedEventAttributes
- type DecisionTaskPoller
- type DecisionTaskScheduledEventAttributes
- type DecisionTaskStartedEventAttributes
- type DecisionTaskTimedOutEventAttributes
- type DecisionWorkerClient
- type DeprecateActivityType
- type DeprecateDomain
- type DeprecateWorkflowType
- type DescribeActivityTypeRequest
- type DescribeActivityTypeResponse
- type DescribeDomainRequest
- type DescribeDomainResponse
- type DescribeStreamRequest
- type DescribeStreamResponse
- type DescribeWorkflowExecutionRequest
- type DescribeWorkflowExecutionResponse
- type DescribeWorkflowTypeRequest
- type DescribeWorkflowTypeResponse
- type DomainConfiguration
- type DomainInfo
- type DomainMigrator
- type ErrorOutcome
- type ErrorResponse
- type ExecutionConfiguration
- type ExecutionFilter
- type ExternalWorkflowExecutionCancelRequestedEventAttributes
- type ExternalWorkflowExecutionSignaledEventAttributes
- type FSM
- func (f *FSM) AddCompleteState(state *FSMState)
- func (f *FSM) AddErrorState(state *FSMState)
- func (f *FSM) AddInitialState(state *FSMState)
- func (f *FSM) AddState(state *FSMState)
- func (f *FSM) DefaultCompleteState() *FSMState
- func (f *FSM) DefaultErrorState() *FSMState
- func (f *FSM) Deserialize(serialized string, data interface{})
- func (f *FSM) EmptyDecisions() []Decision
- func (f *FSM) EventData(event HistoryEvent, eventData interface{})
- func (f *FSM) Init()
- func (f *FSM) InitialState() string
- func (f *FSM) Serialize(data interface{}) string
- func (f *FSM) Start()
- func (f *FSM) StateSerializer() StateSerializer
- func (f *FSM) Stop()
- func (f *FSM) Tick(decisionTask *PollForDecisionTaskResponse) ([]Decision, *SerializedState)
- type FSMContext
- func (f *FSMContext) ActivitiesInfo() map[string]*ActivityInfo
- func (f *FSMContext) ActivityInfo(h HistoryEvent) *ActivityInfo
- func (f *FSMContext) Complete(data interface{}, decisions ...Decision) Outcome
- func (f *FSMContext) CompletionDecision(data interface{}) Decision
- func (f *FSMContext) ContinueDecision(data interface{}, decisions []Decision) Outcome
- func (f *FSMContext) ContinueWorkflowDecision(continuedState string) Decision
- func (f *FSMContext) Decide(h HistoryEvent, data interface{}, decider Decider) Outcome
- func (f *FSMContext) Deserialize(serialized string, data interface{})
- func (f *FSMContext) EmptyDecisions() []Decision
- func (f *FSMContext) Error(data interface{}, decisions []Decision) Outcome
- func (f *FSMContext) EventData(h HistoryEvent, data interface{})
- func (f *FSMContext) Goto(state string, data interface{}, decisions []Decision) Outcome
- func (f *FSMContext) Serialize(data interface{}) string
- func (f *FSMContext) Serializer() StateSerializer
- func (f *FSMContext) StateVersion() uint64
- func (f *FSMContext) Stay(data interface{}, decisions []Decision) Outcome
- type FSMSerializer
- type FSMState
- type FailWorkflowExecutionDecisionAttributes
- type FailWorkflowExecutionFailedEventAttributes
- type GetRecordsRequest
- type GetRecordsResponse
- type GetShardIteratorRequest
- type GetShardIteratorResponse
- type GetWorkflowExecutionHistoryRequest
- type GetWorkflowExecutionHistoryResponse
- type HistoryEvent
- type JSONStateSerializer
- type KinesisClient
- type KinesisReplicator
- type ListActivityTypesRequest
- type ListActivityTypesResponse
- type ListClosedWorkflowExecutionsRequest
- type ListClosedWorkflowExecutionsResponse
- type ListDomainsRequest
- type ListDomainsResponse
- type ListOpenWorkflowExecutionsRequest
- type ListOpenWorkflowExecutionsResponse
- type ListWorkflowTypesRequest
- type ListWorkflowTypesResponse
- type MarkerRecordedEventAttributes
- type MultiDecisionFunc
- type OpenCounts
- type Outcome
- type PollForActivityTaskRequest
- type PollForActivityTaskResponse
- type PollForDecisionTaskRequest
- type PollForDecisionTaskResponse
- type PollerShutdownManager
- type PredicateFunc
- type ProtobufStateSerializer
- type PutRecordRequest
- type PutRecordResponse
- type RecordActivityTaskHeartbeatRequest
- type RecordActivityTaskHeartbeatResponse
- type RecordMarkerDecisionAttributes
- type RecordMarkerFailedEventAttributes
- type Region
- type RegisterActivityType
- type RegisterDomain
- type RegisterWorkflowType
- type ReplicationData
- type RequestCancelActivityTaskDecisionAttributes
- type RequestCancelActivityTaskFailedEventAttributes
- type RequestCancelExternalWorkflowExecutionDecisionAttributes
- type RequestCancelExternalWorkflowExecutionFailedEventAttributes
- type RequestCancelExternalWorkflowExecutionInitiatedEventAttributes
- type RequestCancelWorkflowExecution
- type RespondActivityTaskCanceledRequest
- type RespondActivityTaskCompletedRequest
- type RespondActivityTaskFailedRequest
- type RespondDecisionTaskCompletedRequest
- type ScheduleActivityTaskDecisionAttributes
- type ScheduleActivityTaskFailedEventAttributes
- type SerializedDecisionError
- type SerializedState
- type SerializedSystemError
- type SignalExternalWorkflowExecutionDecisionAttributes
- type SignalExternalWorkflowExecutionFailedEventAttributes
- type SignalExternalWorkflowExecutionInitiatedEventAttributes
- type SignalWorkflowRequest
- type StartChildWorkflowExecutionDecisionAttributes
- type StartChildWorkflowExecutionFailedEventAttributes
- type StartChildWorkflowExecutionInitiatedEventAttributes
- type StartTimerDecisionAttributes
- type StartTimerFailedEventAttributes
- type StartWorkflowRequest
- type StartWorkflowResponse
- type StateFunc
- type StateSerializer
- type StatusFilter
- type StayOutcome
- type StreamMigrator
- type TagFilter
- type TaskList
- type TerminateWorkflowExecution
- type TimeFilter
- type TimerCanceledEventAttributes
- type TimerFiredEventAttributes
- type TimerStartedEventAttributes
- type TransitionOutcome
- type TypeFilter
- type TypedFuncs
- func (t *TypedFuncs) Decider(decider interface{}) Decider
- func (t *TypedFuncs) DecisionFunc(decisionFunc interface{}) DecisionFunc
- func (t *TypedFuncs) MultiDecisionFunc(decisionFunc interface{}) MultiDecisionFunc
- func (t *TypedFuncs) PredicateFunc(stateFunc interface{}) PredicateFunc
- func (t *TypedFuncs) StateFunc(stateFunc interface{}) StateFunc
- type TypesMigrator
- type WorkflowAdminClient
- type WorkflowClient
- type WorkflowConfiguration
- type WorkflowExecution
- type WorkflowExecutionCancelRequestedEventAttributes
- type WorkflowExecutionCanceledEventAttributes
- type WorkflowExecutionCompletedEventAttributes
- type WorkflowExecutionContinuedAsNewEventAttributes
- type WorkflowExecutionFailedEventAttributes
- type WorkflowExecutionInfo
- type WorkflowExecutionSignaledEventAttributes
- type WorkflowExecutionStartedEventAttributes
- type WorkflowExecutionTerminatedEventAttributes
- type WorkflowExecutionTimedOutEventAttributes
- type WorkflowInfoClient
- type WorkflowType
- type WorkflowTypeInfo
- type WorkflowTypeMigrator
- type WorkflowWorkerClient
Examples ¶
Constants ¶
const ( StateMarker = "FSM.State" ErrorSignal = "FSM.Error" SystemErrorSignal = "FSM.SystemError" ContinueTimer = "FSM.ContinueWorkflow" ContinueSignal = "FSM.ContinueWorkflow" CompleteState = "complete" ErrorState = "error" )
constants used as marker names or signal names
const ( StatusRegistered = "REGISTERED" StatusDeprecated = "DEPRECATED" ErrorTypeUnknownResourceFault = "com.amazonaws.swf.base.model#UnknownResourceFault" ErrorTypeWorkflowExecutionAlreadyStartedFault = "com.amazonaws.swf.base.model#WorkflowExecutionAlreadyStartedFault" ErrorTypeStreamNotFound = "ResourceNotFoundException" )
constants for various SWF and Kinesis status and error codes.
const ( EventTypeWorkflowExecutionStarted = "WorkflowExecutionStarted" EventTypeWorkflowExecutionCancelRequested = "WorkflowExecutionCancelRequested" EventTypeWorkflowExecutionCompleted = "WorkflowExecutionCompleted" EventTypeCompleteWorkflowExecutionFailed = "CompleteWorkflowExecutionFailed" EventTypeWorkflowExecutionFailed = "WorkflowExecutionFailed" EventTypeFailWorkflowExecutionFailed = "FailWorkflowExecutionFailed" EventTypeWorkflowExecutionTimedOut = "WorkflowExecutionTimedOut" EventTypeWorkflowExecutionCanceled = "WorkflowExecutionCanceled" EventTypeCancelWorkflowExecutionFailed = "CancelWorkflowExecutionFailed" EventTypeWorkflowExecutionContinuedAsNew = "WorkflowExecutionContinuedAsNew" EventTypeContinueAsNewWorkflowExecutionFailed = "ContinueAsNewWorkflowExecutionFailed" EventTypeWorkflowExecutionTerminated = "WorkflowExecutionTerminated" EventTypeDecisionTaskScheduled = "DecisionTaskScheduled" EventTypeDecisionTaskStarted = "DecisionTaskStarted" EventTypeDecisionTaskCompleted = "DecisionTaskCompleted" EventTypeDecisionTaskTimedOut = "DecisionTaskTimedOut" EventTypeActivityTaskScheduled = "ActivityTaskScheduled" EventTypeScheduleActivityTaskFailed = "ScheduleActivityTaskFailed" EventTypeActivityTaskStarted = "ActivityTaskStarted" EventTypeActivityTaskCompleted = "ActivityTaskCompleted" EventTypeActivityTaskFailed = "ActivityTaskFailed" EventTypeActivityTaskTimedOut = "ActivityTaskTimedOut" EventTypeActivityTaskCanceled = "ActivityTaskCanceled" EventTypeActivityTaskCancelRequested = "ActivityTaskCancelRequested" EventTypeRequestCancelActivityTaskFailed = "RequestCancelActivityTaskFailed" EventTypeWorkflowExecutionSignaled = "WorkflowExecutionSignaled" EventTypeMarkerRecorded = "MarkerRecorded" EventTypeRecordMarkerFailed = "RecordMarkerFailed" EventTypeTimerStarted = "TimerStarted" EventTypeStartTimerFailed = "StartTimerFailed" EventTypeTimerFired = "TimerFired" EventTypeTimerCanceled = "TimerCanceled" EventTypeCancelTimerFailed = "CancelTimerFailed" EventTypeStartChildWorkflowExecutionInitiated = "StartChildWorkflowExecutionInitiated" EventTypeStartChildWorkflowExecutionFailed = "StartChildWorkflowExecutionFailed" EventTypeChildWorkflowExecutionStarted = "ChildWorkflowExecutionStarted" EventTypeChildWorkflowExecutionCompleted = "ChildWorkflowExecutionCompleted" EventTypeChildWorkflowExecutionFailed = "ChildWorkflowExecutionFailed" EventTypeChildWorkflowExecutionTimedOut = "ChildWorkflowExecutionTimedOut" EventTypeChildWorkflowExecutionCanceled = "ChildWorkflowExecutionCanceled" EventTypeChildWorkflowExecutionTerminated = "ChildWorkflowExecutionTerminated" EventTypeSignalExternalWorkflowExecutionInitiated = "SignalExternalWorkflowExecutionInitiated" EventTypeSignalExternalWorkflowExecutionFailed = "SignalExternalWorkflowExecutionFailed" EventTypeExternalWorkflowExecutionSignaled = "ExternalWorkflowExecutionSignaled" EventTypeRequestCancelExternalWorkflowExecutionInitiated = "RequestCancelExternalWorkflowExecutionInitiated" EventTypeRequestCancelExternalWorkflowExecutionFailed = "RequestCancelExternalWorkflowExecutionFailed" EventTypeExternalWorkflowExecutionCancelRequested = "ExternalWorkflowExecutionCancelRequested" )
Valid values for the field EventType in the HistoryEvent struct.
const ( DecisionTypeScheduleActivityTask = "ScheduleActivityTask" DecisionTypeRequestCancelActivityTask = "RequestCancelActivityTask" DecisionTypeCompleteWorkflowExecution = "CompleteWorkflowExecution" DecisionTypeFailWorkflowExecution = "FailWorkflowExecution" DecisionTypeCancelWorkflowExecution = "CancelWorkflowExecution" DecisionTypeContinueAsNewWorkflowExecution = "ContinueAsNewWorkflowExecution" DecisionTypeRecordMarker = "RecordMarker" DecisionTypeStartTimer = "StartTimer" DecisionTypeCancelTimer = "CancelTimer" DecisionTypeSignalExternalWorkflowExecution = "SignalExternalWorkflowExecution" DecisionTypeRequestCancelExternalWorkflowExecution = "RequestCancelExternalWorkflowExecution" DecisionTypeStartChildWorkflowExecution = "StartChildWorkflowExecution" )
Valid values for the field DecisionType in the Decision struct.
Variables ¶
var ( // USEast1 is the AWS us-east-1 Region USEast1 = &Region{"us-east-1", "https://swf.us-east-1.amazonaws.com", "https://kinesis.us-east-1.amazonaws.com"} // USWest1 is the AWS us-west-1 Region USWest1 = &Region{"us-west-1", "https://swf.us-west-1.amazonaws.com", "https://kinesis.us-west-1.amazonaws.com"} // USWest2 is the AWS us-west-2 Region USWest2 = &Region{"us-west-2", "https://swf.us-west-2.amazonaws.com", "https://kinesis.us-west-2.amazonaws.com"} // EUWest1 is the AWS eu-west-1 Region EUWest1 = &Region{"eu-west-1", "https://swf.eu-west-1.amazonaws.com", "https://kinesis.eu-west-1.amazonaws.com"} // APNorthEast1 is the AWS ap-northeast-1 Region APNorthEast1 = &Region{"ap-northeast-1", "https://swf.ap-northeast-1.amazonaws.com", "https://kinesis.ap-northeast-1.amazonaws.com"} // APSouthEast1 is the AWS ap-southeast-1 Region APSouthEast1 = &Region{"ap-southeast-1", "https://swf.ap-southeast-1.amazonaws.com", "https://kinesis.ap-southeast-1.amazonaws.com"} )
Functions ¶
func ContinueFSMWorkflowInput ¶
func ContinueFSMWorkflowInput(ctx *FSMContext, data interface{}) string
ContinueFSMWorkflowInput should be used to construct the input for any ContinueAsNewWorkflowExecution decisions.
func DecisionTypes ¶
func DecisionTypes() []string
DecisionTypes returns a slice containing the valid values of the Decision.DecisionType field.
func EventTypes ¶
func EventTypes() []string
EventTypes returns a slice containing the valid values of the HistoryEvent.EventType field.
func MustGetenv ¶
MustGetenv reads an environment variable, and panics if its not defined.
func StartFSMWorkflowInput ¶
func StartFSMWorkflowInput(serializer StateSerializer, data interface{}) string
StartFSMWorkflowInput should be used to construct the input for any StartWorkflowExecutionRequests. This panics on errors cause really this should never err.
Types ¶
type ActivityCorrelator ¶
type ActivityCorrelator struct {
Activities map[string]*ActivityInfo
}
ActivityCorrelator is a serialization-friendly struct that can be used as a field in your main StateData struct in an FSM. You can use it to track the type of a given activity, so you know how to react when an event that signals the end or an activity hits your Decider. This is missing from the SWF api.
func (*ActivityCorrelator) ActivityType ¶
func (a *ActivityCorrelator) ActivityType(h HistoryEvent) *ActivityInfo
ActivityType returns the ActivityType that is correlates with a given event. The HistoryEvent is expected to be of type EventTypeActivityTaskCompleted,EventTypeActivityTaskFailed,EventTypeActivityTaskTimedOut.
func (*ActivityCorrelator) Correlate ¶
func (a *ActivityCorrelator) Correlate(h HistoryEvent)
Correlate establishes a mapping of eventId to ActivityType. The HistoryEvent is expected to be of type EventTypeActivityTaskScheduled.
func (*ActivityCorrelator) RemoveCorrelation ¶
func (a *ActivityCorrelator) RemoveCorrelation(h HistoryEvent)
RemoveCorrelation gcs a mapping of eventId to ActivityType. The HistoryEvent is expected to be of type EventTypeActivityTaskCompleted,EventTypeActivityTaskFailed,EventTypeActivityTaskTimedOut.
func (*ActivityCorrelator) Track ¶
func (a *ActivityCorrelator) Track(h HistoryEvent)
Track will add or remove entries based on the EventType. A new entry is added when there is a new ActivityTask, or an entry is removed when the ActivityTask is terminating.
type ActivityInfo ¶
type ActivityInfo struct {
ActivityID string
ActivityType
}
ActivityInfo holds the ActivityID and ActivityType for an activity
type ActivityTaskCancelRequestedEventAttributes ¶
type ActivityTaskCancelRequestedEventAttributes struct {
ActivityID string `json:"activityId"`
DecisionTaskCompletedEventID int `json:"decisionTaskCompletedEventId"`
}
ActivityTaskCancelRequestedEventAttributes models the swf json protocol.
type ActivityTaskCanceledEventAttributes ¶
type ActivityTaskCanceledEventAttributes struct {
Details string `json:"details"`
LatestCancelRequestedEventID int `json:"latestCancelRequestedEventId"`
ScheduledEventID int `json:"scheduledEventId"`
StartedEventID int `json:"startedEventId"`
}
ActivityTaskCanceledEventAttributes models the swf json protocol.
type ActivityTaskCompletedEventAttributes ¶
type ActivityTaskCompletedEventAttributes struct {
Result string `json:"result"`
ScheduledEventID int `json:"scheduledEventId"`
StartedEventID int `json:"startedEventId"`
}
ActivityTaskCompletedEventAttributes models the swf json protocol.
type ActivityTaskFailedEventAttributes ¶
type ActivityTaskFailedEventAttributes struct {
Details string `json:"details"`
Reason string `json:"reason"`
ScheduledEventID int `json:"scheduledEventId"`
StartedEventID int `json:"startedEventId"`
}
ActivityTaskFailedEventAttributes models the swf json protocol.
type ActivityTaskPoller ¶
type ActivityTaskPoller struct {
Identity string
Domain string
TaskList string
// contains filtered or unexported fields
}
ActivityTaskPoller polls a given task list in a domain for activity tasks, and sends tasks on its Tasks channel.
func NewActivityTaskPoller ¶
func NewActivityTaskPoller(awc ActivityWorkerClient, domain string, identity string, taskList string) *ActivityTaskPoller
NewActivityTaskPoller returns an ActivityTaskPoller.
func (*ActivityTaskPoller) Poll ¶
func (p *ActivityTaskPoller) Poll() (*PollForActivityTaskResponse, error)
Poll polls the task list for a task. If there is no task, nil is returned. If an error is encountered, no task is returned.
func (*ActivityTaskPoller) PollUntilShutdownBy ¶
func (p *ActivityTaskPoller) PollUntilShutdownBy(mgr *PollerShutdownManager, pollerName string, onTask func(*PollForActivityTaskResponse))
PollUntilShutdownBy will poll until signaled to shutdown by the PollerShutdownManager. this func blocks, so run it in a goroutine if necessary. The implementation calls Poll() and invokes the callback whenever a valid PollForActivityTaskResponse is received.
type ActivityTaskScheduledEventAttributes ¶
type ActivityTaskScheduledEventAttributes struct {
ActivityID string `json:"activityId"`
ActivityType ActivityType `json:"activityType"`
Control string `json:"control"`
DecisionTaskCompletedEventID int `json:"decisionTaskCompletedEventId"`
HeartbeatTimeout string `json:"heartbeatTimeout"`
Input string `json:"input"`
ScheduleToCloseTimeout string `json:"scheduleToCloseTimeout"`
ScheduleToStartTimeout string `json:"scheduleToStartTimeout"`
StartToCloseTimeout string `json:"startToCloseTimeout"`
TaskList TaskList `json:"taskList"`
}
ActivityTaskScheduledEventAttributes models the swf json protocol.
type ActivityTaskStartedEventAttributes ¶
type ActivityTaskStartedEventAttributes struct {
Identity string `json:"identity"`
ScheduledEventID int `json:"scheduledEventId"`
}
ActivityTaskStartedEventAttributes models the swf json protocol.
type ActivityTaskTimedOutEventAttributes ¶
type ActivityTaskTimedOutEventAttributes struct {
Details string `json:"details"`
ScheduledEventID int `json:"scheduledEventId"`
StartedEventID int `json:"startedEventId"`
TimeoutType string `json:"timeoutType"`
}
ActivityTaskTimedOutEventAttributes models the swf json protocol.
type ActivityType ¶
ActivityType models the swf json protocol.
type ActivityTypeConfiguration ¶
type ActivityTypeConfiguration struct {
DefaultTaskHeartbeatTimeout string `json:"defaultTaskHeartbeatTimeout"`
DefaultTaskList TaskList `json:"defaultTaskList"`
DefaultTaskScheduleToCloseTimeout string `json:"defaultTaskScheduleToCloseTimeout"`
DefaultTaskScheduleToStartTimeout string `json:"defaultTaskScheduleToStartTimeout"`
DefaultTaskStartToCloseTimeout string `json:"defaultTaskStartToCloseTimeout"`
}
ActivityTypeConfiguration models the swf json protocol.
type ActivityTypeInfo ¶
type ActivityTypeInfo struct {
CreationDate *Date `json:"creationDate"`
DeprecationDate *Date `json:"deprecationDate"`
Description string `json:"description"`
Status string `json:"status"`
ActivityType ActivityType `json:"activityType"`
}
ActivityTypeInfo models the swf json protocol.
type ActivityTypeMigrator ¶
type ActivityTypeMigrator struct {
RegisteredActivityTypes []RegisterActivityType
DeprecatedActivityTypes []DeprecateActivityType
Client WorkflowClient
}
ActivityTypeMigrator will register or deprecate the configured activity types as required.
func (*ActivityTypeMigrator) Migrate ¶
func (a *ActivityTypeMigrator) Migrate()
Migrate asserts that DeprecatedActivityTypes are deprecated or deprecates them, then asserts that RegisteredActivityTypes are registered or registers them.
type ActivityWorkerClient ¶
type ActivityWorkerClient interface {
PollForActivityTask(request PollForActivityTaskRequest) (*PollForActivityTaskResponse, error)
RecordActivityTaskHeartbeat(request RecordActivityTaskHeartbeatRequest) (*RecordActivityTaskHeartbeatResponse, error)
RespondActivityTaskCompleted(request RespondActivityTaskCompletedRequest) error
RespondActivityTaskFailed(request RespondActivityTaskFailedRequest) error
RespondActivityTaskCanceled(request RespondActivityTaskCanceledRequest) error
}
ActivityWorkerClient specifies swf client operations related to requesting and responding to activity tasks.
type CancelTimerDecisionAttributes ¶
type CancelTimerDecisionAttributes struct {
TimerID string `json:"timerId"`
}
CancelTimerDecisionAttributes models the swf json protocol.
type CancelTimerFailedEventAttributes ¶
type CancelTimerFailedEventAttributes struct {
Cause string `json:"cause"`
DecisionTaskCompletedEventID int `json:"decisionTaskCompletedEventId"`
TimerID string `json:"timerId"`
}
CancelTimerFailedEventAttributes models the swf json protocol.
type CancelWorkflowExecutionDecisionAttributes ¶
type CancelWorkflowExecutionDecisionAttributes struct {
Details string `json:"details"`
}
CancelWorkflowExecutionDecisionAttributes models the swf json protocol.
type CancelWorkflowExecutionFailedEventAttributes ¶
type CancelWorkflowExecutionFailedEventAttributes struct {
Cause string `json:"cause"`
DecisionTaskCompletedEventID int `json:"decisionTaskCompletedEventId"`
}
CancelWorkflowExecutionFailedEventAttributes models the swf json protocol.
type ChildWorkflowExecutionCanceledEventAttributes ¶
type ChildWorkflowExecutionCanceledEventAttributes struct {
Details string `json:"details"`
InitiatedEventID int `json:"initiatedEventId"`
StartedEventID int `json:"startedEventId"`
WorkflowExecution WorkflowExecution `json:"workflowExecution"`
WorkflowType WorkflowType `json:"workflowType"`
}
ChildWorkflowExecutionCanceledEventAttributes models the swf json protocol.
type ChildWorkflowExecutionCompletedEventAttributes ¶
type ChildWorkflowExecutionCompletedEventAttributes struct {
InitiatedEventID int `json:"initiatedEventId"`
Result string `json:"result"`
StartedEventID int `json:"startedEventId"`
WorkflowExecution WorkflowExecution `json:"workflowExecution"`
WorkflowType WorkflowType `json:"workflowType"`
}
ChildWorkflowExecutionCompletedEventAttributes models the swf json protocol.
type ChildWorkflowExecutionFailedEventAttributes ¶
type ChildWorkflowExecutionFailedEventAttributes struct {
Details string `json:"details"`
InitiatedEventID int `json:"initiatedEventId"`
Reason string `json:"reason"`
StartedEventID int `json:"startedEventId"`
WorkflowExecution WorkflowExecution `json:"workflowExecution"`
WorkflowType WorkflowType `json:"workflowType"`
}
ChildWorkflowExecutionFailedEventAttributes models the swf json protocol.
type ChildWorkflowExecutionStartedEventAttributes ¶
type ChildWorkflowExecutionStartedEventAttributes struct {
InitiatedEventID int `json:"initiatedEventId"`
WorkflowExecution WorkflowExecution `json:"workflowExecution"`
WorkflowType WorkflowType `json:"workflowType"`
}
ChildWorkflowExecutionStartedEventAttributes models the swf json protocol.
type ChildWorkflowExecutionTerminatedEventAttributes ¶
type ChildWorkflowExecutionTerminatedEventAttributes struct {
InitiatedEventID int `json:"initiatedEventId"`
StartedEventID int `json:"startedEventId"`
WorkflowExecution WorkflowExecution `json:"workflowExecution"`
WorkflowType WorkflowType `json:"workflowType"`
}
ChildWorkflowExecutionTerminatedEventAttributes models the swf json protocol.
type ChildWorkflowExecutionTimedOutEventAttributes ¶
type ChildWorkflowExecutionTimedOutEventAttributes struct {
InitiatedEventID int `json:"initiatedEventId"`
StartedEventID int `json:"startedEventId"`
TimeoutType string `json:"timeoutType"`
WorkflowExecution WorkflowExecution `json:"workflowExecution"`
WorkflowType WorkflowType `json:"workflowType"`
}
ChildWorkflowExecutionTimedOutEventAttributes models the swf json protocol.
type Client ¶
Client is the implementation of the WorkflowClient, DecisionWorkerClient, ActivityWorkerClient, WorkflowAdminClient, and WorkflowInfoClient interfaces.
func NewClient ¶
NewClient creates a new Client which uses the given credentials to talk to the given region with http.DefaultClient.
func NewClientWithHTTPAndPollingClient ¶
func NewClientWithHTTPAndPollingClient(key string, secret string, region *Region, client *http.Client, pollingClient *http.Client) *Client
NewClientWithHTTPAndPollingClient creates a new Client which uses the given credentials to talk to the given region with the specified http.Client, and does polling calls with the configured polling client
func NewClientWithHTTPClient ¶
func NewClientWithHTTPClient(key string, secret string, region *Region, client *http.Client) *Client
NewClientWithHTTPClient creates a new Client which uses the given credentials to talk to the given region with the specified http.Client.
func (*Client) CountClosedWorkflowExecutions ¶
func (c *Client) CountClosedWorkflowExecutions(request CountClosedWorkflowExecutionsRequest) (*CountResponse, error)
CountClosedWorkflowExecutions executes http://docs.aws.amazon.com/amazonswf/latest/apireference/API_CountClosedWorkflowExecutions.html
func (*Client) CountOpenWorkflowExecutions ¶
func (c *Client) CountOpenWorkflowExecutions(request CountOpenWorkflowExecutionsRequest) (*CountResponse, error)
CountOpenWorkflowExecutions executes http://docs.aws.amazon.com/amazonswf/latest/apireference/API_CountOpenWorkflowExecutions.html
func (*Client) CountPendingActivityTasks ¶
func (c *Client) CountPendingActivityTasks(request CountPendingActivityTasksRequest) (*CountResponse, error)
CountPendingActivityTasks executes http://docs.aws.amazon.com/amazonswf/latest/apireference/API_CountPendingActivityTasks.html
func (*Client) CountPendingDecisionTasks ¶
func (c *Client) CountPendingDecisionTasks(request CountPendingDecisionTasksRequest) (*CountResponse, error)
CountPendingDecisionTasks executes http://docs.aws.amazon.com/amazonswf/latest/apireference/API_CountPendingDecisionTasks.html
func (*Client) CreateStream ¶
func (c *Client) CreateStream(request CreateStream) error
CreateStream executes http://docs.aws.amazon.com/kinesis/latest/APIReference/API_CreateStream.html
func (*Client) DeprecateActivityType ¶
func (c *Client) DeprecateActivityType(request DeprecateActivityType) error
DeprecateActivityType executes http://docs.aws.amazon.com/amazonswf/latest/apireference/API_DeprecateActivityType.html
func (*Client) DeprecateDomain ¶
func (c *Client) DeprecateDomain(request DeprecateDomain) error
DeprecateDomain executes http://docs.aws.amazon.com/amazonswf/latest/apireference/API_DeprecateDomain.html
func (*Client) DeprecateWorkflowType ¶
func (c *Client) DeprecateWorkflowType(request DeprecateWorkflowType) error
DeprecateWorkflowType executes http://docs.aws.amazon.com/amazonswf/latest/apireference/API_DeprecateWorkflowType.html
func (*Client) DescribeActivityType ¶
func (c *Client) DescribeActivityType(request DescribeActivityTypeRequest) (*DescribeActivityTypeResponse, error)
DescribeActivityType executes http://docs.aws.amazon.com/amazonswf/latest/apireference/API_DescribeActivityType.html
func (*Client) DescribeDomain ¶
func (c *Client) DescribeDomain(request DescribeDomainRequest) (*DescribeDomainResponse, error)
DescribeDomain executes http://docs.aws.amazon.com/amazonswf/latest/apireference/API_DescribeDomain.html
func (*Client) DescribeStream ¶
func (c *Client) DescribeStream(request DescribeStreamRequest) (*DescribeStreamResponse, error)
DescribeStream executes http://docs.aws.amazon.com/kinesis/latest/APIReference/API_DescribeStream.html
func (*Client) DescribeWorkflowExecution ¶
func (c *Client) DescribeWorkflowExecution(request DescribeWorkflowExecutionRequest) (*DescribeWorkflowExecutionResponse, error)
DescribeWorkflowExecution executes http://docs.aws.amazon.com/amazonswf/latest/apireference/API_DescribeWorkflowExecution.html
func (*Client) DescribeWorkflowType ¶
func (c *Client) DescribeWorkflowType(request DescribeWorkflowTypeRequest) (*DescribeWorkflowTypeResponse, error)
DescribeWorkflowType executes http://docs.aws.amazon.com/amazonswf/latest/apireference/API_DescribeWorkflowType.html
func (*Client) GetRecords ¶
func (c *Client) GetRecords(request GetRecordsRequest) (*GetRecordsResponse, error)
GetRecords executes http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html
func (*Client) GetShardIterator ¶
func (c *Client) GetShardIterator(request GetShardIteratorRequest) (*GetShardIteratorResponse, error)
GetShardIterator executes http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html
func (*Client) GetWorkflowExecutionHistory ¶
func (c *Client) GetWorkflowExecutionHistory(request GetWorkflowExecutionHistoryRequest) (*GetWorkflowExecutionHistoryResponse, error)
GetWorkflowExecutionHistory executes http://docs.aws.amazon.com/amazonswf/latest/apireference/API_GetWorkflowExecutionHistory.html
func (*Client) ListActivityTypes ¶
func (c *Client) ListActivityTypes(request ListActivityTypesRequest) (*ListActivityTypesResponse, error)
ListActivityTypes executes http://docs.aws.amazon.com/amazonswf/latest/apireference/API_ListActivityTypes.html
func (*Client) ListClosedWorkflowExecutions ¶
func (c *Client) ListClosedWorkflowExecutions(request ListClosedWorkflowExecutionsRequest) (*ListClosedWorkflowExecutionsResponse, error)
ListClosedWorkflowExecutions executes http://docs.aws.amazon.com/amazonswf/latest/apireference/API_ListClosedWorkflowExecutions.html
func (*Client) ListDomains ¶
func (c *Client) ListDomains(request ListDomainsRequest) (*ListDomainsResponse, error)
ListDomains executes http://docs.aws.amazon.com/amazonswf/latest/apireference/API_ListDomains.html
func (*Client) ListOpenWorkflowExecutions ¶
func (c *Client) ListOpenWorkflowExecutions(request ListOpenWorkflowExecutionsRequest) (*ListOpenWorkflowExecutionsResponse, error)
ListOpenWorkflowExecutions executes http://docs.aws.amazon.com/amazonswf/latest/apireference/API_ListOpenWorkflowExecutions.html
func (*Client) ListWorkflowTypes ¶
func (c *Client) ListWorkflowTypes(request ListWorkflowTypesRequest) (*ListWorkflowTypesResponse, error)
ListWorkflowTypes executes http://docs.aws.amazon.com/amazonswf/latest/apireference/API_ListWorkflowTypes.html
func (*Client) PollForActivityTask ¶
func (c *Client) PollForActivityTask(request PollForActivityTaskRequest) (*PollForActivityTaskResponse, error)
PollForActivityTask executes http://docs.aws.amazon.com/amazonswf/latest/apireference/API_PollForActivityTask.html
func (*Client) PollForDecisionTask ¶
func (c *Client) PollForDecisionTask(request PollForDecisionTaskRequest) (*PollForDecisionTaskResponse, error)
PollForDecisionTask executes http://docs.aws.amazon.com/amazonswf/latest/apireference/API_PollForDecisionTask.html
func (*Client) PutRecord ¶
func (c *Client) PutRecord(request PutRecordRequest) (*PutRecordResponse, error)
PutRecord executes http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html
func (*Client) RecordActivityTaskHeartbeat ¶
func (c *Client) RecordActivityTaskHeartbeat(request RecordActivityTaskHeartbeatRequest) (*RecordActivityTaskHeartbeatResponse, error)
RecordActivityTaskHeartbeat executes http://docs.aws.amazon.com/amazonswf/latest/apireference/API_RecordActivityTaskHeartbeat.html
func (*Client) RegisterActivityType ¶
func (c *Client) RegisterActivityType(request RegisterActivityType) error
RegisterActivityType executes http://docs.aws.amazon.com/amazonswf/latest/apireference/API_RegisterActivityType.html
func (*Client) RegisterDomain ¶
func (c *Client) RegisterDomain(request RegisterDomain) error
RegisterDomain executes http://docs.aws.amazon.com/amazonswf/latest/apireference/API_RegisterDomain.html
func (*Client) RegisterWorkflowType ¶
func (c *Client) RegisterWorkflowType(request RegisterWorkflowType) error
RegisterWorkflowType executes http://docs.aws.amazon.com/amazonswf/latest/apireference/API_RegisterWorkflowType.html
func (*Client) RequestCancelWorkflowExecution ¶
func (c *Client) RequestCancelWorkflowExecution(request RequestCancelWorkflowExecution) error
RequestCancelWorkflowExecution executes http://docs.aws.amazon.com/amazonswf/latest/apireference/API_RequestCancelWorkflowExecution.html
func (*Client) RespondActivityTaskCanceled ¶
func (c *Client) RespondActivityTaskCanceled(request RespondActivityTaskCanceledRequest) error
RespondActivityTaskCanceled executes http://docs.aws.amazon.com/amazonswf/latest/apireference/API_RespondActivityTaskCanceled.html
func (*Client) RespondActivityTaskCompleted ¶
func (c *Client) RespondActivityTaskCompleted(request RespondActivityTaskCompletedRequest) error
RespondActivityTaskCompleted executes http://docs.aws.amazon.com/amazonswf/latest/apireference/API_RespondActivityTaskCompleted.html
func (*Client) RespondActivityTaskFailed ¶
func (c *Client) RespondActivityTaskFailed(request RespondActivityTaskFailedRequest) error
RespondActivityTaskFailed executes http://docs.aws.amazon.com/amazonswf/latest/apireference/API_RespondActivityTaskFailed.html
func (*Client) RespondDecisionTaskCompleted ¶
func (c *Client) RespondDecisionTaskCompleted(request RespondDecisionTaskCompletedRequest) error
RespondDecisionTaskCompleted executes http://docs.aws.amazon.com/amazonswf/latest/apireference/API_RespondDecisionTaskCompleted.html
func (*Client) SignalWorkflow ¶
func (c *Client) SignalWorkflow(request SignalWorkflowRequest) error
SignalWorkflow executes http://docs.aws.amazon.com/amazonswf/latest/apireference/API_SignalWorkflowExecution.html
func (*Client) StartWorkflow ¶
func (c *Client) StartWorkflow(request StartWorkflowRequest) (*StartWorkflowResponse, error)
StartWorkflow executes http://docs.aws.amazon.com/amazonswf/latest/apireference/API_StartWorkflowExecution.html
func (*Client) TerminateWorkflowExecution ¶
func (c *Client) TerminateWorkflowExecution(request TerminateWorkflowExecution) error
TerminateWorkflowExecution executes http://docs.aws.amazon.com/amazonswf/latest/apireference/API_TerminateWorkflowExecution.html
type CompleteOutcome ¶
type CompleteOutcome struct {
// contains filtered or unexported fields
}
CompleteOutcome will send a CompleteWorkflowExecutionDecision, and transition to a 'managed' fsm state that will respond to any further events by attempting to Complete the workflow. This can happen only if there were unhandled decisions
func (CompleteOutcome) Data ¶
func (t CompleteOutcome) Data() interface{}
Data returns the data for this Outcome.
func (CompleteOutcome) Decisions ¶
func (t CompleteOutcome) Decisions() []Decision
Decisions returns the list of Decisions for this Outcome.
func (CompleteOutcome) State ¶
func (t CompleteOutcome) State() string
State returns the next state for the CompleteOutcome, which is always "complete".
type CompleteWorkflowExecutionDecisionAttributes ¶
type CompleteWorkflowExecutionDecisionAttributes struct {
Result string `json:"result"`
}
CompleteWorkflowExecutionDecisionAttributes models the swf json protocol.
type CompleteWorkflowExecutionFailedEventAttributes ¶
type CompleteWorkflowExecutionFailedEventAttributes struct {
Cause string `json:"cause"`
DecisionTaskCompletedEventID int `json:"decisionTaskCompletedEventId"`
}
CompleteWorkflowExecutionFailedEventAttributes models the swf json protocol.
type ComposedDecider ¶
type ComposedDecider struct {
// contains filtered or unexported fields
}
ComposedDecider can be used to build a decider out of a number of sub Deciders the sub deciders should return Pass when they dont wish to handle an event.
Example ¶
//to reduce boilerplate you can create reusable components to compose Deciders with,
//that use functions that have the dataType of your FSM.
typedFuncs := Typed(new(TestingType))
//for example. reduced boilerplate for the retry of failed activities.
//first, you would have one of these typed DecisionFuncs for each activity decision type you create.
fooActivityDecision := func(ctx *FSMContext, h HistoryEvent, data *TestingType) Decision {
return Decision{
DecisionType: DecisionTypeScheduleActivityTask,
ScheduleActivityTaskDecisionAttributes: &ScheduleActivityTaskDecisionAttributes{
ActivityType: ActivityType{Name: "foo-activity", Version: "1"},
},
}
}
barActivityDecision := func(ctx *FSMContext, h HistoryEvent, data *TestingType) Decision {
return Decision{
DecisionType: DecisionTypeScheduleActivityTask,
ScheduleActivityTaskDecisionAttributes: &ScheduleActivityTaskDecisionAttributes{
ActivityType: ActivityType{Name: "bar-activity", Version: "1"},
},
}
}
// optionally a type alias for your 'typed' decision fn.
// if you dont do this the retryFailedActivities below will need to be
// func(activityName string, activityFn interface{})
// instead of
// func(activityName string, activityFn TestingTypeDecisionFunc)
type TestingTypeDecisionFunc func(*FSMContext, HistoryEvent, *TestingType) Decision
//now the retryFailedActivities function, which can be used for all activity funcs like the above.
retryFailedActivities := func(activityName string, activityFn TestingTypeDecisionFunc) Decider {
typedDecisionFn := typedFuncs.DecisionFunc(activityFn)
return func(ctx *FSMContext, h HistoryEvent, data interface{}) Outcome {
switch h.EventType {
case EventTypeActivityTaskFailed, EventTypeActivityTaskTimedOut, EventTypeActivityTaskCanceled:
if ctx.ActivityInfo(h).Name == activityName {
decisions := ctx.EmptyDecisions()
retry := typedDecisionFn(ctx, h, data)
decisions = append(decisions, retry)
return ctx.Stay(data, decisions)
}
}
return Pass
}
}
//now build a decider out of the parts.
//the one thing you need to be careful of is having a unit test that executes the following
//since the type checking can only be done at initialization at runtime here.
decider := NewComposedDecider(
retryFailedActivities("foo-activity", fooActivityDecision),
retryFailedActivities("bar-activity", barActivityDecision),
DefaultDecider(),
)
decider(new(FSMContext), HistoryEvent{}, new(TestData))
func (*ComposedDecider) Decide ¶
func (c *ComposedDecider) Decide(ctx *FSMContext, h HistoryEvent, data interface{}) Outcome
Decide is the the Decider func for a ComposedDecider
type ContinueAsNewWorkflowExecutionDecisionAttributes ¶
type ContinueAsNewWorkflowExecutionDecisionAttributes struct {
ChildPolicy string `json:"childPolicy,omitempty"`
ExecutionStartToCloseTimeout string `json:"executionStartToCloseTimeout,omitempty"`
Input string `json:"input"`
TagList []string `json:"tagList"`
TaskList *TaskList `json:"taskList,omitempty"`
TaskStartToCloseTimeout string `json:"taskStartToCloseTimeout,omitempty"`
WorkflowTypeVersion string `json:"workflowTypeVersion,omitempty"`
}
ContinueAsNewWorkflowExecutionDecisionAttributes models the swf json protocol.
type ContinueAsNewWorkflowExecutionFailedEventAttributes ¶
type ContinueAsNewWorkflowExecutionFailedEventAttributes struct {
Cause string `json:"cause"`
DecisionTaskCompletedEventID int `json:"decisionTaskCompletedEventId"`
}
ContinueAsNewWorkflowExecutionFailedEventAttributes models the swf json protocol.
type ContinueOutcome ¶
type ContinueOutcome struct {
// contains filtered or unexported fields
}
ContinueOutcome is an Outcome used to contribute decisions and data to a composed Decider.
func (ContinueOutcome) Data ¶
func (s ContinueOutcome) Data() interface{}
Data returns the data for this Outcome.
func (ContinueOutcome) Decisions ¶
func (s ContinueOutcome) Decisions() []Decision
Decisions returns the list of Decisions for this Outcome.
func (ContinueOutcome) State ¶
func (s ContinueOutcome) State() string
State returns the next state for the ContinueOutcome, which is always empty.
type CountClosedWorkflowExecutionsRequest ¶
type CountClosedWorkflowExecutionsRequest struct {
CloseStatusFilter *StatusFilter `json:"closeStatusFilter,omitempty"`
CloseTimeFilter *TimeFilter `json:"closeTimeFilter,omitempty"`
Domain string `json:"domain"`
ExecutionFilter *ExecutionFilter `json:"executionFilter,omitempty"`
StartTimeFilter *TimeFilter `json:"startTimeFilter,omitempty"`
TagFilter *TagFilter `json:"tagFilter,omitempty"`
TypeFilter *TypeFilter `json:"typeFilter,omitempty"`
}
CountClosedWorkflowExecutionsRequest models the swf json protocol.
type CountOpenWorkflowExecutionsRequest ¶
type CountOpenWorkflowExecutionsRequest struct {
Domain string `json:"domain"`
ExecutionFilter *ExecutionFilter `json:"executionFilter,omitempty"`
StartTimeFilter TimeFilter `json:"startTimeFilter,omitempty"`
TagFilter *TagFilter `json:"tagFilter,omitempty"`
TypeFilter *TypeFilter `json:"typeFilter,omitempty"`
}
CountOpenWorkflowExecutionsRequest models the swf json protocol.
type CountPendingActivityTasksRequest ¶
type CountPendingActivityTasksRequest struct {
Domain string `json:"domain"`
TaskList TaskList `json:"taskList"`
}
CountPendingActivityTasksRequest models the swf json protocol.
type CountPendingDecisionTasksRequest ¶
type CountPendingDecisionTasksRequest struct {
Domain string `json:"domain"`
TaskList TaskList `json:"taskList"`
}
CountPendingDecisionTasksRequest models the swf json protocol.
type CountResponse ¶
CountResponse models the swf json protocol.
type CreateStream ¶
CreateStream models the kinesis json protocol.
type Date ¶
Date is a wrapper struct around time.Time that provides json de/serialization in swf's expected format.
func (*Date) MarshalJSON ¶
MarshalJSON formats a time.Time into swf's expected format.
func (*Date) UnmarshalJSON ¶
UnmarshalJSON parses the swf representation of time.
type Decider ¶
type Decider func(*FSMContext, HistoryEvent, interface{}) Outcome
Decider decides an Outcome based on an event and the current data for an FSM. You can assert the interface{} parameter that is passed to the Decider as the type of the DataType field in the FSM. Alternatively, you can use the TypedDecider to avoid having to do the assertion.
func DefaultDecider ¶
func DefaultDecider() Decider
DefaultDecider is a 'catch-all' decider that simply logs the unhandled decision. You should place this or one like it as the last decider in your top level ComposableDecider.
func ManagedContinuations ¶
ManagedContinuations is a composable decider that will handle most of the mechanics of autmoatically continuing workflows. todo, does it ever happen that we would have a decision task that previous deciders would have created a decision that breaks continuation? i.e. you get decision task that has a history containing 2 signals, etc. lets assume not till we find otherwise. If we are wrong, managedcontinuations probably cant happen in userspace. If there are no activities present in the tracker, it will continueAsNew the workflow in response to a FSM.ContinueWorkflow timer or signal. If there are activities present in the tracker, it will set a new FSM.ContinueWorkflow timer, that fires in timerRetrySeconds. It will also signal the workflow to continue when the workflow history grows beyond the configured historySize. this should be last in your decider stack, as it will signal in response to *any* event that has an id > historySize
func NewComposedDecider ¶
NewComposedDecider builds a Composed Decider from a list of sub Deciders. You can compose your fiinal composable decider from other composable deciders, but you should make sure that the final decider includes a 'catch-all' decider in last place you can use DefaultDecider() or your own.
type Decision ¶
type Decision struct {
CancelTimerDecisionAttributes *CancelTimerDecisionAttributes `json:"cancelTimerDecisionAttributes,omitempty"`
CancelWorkflowExecutionDecisionAttributes *CancelWorkflowExecutionDecisionAttributes `json:"cancelWorkflowExecutionDecisionAttributes,omitempty"`
CompleteWorkflowExecutionDecisionAttributes *CompleteWorkflowExecutionDecisionAttributes `json:"completeWorkflowExecutionDecisionAttributes,omitempty"`
ContinueAsNewWorkflowExecutionDecisionAttributes *ContinueAsNewWorkflowExecutionDecisionAttributes `json:"continueAsNewWorkflowExecutionDecisionAttributes,omitempty"`
DecisionType string `json:"decisionType"`
FailWorkflowExecutionDecisionAttributes *FailWorkflowExecutionDecisionAttributes `json:"failWorkflowExecutionDecisionAttributes,omitempty"`
RecordMarkerDecisionAttributes *RecordMarkerDecisionAttributes `json:"recordMarkerDecisionAttributes,omitempty"`
RequestCancelActivityTaskDecisionAttributes *RequestCancelActivityTaskDecisionAttributes `json:"requestCancelActivityTaskDecisionAttributes,omitempty"`
RequestCancelExternalWorkflowExecutionDecisionAttributes *RequestCancelExternalWorkflowExecutionDecisionAttributes `json:"requestCancelExternalWorkflowExecutionDecisionAttributes,omitempty"`
ScheduleActivityTaskDecisionAttributes *ScheduleActivityTaskDecisionAttributes `json:"scheduleActivityTaskDecisionAttributes,omitempty"`
SignalExternalWorkflowExecutionDecisionAttributes *SignalExternalWorkflowExecutionDecisionAttributes `json:"signalExternalWorkflowExecutionDecisionAttributes,omitempty"`
StartChildWorkflowExecutionDecisionAttributes *StartChildWorkflowExecutionDecisionAttributes `json:"startChildWorkflowExecutionDecisionAttributes,omitempty"`
StartTimerDecisionAttributes *StartTimerDecisionAttributes `json:"startTimerDecisionAttributes,omitempty"`
}
Decision models the swf json protocol.
type DecisionFunc ¶
type DecisionFunc func(ctx *FSMContext, h HistoryEvent, data interface{}) Decision
DecisionFunc is a building block for composable deciders that returns a decision.
type DecisionTaskCompletedEventAttributes ¶
type DecisionTaskCompletedEventAttributes struct {
ExecutionContext string `json:"executionContext"`
ScheduledEventID int `json:"scheduledEventId"`
StartedEventID int `json:"startedEventId"`
}
DecisionTaskCompletedEventAttributes models the swf json protocol.
type DecisionTaskPoller ¶
type DecisionTaskPoller struct {
Identity string
Domain string
TaskList string
// contains filtered or unexported fields
}
DecisionTaskPoller polls a given task list in a domain for decision tasks.
func NewDecisionTaskPoller ¶
func NewDecisionTaskPoller(dwc DecisionWorkerClient, domain string, identity string, taskList string) *DecisionTaskPoller
NewDecisionTaskPoller returns a DecisionTaskPoller whick can be used to poll the given task list.
func (*DecisionTaskPoller) Poll ¶
func (p *DecisionTaskPoller) Poll() (*PollForDecisionTaskResponse, error)
Poll polls the task list for a task. If there is no task available, nil is returned. If an error is encountered, no task is returned.
func (*DecisionTaskPoller) PollUntilShutdownBy ¶
func (p *DecisionTaskPoller) PollUntilShutdownBy(mgr *PollerShutdownManager, pollerName string, onTask func(*PollForDecisionTaskResponse))
PollUntilShutdownBy will poll until signaled to shutdown by the PollerShutdownManager. this func blocks, so run it in a goroutine if necessary. The implementation calls Poll() and invokes the callback whenever a valid PollForDecisionTaskResponse is received.
type DecisionTaskScheduledEventAttributes ¶
type DecisionTaskScheduledEventAttributes struct {
StartToCloseTimeout string `json:"startToCloseTimeout"`
TaskList TaskList `json:"taskList"`
}
DecisionTaskScheduledEventAttributes models the swf json protocol.
type DecisionTaskStartedEventAttributes ¶
type DecisionTaskStartedEventAttributes struct {
Identity string `json:"identity"`
ScheduledEventID int `json:"scheduledEventId"`
}
DecisionTaskStartedEventAttributes models the swf json protocol.
type DecisionTaskTimedOutEventAttributes ¶
type DecisionTaskTimedOutEventAttributes struct {
ScheduledEventID int `json:"scheduledEventId"`
StartedEventID int `json:"startedEventId"`
TimeoutType string `json:"timeoutType"`
}
DecisionTaskTimedOutEventAttributes models the swf json protocol.
type DecisionWorkerClient ¶
type DecisionWorkerClient interface {
PollForDecisionTask(request PollForDecisionTaskRequest) (*PollForDecisionTaskResponse, error)
RespondDecisionTaskCompleted(request RespondDecisionTaskCompletedRequest) error
}
DecisionWorkerClient specifies swf client operations related to requesting and responding to decision tasks.
type DeprecateActivityType ¶
type DeprecateActivityType struct {
ActivityType ActivityType `json:"activityType"`
Domain string `json:"domain"`
}
DeprecateActivityType models the swf json protocol.
type DeprecateDomain ¶
type DeprecateDomain struct {
Name string `json:"name"`
}
DeprecateDomain models the swf json protocol.
type DeprecateWorkflowType ¶
type DeprecateWorkflowType struct {
Domain string `json:"domain"`
WorkflowType WorkflowType `json:"workflowType"`
}
DeprecateWorkflowType models the swf json protocol.
type DescribeActivityTypeRequest ¶
type DescribeActivityTypeRequest struct {
ActivityType ActivityType `json:"activityType"`
Domain string `json:"domain"`
}
DescribeActivityTypeRequest models the swf json protocol.
type DescribeActivityTypeResponse ¶
type DescribeActivityTypeResponse struct {
Configuration ActivityTypeConfiguration `json:"configuration"`
TypeInfo ActivityTypeInfo `json:"typeInfo"`
}
DescribeActivityTypeResponse models the swf json protocol.
type DescribeDomainRequest ¶
type DescribeDomainRequest struct {
Name string `json:"name"`
}
DescribeDomainRequest models the swf json protocol.
type DescribeDomainResponse ¶
type DescribeDomainResponse struct {
Configuration DomainConfiguration `json:"configuration"`
DomainInfo DomainInfo `json:"domainInfo"`
}
DescribeDomainResponse models the swf json protocol.
type DescribeStreamRequest ¶
type DescribeStreamRequest struct {
ExclusiveStartShardID *string `json:"ExclusiveStartShardId"`
Limit *int
StreamName string
}
DescribeStreamRequest models the kinesis json protocol.
type DescribeStreamResponse ¶
type DescribeStreamResponse struct {
StreamDescription struct {
HasMoreShards bool
Shards []struct {
AdjacentParentShardID string `json:"AdjacentParentShardId"`
HashKeyRange struct {
EndingHashKey string
StartingHashKey string
}
ParentShardID string `json:"ParentShardId"`
SequenceNumberRange struct {
EndingSequenceNumber string
StartingSequenceNumber string
}
ShardID string `json:"ShardId"`
}
StreamARN string
StreamName string
StreamStatus string
}
}
DescribeStreamResponse models the kinesis json protocol.
type DescribeWorkflowExecutionRequest ¶
type DescribeWorkflowExecutionRequest struct {
Domain string `json:"domain"`
Execution WorkflowExecution `json:"execution"`
}
DescribeWorkflowExecutionRequest models the swf json protocol.
type DescribeWorkflowExecutionResponse ¶
type DescribeWorkflowExecutionResponse struct {
ExecutionConfiguration ExecutionConfiguration `json:"executionConfiguration"`
ExecutionInfo WorkflowExecutionInfo `json:"executionInfo"`
LatestActivityTaskTimestamp *Date `json:"latestActivityTaskTimestamp"`
LatestExecutionContext string `json:"latestExecutionContext"`
OpenCounts OpenCounts `json:"openCounts"`
}
DescribeWorkflowExecutionResponse models the swf json protocol.
type DescribeWorkflowTypeRequest ¶
type DescribeWorkflowTypeRequest struct {
Domain string `json:"domain"`
WorkflowType WorkflowType `json:"workflowType"`
}
DescribeWorkflowTypeRequest models the swf json protocol.
type DescribeWorkflowTypeResponse ¶
type DescribeWorkflowTypeResponse struct {
Configuration WorkflowConfiguration `json:"configuration"`
TypeInfo WorkflowTypeInfo `json:"typeInfo"`
}
DescribeWorkflowTypeResponse models the swf json protocol.
type DomainConfiguration ¶
type DomainConfiguration struct {
WorkflowExecutionRetentionPeriodInDays string `json:"workflowExecutionRetentionPeriodInDays"`
}
DomainConfiguration models the swf json protocol.
type DomainInfo ¶
type DomainInfo struct {
Description string `json:"description"`
Name string `json:"name"`
Status string `json:"status"`
}
DomainInfo models the swf json protocol.
type DomainMigrator ¶
type DomainMigrator struct {
RegisteredDomains []RegisterDomain
DeprecatedDomains []DeprecateDomain
Client WorkflowClient
}
DomainMigrator will register or deprecate the configured domains as required.
func (*DomainMigrator) Migrate ¶
func (d *DomainMigrator) Migrate()
Migrate asserts that DeprecatedDomains are deprecated or deprecates them, then asserts that RegisteredDomains are registered or registers them.
type ErrorOutcome ¶
type ErrorOutcome struct {
// contains filtered or unexported fields
}
ErrorOutcome can be used to purposefully put the workflow into an error state.
func (ErrorOutcome) Data ¶
func (e ErrorOutcome) Data() interface{}
Data returns the data for this Outcome.
func (ErrorOutcome) Decisions ¶
func (e ErrorOutcome) Decisions() []Decision
Decisions returns the list of Decisions for this Outcome.
func (ErrorOutcome) State ¶
func (e ErrorOutcome) State() string
State returns the next state for the ErrorOutcome, which is always "error".
type ErrorResponse ¶
type ErrorResponse struct {
StatusCode int
Type string `json:"__type"`
Message string `json:"message"`
}
ErrorResponse models the swf json protocol.
func (*ErrorResponse) Error ¶
func (err *ErrorResponse) Error() string
type ExecutionConfiguration ¶
type ExecutionConfiguration struct {
ChildPolicy string `json:"childPolicy"`
ExecutionStartToCloseTimeout string `json:"executionStartToCloseTimeout"`
TaskList TaskList `json:"taskList"`
TaskStartToCloseTimeout string `json:"taskStartToCloseTimeout"`
}
ExecutionConfiguration models the swf json protocol.
type ExecutionFilter ¶
type ExecutionFilter struct {
WorkflowID string `json:"workflowId"`
}
ExecutionFilter models the swf json protocol.
type ExternalWorkflowExecutionCancelRequestedEventAttributes ¶
type ExternalWorkflowExecutionCancelRequestedEventAttributes struct {
InitiatedEventID int `json:"initiatedEventId"`
WorkflowExecution WorkflowExecution `json:"workflowExecution"`
}
ExternalWorkflowExecutionCancelRequestedEventAttributes models the swf json protocol.
type ExternalWorkflowExecutionSignaledEventAttributes ¶
type ExternalWorkflowExecutionSignaledEventAttributes struct {
InitiatedEventID int `json:"initiatedEventId"`
WorkflowExecution WorkflowExecution `json:"workflowExecution"`
}
ExternalWorkflowExecutionSignaledEventAttributes models the swf json protocol.
type FSM ¶
type FSM struct {
//Name of the fsm. Used when emitting logs. Should probably be set to the name of the workflow associated with the fsm.
Name string
// Domain of the workflow associated with the FSM.
Domain string
// TaskList that the underlying poller will poll for decision tasks.
TaskList string
// Identity used in PollForDecisionTaskRequests, can be empty.
Identity string
// Client used to make SWF api requests.
Client WorkflowClient
// DataType of the data struct associated with this FSM.
// The data is automatically peristed to and loaded from workflow history by the FSM.
DataType interface{}
// Serializer used to serialize/deserialise state from workflow history.
Serializer StateSerializer
// Kinesis stream in the same region to replicate state to.
KinesisStream string
// Strategy for replication of state to Kinesis.
KinesisReplicator KinesisReplicator
//PollerShutdownManager is used when the FSM is managing the polling
PollerShutdownManager *PollerShutdownManager
// contains filtered or unexported fields
}
FSM models the decision handling logic a workflow in SWF
Example ¶
// create with swf.NewClient
var client *Client
// data type that will be managed by the FSM
type StateData struct {
Message string `json:"message,omitempty"`
Count int `json:"count,omitempty"`
}
//event type that will be signalled to the FSM with signal name "hello"
type Hello struct {
Message string `json:"message,omitempty"`
}
//the FSM we will create will oscillate between 2 states,
//waitForSignal -> will wait till the workflow is started or signalled, and update the StateData based on the Hello message received, set a timer, and transition to waitForTimer
//waitForTimer -> will wait till the timer set by waitForSignal fires, and will signal the workflow with a Hello message, and transition to waitFotSignal
waitForSignal := func(f *FSMContext, h HistoryEvent, d *StateData) Outcome {
decisions := f.EmptyDecisions()
switch h.EventType {
case EventTypeWorkflowExecutionStarted, EventTypeWorkflowExecutionSignaled:
if h.EventType == EventTypeWorkflowExecutionSignaled && h.WorkflowExecutionSignaledEventAttributes.SignalName == "hello" {
hello := &Hello{}
f.EventData(h, &Hello{})
d.Count++
d.Message = hello.Message
}
timeoutSeconds := "5" //swf uses stringy numbers in many places
timerDecision := Decision{
DecisionType: DecisionTypeStartTimer,
StartTimerDecisionAttributes: &StartTimerDecisionAttributes{
StartToFireTimeout: timeoutSeconds,
TimerID: "timeToSignal",
},
}
decisions = append(decisions, timerDecision)
return f.Goto("waitForTimer", d, decisions)
}
//if the event was unexpected just stay here
return f.Stay(d, decisions)
}
waitForTimer := func(f *FSMContext, h HistoryEvent, d *StateData) Outcome {
decisions := f.EmptyDecisions()
switch h.EventType {
case EventTypeTimerFired:
//every time the timer fires, signal the workflow with a Hello
message := strconv.FormatInt(time.Now().Unix(), 10)
signalInput := &Hello{message}
signalDecision := Decision{
DecisionType: DecisionTypeSignalExternalWorkflowExecution,
SignalExternalWorkflowExecutionDecisionAttributes: &SignalExternalWorkflowExecutionDecisionAttributes{
SignalName: "hello",
Input: f.Serialize(signalInput),
RunID: f.RunID,
WorkflowID: f.WorkflowID,
},
}
decisions = append(decisions, signalDecision)
return f.Goto("waitForSignal", d, decisions)
}
//if the event was unexpected just stay here
return f.Stay(d, decisions)
}
//create the FSMState by passing the decider function through TypedDecider(),
//which lets you use d *StateData rather than d interface{} in your decider.
waitForSignalState := &FSMState{Name: "waitForSignal", Decider: typedFuncs.Decider(waitForSignal)}
waitForTimerState := &FSMState{Name: "waitForTimer", Decider: typedFuncs.Decider(waitForTimer)}
//wire it up in an fsm
fsm := &FSM{
Name: "example-fsm",
Client: client,
DataType: StateData{},
Domain: "exaple-swf-domain",
TaskList: "example-decision-task-list-to-poll",
Serializer: &JSONStateSerializer{},
}
//add states to FSM
fsm.AddInitialState(waitForSignalState)
fsm.AddState(waitForTimerState)
//start it up!
fsm.Start()
//To start workflows using this fsm
client.StartWorkflow(StartWorkflowRequest{
Domain: "exaple-swf-domain",
WorkflowID: "your-id",
//you will have previously regiestered a WorkflowType that this FSM will work.
WorkflowType: WorkflowType{Name: "the-name", Version: "the-version"},
// It is *very* important to use StartFSMWorkflowInput so the state management works properly
Input: StartFSMWorkflowInput(fsm.Serializer, &StateData{Count: 0, Message: "starting message"}),
})
func (*FSM) AddCompleteState ¶
AddCompleteState adds a state to the FSM and uses it as the final state of a workflow. it will only receive events if you returned FSMContext.Complete(...) and the workflow was unable to complete.
func (*FSM) AddErrorState ¶
AddErrorState adds an error handling state to your FSM. This is a special FSMState that should at a minimum handle WorkflowSignaled events where the signal name is FSM.Error and the event data is a SerializedDecisionError, or the signal name is FSM.SystemError and the event data is a SerializedSystemError. The error state should take care of transitioning the workflow back into a working state, by making decisions, updating data and/or choosing a new state.
func (*FSM) AddInitialState ¶
AddInitialState adds a state to the FSM and uses it as the initial state when a workflow execution is started.
func (*FSM) DefaultCompleteState ¶
DefaultCompleteState is the complete state used in an FSM if one has not been set. It simply responds with a CompleteDecision which attempts to Complete the workflow. This state will only get events if you previously attempted to complete the workflow and it failed.
func (*FSM) DefaultErrorState ¶
DefaultErrorState is the error state used in an FSM if one has not been set. It simply emits logs admonishing you to add a proper error state to your FSM.
func (*FSM) Deserialize ¶
Deserialize uses the FSM.Serializer to deserialize data from a string. If there is an error in deserialization this func will panic, so this should usually only be used inside Deciders where the panics are recovered and proper errors are recorded in the workflow.
func (*FSM) EmptyDecisions ¶
EmptyDecisions is a helper method to give you an empty decisions array for use in your Deciders.
func (*FSM) EventData ¶
func (f *FSM) EventData(event HistoryEvent, eventData interface{})
EventData works in combination with the FSM.Serializer to provide deserialization of data sent in a HistoryEvent. It is sugar around extracting the event payload from the proper field of the proper Attributes struct on the HistoryEvent
func (*FSM) Init ¶
func (f *FSM) Init()
Init initializaed any optional, unspecified values such as the error state, stop channel, serializer, PollerShutdownManager. it gets called by Start(), so you should only call this if you are manually managing polling for tasks, and calling Tick yourself.
func (*FSM) InitialState ¶
InitialState is the implementation of FSMSerializer.InitialState()
func (*FSM) Serialize ¶
Serialize uses the FSM.Serializer to serialize data to a string. If there is an error in serialization this func will panic, so this should usually only be used inside Deciders where the panics are recovered and proper errors are recorded in the workflow.
func (*FSM) Start ¶
func (f *FSM) Start()
Start begins processing DecisionTasks with the FSM. It creates a DecisionTaskPoller and spawns a goroutine that continues polling until Stop() is called and any in-flight polls have completed. If you wish to manage polling and calling Tick() yourself, you dont need to start the FSM, just call Init().
func (*FSM) StateSerializer ¶
func (f *FSM) StateSerializer() StateSerializer
StateSerializer is the implementation of FSMSerializer.StateSerializer()
func (*FSM) Stop ¶
func (f *FSM) Stop()
Stop causes the DecisionTask select loop to exit, and to stop the DecisionTaskPoller
func (*FSM) Tick ¶
func (f *FSM) Tick(decisionTask *PollForDecisionTaskResponse) ([]Decision, *SerializedState)
Tick is called when the DecisionTaskPoller receives a PollForDecisionTaskResponse in its polling loop. On errors, a nil *SerializedState is returned, and an error Outcome is included in the Decision list. It is exported to facilitate testing.
type FSMContext ¶
type FSMContext struct {
WorkflowType
WorkflowExecution
State string
// contains filtered or unexported fields
}
FSMContext is populated by the FSM machinery and passed to Deciders.
func NewFSMContext ¶
func NewFSMContext( serialization FSMSerializer, wfType WorkflowType, wfExec WorkflowExecution, pending *ActivityCorrelator, state string, stateData interface{}, stateVersion uint64, ) *FSMContext
NewFSMContext constructs an FSMContext.
func (*FSMContext) ActivitiesInfo ¶
func (f *FSMContext) ActivitiesInfo() map[string]*ActivityInfo
ActivitiesInfo will return a map of activityId -> ActivityInfo for all in-flight activities in the workflow.
func (*FSMContext) ActivityInfo ¶
func (f *FSMContext) ActivityInfo(h HistoryEvent) *ActivityInfo
ActivityInfo will find information for ActivityTasks being tracked. It can only be used when handling events related to ActivityTasks. ActivityTasks are automatically tracked after a EventTypeActivityTaskScheduled event. When there is no pending activity related to the event, nil is returned.
func (*FSMContext) Complete ¶
func (f *FSMContext) Complete(data interface{}, decisions ...Decision) Outcome
Complete is a helper func to easily create a CompleteOutcome.
func (*FSMContext) CompletionDecision ¶
func (f *FSMContext) CompletionDecision(data interface{}) Decision
CompletionDecision will build a CompleteWorkflowExecutionDecision decision that has the expected SerializedState marshalled to json as its result. This decision should be used when it is appropriate to Complete your workflow.
func (*FSMContext) ContinueDecision ¶
func (f *FSMContext) ContinueDecision(data interface{}, decisions []Decision) Outcome
ContinueDecision is a helper func to easily create a ContinueOutcome.
func (*FSMContext) ContinueWorkflowDecision ¶
func (f *FSMContext) ContinueWorkflowDecision(continuedState string) Decision
ContinueWorkflowDecision will build a ContinueAsNewWorkflow decision that has the expected SerializedState marshalled to json as its input. This decision should be used when it is appropriate to Continue your workflow. You are unable to ContinueAsNew a workflow that has running activites, so you should assure there are none running before using this. As such there is no need to copy over the ActivityCorrelator.
func (*FSMContext) Decide ¶
func (f *FSMContext) Decide(h HistoryEvent, data interface{}, decider Decider) Outcome
Decide executes a decider making sure that Activity tasks are being tracked.
func (*FSMContext) Deserialize ¶
func (f *FSMContext) Deserialize(serialized string, data interface{})
Deserialize will use the current fsm' Serializer to deserialize the given string into the given struct. It will panic on errors, which is ok in the context of a Decider. If you want to handle errors, use Serializer().Deserialize(...) instead.
func (*FSMContext) EmptyDecisions ¶
func (f *FSMContext) EmptyDecisions() []Decision
EmptyDecisions is a helper to give you an empty Decision slice.
func (*FSMContext) Error ¶
func (f *FSMContext) Error(data interface{}, decisions []Decision) Outcome
Goto is a helper func to easily create an ErrorOutcome.
func (*FSMContext) EventData ¶
func (f *FSMContext) EventData(h HistoryEvent, data interface{})
EventData will extract a payload from the given HistoryEvent and unmarshall it into the given struct.
func (*FSMContext) Goto ¶
func (f *FSMContext) Goto(state string, data interface{}, decisions []Decision) Outcome
Goto is a helper func to easily create a TransitionOutcome.
func (*FSMContext) Serialize ¶
func (f *FSMContext) Serialize(data interface{}) string
Serialize will use the current fsm's Serializer to serialize the given struct. It will panic on errors, which is ok in the context of a Decider. If you want to handle errors, use Serializer().Serialize(...) instead.
func (*FSMContext) Serializer ¶
func (f *FSMContext) Serializer() StateSerializer
Serializer returns the current fsm's Serializer.
func (*FSMContext) StateVersion ¶
func (f *FSMContext) StateVersion() uint64
StateVersion returns the current versionNumber of the stateData, equivalent to the number of decsion tasks processed by the workflow.
func (*FSMContext) Stay ¶
func (f *FSMContext) Stay(data interface{}, decisions []Decision) Outcome
Stay is a helper func to easily create a StayOutcome.
type FSMSerializer ¶
type FSMSerializer interface {
EventData(h HistoryEvent, data interface{})
Serialize(data interface{}) string
StateSerializer() StateSerializer
Deserialize(serialized string, data interface{})
InitialState() string
}
FSMSerializer is the contract for de/serializing state inside an FSM, typically implemented by the FSM itself but serves to break the circular dep between FSMContext and FSM.
type FSMState ¶
type FSMState struct {
// Name is the name of the state. When returning an Outcome, the NextState should match the Name of an FSMState in your FSM.
Name string
// Decider decides an Outcome given the current state, data, and an event.
Decider Decider
}
FSMState defines the behavior of one state of an FSM
type FailWorkflowExecutionDecisionAttributes ¶
type FailWorkflowExecutionDecisionAttributes struct {
Details string `json:"details"`
Reason string `json:"reason"`
}
FailWorkflowExecutionDecisionAttributes models the swf json protocol.
type FailWorkflowExecutionFailedEventAttributes ¶
type FailWorkflowExecutionFailedEventAttributes struct {
Cause string `json:"cause"`
DecisionTaskCompletedEventID int `json:"decisionTaskCompletedEventId"`
}
FailWorkflowExecutionFailedEventAttributes models the swf json protocol.
type GetRecordsRequest ¶
GetRecordsRequest models the kinesis json protocol.
type GetRecordsResponse ¶
type GetRecordsResponse struct {
NextShardIterator string
Records []struct {
Data []byte
PartitionKey string
SequenceNumber string
}
}
GetRecordsResponse models the kinesis json protocol.
type GetShardIteratorRequest ¶
type GetShardIteratorRequest struct {
StreamName string
ShardID string `json:"ShardId"`
ShardIteratorType string
StartingSequenceNumber string `json:"StartingSequenceNumber,omitempty"`
}
GetShardIteratorRequest models the kinesis json protocol.
type GetShardIteratorResponse ¶
type GetShardIteratorResponse struct {
ShardIterator string
}
GetShardIteratorResponse models the kinesis json protocol.
type GetWorkflowExecutionHistoryRequest ¶
type GetWorkflowExecutionHistoryRequest struct {
Domain string `json:"domain"`
Execution WorkflowExecution `json:"execution"`
MaximumPageSize int `json:"maximumPageSize,omitempty"`
NextPageToken string `json:"nextPageToken,omitempty"`
ReverseOrder bool `json:"reverseOrder,omitempty"`
}
GetWorkflowExecutionHistoryRequest models the swf json protocol.
type GetWorkflowExecutionHistoryResponse ¶
type GetWorkflowExecutionHistoryResponse struct {
Events []HistoryEvent `json:"events"`
NextPageToken string `json:"nextPageToken,omitempty"`
}
GetWorkflowExecutionHistoryResponse models the swf json protocol.
type HistoryEvent ¶
type HistoryEvent struct {
ActivityTaskCancelRequestedEventAttributes *ActivityTaskCancelRequestedEventAttributes `json:"activityTaskCancelRequestedEventAttributes,omitempty"`
ActivityTaskCanceledEventAttributes *ActivityTaskCanceledEventAttributes `json:"activityTaskCanceledEventAttributes,omitempty"`
ActivityTaskCompletedEventAttributes *ActivityTaskCompletedEventAttributes `json:"activityTaskCompletedEventAttributes,omitempty"`
ActivityTaskFailedEventAttributes *ActivityTaskFailedEventAttributes `json:"activityTaskFailedEventAttributes,omitempty"`
ActivityTaskScheduledEventAttributes *ActivityTaskScheduledEventAttributes `json:"activityTaskScheduledEventAttributes,omitempty"`
ActivityTaskStartedEventAttributes *ActivityTaskStartedEventAttributes `json:"activityTaskStartedEventAttributes,omitempty"`
ActivityTaskTimedOutEventAttributes *ActivityTaskTimedOutEventAttributes `json:"activityTaskTimedOutEventAttributes,omitempty"`
CancelTimerFailedEventAttributes *CancelTimerFailedEventAttributes `json:"cancelTimerFailedEventAttributes,omitempty"`
CancelWorkflowExecutionFailedEventAttributes *CancelWorkflowExecutionFailedEventAttributes `json:"cancelWorkflowExecutionFailedEventAttributes,omitempty"`
ChildWorkflowExecutionCanceledEventAttributes *ChildWorkflowExecutionCanceledEventAttributes `json:"childWorkflowExecutionCanceledEventAttributes,omitempty"`
ChildWorkflowExecutionCompletedEventAttributes *ChildWorkflowExecutionCompletedEventAttributes `json:"childWorkflowExecutionCompletedEventAttributes,omitempty"`
ChildWorkflowExecutionFailedEventAttributes *CancelWorkflowExecutionFailedEventAttributes `json:"childWorkflowExecutionFailedEventAttributes,omitempty"`
ChildWorkflowExecutionStartedEventAttributes *ChildWorkflowExecutionStartedEventAttributes `json:"childWorkflowExecutionStartedEventAttributes,omitempty"`
ChildWorkflowExecutionTerminatedEventAttributes *ChildWorkflowExecutionTerminatedEventAttributes `json:"childWorkflowExecutionTerminatedEventAttributes,omitempty"`
ChildWorkflowExecutionTimedOutEventAttributes *ChildWorkflowExecutionTimedOutEventAttributes `json:"childWorkflowExecutionTimedOutEventAttributes,omitempty"`
CompleteWorkflowExecutionFailedEventAttributes *CompleteWorkflowExecutionFailedEventAttributes `json:"completeWorkflowExecutionFailedEventAttributes,omitempty"`
ContinueAsNewWorkflowExecutionFailedEventAttributes *ContinueAsNewWorkflowExecutionFailedEventAttributes `json:"continueAsNewWorkflowExecutionFailedEventAttributes,omitempty"`
DecisionTaskCompletedEventAttributes *DecisionTaskCompletedEventAttributes `json:"decisionTaskCompletedEventAttributes,omitempty"`
DecisionTaskScheduledEventAttributes *DecisionTaskScheduledEventAttributes `json:"decisionTaskScheduledEventAttributes,omitempty"`
DecisionTaskStartedEventAttributes *DecisionTaskStartedEventAttributes `json:"decisionTaskStartedEventAttributes,omitempty"`
DecisionTaskTimedOutEventAttributes *DecisionTaskTimedOutEventAttributes `json:"decisionTaskTimedOutEventAttributes,omitempty"`
EventID int `json:"eventId"`
EventTimestamp *Date `json:"eventTimestamp"`
EventType string `json:"eventType"`
ExternalWorkflowExecutionCancelRequestedEventAttributes *ExternalWorkflowExecutionCancelRequestedEventAttributes `json:"externalWorkflowExecutionCancelRequestedEventAttributes,omitempty"`
ExternalWorkflowExecutionSignaledEventAttributes *ExternalWorkflowExecutionSignaledEventAttributes `json:"externalWorkflowExecutionSignaledEventAttributes,omitempty"`
FailWorkflowExecutionFailedEventAttributes *FailWorkflowExecutionFailedEventAttributes `json:"failWorkflowExecutionFailedEventAttributes,omitempty"`
MarkerRecordedEventAttributes *MarkerRecordedEventAttributes `json:"markerRecordedEventAttributes,omitempty"`
RecordMarkerFailedEventAttributes *RecordMarkerFailedEventAttributes `json:"recordMarkerFailedEventAttributes,omitempty"`
RequestCancelActivityTaskFailedEventAttributes *RequestCancelActivityTaskFailedEventAttributes `json:"requestCancelActivityTaskFailedEventAttributes,omitempty"`
RequestCancelExternalWorkflowExecutionFailedEventAttributes *RequestCancelExternalWorkflowExecutionFailedEventAttributes `json:"requestCancelExternalWorkflowExecutionFailedEventAttributes,omitempty"`
RequestCancelExternalWorkflowExecutionInitiatedEventAttributes *RequestCancelExternalWorkflowExecutionInitiatedEventAttributes `json:"requestCancelExternalWorkflowExecutionInitiatedEventAttributes,omitempty"`
ScheduleActivityTaskFailedEventAttributes *ScheduleActivityTaskFailedEventAttributes `json:"scheduleActivityTaskFailedEventAttributes,omitempty"`
SignalExternalWorkflowExecutionFailedEventAttributes *SignalExternalWorkflowExecutionFailedEventAttributes `json:"signalExternalWorkflowExecutionFailedEventAttributes,omitempty"`
SignalExternalWorkflowExecutionInitiatedEventAttributes *SignalExternalWorkflowExecutionInitiatedEventAttributes `json:"signalExternalWorkflowExecutionInitiatedEventAttributes,omitempty"`
StartChildWorkflowExecutionFailedEventAttributes *StartChildWorkflowExecutionFailedEventAttributes `json:"startChildWorkflowExecutionFailedEventAttributes,omitempty"`
StartChildWorkflowExecutionInitiatedEventAttributes *StartChildWorkflowExecutionInitiatedEventAttributes `json:"startChildWorkflowExecutionInitiatedEventAttributes,omitempty"`
StartTimerFailedEventAttributes *StartTimerFailedEventAttributes `json:"startTimerFailedEventAttributes,omitempty"`
TimerCanceledEventAttributes *TimerCanceledEventAttributes `json:"timerCanceledEventAttributes,omitempty"`
TimerFiredEventAttributes *TimerFiredEventAttributes `json:"timerFiredEventAttributes,omitempty"`
TimerStartedEventAttributes *TimerStartedEventAttributes `json:"timerStartedEventAttributes,omitempty"`
WorkflowExecutionCancelRequestedEventAttributes *WorkflowExecutionCancelRequestedEventAttributes `json:"workflowExecutionCancelRequestedEventAttributes,omitempty"`
WorkflowExecutionCanceledEventAttributes *WorkflowExecutionCanceledEventAttributes `json:"workflowExecutionCanceledEventAttributes,omitempty"`
WorkflowExecutionCompletedEventAttributes *WorkflowExecutionCompletedEventAttributes `json:"workflowExecutionCompletedEventAttributes,omitempty"`
WorkflowExecutionContinuedAsNewEventAttributes *WorkflowExecutionContinuedAsNewEventAttributes `json:"workflowExecutionContinuedAsNewEventAttributes,omitempty"`
WorkflowExecutionFailedEventAttributes *WorkflowExecutionFailedEventAttributes `json:"workflowExecutionFailedEventAttributes,omitempty"`
WorkflowExecutionSignaledEventAttributes *WorkflowExecutionSignaledEventAttributes `json:"workflowExecutionSignaledEventAttributes,omitempty"`
WorkflowExecutionStartedEventAttributes *WorkflowExecutionStartedEventAttributes `json:"workflowExecutionStartedEventAttributes,omitempty"`
WorkflowExecutionTerminatedEventAttributes *WorkflowExecutionTerminatedEventAttributes `json:"workflowExecutionTerminatedEventAttributes,omitempty"`
WorkflowExecutionTimedOutEventAttributes *WorkflowExecutionTimedOutEventAttributes `json:"workflowExecutionTimedOutEventAttributes,omitempty"`
}
HistoryEvent models the swf json protocol.
func (HistoryEvent) String ¶
func (h HistoryEvent) String() string
type JSONStateSerializer ¶
type JSONStateSerializer struct{}
JSONStateSerializer is a StateSerializer that uses go json serialization.
func (JSONStateSerializer) Deserialize ¶
func (j JSONStateSerializer) Deserialize(serialized string, state interface{}) error
Deserialize unmarshalls the given (json) string into the given struct
func (JSONStateSerializer) Serialize ¶
func (j JSONStateSerializer) Serialize(state interface{}) (string, error)
Serialize serializes the given struct to a json string.
type KinesisClient ¶
type KinesisClient interface {
PutRecord(request PutRecordRequest) (*PutRecordResponse, error)
GetRecords(request GetRecordsRequest) (*GetRecordsResponse, error)
CreateStream(request CreateStream) error
DescribeStream(request DescribeStreamRequest) (*DescribeStreamResponse, error)
GetShardIterator(request GetShardIteratorRequest) (*GetShardIteratorResponse, error)
}
KinesisClient specifies operations used by this library on the aws kinesis api.
type KinesisReplicator ¶
type KinesisReplicator func(fsm, workflowID string, put func() (*PutRecordResponse, error)) (*PutRecordResponse, error)
KinesisReplicator lets you customize the retry logic around Replicating State to Kinesis.
type ListActivityTypesRequest ¶
type ListActivityTypesRequest struct {
Domain string `json:"domain"`
MaximumPageSize int `json:"maximumPageSize,omitempty"`
Name string `json:"name,omitempty"`
NextPageToken string `json:"nextPageToken,omitempty"`
RegistrationStatus string `json:"registrationStatus"`
ReverseOrder bool `json:"reverseOrder,omitempty"`
}
ListActivityTypesRequest models the swf json protocol.
type ListActivityTypesResponse ¶
type ListActivityTypesResponse struct {
NextPageToken *string `json:"nextPageToken"`
TypeInfos []ActivityTypeInfo `json:"typeInfos"`
}
ListActivityTypesResponse models the swf json protocol.
type ListClosedWorkflowExecutionsRequest ¶
type ListClosedWorkflowExecutionsRequest struct {
CloseStatusFilter *StatusFilter `json:"closeStatusFilter,omitempty"`
CloseTimeFilter *TimeFilter `json:"closeTimeFilter,omitempty"`
Domain string `json:"domain"`
ExecutionFilter *ExecutionFilter `json:"executionFilter,omitempty"`
MaximumPageSize int `json:"maximumPageSize,omitempty"`
NextPageToken string `json:"nextPageToken,omitempty"`
ReverseOrder bool `json:"reverseOrde,omitemptyr"`
StartTimeFilter *TimeFilter `json:"startTimeFilter,omitempty"`
TagFilter *TagFilter `json:"tagFilter,omitempty"`
TypeFilter *TypeFilter `json:"typeFilter,omitempty"`
}
ListClosedWorkflowExecutionsRequest models the swf json protocol.
type ListClosedWorkflowExecutionsResponse ¶
type ListClosedWorkflowExecutionsResponse struct {
ExecutionInfos []WorkflowExecutionInfo `json:"executionInfos"`
NextPageToken string `json:"nextPageToken,omitempty"`
}
ListClosedWorkflowExecutionsResponse models the swf json protocol.
type ListDomainsRequest ¶
type ListDomainsRequest struct {
MaximumPageSize int `json:"maximumPageSize,omitempty"`
NextPageToken string `json:"nextPageToken,omitempty"`
RegistrationStatus string `json:"registrationStatus"`
ReverseOrder bool `json:"reverseOrder,omitempty"`
}
ListDomainsRequest models the swf json protocol.
type ListDomainsResponse ¶
type ListDomainsResponse struct {
DomainInfos []DomainInfo `json:"domainInfos"`
NextPageToken string `json:"nextPageToken,omitempty"`
}
ListDomainsResponse models the swf json protocol.
type ListOpenWorkflowExecutionsRequest ¶
type ListOpenWorkflowExecutionsRequest struct {
Domain string `json:"domain"`
ExecutionFilter *ExecutionFilter `json:"executionFilter,omitempty"`
MaximumPageSize int `json:"maximumPageSize,omitempty"`
NextPageToken string `json:"nextPageToken,omitempty"`
ReverseOrder bool `json:"reverseOrder,omitempty"`
StartTimeFilter TimeFilter `json:"startTimeFilter"`
TagFilter *TagFilter `json:"tagFilter,omitempty"`
TypeFilter *TypeFilter `json:"typeFilter,omitempty"`
}
ListOpenWorkflowExecutionsRequest models the swf json protocol.
type ListOpenWorkflowExecutionsResponse ¶
type ListOpenWorkflowExecutionsResponse struct {
ExecutionInfos []WorkflowExecutionInfo `json:"executionInfos"`
NextPageToken string `json:"nextPageToken,omitempty"`
}
ListOpenWorkflowExecutionsResponse models the swf json protocol.
type ListWorkflowTypesRequest ¶
type ListWorkflowTypesRequest struct {
Domain string `json:"domain"`
MaximumPageSize int `json:"maximumPageSize,omitempty"`
Name string `json:"name,omitempty"`
NextPageToken string `json:"nextPageToken,omitempty"`
RegistrationStatus string `json:"registrationStatus"`
ReverseOrder bool `json:"reverseOrder,omitempty"`
}
ListWorkflowTypesRequest models the swf json protocol.
type ListWorkflowTypesResponse ¶
type ListWorkflowTypesResponse struct {
NextPageToken string `json:"nextPageToken,omitempty"`
TypeInfos []WorkflowTypeInfo `json:"typeInfos"`
}
ListWorkflowTypesResponse models the swf json protocol.
type MarkerRecordedEventAttributes ¶
type MarkerRecordedEventAttributes struct {
DecisionTaskCompletedEventID int `json:"decisionTaskCompletedEventId"`
Details string `json:"details"`
MarkerName string `json:"markerName"`
}
MarkerRecordedEventAttributes models the swf json protocol.
type MultiDecisionFunc ¶
type MultiDecisionFunc func(ctx *FSMContext, h HistoryEvent, data interface{}) []Decision
MultiDecisionFunc is a building block for composable deciders that returns a [] of decision.
type OpenCounts ¶
type OpenCounts struct {
OpenActivityTasks string `json:"openActivityTasks"`
OpenChildWorkflowExecutions string `json:"openChildWorkflowExecutions"`
OpenDecisionTasks string `json:"openDecisionTasks"`
OpenTimers string `json:"openTimers"`
}
OpenCounts models the swf json protocol.
type Outcome ¶
type Outcome interface {
// Data returns the data for this Outcome.
Data() interface{}
// Decisions returns the list of Decisions for this Outcome.
Decisions() []Decision
// State returns the state to transition to. An empty string means no
// transition.
State() string
}
Outcome represents the minimum data needed to be returned by a Decider.
var Pass Outcome
Pass is nil, a sentinel value to represent 'no outcome'
type PollForActivityTaskRequest ¶
type PollForActivityTaskRequest struct {
Domain string `json:"domain"`
Identity string `json:"identity,omitempty"`
TaskList TaskList `json:"taskList"`
}
PollForActivityTaskRequest models the swf json protocol.
type PollForActivityTaskResponse ¶
type PollForActivityTaskResponse struct {
ActivityID string `json:"activityId"`
ActivityType ActivityType `json:"activityType"`
Input string `json:"input"`
StartedEventID int `json:"startedEventId"`
TaskToken string `json:"taskToken"`
WorkflowExecution WorkflowExecution `json:"workflowExecution"`
}
PollForActivityTaskResponse models the swf json protocol.
type PollForDecisionTaskRequest ¶
type PollForDecisionTaskRequest struct {
Domain string `json:"domain"`
Identity string `json:"identity,omitempty"`
MaximumPageSize int `json:"maximumPageSize,omitempty"`
NextPageToken string `json:"nextPageToken,omitempty"`
ReverseOrder bool `json:"reverseOrder,omitempty"`
TaskList TaskList `json:"taskList"`
}
PollForDecisionTaskRequest models the swf json protocol.
type PollForDecisionTaskResponse ¶
type PollForDecisionTaskResponse struct {
Events []HistoryEvent `json:"events"`
NextPageToken string `json:"nextPageToken"`
PreviousStartedEventID int `json:"previousStartedEventId"`
StartedEventID int `json:"startedEventId"`
TaskToken string `json:"taskToken"`
WorkflowExecution WorkflowExecution `json:"workflowExecution"`
WorkflowType WorkflowType `json:"workflowType"`
}
PollForDecisionTaskResponse models the swf json protocol.
type PollerShutdownManager ¶
type PollerShutdownManager struct {
// contains filtered or unexported fields
}
PollerShutdownManager facilitates cleanly shutting down pollers when the application decides to exit. When StopPollers() is called it will send to each of the stopChan that have been registered, then recieve from each of the ackChan that have been registered. At this point StopPollers() returns.
func NewPollerShutdownManager ¶
func NewPollerShutdownManager() *PollerShutdownManager
NewPollerShutdownManager creates a PollerShutdownManager
func (*PollerShutdownManager) Deregister ¶
func (p *PollerShutdownManager) Deregister(name string)
Deregister removes a registered pair of channels from the shutdown manager.
func (*PollerShutdownManager) Register ¶
func (p *PollerShutdownManager) Register(name string, stopChan chan bool, ackChan chan bool)
Register registers a named pair of channels to the shutdown manager. Buffered channels please!
func (*PollerShutdownManager) StopPollers ¶
func (p *PollerShutdownManager) StopPollers()
StopPollers blocks until it is able to stop all the registered pollers, which can take up to 60 seconds.
type PredicateFunc ¶
type PredicateFunc func(data interface{}) bool
PredicateFunc is a building block for composable deciders, a predicate based on the FSM stateData.
type ProtobufStateSerializer ¶
type ProtobufStateSerializer struct{}
ProtobufStateSerializer is a StateSerializer that uses base64 encoded protobufs.
func (ProtobufStateSerializer) Deserialize ¶
func (p ProtobufStateSerializer) Deserialize(serialized string, state interface{}) error
Deserialize base64 decodes the given string then unmarshalls the bytes into the struct using protobuf. The struct passed to Deserialize must satisfy proto.Message.
func (ProtobufStateSerializer) Serialize ¶
func (p ProtobufStateSerializer) Serialize(state interface{}) (string, error)
Serialize serializes the given struct into bytes with protobuf, then base64 encodes it. The struct passed to Serialize must satisfy proto.Message.
type PutRecordRequest ¶
type PutRecordRequest struct {
Data []byte
ExplicitHashKey string `json:"ExplicitHashKey,omitempty"`
PartitionKey string
SequenceNumberForOrdering string `json:"SequenceNumberForOrdering,omitempty"`
StreamName string
}
PutRecordRequest models the kinesis json protocol.
type PutRecordResponse ¶
PutRecordResponse models the kinesis json protocol.
type RecordActivityTaskHeartbeatRequest ¶
type RecordActivityTaskHeartbeatRequest struct {
Details string `json:"details,omitempty"`
TaskToken string `json:"taskToken"`
}
RecordActivityTaskHeartbeatRequest models the swf json protocol.
type RecordActivityTaskHeartbeatResponse ¶
type RecordActivityTaskHeartbeatResponse struct {
CancelRequested bool `json:"cancelRequested"`
}
RecordActivityTaskHeartbeatResponse models the swf json protocol.
type RecordMarkerDecisionAttributes ¶
type RecordMarkerDecisionAttributes struct {
Details string `json:"details"`
MarkerName string `json:"markerName"`
}
RecordMarkerDecisionAttributes models the swf json protocol.
type RecordMarkerFailedEventAttributes ¶
type RecordMarkerFailedEventAttributes struct {
Cause string `json:"cause"`
DecisionTaskCompletedEventID int `json:"decisionTaskCompletedEventId"`
MarkerName string `json:"markerName"`
}
RecordMarkerFailedEventAttributes models the swf json protocol.
type RegisterActivityType ¶
type RegisterActivityType struct {
DefaultTaskHeartbeatTimeout string `json:"defaultTaskHeartbeatTimeout,omitempty"`
DefaultTaskList *TaskList `json:"defaultTaskList,omitempty"`
DefaultTaskScheduleToCloseTimeout string `json:"defaultTaskScheduleToCloseTimeout,omitempty"`
DefaultTaskScheduleToStartTimeout string `json:"defaultTaskScheduleToStartTimeout,omitempty"`
DefaultTaskStartToCloseTimeout string `json:"defaultTaskStartToCloseTimeout,omitempty"`
Description string `json:"description,omitempty"`
Domain string `json:"domain"`
Name string `json:"name"`
Version string `json:"version"`
}
RegisterActivityType models the swf json protocol.
type RegisterDomain ¶
type RegisterDomain struct {
Description string `json:"description,omitempty"`
Name string `json:"name"`
WorkflowExecutionRetentionPeriodInDays string `json:"workflowExecutionRetentionPeriodInDays"`
}
RegisterDomain models the swf json protocol.
type RegisterWorkflowType ¶
type RegisterWorkflowType struct {
DefaultChildPolicy string `json:"defaultChildPolicy,omitempty"`
DefaultExecutionStartToCloseTimeout string `json:"defaultExecutionStartToCloseTimeout,omitempty"`
DefaultTaskList *TaskList `json:"defaultTaskList,omitempty"`
DefaultTaskStartToCloseTimeout string `json:"defaultTaskStartToCloseTimeout,omitempty"`
Description string `json:"description,omitempty"`
Domain string `json:"domain"`
Name string `json:"name"`
Version string `json:"version"`
}
RegisterWorkflowType models the swf json protocol.
type ReplicationData ¶
type ReplicationData struct {
StateVersion uint64 `json:"stateVersion"`
StateName string `json:"stateName"`
StateData string `json:"stateData"`
}
ReplicationData is the part of SerializedState that will be replicated onto Kinesis streams.
type RequestCancelActivityTaskDecisionAttributes ¶
type RequestCancelActivityTaskDecisionAttributes struct {
ActivityID string `json:"activityId"`
}
RequestCancelActivityTaskDecisionAttributes models the swf json protocol.
type RequestCancelActivityTaskFailedEventAttributes ¶
type RequestCancelActivityTaskFailedEventAttributes struct {
ActivityID string `json:"activityId"`
Cause string `json:"cause"`
DecisionTaskCompletedEventID int `json:"decisionTaskCompletedEventId"`
}
RequestCancelActivityTaskFailedEventAttributes models the swf json protocol.
type RequestCancelExternalWorkflowExecutionDecisionAttributes ¶
type RequestCancelExternalWorkflowExecutionDecisionAttributes struct {
Control string `json:"control"`
RunID string `json:"runId"`
WorkflowID string `json:"workflowId"`
}
RequestCancelExternalWorkflowExecutionDecisionAttributes models the swf json protocol.
type RequestCancelExternalWorkflowExecutionFailedEventAttributes ¶
type RequestCancelExternalWorkflowExecutionFailedEventAttributes struct {
Cause string `json:"cause"`
Control string `json:"control"`
DecisionTaskCompletedEventID int `json:"decisionTaskCompletedEventId"`
InitiatedEventID int `json:"initiatedEventId"`
RunID string `json:"runId"`
WorkflowID string `json:"workflowId"`
}
RequestCancelExternalWorkflowExecutionFailedEventAttributes models the swf json protocol.
type RequestCancelExternalWorkflowExecutionInitiatedEventAttributes ¶
type RequestCancelExternalWorkflowExecutionInitiatedEventAttributes struct {
Control string `json:"control"`
DecisionTaskCompletedEventID int `json:"decisionTaskCompletedEventId"`
RunID string `json:"runId"`
WorkflowID string `json:"workflowId"`
}
RequestCancelExternalWorkflowExecutionInitiatedEventAttributes models the swf json protocol.
type RequestCancelWorkflowExecution ¶
type RequestCancelWorkflowExecution struct {
Domain string `json:"domain"`
RunID string `json:"runId,omitempty"`
WorkflowID string `json:"workflowId"`
}
RequestCancelWorkflowExecution models the swf json protocol.
type RespondActivityTaskCanceledRequest ¶
type RespondActivityTaskCanceledRequest struct {
Details string `json:"details,omitempty"`
TaskToken string `json:"taskToken"`
}
RespondActivityTaskCanceledRequest models the swf json protocol.
type RespondActivityTaskCompletedRequest ¶
type RespondActivityTaskCompletedRequest struct {
Result string `json:"result,omitempty"`
TaskToken string `json:"taskToken"`
}
RespondActivityTaskCompletedRequest models the swf json protocol.
type RespondActivityTaskFailedRequest ¶
type RespondActivityTaskFailedRequest struct {
Details string `json:"details,omitempty"`
Reason string `json:"reason,omitempty"`
TaskToken string `json:"taskToken"`
}
RespondActivityTaskFailedRequest models the swf json protocol.
type RespondDecisionTaskCompletedRequest ¶
type RespondDecisionTaskCompletedRequest struct {
Decisions []Decision `json:"decisions"`
ExecutionContext string `json:"executionContext"`
TaskToken string `json:"taskToken"`
}
RespondDecisionTaskCompletedRequest models the swf json protocol.
type ScheduleActivityTaskDecisionAttributes ¶
type ScheduleActivityTaskDecisionAttributes struct {
ActivityID string `json:"activityId"`
ActivityType ActivityType `json:"activityType"`
Control string `json:"control"`
HeartbeatTimeout string `json:"heartbeatTimeout,omitempty"`
Input string `json:"input,omitempty"`
ScheduleToCloseTimeout string `json:"scheduleToCloseTimeout,omitempty"`
ScheduleToStartTimeout string `json:"scheduleToStartTimeout,omitempty"`
StartToCloseTimeout string `json:"startToCloseTimeout,omitempty"`
TaskList *TaskList `json:"taskList,omitempty"`
}
ScheduleActivityTaskDecisionAttributes models the swf json protocol.
type ScheduleActivityTaskFailedEventAttributes ¶
type ScheduleActivityTaskFailedEventAttributes struct {
ActivityID string `json:"activityId"`
ActivityType ActivityType
Cause string `json:"cause"`
DecisionTaskCompletedEventID int `json:"decisionTaskCompletedEventId"`
}
ScheduleActivityTaskFailedEventAttributes models the swf json protocol.
type SerializedDecisionError ¶
type SerializedDecisionError struct {
ErrorEventID int `json:"errorEventIds"`
UnprocessedEventIDs []int `json:"unprocessedEventIds"`
StateName string `json:"stateName"`
StateData interface{} `json:"stateData"`
Error interface{} `json:"error"`
}
SerializedDecisionError is a wrapper struct that allows serializing the context in which an error in a Decider occurred into a WorkflowSignaledEvent in the workflow history.
type SerializedState ¶
type SerializedState struct {
ReplicationData ReplicationData
PendingActivities ActivityCorrelator
}
SerializedState is a wrapper struct that allows serializing the current state and current data for the FSM in a MarkerRecorded event in the workflow history. We also maintain an epoch, which counts the number of times a workflow has been continued, and the StartedId of the DecisionTask that generated this state. The epoch + the id provide a total ordering of state over the lifetime of different runs of a workflow.
type SerializedSystemError ¶
type SerializedSystemError struct {
ErrorType string `json:"errorType"`
Error interface{} `json:"error"`
UnprocessedEventIDs []int `json:"unprocessedEventIds"`
}
SerializedSystemError is a wrapper struct that allows serializing the context in which an error internal to FSM processing has occurred into a WorkflowSignaledEvent in the workflow history. These errors are generally in finding the current state and data for a workflow, or in serializing and deserializing said state.
type SignalExternalWorkflowExecutionDecisionAttributes ¶
type SignalExternalWorkflowExecutionDecisionAttributes struct {
Control string `json:"control"`
Input string `json:"input"`
RunID string `json:"runId"`
SignalName string `json:"signalName"`
WorkflowID string `json:"workflowId"`
}
SignalExternalWorkflowExecutionDecisionAttributes models the swf json protocol.
type SignalExternalWorkflowExecutionFailedEventAttributes ¶
type SignalExternalWorkflowExecutionFailedEventAttributes struct {
Cause string `json:"cause"`
Control string `json:"control"`
DecisionTaskCompletedEventID int `json:"decisionTaskCompletedEventId"`
InitiatedEventID int `json:"initiatedEventId"`
RunID string `json:"runId"`
WorkflowID string `json:"workflowId"`
}
SignalExternalWorkflowExecutionFailedEventAttributes models the swf json protocol.
type SignalExternalWorkflowExecutionInitiatedEventAttributes ¶
type SignalExternalWorkflowExecutionInitiatedEventAttributes struct {
Control string `json:"control"`
DecisionTaskCompletedEventID int `json:"decisionTaskCompletedEventId"`
Input string `json:"input"`
RunID string `json:"runId"`
SignalName string `json:"signalName"`
WorkflowID string `json:"workflowId"`
}
SignalExternalWorkflowExecutionInitiatedEventAttributes models the swf json protocol.
type SignalWorkflowRequest ¶
type SignalWorkflowRequest struct {
Domain string `json:"domain"`
Input string `json:"input,omitempty"`
RunID string `json:"runId,omitempty"`
SignalName string `json:"signalName"`
WorkflowID string `json:"workflowId"`
}
SignalWorkflowRequest models the swf json protocol.
type StartChildWorkflowExecutionDecisionAttributes ¶
type StartChildWorkflowExecutionDecisionAttributes struct {
ChildPolicy string `json:"childPolicy"`
Control string `json:"control"`
ExecutionStartToCloseTimeout string `json:"executionStartToCloseTimeout"`
Input string `json:"input"`
TagList []string `json:"tagList"`
TaskList TaskList `json:"taskList"`
TaskStartToCloseTimeout string `json:"taskStartToCloseTimeout"`
WorkflowID string `json:"workflowId"`
WorkflowType WorkflowType `json:"workflowType"`
}
StartChildWorkflowExecutionDecisionAttributes models the swf json protocol.
type StartChildWorkflowExecutionFailedEventAttributes ¶
type StartChildWorkflowExecutionFailedEventAttributes struct {
Cause string `json:"cause"`
Control string `json:"control"`
DecisionTaskCompletedEventID int `json:"decisionTaskCompletedEventId"`
InitiatedEventID int `json:"initiatedEventId"`
WorkflowID string `json:"workflowId"`
WorkflowType WorkflowType `json:"workflowType"`
}
StartChildWorkflowExecutionFailedEventAttributes models the swf json protocol.
type StartChildWorkflowExecutionInitiatedEventAttributes ¶
type StartChildWorkflowExecutionInitiatedEventAttributes struct {
ChildPolicy string `json:"childPolicy"`
Control string `json:"control"`
DecisionTaskCompletedEventID int `json:"decisionTaskCompletedEventId"`
ExecutionStartToCloseTimeout string `json:"executionStartToCloseTimeout"`
Input string `json:"input"`
TagList []string `json:"tagList"`
TaskList TaskList `json:"taskList"`
TaskStartToCloseTimeout string `json:"taskStartToCloseTimeout"`
WorkflowID string `json:"workflowId"`
WorkflowType WorkflowType `json:"workflowType"`
}
StartChildWorkflowExecutionInitiatedEventAttributes models the swf json protocol.
type StartTimerDecisionAttributes ¶
type StartTimerDecisionAttributes struct {
Control string `json:"control"`
StartToFireTimeout string `json:"startToFireTimeout"`
TimerID string `json:"timerId"`
}
StartTimerDecisionAttributes models the swf json protocol.
type StartTimerFailedEventAttributes ¶
type StartTimerFailedEventAttributes struct {
Cause string `json:"cause"`
DecisionTaskCompletedEventID int `json:"decisionTaskCompletedEventId"`
TimerID string `json:"timerId"`
}
StartTimerFailedEventAttributes models the swf json protocol.
type StartWorkflowRequest ¶
type StartWorkflowRequest struct {
ChildPolicy string `json:"childPolicy,omitempty"`
Domain string `json:"domain"`
ExecutionStartToCloseTimeout string `json:"executionStartToCloseTimeout,omitempty"`
Input string `json:"input,omitempty"`
TagList []string `json:"tagList,omitempty"`
TaskList *TaskList `json:"taskList,omitempty"`
TaskStartToCloseTimeout string `json:"taskStartToCloseTimeout,omitempty"`
WorkflowID string `json:"workflowId"`
WorkflowType WorkflowType `json:"workflowType"`
}
StartWorkflowRequest models the swf json protocol.
type StartWorkflowResponse ¶
type StartWorkflowResponse struct {
RunID string `json:"runId"`
}
StartWorkflowResponse models the swf json protocol.
type StateFunc ¶
type StateFunc func(ctx *FSMContext, h HistoryEvent, data interface{})
StateFunc is a building block for composable deciders mutates the FSM stateData.
type StateSerializer ¶
type StateSerializer interface {
Serialize(state interface{}) (string, error)
Deserialize(serialized string, state interface{}) error
}
StateSerializer defines the interface for serializing state to and deserializing state from the workflow history.
type StatusFilter ¶
type StatusFilter struct {
Status string `json:"status"`
}
StatusFilter models the swf json protocol.
type StayOutcome ¶
type StayOutcome struct {
// contains filtered or unexported fields
}
StayOutcome is an Outcome in which the FSM will remain in the same state.
func (StayOutcome) Data ¶
func (s StayOutcome) Data() interface{}
Data returns the data for this Outcome.
func (StayOutcome) Decisions ¶
func (s StayOutcome) Decisions() []Decision
Decisions returns the list of Decisions for this Outcome.
func (StayOutcome) State ¶
func (s StayOutcome) State() string
State returns the next state for the StayOutcome, which is always empty.
type StreamMigrator ¶
type StreamMigrator struct {
Streams []CreateStream
Client KinesisClient
}
StreamMigrator will create any Kinesis Streams required.
func (*StreamMigrator) Migrate ¶
func (s *StreamMigrator) Migrate()
Migrate checks that the desired streams have been created and if they have not, creates them.s
type TagFilter ¶
type TagFilter struct {
Tag string `json:"tag"`
}
TagFilter models the swf json protocol.
type TaskList ¶
type TaskList struct {
Name string `json:"name"`
}
TaskList models the swf json protocol.
type TerminateWorkflowExecution ¶
type TerminateWorkflowExecution struct {
ChildPolicy string `json:"childPolicy,omitempty"`
Details string `json:"details,omitempty"`
Domain string `json:"domain"`
Reason string `json:"reason,omitempty"`
RunID string `json:"runId,omitempty"`
WorkflowID string `json:"workflowId,omitempty"`
}
TerminateWorkflowExecution models the swf json protocol.
type TimeFilter ¶
type TimeFilter struct {
LatestDate *Date `json:"latestDate,omitempty"`
OldestDate *Date `json:"oldestDate"`
}
TimeFilter models the swf json protocol.
func ZeroTimeFilter ¶
func ZeroTimeFilter() *TimeFilter
ZeroTimeFilter returns a TimeFiter with the OldestDate set to 0 and the LatestDate nil
type TimerCanceledEventAttributes ¶
type TimerCanceledEventAttributes struct {
DecisionTaskCompletedEventID int `json:"decisionTaskCompletedEventId"`
StartedEventID int `json:"startedEventId"`
TimerID string `json:"timerId"`
}
TimerCanceledEventAttributes models the swf json protocol.
type TimerFiredEventAttributes ¶
type TimerFiredEventAttributes struct {
StartedEventID int `json:"startedEventId"`
TimerID string `json:"timerId"`
}
TimerFiredEventAttributes models the swf json protocol.
type TimerStartedEventAttributes ¶
type TimerStartedEventAttributes struct {
Control string `json:"control"`
DecisionTaskCompletedEventID int `json:"decisionTaskCompletedEventId"`
StartToFireTimeout string `json:"startToFireTimeout"`
TimerID string `json:"timerId"`
}
TimerStartedEventAttributes models the swf json protocol.
type TransitionOutcome ¶
type TransitionOutcome struct {
// contains filtered or unexported fields
}
TransitionOutcome is an Outcome in which the FSM will transtion to a new state.
func (TransitionOutcome) Data ¶
func (t TransitionOutcome) Data() interface{}
Data returns the data for this Outcome.
func (TransitionOutcome) Decisions ¶
func (t TransitionOutcome) Decisions() []Decision
Decisions returns the list of Decisions for this Outcome.
func (TransitionOutcome) State ¶
func (t TransitionOutcome) State() string
State returns the next state for this TransitionOutcome.
type TypeFilter ¶
TypeFilter models the swf json protocol.
type TypedFuncs ¶
type TypedFuncs struct {
// contains filtered or unexported fields
}
TypedFuncs lets you construct building block for composable deciders, that have arguments that are checked against the type of your FSM stateData.
func Typed ¶
func Typed(typed interface{}) *TypedFuncs
Typed allows you to create Typed building blocks for composable deciders. the type checking here is done on constriction at runtime, so be sure to have a unit test that constructs your funcs.
func (*TypedFuncs) Decider ¶
func (t *TypedFuncs) Decider(decider interface{}) Decider
Decider builds a Decider from your typed Decider that verifies the right typing at construction time.
func (*TypedFuncs) DecisionFunc ¶
func (t *TypedFuncs) DecisionFunc(decisionFunc interface{}) DecisionFunc
DecisionFunc builds a DecisionFunc from your typed DecisionFunc that verifies the right typing at construction time.
func (*TypedFuncs) MultiDecisionFunc ¶
func (t *TypedFuncs) MultiDecisionFunc(decisionFunc interface{}) MultiDecisionFunc
MultiDecisionFunc builds a MultiDecisionFunc from your typed MultiDecisionFunc that verifies the right typing at construction time.
func (*TypedFuncs) PredicateFunc ¶
func (t *TypedFuncs) PredicateFunc(stateFunc interface{}) PredicateFunc
PredicateFunc builds a PredicateFunc from your typed PredicateFunc that verifies the right typing at construction time.
func (*TypedFuncs) StateFunc ¶
func (t *TypedFuncs) StateFunc(stateFunc interface{}) StateFunc
StateFunc builds a StateFunc from your typed StateFunc that verifies the right typing at construction time.
type TypesMigrator ¶
type TypesMigrator struct {
DomainMigrator *DomainMigrator
WorkflowTypeMigrator *WorkflowTypeMigrator
ActivityTypeMigrator *ActivityTypeMigrator
StreamMigrator *StreamMigrator
}
TypesMigrator is composed of a DomainMigrator, a WorkflowTypeMigrator and an ActivityTypeMigrator.
func NewTypesMigrator ¶
func NewTypesMigrator(client WorkflowClient, registerDomains []RegisterDomain, deprecateDomains []DeprecateDomain, registerWorkflows []RegisterWorkflowType, deprecateWorkflows []DeprecateWorkflowType, registerActivities []RegisterActivityType, deprecateActivities []DeprecateActivityType) *TypesMigrator
NewTypesMigrator will create a TypesMigrator that will migrate the given domains, workflows, and activities. Pass nil if you dont need a given domain, workflow or activity registered or deprecated.
func (*TypesMigrator) Migrate ¶
func (t *TypesMigrator) Migrate()
Migrate runs Migrate on the underlying DomainMigrator, a WorkflowTypeMigrator and ActivityTypeMigrator.
type WorkflowAdminClient ¶
type WorkflowAdminClient interface {
RegisterActivityType(request RegisterActivityType) error
DeprecateActivityType(request DeprecateActivityType) error
RegisterWorkflowType(request RegisterWorkflowType) error
DeprecateWorkflowType(request DeprecateWorkflowType) error
RegisterDomain(request RegisterDomain) error
DeprecateDomain(request DeprecateDomain) error
}
WorkflowAdminClient specifies swf client operations related to registering and deprecating domains, workflows and activities.
type WorkflowClient ¶
type WorkflowClient interface {
ActivityWorkerClient
DecisionWorkerClient
KinesisClient
WorkflowAdminClient
WorkflowInfoClient
WorkflowWorkerClient
}
WorkflowClient is the combined client of the ActivityWorkerClient, DecisionWorkerClient, KinesisClient, WorkflowAdminClient, WorkflowInfoClient, and WorkflowWorkerClient interfaces.
type WorkflowConfiguration ¶
type WorkflowConfiguration struct {
DefaultChildPolicy string `json:"defaultChildPolicy"`
DefaultExecutionStartToCloseTimeout string `json:"defaultExecutionStartToCloseTimeout"`
DefaultTaskList TaskList `json:"defaultTaskList"`
DefaultTaskStartToCloseTimeout string `json:"defaultTaskStartToCloseTimeout"`
}
WorkflowConfiguration models the swf json protocol.
type WorkflowExecution ¶
WorkflowExecution models the swf json protocol.
type WorkflowExecutionCancelRequestedEventAttributes ¶
type WorkflowExecutionCancelRequestedEventAttributes struct {
Cause string `json:"cause"`
ExternalInitiatedEventID int `json:"externalInitiatedEventId"`
ExternalWorkflowExecution WorkflowExecution `json:"externalWorkflowExecution"`
}
WorkflowExecutionCancelRequestedEventAttributes models the swf json protocol.
type WorkflowExecutionCanceledEventAttributes ¶
type WorkflowExecutionCanceledEventAttributes struct {
DecisionTaskCompletedEventID int `json:"decisionTaskCompletedEventId"`
Details string `json:"details"`
}
WorkflowExecutionCanceledEventAttributes models the swf json protocol.
type WorkflowExecutionCompletedEventAttributes ¶
type WorkflowExecutionCompletedEventAttributes struct {
DecisionTaskCompletedEventID int `json:"decisionTaskCompletedEventId"`
Result string `json:"result"`
}
WorkflowExecutionCompletedEventAttributes models the swf json protocol.
type WorkflowExecutionContinuedAsNewEventAttributes ¶
type WorkflowExecutionContinuedAsNewEventAttributes struct {
ChildPolicy string `json:"childPolicy"`
DecisionTaskCompletedEventID int `json:"decisionTaskCompletedEventId"`
ExecutionStartToCloseTimeout string `json:"executionStartToCloseTimeout"`
Input string `json:"input"`
NewExecutionRunID string `json:"newExecutionRunId"`
TagList []string `json:"tagList"`
TaskList TaskList `json:"taskList"`
TaskStartToCloseTimeout string `json:"taskStartToCloseTimeout"`
WorkflowType WorkflowType `json:"workflowType"`
}
WorkflowExecutionContinuedAsNewEventAttributes models the swf json protocol.
type WorkflowExecutionFailedEventAttributes ¶
type WorkflowExecutionFailedEventAttributes struct {
DecisionTaskCompletedEventID int `json:"decisionTaskCompletedEventId"`
Details string `json:"details"`
Reason string `json:"reason"`
}
WorkflowExecutionFailedEventAttributes models the swf json protocol.
type WorkflowExecutionInfo ¶
type WorkflowExecutionInfo struct {
CancelRequested bool `json:"cancelRequested"`
CloseStatus string `json:"closeStatus"`
CloseTimestamp *Date `json:"closeTimestamp"`
Execution WorkflowExecution `json:"execution"`
ExecutionStatus string `json:"executionStatus"`
Parent WorkflowExecution `json:"parent"`
StartTimestamp *Date `json:"startTimestamp"`
TagList []string `json:"tagList"`
WorkflowType WorkflowType `json:"workflowType"`
}
WorkflowExecutionInfo models the swf json protocol.
type WorkflowExecutionSignaledEventAttributes ¶
type WorkflowExecutionSignaledEventAttributes struct {
ExternalInitiatedEventID int `json:"externalInitiatedEventId"`
ExternalWorkflowExecution WorkflowExecution `json:"externalWorkflowExecution"`
Input string `json:"input"`
SignalName string `json:"signalName"`
}
WorkflowExecutionSignaledEventAttributes models the swf json protocol.
type WorkflowExecutionStartedEventAttributes ¶
type WorkflowExecutionStartedEventAttributes struct {
ChildPolicy string `json:"childPolicy"`
ContinuedExecutionRunID string `json:"continuedExecutionRunId"`
ExecutionStartToCloseTimeout string `json:"executionStartToCloseTimeout"`
Input string `json:"input"`
ParentInitiatedEventID int `json:"parentInitiatedEventId"`
ParentWorkflowExecution WorkflowExecution `json:"parentWorkflowExecution"`
TagList []string `json:"tagList"`
TaskList TaskList `json:"taskList"`
TaskStartToCloseTimeout string `json:"taskStartToCloseTimeout"`
WorkflowType WorkflowType `json:"workflowType"`
}
WorkflowExecutionStartedEventAttributes models the swf json protocol.
type WorkflowExecutionTerminatedEventAttributes ¶
type WorkflowExecutionTerminatedEventAttributes struct {
Cause string `json:"cause"`
ChildPolicy string `json:"childPolicy"`
Details string `json:"details"`
Reason string `json:"reason"`
}
WorkflowExecutionTerminatedEventAttributes models the swf json protocol.
type WorkflowExecutionTimedOutEventAttributes ¶
type WorkflowExecutionTimedOutEventAttributes struct {
ChildPolicy string `json:"childPolicy"`
TimeoutType string `json:"timeoutType"`
}
WorkflowExecutionTimedOutEventAttributes models the swf json protocol.
type WorkflowInfoClient ¶
type WorkflowInfoClient interface {
CountClosedWorkflowExecutions(request CountClosedWorkflowExecutionsRequest) (*CountResponse, error)
CountOpenWorkflowExecutions(request CountOpenWorkflowExecutionsRequest) (*CountResponse, error)
CountPendingActivityTasks(request CountPendingActivityTasksRequest) (*CountResponse, error)
CountPendingDecisionTasks(request CountPendingDecisionTasksRequest) (*CountResponse, error)
ListWorkflowTypes(request ListWorkflowTypesRequest) (*ListWorkflowTypesResponse, error)
ListActivityTypes(request ListActivityTypesRequest) (*ListActivityTypesResponse, error)
DescribeActivityType(request DescribeActivityTypeRequest) (*DescribeActivityTypeResponse, error)
DescribeDomain(request DescribeDomainRequest) (*DescribeDomainResponse, error)
DescribeWorkflowType(request DescribeWorkflowTypeRequest) (*DescribeWorkflowTypeResponse, error)
DescribeWorkflowExecution(request DescribeWorkflowExecutionRequest) (*DescribeWorkflowExecutionResponse, error)
ListOpenWorkflowExecutions(request ListOpenWorkflowExecutionsRequest) (*ListOpenWorkflowExecutionsResponse, error)
ListClosedWorkflowExecutions(request ListClosedWorkflowExecutionsRequest) (*ListClosedWorkflowExecutionsResponse, error)
GetWorkflowExecutionHistory(request GetWorkflowExecutionHistoryRequest) (*GetWorkflowExecutionHistoryResponse, error)
ListDomains(request ListDomainsRequest) (*ListDomainsResponse, error)
}
WorkflowInfoClient specifies swf client operations related to querying swf types and executions.
type WorkflowType ¶
WorkflowType models the swf json protocol.
type WorkflowTypeInfo ¶
type WorkflowTypeInfo struct {
CreationDate *Date `json:"creationDate"`
DeprecationDate *Date `json:"deprecationDate"`
Description string `json:"description"`
Status string `json:"status"`
WorkflowType WorkflowType `json:"workflowType"`
}
WorkflowTypeInfo models the swf json protocol.
type WorkflowTypeMigrator ¶
type WorkflowTypeMigrator struct {
RegisteredWorkflowTypes []RegisterWorkflowType
DeprecatedWorkflowTypes []DeprecateWorkflowType
Client WorkflowClient
}
WorkflowTypeMigrator will register or deprecate the configured workflow types as required.
func (*WorkflowTypeMigrator) Migrate ¶
func (w *WorkflowTypeMigrator) Migrate()
Migrate asserts that DeprecatedWorkflowTypes are deprecated or deprecates them, then asserts that RegisteredWorkflowTypes are registered or registers them.
type WorkflowWorkerClient ¶
type WorkflowWorkerClient interface {
StartWorkflow(request StartWorkflowRequest) (*StartWorkflowResponse, error)
SignalWorkflow(request SignalWorkflowRequest) error
RequestCancelWorkflowExecution(request RequestCancelWorkflowExecution) error
TerminateWorkflowExecution(request TerminateWorkflowExecution) error
}
WorkflowWorkerClient specifies ActivityWorkerClient operations related to starting and stopping workflows.