Documentation
¶
Index ¶
- Constants
- func WithDependency(dependent RunnableInterface, dependencies ...RunnableInterface) func(DependencyAdder)
- func WithHandler(...) func(StepInterface)
- func WithID(id string) func(Identifiable)
- func WithName(name string) func(Nameable)
- func WithRunnables(nodes ...RunnableInterface) func(RunnableAdder)
- type Dag
- func (d *Dag) DependencyAdd(dependent RunnableInterface, dependency ...RunnableInterface)
- func (d *Dag) DependencyList(ctx context.Context, node RunnableInterface, data map[string]any) []RunnableInterface
- func (d *Dag) GetID() string
- func (d *Dag) GetName() string
- func (d *Dag) GetState() StateInterface
- func (d *Dag) IsCompleted() bool
- func (d *Dag) IsFailed() bool
- func (d *Dag) IsPaused() bool
- func (d *Dag) IsRunning() bool
- func (d *Dag) IsWaiting() bool
- func (d *Dag) Pause() error
- func (d *Dag) Resume(ctx context.Context, data map[string]any) (context.Context, map[string]any, error)
- func (d *Dag) Run(ctx context.Context, data map[string]any) (context.Context, map[string]any, error)
- func (d *Dag) RunnableAdd(node ...RunnableInterface)
- func (d *Dag) RunnableList() []RunnableInterface
- func (d *Dag) RunnableRemove(node RunnableInterface) bool
- func (d *Dag) SetID(id string)
- func (d *Dag) SetName(name string)
- func (d *Dag) SetState(state StateInterface)
- func (d *Dag) Visualize() string
- type DagInterface
- type DependencyAdder
- type DotEdgeSpec
- type DotNodeSpec
- type Identifiable
- type Nameable
- type PipelineInterface
- type RunnableAdder
- type RunnableInterface
- type State
- func (s *State) AddCompletedStep(id string)
- func (s *State) FromJSON(data []byte) error
- func (s *State) GetCompletedSteps() []string
- func (s *State) GetCurrentStepID() string
- func (s *State) GetData() map[string]any
- func (s *State) GetLastUpdated() time.Time
- func (s *State) GetStatus() StateStatus
- func (s *State) GetWorkflowData() map[string]any
- func (s *State) SetCurrentStepID(id string)
- func (s *State) SetData(data map[string]any)
- func (s *State) SetLastUpdated(t time.Time)
- func (s *State) SetStatus(status StateStatus)
- func (s *State) SetWorkflowData(data map[string]any)
- func (s *State) ToJSON() ([]byte, error)
- type StateInterface
- type StateStatus
- type StepHandler
- type StepInterface
- type StepOption
Constants ¶
const ( // State status constants StateStatusRunning = "running" StateStatusPaused = "paused" StateStatusComplete = "complete" StateStatusFailed = "failed" )
Variables ¶
This section is empty.
Functions ¶
func WithDependency ¶ added in v0.5.0
func WithDependency(dependent RunnableInterface, dependencies ...RunnableInterface) func(DependencyAdder)
WithDependency adds a dependency between nodes in a DAG. The dependent node will only execute after all dependency nodes have completed successfully. This can be used to create a fluent API when building DAGs.
Example:
dag := NewDag(
WithName("My DAG"),
WithRunnables(step1, step2, step3),
WithDependency(step2, step1), // step2 depends on step1
WithDependency(step3, step2), // step3 depends on step2
)
func WithHandler ¶ added in v0.4.0
func WithHandler(handler func(context.Context, map[string]any) (context.Context, map[string]any, error)) func(StepInterface)
WithHandler sets the handler function for a step
func WithID ¶ added in v0.3.0
func WithID(id string) func(Identifiable)
WithID is a generic option that sets the ID of any type that implements Identifiable
func WithName ¶ added in v0.3.0
WithName is a generic option that sets the name of any type that implements Nameable
func WithRunnables ¶ added in v0.5.0
func WithRunnables(nodes ...RunnableInterface) func(RunnableAdder)
WithRunnables adds multiple runnable nodes to a Pipeline or Dag. Nil nodes are filtered out and not added.
Types ¶
type Dag ¶
type Dag struct {
// contains filtered or unexported fields
}
func (*Dag) DependencyAdd ¶
func (d *Dag) DependencyAdd(dependent RunnableInterface, dependency ...RunnableInterface)
DependencyAdd adds a dependency between two nodes.
func (*Dag) DependencyList ¶
func (d *Dag) DependencyList(ctx context.Context, node RunnableInterface, data map[string]any) []RunnableInterface
DependencyList returns all dependencies for a given node.
func (*Dag) GetState ¶
func (d *Dag) GetState() StateInterface
GetState returns the current workflow state
func (*Dag) IsCompleted ¶
func (*Dag) Resume ¶
func (d *Dag) Resume(ctx context.Context, data map[string]any) (context.Context, map[string]any, error)
Resume resumes the workflow execution from the last saved state
func (*Dag) Run ¶
func (d *Dag) Run(ctx context.Context, data map[string]any) (context.Context, map[string]any, error)
Run executes all nodes in the DAG in the correct order
func (*Dag) RunnableAdd ¶
func (d *Dag) RunnableAdd(node ...RunnableInterface)
RunnableAdd adds a single node to the DAG.
func (*Dag) RunnableList ¶
func (d *Dag) RunnableList() []RunnableInterface
RunnableList returns all runnable nodes in the DAG.
func (*Dag) RunnableRemove ¶
func (d *Dag) RunnableRemove(node RunnableInterface) bool
RunnableRemove removes a node from the DAG.
func (*Dag) SetState ¶
func (d *Dag) SetState(state StateInterface)
SetState sets the workflow state
type DagInterface ¶
type DagInterface interface {
RunnableInterface
// RunnableAdd adds a single node to the DAG.
// Runnable nodes can be added in any order, as their execution order will be determined by their dependencies.
RunnableAdd(node ...RunnableInterface)
// RunnableRemove removes a node from the DAG.
// Returns true if the node was found and removed, false if it wasn't found.
RunnableRemove(node RunnableInterface) bool
// RunnableList returns all runnable nodes in the DAG.
// The order of nodes in the returned slice is not guaranteed to be their execution order.
// Use Run() to execute nodes in the correct order based on their dependencies.
RunnableList() []RunnableInterface
// DependencyAdd adds a dependency between two nodes.
// The dependent node will only execute after the dependency node has completed successfully.
DependencyAdd(dependent RunnableInterface, dependency ...RunnableInterface)
// DependencyList returns all dependencies for a given node.
// The actual dependencies may vary based on the context and any conditional dependencies.
DependencyList(ctx context.Context, node RunnableInterface, data map[string]any) []RunnableInterface
// Pause pauses the workflow execution
Pause() error
// Resume resumes the workflow execution from the last saved state
Resume(ctx context.Context, data map[string]any) (context.Context, map[string]any, error)
// GetState returns the current workflow state
GetState() StateInterface
// SetState sets the workflow state
SetState(state StateInterface)
}
DagInterface represents a Directed Acyclic Graph (DAG) of steps that can be executed in a specific order. It manages the dependencies between steps and ensures they are executed in the correct sequence.
func NewDag ¶
func NewDag(opts ...interface{}) DagInterface
NewDag creates a new DAG with the given options
type DependencyAdder ¶ added in v0.5.0
type DependencyAdder interface {
DependencyAdd(dependent RunnableInterface, dependency ...RunnableInterface)
}
DependencyAdder is an interface for types that can add dependencies between nodes
type DotEdgeSpec ¶
type DotEdgeSpec struct {
FromNodeName string
ToNodeName string
Tooltip string
Style string // Use edgeStyleSolid, etc.
Color string
}
DotEdgeSpec represents an edge in the DOT graph
type DotNodeSpec ¶
type DotNodeSpec struct {
Name string
DisplayName string
Tooltip string
Shape string
Style string // Use nodeStyleSolid or nodeStyleFilled
FillColor string
}
DotNodeSpec represents a node in the DOT graph
type Identifiable ¶ added in v0.3.0
type Identifiable interface {
SetID(id string)
}
Identifiable is an interface for types that can have an ID
type Nameable ¶ added in v0.3.0
type Nameable interface {
SetName(name string)
}
Nameable is an interface for types that can have a name
type PipelineInterface ¶
type PipelineInterface interface {
RunnableInterface
// RunnableAdd adds a runnable node(s) to the pipeline.
RunnableAdd(node ...RunnableInterface)
// RunnableRemove removes a runnable node from the pipeline.
RunnableRemove(node RunnableInterface) bool
// RunnableList returns all runnable nodes in the pipeline.
// The order of nodes in the returned slice is the order they were added.
RunnableList() []RunnableInterface
// Pause pauses the workflow execution
Pause() error
// Resume resumes the workflow execution from the last saved state
Resume(ctx context.Context, data map[string]any) (context.Context, map[string]any, error)
// GetState returns the current workflow state
GetState() StateInterface
// SetState sets the workflow state
SetState(state StateInterface)
}
PipelineInterface defines the interface for a pipeline
func NewPipeline ¶
func NewPipeline(opts ...interface{}) PipelineInterface
NewPipeline creates a new pipeline with the given options
type RunnableAdder ¶ added in v0.5.0
type RunnableAdder interface {
RunnableAdd(node ...RunnableInterface)
}
RunnableAdder is an interface that defines the RunnableAdd method RunnableAdder is an interface for types that can add runnable nodes
type RunnableInterface ¶
type RunnableInterface interface {
GetID() string
SetID(id string)
GetName() string
SetName(name string)
Run(ctx context.Context, data map[string]any) (context.Context, map[string]any, error)
// State helper methods
IsRunning() bool
IsPaused() bool
IsCompleted() bool
IsFailed() bool
IsWaiting() bool
// Visualize returns a DOT graph representation of the workflow component
Visualize() string
}
RunnableInterface represents a single unit of work, that can be executed within a given context, and specified data. It can work wuth the data and return the result of the work.
It can be used as a single step, or combined with other nodes to form a Pipeline, Workflow or DAG.
type State ¶
type State struct {
Status StateStatus
Data map[string]any
CurrentStepID string
CompletedSteps []string
LastUpdated time.Time
}
State represents the current state of a workflow
func (*State) AddCompletedStep ¶
AddCompletedStep adds a step ID to the completed steps list
func (*State) GetCompletedSteps ¶
GetCompletedSteps returns the list of completed step IDs
func (*State) GetCurrentStepID ¶
GetCurrentStepID returns the ID of the current step
func (*State) GetLastUpdated ¶
GetLastUpdated returns the timestamp of the last update
func (*State) GetStatus ¶
func (s *State) GetStatus() StateStatus
GetStatus returns the current status of the workflow
func (*State) GetWorkflowData ¶
GetWorkflowData returns the workflow data
func (*State) SetCurrentStepID ¶
SetCurrentStepID sets the ID of the current step
func (*State) SetLastUpdated ¶
SetLastUpdated sets the timestamp of the last update
func (*State) SetStatus ¶
func (s *State) SetStatus(status StateStatus)
SetStatus sets the current status of the workflow
func (*State) SetWorkflowData ¶
SetWorkflowData sets the workflow data
type StateInterface ¶
type StateInterface interface {
GetStatus() StateStatus
SetStatus(status StateStatus)
GetData() map[string]any
SetData(data map[string]any)
ToJSON() ([]byte, error)
FromJSON(data []byte) error
GetCurrentStepID() string
SetCurrentStepID(id string)
GetCompletedSteps() []string
AddCompletedStep(id string)
GetWorkflowData() map[string]any
SetWorkflowData(data map[string]any)
GetLastUpdated() time.Time
SetLastUpdated(t time.Time)
}
StateInterface defines the interface for workflow state management
type StepHandler ¶
type StepInterface ¶
type StepInterface interface {
RunnableInterface
// GetHandler returns the function that implements the step's execution logic.
GetHandler() StepHandler
// SetHandler allows setting or modifying the step's execution logic.
SetHandler(handler StepHandler)
// Pause pauses the workflow execution
Pause() error
// Resume resumes the workflow execution from the last saved state
Resume(ctx context.Context, data map[string]any) (context.Context, map[string]any, error)
// GetState returns the current workflow state
GetState() StateInterface
// SetState sets the workflow state
SetState(state StateInterface)
}
StepInterface represents a single node in a Pipeline, Workflow or DAG. A step is a unit of work that can be executed within a given context. A step is executed by a Pipeline, Workflow or DAG which manages its dependencies and execution order.
func NewStep ¶
func NewStep(opts ...interface{}) StepInterface
NewStep creates a new step with the given options
type StepOption ¶ added in v0.4.0
type StepOption = func(StepInterface)
StepOption is a function that configures a Step This is a type alias for backward compatibility Deprecated: Use functional options directly instead
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
examples
|
|
|
dag_conditional_logic
command
|
|
|
dag_dependencies
command
|
|
|
dag_error_handling
command
|
|
|
pipeline
command
|
|