pipeline.runner

PipelineRunner: Orchestrator for executing multiple pipelines with control flow.

This module provides the PipelineRunner class which enables:

  • Sequential execution of multiple pipelines

  • Conditional branching (if/else)

  • Loops (while, for)

  • Inter-pipeline communication via port connections

  • Workflow visualization

  • Save/load complete workflows to .pipes files

Classes

class pipeline.runner.FlowNodeType

Types of flow control nodes

Inherits from:

str, Enum

Attributes:

RUN = 'run'
IF = 'if'
WHILE = 'while'
FOR = 'for'
PARALLEL = 'parallel'
class pipeline.runner.ConditionResult

Result of condition evaluation

Inherits from:

BaseModel

Attributes:

passed: bool = <ast.Call object at 0x0000028E223EB430>
message: str = <ast.Call object at 0x0000028E223EB2E0>
data: dict[(str, Any)] = <ast.Call object at 0x0000028E223EB070>
class pipeline.runner.PipelinePortReference

Reference to a module port within a specific pipeline.

Enables IDE-friendly inter-pipeline connections using >> operator.

Usage:
    runner["pipe1"]["mod1"].OutputPort >>         runner["pipe2"]["mod2"].InputPort

Methods:

__init__(pipeline_name: str, port_ref: PortReference, runner: PipelineRunner | None = None)
class pipeline.runner.PipelineConnection

Connection between pipelines via module ports

Inherits from:

BaseModel

Attributes:

source_pipeline: str = <ast.Call object at 0x0000028E22388790>
source_module: str = <ast.Call object at 0x0000028E22388640>
source_port: str = <ast.Call object at 0x0000028E223884F0>
target_pipeline: str = <ast.Call object at 0x0000028E22388490>
target_module: str = <ast.Call object at 0x0000028E22388250>
target_port: str = <ast.Call object at 0x0000028E223880A0>
class pipeline.runner.FlowNode

Base class for flow control nodes

Inherits from:

BaseModel

Methods:

execute(context: RunnerContext) str | None

Execute this node and return next node_id to execute

Attributes:

node_id: str = <ast.Call object at 0x0000028E2238BEB0>
node_type: FlowNodeType = <ast.Call object at 0x0000028E2238B7C0>
next_node: str | None = <ast.Call object at 0x0000028E2238B970>
class pipeline.runner.RunNode

Node that executes a pipeline

Inherits from:

FlowNode

Methods:

execute(context: RunnerContext) str | None

Execute the pipeline and return next node

Attributes:

node_type: FlowNodeType = <ast.Call object at 0x0000028E2238B5B0>
pipeline_name: str = <ast.Call object at 0x0000028E22389C00>
class pipeline.runner.IfNode

Conditional branching node

Inherits from:

FlowNode

Methods:

execute(context: RunnerContext) str | None

Evaluate condition and return appropriate branch

Attributes:

node_type: FlowNodeType = <ast.Call object at 0x0000028E2238B310>
condition: str = <ast.Call object at 0x0000028E2238B4F0>
true_branch: str = <ast.Call object at 0x0000028E2176FFD0>
false_branch: str | None = <ast.Call object at 0x0000028E2176FE20>
class pipeline.runner.WhileNode

While loop node

Inherits from:

FlowNode

Methods:

execute(context: RunnerContext) str | None

Execute while loop

Attributes:

node_type: FlowNodeType = <ast.Call object at 0x0000028E2176F610>
condition: str = <ast.Call object at 0x0000028E2176F430>
body_node: str = <ast.Call object at 0x0000028E2176F2E0>
max_iterations: int = <ast.Call object at 0x0000028E2176F190>
class pipeline.runner.ForNode

For loop node (fixed iteration count)

Inherits from:

FlowNode

Methods:

execute(context: RunnerContext) str | None

Execute for loop

Attributes:

node_type: FlowNodeType = <ast.Call object at 0x0000028E217496F0>
iterations: int | str = <ast.Call object at 0x0000028E217498D0>
body_node: str = <ast.Call object at 0x0000028E21749A20>
iteration_var: str = <ast.Call object at 0x0000028E21749C00>
class pipeline.runner.RunnerContext

