Skip to content

Pipelines

John Rippington edited this page Feb 20, 2025 · 2 revisions

UKHO.ADDS.Infrastructure.Pipelines

This package contains an easy .Net pipeline solution that contains composable nodes for constructing simple and complex pipelines.

It's important to understand that this package is primarily about async, not parallel, although it does take a DegreeOfParallelism when executing against an enumerable of subjects.
This package is optimal for setting up business pipelines which have operations that benfit from async, such as external web service calls or Database/file I/O operations. Of course it can be used to organize code regardless of external I/O, but it's performance advantage is primarily based on async and the pipeline pattern is a good way to force some organisational constraints on processing pipelines, such as applying rules or transformations to a subject.

Basic Concepts

Flows are composed from nodes of which there are a few types explained below. Nodes are composed into simple or complex flows and then executed on the flow subject. All flows accept a Subject Type (T). This the type of the subject that is acted upon by the flow.
All Node methods that are either overridden or provided via a function accept an ExecutionContext.
All node executions return a NodeResult.

    //Create a pipeline node
    var pipelineNode = new PipelineNode<TestObjectA>();

    //Add a couple of child nodes to the pipeline that do things
    pipelineNode.AddChild(new SimpleTestNodeA1());
    pipelineNode.AddChild(new SimpleTestNodeA2()); 

    //Create the subject and execute the pipeline on it.
    var subject = new TestObjectA();
    NodeResult result = await pipelineNode.ExecuteAsync(subject);

Basic Nodes

These are the nodes that contain functionality that runs against the subject of the pipeline. Basically, this is where most of your code goes.

Node/INode - The simplest node type. This is overridden to provide functionality.

FuncNode/IFuncNode - Inherits node and allows functions to be assigned to provide functionality.

Node/FuncNode Usage

  • Override PerformExecute/PerformExecuteAsync to perform operations on the subject.

  • Override ShouldExecute to determine if the node should execute.

  • PerformExecuteFuncAsync - Property that accepts a function to perform on the subject.

  • ShouldExecuteFuncAsync - Property that accepts a function to determine if the node should be executed. Not strongly typed.

  • AddShouldExecute - Extension method to set ShouldExecuteFunc in a strongly typed manner. Has to be done this way to enable variance.

Example of overriding to provide functionality

    public class SimpleTestNodeA1 : Node<TestObjectA>
    {
        public override Task<bool> ShouldExecuteAsync(ExecutionContext<TestObjectA> context)
        {
            return Task.FromResult(true);
        }

        protected override Task<NodeResultStatus> PerformExecuteAsync(ExecutionContext<TestObjectA> context)
        {
            context.Subject.TestValueString = "Completed";
            return Task.FromResult(NodeResultStatus.Succeeded);
        }
    }

Example of providing functionality via functions

    var node = new Node<TestObjectA>();

    node.AddShouldExecute = context => Task.FromResult(context.Subject.TestValueInt == 5);
    node.ExecutedFuncAsync = context => 
        { 
          context.Subject.TestValueString = "Completed"; 
          return Task.FromResult(NodeResultStatus.Succeeded); 
        };

Multi Nodes

The following nodes allow you to organize and run other nodes together. These nodes are sealed and are intended for direct use. If you wish to create a custom MultiNode, reference Abstract Multi Nodes below.

PipelineNode/IPipelineNode - Runs a group of nodes serially on the subject. This will be the root node of most flows.

Pipeline Node

    var pipelineNode = new PipelineNode<TestObjectA>();

    pipelineNode.AddChild(new SimpleTestNodeA1());
    pipelineNode.AddChild(new SimpleTestNodeA2());

    var testObject = new TestObjectA();
    NodeResult result = await pipelineNode.ExecuteAsync(testObject);

GroupNode/IGroupNode - An aggregation of nodes that are run on a subject using the asyncrhonous Task.WhenAll pattern.

Group Node

    var groupNode = new GroupNode<TestObjectA>();

    groupNode.AddChild(new SimpleTestNodeA1());
    groupNode.AddChild(new SimpleTestNodeA2());

    var testObject = new TestObjectA();
    NodeResult result =  await groupNode.ExecuteAsync(testObject);

FirstMatchNode/IFirstMatchNode - An aggregation of nodes of which the first matching its ShouldExecute condition is run on the subject.

First Match Node

    var matchNode = new FirstMatchNode<TestObjectA>();

    var firstNode = new SimpleTestNodeA1();
    firstNode.ShouldExecuteFunc = 
        context => Task.FromResult(context.Subject.TestValueInt == 0);
    matchNode.AddChild(firstNode);

    var secondNode = new SimpleTestNodeA2();
    secondNode.ShouldExecuteFunc = 
        context => Task.FromResult(context.Subject.TestValueInt == 1);
    matchNode.AddChild(secondNode);

    var testObject = new TestObjectA();
    NodeResult result = await matchNode.ExecuteAsync(testObject);

Abstract Multi Nodes

These nodes are synonymous with the above nodes, but are abstract and intended to serve as a base class for custom implementations. If you wish to create your own multi-node, inherit from one of these. These abstract classes contain all of the functionality of the corresponding concrete multi-nodes (the concrete multi-nodes directly inherit from these).

PipelineNodeBase/IPipelineNodeBase - Base for concrete pipelines.

GroupNodeBase/IGroupNodeBase - Base for concrete group nodes.

FirstMatchNodeBase/IFirstMatchNodeBase - Base for concrete first match nodes.

Transition Nodes

