Documentation
¶
Index ¶
- Variables
- func AppFrom(ctx context.Context) core.App
- func Do[R any](ctx Context, fn Step[R], opts ...StepOption) (R, error)
- func DoAsync[R any](ctx Context, fn Step[R], opts ...StepOption) (chan AsyncResult[R], error)
- func GetValue[R any](ctx Context, targetWorkflowID string, key string, timeout time.Duration) (R, error)
- func KVGet[R any](rt *Runtime, ctx context.Context, key string) (R, bool, error)
- func LoggerFrom(ctx context.Context) *slog.Logger
- func NewApp(config Config) (App, *Runtime)
- func Pause(ctx Context, duration time.Duration) error
- func Recv[R any](ctx Context, topic string, timeout time.Duration) (R, error)
- func Register[P any, R any](rt *Runtime, fn Workflow[P, R], opts ...WorkflowRegistrationOption)
- func ResendProduct(app core.App, rt *Runtime, record *core.Record) error
- func Send(ctx Context, destinationID string, message any, topic string) error
- func SendNotification(ctx context.Context, name, message string) error
- func SendProduct(ctx context.Context, fileName string, data io.Reader, metadata map[string]any) error
- func SetAppStatus(ctx context.Context, label, color string) error
- func SetValue(ctx Context, key string, value any) error
- type App
- type ApprovalOption
- type ApprovalResult
- type AsyncResult
- type Config
- type Context
- type Error
- type ErrorCode
- type GetResultOption
- type Handle
- type ProductRecord
- type ProductSender
- type QueueOption
- type RateLimiter
- type RegisteredWorkflow
- type RequestEvent
- type Runtime
- func (rt *Runtime) App() core.App
- func (rt *Runtime) Cancel(workflowID string) error
- func (rt *Runtime) GarbageCollect() error
- func (rt *Runtime) GetApplicationID() string
- func (rt *Runtime) GetApplicationVersion() string
- func (rt *Runtime) GetExecutorID() string
- func (rt *Runtime) IsDraining() bool
- func (rt *Runtime) IsLaunched() bool
- func (rt *Runtime) IsTriggerable(fqn string) bool
- func (rt *Runtime) KVDelete(ctx context.Context, key string) error
- func (rt *Runtime) KVSet(ctx context.Context, key string, value any) error
- func (rt *Runtime) Launch() error
- func (rt *Runtime) List(input listWorkflowsDBInput) ([]Status, error)
- func (rt *Runtime) Listen(queues ...WorkflowQueue)
- func (rt *Runtime) NewContext(ctx context.Context) Context
- func (rt *Runtime) Queue(name string, options ...QueueOption) WorkflowQueue
- func (rt *Runtime) Queues() []WorkflowQueue
- func (rt *Runtime) RegisteredWorkflows() []RegisteredWorkflow
- func (rt *Runtime) Resume(workflowID string) error
- func (rt *Runtime) SendNotification(name, message string) error
- func (rt *Runtime) SendToWorkflow(workflowID string, message any, topic string) error
- func (rt *Runtime) SetProductSender(sender ProductSender)
- func (rt *Runtime) Shutdown()
- func (rt *Runtime) Steps(workflowID string) ([]StepInfo, error)
- func (rt *Runtime) TestAlertChannel(id string) error
- func (rt *Runtime) TriggerByFQN(fqn string, rawInput json.RawMessage) (string, error)
- type Schedule
- func (s *Schedule) Created() types.DateTime
- func (s *Schedule) CronExpression() string
- func (s *Schedule) Enabled() bool
- func (s *Schedule) Input() json.RawMessage
- func (s *Schedule) Jitter() time.Duration
- func (s *Schedule) ScheduledAt() time.Time
- func (s *Schedule) SetCronExpression(expr string)
- func (s *Schedule) SetEnabled(v bool)
- func (s *Schedule) SetInput(input json.RawMessage)
- func (s *Schedule) SetJitter(d time.Duration)
- func (s *Schedule) SetScheduledAt(t time.Time)
- func (s *Schedule) SetType(t string)
- func (s *Schedule) SetWorkflowFQN(fqn string)
- func (s *Schedule) Type() string
- func (s *Schedule) Updated() types.DateTime
- func (s *Schedule) WorkflowFQN() string
- type SendInput
- type ServeEvent
- type SetValueInput
- type Status
- type StatusType
- type Step
- type StepFunc
- type StepInfo
- type StepOption
- func WithBackoffFactor(factor float64) StepOption
- func WithBaseInterval(interval time.Duration) StepOption
- func WithMaxInterval(interval time.Duration) StepOption
- func WithNextStepID(stepID int) StepOption
- func WithStepMaxRetries(maxRetries int) StepOption
- func WithStepName(name string) StepOption
- func WithoutCheckpoint() StepOption
- type Workflow
- type WorkflowFunc
- type WorkflowOption
- func WithApplicationVersion(version string) WorkflowOption
- func WithDeadline(t time.Time) WorkflowOption
- func WithDeduplicationID(id string) WorkflowOption
- func WithID(id string) WorkflowOption
- func WithPriority(priority uint) WorkflowOption
- func WithQueue(queueName string) WorkflowOption
- func WithQueuePartitionKey(partitionKey string) WorkflowOption
- func WithTimeout(d time.Duration) WorkflowOption
- type WorkflowQueue
- type WorkflowRegistrationOption
- func WithDashboardTrigger() WorkflowRegistrationOption
- func WithInputSchema(schema map[string]any) WorkflowRegistrationOption
- func WithMaxRetries(maxRetries int) WorkflowRegistrationOption
- func WithName(name string) WorkflowRegistrationOption
- func WithSchedule(cronExpr string) WorkflowRegistrationOption
- func WithSummaryFunc[P any](fn func(P) string) WorkflowRegistrationOption
- func WithTags(tags ...string) WorkflowRegistrationOption
- type WorkflowSender
Constants ¶
This section is empty.
Variables ¶
var ErrApprovalTimeout = errors.New("turbine: approval timed out")
ErrApprovalTimeout is returned when WaitForApproval times out.
var ErrShuttingDown = errors.New("turbine: runtime is shutting down")
ErrShuttingDown is returned when a workflow is rejected because the runtime is draining.
Functions ¶
func AppFrom ¶
AppFrom returns the app from a step's context.Context. Returns nil if called outside a turbine context.
func Do ¶
func Do[R any](ctx Context, fn Step[R], opts ...StepOption) (R, error)
Do executes a function as a durable step within a workflow.
func DoAsync ¶
func DoAsync[R any](ctx Context, fn Step[R], opts ...StepOption) (chan AsyncResult[R], error)
Go runs a step in a goroutine. Must be within a workflow.
func GetValue ¶
func GetValue[R any](ctx Context, targetWorkflowID string, key string, timeout time.Duration) (R, error)
GetValue gets a key-value event from a target workflow.
func KVGet ¶
KVGet retrieves a value from the global key-value store. Returns (zero, false, nil) if the key doesn't exist.
func LoggerFrom ¶
LoggerFrom returns a logger from a step's context.Context. The logger includes workflow_id and step_id attributes automatically. Returns a no-op logger if called outside a turbine context.
func NewApp ¶ added in v0.2.0
NewApp creates a new PocketBase app and a Runtime wired into its OnServe and OnTerminate hooks. The caller registers workflows, then calls app.Start(); Launch and Shutdown are driven by the app lifecycle.
func Pause ¶
Sleep performs a durable sleep within a workflow. The wake-up time is recorded as a step so that on recovery, if the wake-up time has already passed, Pause returns immediately; otherwise it sleeps only the remaining time.
func Register ¶
func Register[P any, R any](rt *Runtime, fn Workflow[P, R], opts ...WorkflowRegistrationOption)
Register registers a typed workflow function with the runtime. Must be called before Launch().
func ResendProduct ¶ added in v0.1.0
ResendProduct re-sends a stored product to the configured ProductSender. Only valid for products with status "failed". Returns an error if no sender is configured.
func SendNotification ¶ added in v0.1.0
SendNotification sends a custom message to the named alert channel. Usable from inside a step via the step's context.Context, mirroring the SetAppStatus/LoggerFrom/AppFrom pattern.
func SendProduct ¶
func SendProduct(ctx context.Context, fileName string, data io.Reader, metadata map[string]any) error
SendProduct stores a product file and optionally sends it via the registered ProductSender. Must be called from within a step (requires workflow context). Returns error if the sender fails — the product is still stored with "failed" status.
func SetAppStatus ¶
SetAppStatus sets a user-defined application status on the current workflow. It can be called from a step's context.Context (same pattern as LoggerFrom/AppFrom).
Types ¶
type App ¶ added in v0.2.0
type App = *pocketbase.PocketBase
type ApprovalOption ¶
type ApprovalOption func(*approvalOptions)
ApprovalOption configures WaitForApproval behavior.
func WithApprovalTimeout ¶
func WithApprovalTimeout(d time.Duration) ApprovalOption
WithApprovalTimeout sets a timeout for how long to wait for approval. If zero (default), waits indefinitely.
type ApprovalResult ¶
type ApprovalResult struct {
Approved bool `json:"approved"`
Comment string `json:"comment,omitempty"`
}
ApprovalResult holds the decision from a human approver.
func WaitForApproval ¶
func WaitForApproval(ctx Context, opts ...ApprovalOption) (ApprovalResult, error)
WaitForApproval pauses the workflow and waits for a human decision. The workflow's app status is set to "waiting for approval" (yellow) while blocked. Use Send(ctx, workflowID, ApprovalResult{...}, "pt.approval") or the POST /api/pt/workflows/{id}/approve endpoint to deliver the decision.
type AsyncResult ¶
AsyncResult holds the result and error from a concurrent step started with Go.
type Config ¶
type Config struct {
ApplicationVersion string // Optional: defaults to binary hash
ExecutorID string // Optional: defaults to "local"
GCRetention time.Duration // How long to keep completed workflows. 0 = use default (72h). Negative = disabled.
GCSchedule string // Cron expression for GC. Default: "0 0 * * *" (daily midnight)
ProductSender ProductSender // Optional: sender for dispatching products to external systems
WebhookMaxRetries int // Max webhook delivery attempts. 0 = use default (3).
WebhookTimeout time.Duration // Timeout per webhook delivery attempt. 0 = use default (10s).
ShutdownTimeout time.Duration // Max drain duration on Shutdown. 0 = use default (30s).
Logger *slog.Logger // Optional: overrides the default logger used by LoggerFrom and ctx.Logger().
}
Config holds configuration parameters for initializing the turbine plugin.
type Context ¶
type Context interface {
context.Context
App() core.App
Logger() *slog.Logger
WorkflowID() (string, error)
SetAppStatus(label, color string)
}
Context is the unified context for turbine workflows. It embeds context.Context and provides access to the app, logger, and workflow ID.
type Error ¶
type Error struct {
Message string
Code ErrorCode
WorkflowID string
StepName string
QueueName string
DeduplicationID string
MaxRetries int
// contains filtered or unexported fields
}
Error is the unified error type for all Turbine operations.
type ErrorCode ¶
type ErrorCode int
ErrorCode represents the different types of errors that can occur in Turbine operations.
type GetResultOption ¶
type GetResultOption func(*getResultOptions)
GetResultOption is a functional option for GetResult.
func WithHandlePollingInterval ¶
func WithHandlePollingInterval(interval time.Duration) GetResultOption
WithHandlePollingInterval sets the polling interval for GetResult.
type Handle ¶
type Handle[R any] interface { GetResult(opts ...GetResultOption) (R, error) GetStatus() (Status, error) GetWorkflowID() string }
Handle provides methods to interact with a running or completed workflow.
type ProductRecord ¶
type ProductRecord struct {
ID string `json:"id"`
FileName string `json:"file_name"`
Size int `json:"size"`
Metadata map[string]any `json:"metadata"`
}
ProductRecord is the metadata reference to a stored product passed to ProductSender.Send(). It is JSON-serializable so it can also be used as a workflow input (see WorkflowSender).
type ProductSender ¶
type ProductSender interface {
Send(ctx context.Context, product ProductRecord, data io.Reader) error
}
ProductSender is the interface for sending products to external systems. Users implement this to define custom destinations. The data reader carries the full file bytes; the concrete value is *bytes.Reader, so senders that need to rewind for retries can type-assert to io.Seeker.
type QueueOption ¶
type QueueOption func(*WorkflowQueue)
QueueOption configures a workflow queue.
func WithGlobalConcurrency ¶
func WithGlobalConcurrency(n int) QueueOption
func WithMaxTasksPerIteration ¶
func WithMaxTasksPerIteration(n int) QueueOption
func WithPartitionQueue ¶
func WithPartitionQueue() QueueOption
func WithPriorityEnabled ¶
func WithPriorityEnabled() QueueOption
func WithRateLimiter ¶
func WithRateLimiter(limiter RateLimiter) QueueOption
func WithWorkerConcurrency ¶
func WithWorkerConcurrency(n int) QueueOption
type RateLimiter ¶
RateLimiter configures rate limiting for workflow queue execution.
type RegisteredWorkflow ¶
type RequestEvent ¶ added in v0.2.0
type RequestEvent = core.RequestEvent
type Runtime ¶
type Runtime struct {
// contains filtered or unexported fields
}
Runtime is the core durable execution runtime. Create with NewRuntime, register workflows before Launch, then Shutdown when done. For higher-level entry points see New (standalone) and NewApp (HTTP-serving).
func NewRuntime ¶ added in v0.2.0
NewRuntime creates a new Runtime. Must be called before Launch().
func NewStandalone ¶ added in v0.2.0
NewStandalone creates a Runtime that owns an embedded PocketBase app. Use for scripts and background processes that run workflows without serving HTTP.
The returned runtime is constructed but not launched. The caller must:
rt := turbine.NewStandalone(cfg)
defer rt.Shutdown()
turbine.Register(rt, MyWorkflow)
if err := rt.Launch(); err != nil { log.Fatal(err) }
Workflow and step logs go to stdout by default. Set Config.Logger to override.
func Setup ¶
Setup wires turbine into an existing app's lifecycle. Returns the Runtime so you can register workflows before app.Start().
func SetupStandalone ¶ added in v0.2.0
SetupStandalone wires turbine into a caller-supplied PocketBase app for non-HTTP use. The caller is responsible for Bootstrap (if needed) and must invoke rt.Launch and rt.Shutdown themselves. Unlike NewStandalone, this does NOT default Config.Logger, the supplied app's logger is used as a fallback (via baseLogger) so caller-configured logging is preserved.
func (*Runtime) GarbageCollect ¶
GarbageCollect removes completed workflows older than the configured retention period.
func (*Runtime) GetApplicationID ¶
func (*Runtime) GetApplicationVersion ¶
func (*Runtime) GetExecutorID ¶
func (*Runtime) IsDraining ¶
IsDraining returns true if the runtime is in the process of shutting down.
func (*Runtime) IsLaunched ¶
func (*Runtime) IsTriggerable ¶
func (*Runtime) KVDelete ¶
KVDelete removes a key from the global store. No-op if the key doesn't exist.
func (*Runtime) Launch ¶
Launch starts the runtime: ensures collections, launches sysdb, starts queue runner, and recovers pending workflows.
func (*Runtime) Listen ¶
func (rt *Runtime) Listen(queues ...WorkflowQueue)
Listen configures which queues the runtime should poll. Must be called before Launch().
func (*Runtime) NewContext ¶
NewContext wraps a standard context.Context with the runtime. Use this from HTTP handlers or other external callers.
func (*Runtime) Queue ¶
func (rt *Runtime) Queue(name string, options ...QueueOption) WorkflowQueue
Queue registers a named queue with the runtime. Must be called before Launch().
func (*Runtime) Queues ¶
func (rt *Runtime) Queues() []WorkflowQueue
func (*Runtime) RegisteredWorkflows ¶
func (rt *Runtime) RegisteredWorkflows() []RegisteredWorkflow
func (*Runtime) SendNotification ¶ added in v0.1.0
SendNotification sends a custom message to the alert channel matching name. Disabled channels are a silent no-op (returns nil), matching the event-driven dispatch behavior — toggling a channel off mutes both. If multiple channels share the same name, the first match wins. Returns an error only if no channel with that name exists or if delivery fails.
func (*Runtime) SendToWorkflow ¶
SendToWorkflow sends a message to a workflow from outside a workflow context. This is used by HTTP handlers and other external callers.
func (*Runtime) SetProductSender ¶
func (rt *Runtime) SetProductSender(sender ProductSender)
SetProductSender sets the product sender after construction. Use this when the sender needs a reference to the runtime (e.g., WorkflowSender).
func (*Runtime) Shutdown ¶
func (rt *Runtime) Shutdown()
Shutdown gracefully stops the runtime with a two-phase approach: 1. Drain, stop accepting new work, let running workflows finish naturally 2. Force, if cfg.ShutdownTimeout expires, cancel root context to kill remaining workflows Idempotent. Drain progress is logged directly to stdout, not via app.Logger, since the app's logger pipeline may itself be shutting down.
func (*Runtime) TestAlertChannel ¶
TestAlertChannel sends a test notification to verify the channel works.
func (*Runtime) TriggerByFQN ¶
type Schedule ¶
type Schedule struct {
core.BaseRecordProxy
}
Schedule is a typed proxy for pt_schedules records.
func (*Schedule) CronExpression ¶
func (*Schedule) Input ¶
func (s *Schedule) Input() json.RawMessage
func (*Schedule) ScheduledAt ¶
func (*Schedule) SetCronExpression ¶
func (*Schedule) SetEnabled ¶
func (*Schedule) SetInput ¶
func (s *Schedule) SetInput(input json.RawMessage)
func (*Schedule) SetScheduledAt ¶
func (*Schedule) SetWorkflowFQN ¶
func (*Schedule) WorkflowFQN ¶
type ServeEvent ¶ added in v0.2.0
type ServeEvent = core.ServeEvent
type SetValueInput ¶
SetValueInput is the input for setting a workflow event.
type Status ¶
type Status struct {
ID string `json:"workflow_id"`
Status StatusType `json:"status"`
Name string `json:"name"`
Output any `json:"output,omitempty"`
Error error `json:"error,omitempty"`
ExecutorID string `json:"executor_id"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
ApplicationVersion string `json:"application_version"`
ApplicationID string `json:"application_id,omitempty"`
Attempts int `json:"attempts"`
QueueName string `json:"queue_name,omitempty"`
Timeout time.Duration `json:"timeout,omitempty"`
Deadline time.Time `json:"deadline"`
StartedAt time.Time `json:"started_at"`
DeduplicationID string `json:"deduplication_id,omitempty"`
Input any `json:"input,omitempty"`
Priority int `json:"priority,omitempty"`
QueuePartitionKey string `json:"queue_partition_key,omitempty"`
ForkedFrom string `json:"forked_from,omitempty"`
ParentWorkflowID string `json:"parent_workflow_id,omitempty"`
AppStatus string `json:"app_status,omitempty"`
AppStatusColor string `json:"app_status_color,omitempty"`
Summary string `json:"summary,omitempty"`
Tags []string `json:"tags,omitempty"`
}
Status contains information about a workflow's current state.
type StatusType ¶
type StatusType string
StatusType represents the current execution state of a workflow.
const ( StatusPending StatusType = "PENDING" StatusEnqueued StatusType = "ENQUEUED" StatusSuccess StatusType = "SUCCESS" StatusError StatusType = "ERROR" StatusCancelled StatusType = "CANCELLED" StatusMaxRecoveryAttemptsExceeded StatusType = "MAX_RECOVERY_ATTEMPTS_EXCEEDED" StatusWaitingForApproval StatusType = "WAITING_FOR_APPROVAL" )
type Step ¶
Step is a function executed as a durable step within a workflow. Steps are automatically recorded and replayed during recovery.
type StepInfo ¶
type StepInfo struct {
WorkflowID string `json:"workflow_id"`
FunctionID int `json:"function_id"`
FunctionName string `json:"function_name"`
Output string `json:"output,omitempty"`
Error string `json:"error,omitempty"`
StartedAt int64 `json:"started_at_epoch_ms,omitempty"`
EndedAt int64 `json:"ended_at_epoch_ms,omitempty"`
}
StepInfo contains information about a workflow step execution.
type StepOption ¶
type StepOption func(*stepOptions)
StepOption configures step execution.
func WithBackoffFactor ¶
func WithBackoffFactor(factor float64) StepOption
func WithBaseInterval ¶
func WithBaseInterval(interval time.Duration) StepOption
func WithMaxInterval ¶
func WithMaxInterval(interval time.Duration) StepOption
func WithNextStepID ¶
func WithNextStepID(stepID int) StepOption
func WithStepMaxRetries ¶
func WithStepMaxRetries(maxRetries int) StepOption
func WithStepName ¶
func WithStepName(name string) StepOption
func WithoutCheckpoint ¶
func WithoutCheckpoint() StepOption
WithoutCheckpoint skips persisting the step result to the database. The step will always re-execute during recovery instead of replaying from a checkpoint. Use this for steps that establish non-serializable resources like network connections.
type WorkflowFunc ¶
WorkflowFunc is a type-erased workflow function used internally.
type WorkflowOption ¶
type WorkflowOption func(*workflowOptions)
WorkflowOption configures workflow execution.
func WithApplicationVersion ¶
func WithApplicationVersion(version string) WorkflowOption
func WithDeadline ¶
func WithDeadline(t time.Time) WorkflowOption
WithDeadline sets an absolute deadline for the workflow execution. The workflow's context will be cancelled at this time.
func WithDeduplicationID ¶
func WithDeduplicationID(id string) WorkflowOption
func WithID ¶
func WithID(id string) WorkflowOption
func WithPriority ¶
func WithPriority(priority uint) WorkflowOption
func WithQueue ¶
func WithQueue(queueName string) WorkflowOption
func WithQueuePartitionKey ¶
func WithQueuePartitionKey(partitionKey string) WorkflowOption
func WithTimeout ¶
func WithTimeout(d time.Duration) WorkflowOption
WithTimeout sets a timeout duration for the workflow execution. The workflow's context will be cancelled after this duration.
type WorkflowQueue ¶
type WorkflowQueue struct {
Name string
WorkerConcurrency *int
GlobalConcurrency *int
PriorityEnabled bool
RateLimit *RateLimiter
MaxTasksPerIteration int
PartitionQueue bool
// contains filtered or unexported fields
}
WorkflowQueue defines a named queue with concurrency and rate limiting options.
type WorkflowRegistrationOption ¶
type WorkflowRegistrationOption func(*workflowRegistrationOptions)
WorkflowRegistrationOption configures workflow registration.
func WithDashboardTrigger ¶
func WithDashboardTrigger() WorkflowRegistrationOption
func WithInputSchema ¶
func WithInputSchema(schema map[string]any) WorkflowRegistrationOption
WithInputSchema attaches a JSON schema to the workflow, enabling the dashboard to render a typed form instead of a raw JSON textarea.
func WithMaxRetries ¶
func WithMaxRetries(maxRetries int) WorkflowRegistrationOption
func WithName ¶
func WithName(name string) WorkflowRegistrationOption
func WithSchedule ¶
func WithSchedule(cronExpr string) WorkflowRegistrationOption
WithSchedule registers the workflow as a scheduled workflow using cron syntax. Scheduled workflows must accept time.Time as input — they receive the scheduled execution time.
func WithSummaryFunc ¶
func WithSummaryFunc[P any](fn func(P) string) WorkflowRegistrationOption
WithSummaryFunc registers a function that generates a human-readable summary from the workflow input. The summary is computed once at workflow start and stored in the database. Maximum 200 characters.
func WithTags ¶
func WithTags(tags ...string) WorkflowRegistrationOption
type WorkflowSender ¶
type WorkflowSender[R any] struct { // contains filtered or unexported fields }
WorkflowSender is a built-in ProductSender that forwards products to another workflow.
func NewWorkflowSender ¶
func NewWorkflowSender[R any](rt *Runtime, workflow Workflow[ProductRecord, R]) *WorkflowSender[R]
NewWorkflowSender creates a sender that starts the target workflow with ProductRecord as input.
func (*WorkflowSender[R]) Send ¶
func (ws *WorkflowSender[R]) Send(ctx context.Context, product ProductRecord, _ io.Reader) error