Execution context for PipelineRunner

Methods:

__init__(runner: PipelineRunner)
evaluate_condition(condition: str) ConditionResult

Evaluate a condition string

evaluate_expression(expression: str) Any

Evaluate an expression and return the result

class pipeline.runner.PipelineRunnerResult

Results from workflow execution

Inherits from:

BaseModel

Methods:

get_pipeline_result(pipeline_name: str) PipelineResult | None

Get result from specific pipeline

get_final_outputs() dict[str, Any]

Get outputs from the last executed pipeline

Attributes:

success: bool = <ast.Call object at 0x0000028E2171BCD0>
pipeline_results: dict[(str, Any)] = <ast.Call object at 0x0000028E2171B340>
execution_trace: list[str] = <ast.Call object at 0x0000028E2171AAA0>
iteration_counts: dict[(str, int)] = <ast.Call object at 0x0000028E2171B2B0>
total_duration: float = <ast.Call object at 0x0000028E2171ABC0>
error_message: str = <ast.Call object at 0x0000028E2171AD40>
class pipeline.runner.PipelineRunnerData

Complete workflow data for serialization.

Includes both runner configuration and all pipeline definitions.

Inherits from:

BaseModel

Attributes:

format_version: str = <ast.Call object at 0x0000028E2171BC10>
runner_name: str = <ast.Call object at 0x0000028E21712DA0>
runner_workspace: str | None = <ast.Call object at 0x0000028E217101C0>
pipelines: dict[(str, dict)] = <ast.Call object at 0x0000028E217129B0>
connections: list[dict] = <ast.Call object at 0x0000028E21713E80>
flow: dict = <ast.Call object at 0x0000028E21711180>
class pipeline.runner.PipelineRunner

Runner for executing multiple pipelines with control flow.

Supports:

  • Sequential execution

  • Conditional branching (if/else)

  • Loops (while, for)

  • Inter-pipeline communication via port connections (using >> operator)

  • Workflow visualization

Examples

>>> runner = PipelineRunner("MyWorkflow")
>>> runner.add_pipeline("preprocess", preprocess_pipe)
>>> runner.add_pipeline("analyze", analyze_pipe)
>>>
>>> # Connect using >> operator
>>> runner["preprocess"]["module1"].OutputPort >> \
>>>     runner["analyze"]["module2"].InputPort
>>>
>>> # Define flow
>>> runner.add_run_node("start", "preprocess", next_node="loop")
>>> runner.add_while_node("loop", "iterations['preprocess'] < 10", "start")
>>>
>>> # Visualize
>>> runner.draw_workflow("my_workflow.svg")
>>>
>>> # Run
>>> result = runner.run()
>>>
>>> # Save everything
>>> runner.save("workflow.pipes")

Methods:

__init__(name: str = 'workflow', workspace: str | Path | None = None)

Initialize PipelineRunner.

Parameters

namestr, default: “workflow”

Name of the workflow

workspacestr | Path | None, default: None

Working directory for saving workflow output files

add_pipeline(name: str, pipeline: PipeLine) PipelineRunner

Add a pipeline to the runner.

Parameters

namestr

Name to identify this pipeline in the workflow

pipelinePipeLine

Pipeline instance to add

Returns

Any
PipelineRunner
    Self for method chaining
connect(source_pipeline: str, source_module: str, source_port: str, target_pipeline: str, target_module: str, target_port: str) PipelineRunner

Connect a port from one pipeline to another (alternative to >> operator).

Parameters

source_pipelinestr

Name of source pipeline

source_modulestr

Name of module in source pipeline

source_portstr

Name of output port in source module

target_pipelinestr

Name of target pipeline

target_modulestr

Name of module in target pipeline

target_portstr

Name of input port in target module

Returns

Any
PipelineRunner
    Self for method chaining
add_run_node(node_id: str, pipeline_name: str, next_node: str | None = None) PipelineRunner

Add a node that runs a pipeline.

Parameters

node_idstr

Unique identifier for this node

pipeline_namestr

Name of pipeline to execute

next_nodestr | None, default: None

Next node to execute after this one

Returns

Any
PipelineRunner
    Self for method chaining
add_if_node(node_id: str, condition: str, true_branch: str, false_branch: str | None = None, next_node: str | None = None) PipelineRunner

Add a conditional branching node.

Parameters

node_idstr

Unique identifier for this node

conditionstr

Python expression to evaluate (has access to results, shared, iterations, loops)

true_branchstr

Node ID to execute if condition is True

false_branchstr | None, default: None

Node ID to execute if condition is False

next_nodestr | None, default: None

Fallback next node if no branch is taken

Returns

Any
PipelineRunner
    Self for method chaining

Examples

>>> runner.add_if_node("check", "results['validate'].success", "process", "error")
add_while_node(node_id: str, condition: str, body_node: str, max_iterations: int = 100, next_node: str | None = None) PipelineRunner

Add a while loop node.

Parameters

node_idstr

Unique identifier for this node

conditionstr

Loop condition expression

body_nodestr

First node in loop body

max_iterationsint, default: 100

Safety limit for maximum iterations

next_nodestr | None, default: None

Node to execute after loop completes

Returns

Any
PipelineRunner
    Self for method chaining

Examples

>>> runner.add_while_node("loop", "iterations['process'] < 10", "body", max_iterations=20)
add_for_node(node_id: str, iterations: int | str, body_node: str, iteration_var: str = 'i', next_node: str | None = None) PipelineRunner

Add a for loop node.

Parameters

node_idstr

Unique identifier for this node

iterationsint | str

Number of iterations (can be expression string)

body_nodestr

First node in loop body

iteration_varstr, default: “i”

Variable name for iteration counter (accessible in conditions)

next_nodestr | None, default: None

Node to execute after loop completes

Returns

Any
PipelineRunner
    Self for method chaining

Examples

>>> runner.add_for_node("batch_loop", iterations=5, body_node="process", iteration_var="batch_num")
run() PipelineRunnerResult

Execute the workflow.

Returns

Any
PipelineRunnerResult
    Result object containing execution details and pipeline results
save(file: str | Path) None

Save complete workflow to a single .pipes file in JSON format.

Includes all pipeline definitions and runner configuration in one file.

Parameters

filestr | Path

Path to save the .pipes file (must have .pipes extension)

load(file: str | Path, silent: bool = False) PipelineRunner

Load complete workflow from a single .pipes file or URL.

Reconstructs all pipelines and runner configuration.

Parameters

filestr | Path

Path to the .pipes file (must have .pipes extension) or URL to download from

silentbool, default: False

If True, suppresses printed messages (default: False)

Returns

Any
PipelineRunner
    Self for method chaining
draw_workflow(filename: str | None = None, format: str = 'svg', show_connections: bool = True, show_flow_control: bool = True, layout: str = 'TB', view: bool = False) str | None

Draw a visual representation of the workflow showing pipelines and their connections.

Similar to Pipeline.draw_pipeline(), but shows the meta-level workflow structure.

Parameters

formatstr, default: ‘svg’

Output format (‘png’, ‘svg’, ‘pdf’, ‘dot’, etc.)

show_connectionsbool, default: True

Whether to show inter-pipeline port connections

show_flow_controlbool, default: True

Whether to show flow control nodes (if/while/for)

layoutstr, default: ‘TB’

Graph layout direction (‘TB’=top-bottom, ‘LR’=left-right, ‘BT’=bottom-top, ‘RL’=right-left)

viewbool, default: False

Whether to automatically open the generated file

Returns

Any
str | None
    Path to the generated file, or None if visualization is not available

Examples

>>> runner.draw_workflow('my_workflow', format='svg', view=True)
>>> runner.draw_workflow(show_flow_control=True, layout='LR')
class pipeline.runner.PipelineProxy

Proxy to enable runner[“pipeline”][“module”].Port syntax.

Wraps module access to return PipelinePortReference objects.

Methods:

__init__(pipeline_name: str, pipeline: PipeLine, runner: PipelineRunner)
class pipeline.runner.ModuleProxy

Proxy to enable runner[“pipeline”][“module”].Port syntax.

Methods:

__init__(pipeline_name: str, module: PipeModule, runner: PipelineRunner)