miniflow

package module
v0.0.0-...-1820d9b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 21, 2017 License: AGPL-3.0 Imports: 17 Imported by: 0

README

miniflow

Go Report Card

Workflow execution based on YAML definitions and NATS streaming.

Disclaimer

This is a work in progress and NOT meant for any production use!

Features

  • Simple definition of workflows using YAML
  • Easy to setup and execute a workflow
  • Designed with ETL in mind
  • Supported distributed processing of workloads with NATS streaming messaging.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrWorkflowNotFound = errors.New("Workflow is not found")

ErrWorkflowNotFound is error when workflow is not registered

Functions

This section is empty.

Types

type Arc

type Arc struct {
	ID        string `yaml:"id"`
	Condition string `yaml:"condition,omitempty"`

	FromID string `yaml:"from"`
	ToID   string `yaml:"to"`

	FromTask *Task `yaml:"-"`
	ToTask   *Task `yaml:"-"`
}

Arc is graph element which represents a relation between two Tasks.

type FlowError

type FlowError struct {
	Err  error
	Data interface{}
}

FlowError custom error holder.

func (FlowError) Error

func (fe FlowError) Error() string

type NATSStreamingConfig

type NATSStreamingConfig struct {
	ClusterID string
	ClientID  string
	Durable   string

	NATSConn stan.Conn `yaml:"-"`
}

NATSStreamingConfig is object which holds important information about connection and nats streaming server.

func NewNATSStreamingConfig

func NewNATSStreamingConfig(clusterID, url string, reloadConfig bool) *NATSStreamingConfig

NewNATSStreamingConfig is constructor object for NATSStreamingConfig object.

type Produce

type Produce func() *Token

Produce is definition for producer implementation.

type Producer

type Producer struct {
	*sync.RWMutex
	// contains filtered or unexported fields
}

Producer is responsible for producing tokens.

func NewProducer

func NewProducer(concurency int, producer Produce) *Producer

NewProducer will create new producer object.

type Task

type Task struct {
	ID   string `yaml:"id"`
	Type string `yaml:"type"`
	Fn   Work   `yaml:"-"`

	FromArcs []*Arc
	ToArcs   []*Arc
	// contains filtered or unexported fields
}

Task is graph element which represents task which is getting executed. Each task can have n in channels and m out channels. Depending on the type of the task, we are going to start their receivers and senders.

type Token

type Token struct {
	TaskID    string
	TaskType  string
	ContextID string
	Ctx       interface{}
	Data      map[string]interface{}
}

Token is data traveling from one task to task.

func NewToken

func NewToken() *Token

NewToken is constructor for new empty token.

type Work

type Work func(*Token) *Token

Work is definition for work implementation.

type Workflow

type Workflow struct {
	Name        string
	Description string
	Tasks       map[string]Task
	Subs        []stan.Subscription
	Ctx         interface{}
	EndTokens   chan *Token
	// contains filtered or unexported fields
}

Workflow is structure which will be used to execute a a workflow

func NewWorkflow

func NewWorkflow(wfs *WorkflowSchema, config *NATSStreamingConfig) (*Workflow, error)

NewWorkflow is constructor which will construct workflow from given schema.

func (*Workflow) ArcTasks

func (w *Workflow) ArcTasks(arc *Arc) (from Task, to Task)

ArcTasks is helper method for retrieving from and to tasks of a given arc.

func (*Workflow) AttachProducer

func (w *Workflow) AttachProducer(concurency int, produceFn Produce) error

AttachProducer will register function which will produce tokens.

func (*Workflow) Close

func (w *Workflow) Close()

Close will explicitly close connection towards NATS messaging.

func (*Workflow) Publish

func (w *Workflow) Publish(subject string, token *Token) error

Publish is used to publish information between tasks. Can be used externally to start the flow.

func (*Workflow) Register

func (w *Workflow) Register(name string, work Work) error

Register will register function mapping between workflow schema and actual code you wish to execute.

func (*Workflow) Run

func (w *Workflow) Run() error

Run is method to start execution of the workflow.

func (*Workflow) RunWithContext

func (w *Workflow) RunWithContext(Ctx interface{}) error

func (*Workflow) Teardown

func (w *Workflow) Teardown(signalChan <-chan time.Time)

Teardown will gracefully shutdown workflow.

type WorkflowRunner

type WorkflowRunner struct {
	Workflows map[string]*Workflow
	Config    *NATSStreamingConfig
	Ctx       interface{}
}

WorkflowRunner is manager for all workflow which desired to be run

func NewWorkflowRunner

func NewWorkflowRunner(config *NATSStreamingConfig) *WorkflowRunner

NewWorkflowRunner creates new WorkflowRunner object.

func (*WorkflowRunner) Register

func (wr *WorkflowRunner) Register(wf *Workflow)

Register will register defined workflow for running state.

func (*WorkflowRunner) Run

func (wr *WorkflowRunner) Run(name string) (*Workflow, error)

Run will start workflow and attach kill signal channel listener.

func (*WorkflowRunner) RunAll

func (wr *WorkflowRunner) RunAll()

RunAll will execute all registered workflows.

func (*WorkflowRunner) StopAll

func (wr *WorkflowRunner) StopAll()

StopAll will stop all registered workflows.

type WorkflowSchema

type WorkflowSchema struct {
	Name        string
	Description string
	Flow        []Arc
	Tasks       []Task
}

WorkflowSchema is structure which will be used to parse YAML defined schemas.

func NewWorkflowSchema

func NewWorkflowSchema(schemaPath string) (*WorkflowSchema, error)

NewWorkflowSchema constructor will source yaml file schema from provided schemaPath and return Workflow spec.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL