pipeline.runner
PipelineRunner: Orchestrator for executing multiple pipelines with control flow.
This module provides the PipelineRunner class which enables:
Sequential execution of multiple pipelines
Conditional branching (if/else)
Loops (while, for)
Inter-pipeline communication via port connections
Workflow visualization
Save/load complete workflows to .pipes files
Classes
- class pipeline.runner.FlowNodeType
Types of flow control nodes
- Inherits from:
str, Enum
Attributes:
- RUN = 'run'
- IF = 'if'
- WHILE = 'while'
- FOR = 'for'
- PARALLEL = 'parallel'
- class pipeline.runner.ConditionResult
Result of condition evaluation
- Inherits from:
BaseModel
Attributes:
- passed: bool = <ast.Call object at 0x0000028E223EB430>
- message: str = <ast.Call object at 0x0000028E223EB2E0>
- data: dict[(str, Any)] = <ast.Call object at 0x0000028E223EB070>
- class pipeline.runner.PipelinePortReference
Reference to a module port within a specific pipeline.
Enables IDE-friendly inter-pipeline connections using >> operator. Usage: runner["pipe1"]["mod1"].OutputPort >> runner["pipe2"]["mod2"].InputPortMethods:
- __init__(pipeline_name: str, port_ref: PortReference, runner: PipelineRunner | None = None)
- class pipeline.runner.PipelineConnection
Connection between pipelines via module ports
- Inherits from:
BaseModel
Attributes:
- source_pipeline: str = <ast.Call object at 0x0000028E22388790>
- source_module: str = <ast.Call object at 0x0000028E22388640>
- source_port: str = <ast.Call object at 0x0000028E223884F0>
- target_pipeline: str = <ast.Call object at 0x0000028E22388490>
- target_module: str = <ast.Call object at 0x0000028E22388250>
- target_port: str = <ast.Call object at 0x0000028E223880A0>
- class pipeline.runner.FlowNode
Base class for flow control nodes
- Inherits from:
BaseModel
Methods:
- execute(context: RunnerContext) str | None
Execute this node and return next node_id to execute
Attributes:
- node_id: str = <ast.Call object at 0x0000028E2238BEB0>
- node_type: FlowNodeType = <ast.Call object at 0x0000028E2238B7C0>
- next_node: str | None = <ast.Call object at 0x0000028E2238B970>
- class pipeline.runner.RunNode
Node that executes a pipeline
- Inherits from:
FlowNode
Methods:
- execute(context: RunnerContext) str | None
Execute the pipeline and return next node
Attributes:
- node_type: FlowNodeType = <ast.Call object at 0x0000028E2238B5B0>
- pipeline_name: str = <ast.Call object at 0x0000028E22389C00>
- class pipeline.runner.IfNode
Conditional branching node
- Inherits from:
FlowNode
Methods:
- execute(context: RunnerContext) str | None
Evaluate condition and return appropriate branch
Attributes:
- node_type: FlowNodeType = <ast.Call object at 0x0000028E2238B310>
- condition: str = <ast.Call object at 0x0000028E2238B4F0>
- true_branch: str = <ast.Call object at 0x0000028E2176FFD0>
- false_branch: str | None = <ast.Call object at 0x0000028E2176FE20>
- class pipeline.runner.WhileNode
While loop node
- Inherits from:
FlowNode
Methods:
- execute(context: RunnerContext) str | None
Execute while loop
Attributes:
- node_type: FlowNodeType = <ast.Call object at 0x0000028E2176F610>
- condition: str = <ast.Call object at 0x0000028E2176F430>
- body_node: str = <ast.Call object at 0x0000028E2176F2E0>
- max_iterations: int = <ast.Call object at 0x0000028E2176F190>
- class pipeline.runner.ForNode
For loop node (fixed iteration count)
- Inherits from:
FlowNode
Methods:
- execute(context: RunnerContext) str | None
Execute for loop
Attributes:
- node_type: FlowNodeType = <ast.Call object at 0x0000028E217496F0>
- iterations: int | str = <ast.Call object at 0x0000028E217498D0>
- body_node: str = <ast.Call object at 0x0000028E21749A20>
- iteration_var: str = <ast.Call object at 0x0000028E21749C00>
- class pipeline.runner.RunnerContext
Execution context for PipelineRunner
Methods:
- __init__(runner: PipelineRunner)
- evaluate_condition(condition: str) ConditionResult
Evaluate a condition string
- class pipeline.runner.PipelineRunnerResult
Results from workflow execution
- Inherits from:
BaseModel
Methods:
Attributes:
- success: bool = <ast.Call object at 0x0000028E2171BCD0>
- pipeline_results: dict[(str, Any)] = <ast.Call object at 0x0000028E2171B340>
- execution_trace: list[str] = <ast.Call object at 0x0000028E2171AAA0>
- iteration_counts: dict[(str, int)] = <ast.Call object at 0x0000028E2171B2B0>
- total_duration: float = <ast.Call object at 0x0000028E2171ABC0>
- error_message: str = <ast.Call object at 0x0000028E2171AD40>
- class pipeline.runner.PipelineRunnerData
Complete workflow data for serialization.
Includes both runner configuration and all pipeline definitions.
- Inherits from:
BaseModel
Attributes:
- format_version: str = <ast.Call object at 0x0000028E2171BC10>
- runner_name: str = <ast.Call object at 0x0000028E21712DA0>
- runner_workspace: str | None = <ast.Call object at 0x0000028E217101C0>
- pipelines: dict[(str, dict)] = <ast.Call object at 0x0000028E217129B0>
- connections: list[dict] = <ast.Call object at 0x0000028E21713E80>
- flow: dict = <ast.Call object at 0x0000028E21711180>
- class pipeline.runner.PipelineRunner
Runner for executing multiple pipelines with control flow.
Supports:
Sequential execution
Conditional branching (if/else)
Loops (while, for)
Inter-pipeline communication via port connections (using >> operator)
Workflow visualization
Examples
>>> runner = PipelineRunner("MyWorkflow") >>> runner.add_pipeline("preprocess", preprocess_pipe) >>> runner.add_pipeline("analyze", analyze_pipe) >>> >>> # Connect using >> operator >>> runner["preprocess"]["module1"].OutputPort >> \ >>> runner["analyze"]["module2"].InputPort >>> >>> # Define flow >>> runner.add_run_node("start", "preprocess", next_node="loop") >>> runner.add_while_node("loop", "iterations['preprocess'] < 10", "start") >>> >>> # Visualize >>> runner.draw_workflow("my_workflow.svg") >>> >>> # Run >>> result = runner.run() >>> >>> # Save everything >>> runner.save("workflow.pipes")
Methods:
- __init__(name: str = 'workflow', workspace: str | Path | None = None)
Initialize PipelineRunner.
Parameters
- namestr, default: “workflow”
Name of the workflow
- workspacestr | Path | None, default: None
Working directory for saving workflow output files
- add_pipeline(name: str, pipeline: PipeLine) PipelineRunner
Add a pipeline to the runner.
Parameters
- namestr
Name to identify this pipeline in the workflow
- pipelinePipeLine
Pipeline instance to add
Returns
- Any
PipelineRunner Self for method chaining
- connect(source_pipeline: str, source_module: str, source_port: str, target_pipeline: str, target_module: str, target_port: str) PipelineRunner
Connect a port from one pipeline to another (alternative to >> operator).
Parameters
- source_pipelinestr
Name of source pipeline
- source_modulestr
Name of module in source pipeline
- source_portstr
Name of output port in source module
- target_pipelinestr
Name of target pipeline
- target_modulestr
Name of module in target pipeline
- target_portstr
Name of input port in target module
Returns
- Any
PipelineRunner Self for method chaining
- add_run_node(node_id: str, pipeline_name: str, next_node: str | None = None) PipelineRunner
Add a node that runs a pipeline.
Parameters
- node_idstr
Unique identifier for this node
- pipeline_namestr
Name of pipeline to execute
- next_nodestr | None, default: None
Next node to execute after this one
Returns
- Any
PipelineRunner Self for method chaining
- add_if_node(node_id: str, condition: str, true_branch: str, false_branch: str | None = None, next_node: str | None = None) PipelineRunner
Add a conditional branching node.
Parameters
- node_idstr
Unique identifier for this node
- conditionstr
Python expression to evaluate (has access to results, shared, iterations, loops)
- true_branchstr
Node ID to execute if condition is True
- false_branchstr | None, default: None
Node ID to execute if condition is False
- next_nodestr | None, default: None
Fallback next node if no branch is taken
Returns
- Any
PipelineRunner Self for method chaining
Examples
>>> runner.add_if_node("check", "results['validate'].success", "process", "error")
- add_while_node(node_id: str, condition: str, body_node: str, max_iterations: int = 100, next_node: str | None = None) PipelineRunner
Add a while loop node.
Parameters
- node_idstr
Unique identifier for this node
- conditionstr
Loop condition expression
- body_nodestr
First node in loop body
- max_iterationsint, default: 100
Safety limit for maximum iterations
- next_nodestr | None, default: None
Node to execute after loop completes
Returns
- Any
PipelineRunner Self for method chaining
Examples
>>> runner.add_while_node("loop", "iterations['process'] < 10", "body", max_iterations=20)
- add_for_node(node_id: str, iterations: int | str, body_node: str, iteration_var: str = 'i', next_node: str | None = None) PipelineRunner
Add a for loop node.
Parameters
- node_idstr
Unique identifier for this node
- iterationsint | str
Number of iterations (can be expression string)
- body_nodestr
First node in loop body
- iteration_varstr, default: “i”
Variable name for iteration counter (accessible in conditions)
- next_nodestr | None, default: None
Node to execute after loop completes
Returns
- Any
PipelineRunner Self for method chaining
Examples
>>> runner.add_for_node("batch_loop", iterations=5, body_node="process", iteration_var="batch_num")
- run() PipelineRunnerResult
Execute the workflow.
Returns
- Any
PipelineRunnerResult Result object containing execution details and pipeline results
- save(file: str | Path) None
Save complete workflow to a single .pipes file in JSON format.
Includes all pipeline definitions and runner configuration in one file.
Parameters
- filestr | Path
Path to save the .pipes file (must have .pipes extension)
- load(file: str | Path, silent: bool = False) PipelineRunner
Load complete workflow from a single .pipes file or URL.
Reconstructs all pipelines and runner configuration.
Parameters
- filestr | Path
Path to the .pipes file (must have .pipes extension) or URL to download from
- silentbool, default: False
If True, suppresses printed messages (default: False)
Returns
- Any
PipelineRunner Self for method chaining
- draw_workflow(filename: str | None = None, format: str = 'svg', show_connections: bool = True, show_flow_control: bool = True, layout: str = 'TB', view: bool = False) str | None
Draw a visual representation of the workflow showing pipelines and their connections.
Similar to Pipeline.draw_pipeline(), but shows the meta-level workflow structure.
Parameters
- formatstr, default: ‘svg’
Output format (‘png’, ‘svg’, ‘pdf’, ‘dot’, etc.)
- show_connectionsbool, default: True
Whether to show inter-pipeline port connections
- show_flow_controlbool, default: True
Whether to show flow control nodes (if/while/for)
- layoutstr, default: ‘TB’
Graph layout direction (‘TB’=top-bottom, ‘LR’=left-right, ‘BT’=bottom-top, ‘RL’=right-left)
- viewbool, default: False
Whether to automatically open the generated file
Returns
- Any
str | None Path to the generated file, or None if visualization is not available
Examples
>>> runner.draw_workflow('my_workflow', format='svg', view=True) >>> runner.draw_workflow(show_flow_control=True, layout='LR')
- class pipeline.runner.PipelineProxy
Proxy to enable runner[“pipeline”][“module”].Port syntax.
Wraps module access to return PipelinePortReference objects.
Methods:
- __init__(pipeline_name: str, pipeline: PipeLine, runner: PipelineRunner)
- class pipeline.runner.ModuleProxy
Proxy to enable runner[“pipeline”][“module”].Port syntax.
Methods:
- __init__(pipeline_name: str, module: PipeModule, runner: PipelineRunner)