In some cases, you need to transition during pipeline execution from one subject type to another. To accomplish this, use the TransitionNode or TransitionFuncNode.
These nodes take a source type and a destination type and allow you to perform any necessary transitioning from source to destination before execution, and allow the original source to be updated after execution. Transition nodes take a child node to execute after transition occurs.
If the source reference has been changed during the result transition, a ChangeSubject call is automatically made to set the correct subject on the ExecutionContext. The aggregate result and any exceptions are passed back to the source node that called the transition node.

Transition Node

ChildNode - Assigns a child node that is executed after the transition to the destination type occurs.

TransitionSourceAsync - Transitions the source to the destination type.

TransitionResultAsync - Transitions the source after node execution based on the destination node results.

TransitionSourceFuncAsync - In TransitionFuncNode, allows assignment of source to destination transition function.

TransitionResultFuncAsync - In TransitionFuncNode, allows assignment of post-run source transition function.

    public class SimpleTransitionNode : TransitionNode<TestObjectA, TestObjectB>
    {
        protected override TestObjectB TransitionSource(ExecutionContext<TestObjectA> sourceContext)
        {
            return new TestObjectB();
        }

        protected override TestObjectA TransitionResult(ExecutionContext<TestObjectA> sourceContext, NodeResult result)
        {
            return sourceContext.Subject;
        }
    } 

Or

    var pipelineNode = new PipelineNode<TestObjectA>();

    pipelineNode.AddChild(new TransitionFuncNode<TestObjectA, TestObjectB>
    {
        ChildNode = new SimpleTestNodeB1(),
        TransitionSourceFunc = ctxt => new TestObjectB()
    });

Should Execute Blocks

In some cases, you would like to create a reusable rule to determine if a node should execute, but keep the logic independent of the node itself. In this case, there is the ShouldExecuteBlock. This block provides one method to implement, ShouldExecuteAsync, which returns a true or false:

    public class ShouldNotExecuteBlockA : ShouldExecuteBlock<TestObjectA>
    {
        public override Task<bool> ShouldExecuteAsync(IExecutionContext<TestObjectA> context)
        {
            return Task.FromResult(false);
        }
    }

To use the ShouldExecuteBlock, call the AddShouldExecuteBlock method of the Node:

    var node = new FuncNode<TestObjectA>();
    node.AddShouldExecuteBlock(new ShouldNotExecuteBlockA());

Running Nodes

In order to run a node, you can call one of the following methods, which exist on all nodes:

ExecuteAsync - Executes the node given either a subject (the object your are sending through the flow) or an ExecutionContext that contains the subject.

ExecuteManyAsync - Just like ExecuteAsync, but accepts an enumerable of subjects. Executes the tasks asynchrounously, so several could run simultaneously. The results are aggregated and returnd via a single NodeResult

ExecuteManySeriallyAsync - Executes many, but awaits each subject so that they are guaranteed to execute serially.

ExecutionContext

The execution context flows through all nodes in the flow. The execution context contains options for running the flow as well as the instance of the subject that the flow is executed on. The type of the ExecutionContext is covariant, so it will allow ExecutionContexts based on inherited types to utilize a flow designed for a base type.

Subject - This is the main subject instance that all nodes in the flow operate on. If it is necessary to change the subject reference, use the ChangeSubject() method of the ExecutionContext to do so.

State - The execution context also contains a dynamic State property that can be used to flow any random state needed for the workflow. Any node in the flow can update the state or add dynamic properties to the state.

    var context = new ExecutionContext<object>(new object());
    context.State.Foo = "Bar";

GlobalOptions - ExecutionOptions that are set in the context and applied to every node.

ParentResult - The root result of this node execution and all of its children.

CancelProcessing - Cancels any further processing of the flow. This only cancels the current subject iteration of an ExecuteMany.

ExecutionOptions

The execution options impact the behavior of node execution.

ContinueOnFailure - Continue execution of other nodes under the parent if this node fails. Defaults to false.

ThrowOnException - Should a node execution exception throw or register as a failure and store the exception in the NodeResults exception property. Defaults to false.

DegreeOfParallelism - The maximum number of parallel operations that are used to process the subjects when calling ExecuteManyAsync.

NodeResult

When a node executes, it returns a NodeResult and also exposes the current or last NodeResult in the Result property.
The NodeResult will contain:

NodeResultStatus - Represents the status of this node. If this is a parent node, it represents a rollup status of all child nodes.

Subject - A reference to the subject, stored as an object.

GetSubjectAs - Allows a strongly typed subject to be returned.

ChildResults - A collection of child result nodes corresponding to the current node's children.

Exception - An Exception if any exception occurred during execution of the node.

GetFailExceptions() - This method aggregates all the exceptions on the failure path of the current node and returns them as an IEnumerable<Exception>.
This includes any exception from nodes that contributed to a failure status of the current. It's important to know that if a PipelineNode is set to ContinueOnFailure and some of the nodes succeed, the PipelineNode will have a status of PartiallySucceeded and as such will not have failures added to this collection.

NodeResultStatus

Each node returns a result that contains a status when run. The status will be one of the following:

  • NotRun - Node has not been run
  • SucceededWithErrors - Node is flagged as succeeded, but some error occurred. Typically indicates that a subnode failed but "ContinueOnError" was set to true.
  • Succeeded - Node Succeed
  • Failed - Node reported a failure or an exception was thrown during the execution of the node.

NodeRunStatus

Represents the run status of the node. The status will be one of the following:

  • NotRun - The node has not been run
  • Running - The node is in process
  • Completed - The node has completed
  • Faulted - The node faulted (threw an exception)

This is based on the Banzai package, found here. The original repository has now been EOL'ed, and so has been moved here.