pipeline.flowControl
Flow control constructs for pipelines.
This module provides controller classes that enable flow control within a single pipeline,
such as loops and conditional execution. Unlike PipelineRunner which orchestrates multiple
pipelines, these controllers work within a single PipeLine to control module execution flow.
Classes:
ForEachController: Execute inner modules for each item in an iteration list.
Notes
For if/else branching, use :class:``pipeline.pipeline.ConditionalLink`` (or
:meth:``pipeline.pipeline.Link.when``) instead of a controller. Conditional
routing is a property of edges (links), not nodes (modules).
Future:
WhileController: Loop while a condition is true.
Classes
- class pipeline.flowControl.FlowController
Base class for all flow control constructs within a pipeline.
Flow controllers are special modules that control execution flow rather than processing data directly. They contain inner modules and manage their execution based on specific control logic (iteration, conditions, etc.).
Unlike regular modules:
They don’t have business logic in execute() - the pipeline handles execution
They contain inner modules and links
They have special port semantics (forwarded, aggregated)
Attributes
- _is_controllerClassVar[bool]
Marker flag for Pipeline to detect controller modules.
- _controller_typeClassVar[str]
Type of controller (e.g., “for_each”, “if”, “while”).
- Inherits from:
PipeModule
Methods:
- __init__(mname: str, auto_run: bool = True)
Initialize the FlowController.
Parameters
- mnamestr
Unique name for this controller. auto_run : bool, default True Whether to execute automatically when inputs are ready.
- get_cal_params() dict[str, Any]
Get calculation parameters for serialization.
Excludes inner_modules and inner_links properties since they are serialized separately in the ControllerData structure.
- execute() Any
Execute is called after all iterations/conditions are handled.
For controllers, this typically aggregates results. The actual iteration/condition logic is handled by the Pipeline.
Properties:
- inner_modules
Get the list of inner modules.
- inner_links
Get the list of inner links.
- class pipeline.flowControl.ForEachController
Execute inner modules for each item in an iteration list.
This controller enables loop semantics within a pipeline. It: #. Takes an iteration list via InputIterData port #. For each item, applies bindings to inner module attributes #. Executes inner modules in order #. Aggregates specified outputs from all iterations into lists Design Philosophy: - The controller manages iteration logic and output aggregation - External modules can link directly to inner modules for: - Static input data (data that doesn't change between iterations) - Direct output data (to get the final/last iteration's result) - Use iteration_bindings for data that changes per iteration - Use output_ports to explicitly specify which outputs to aggregate into lists - By default (output_ports=None), no outputs are aggregated on the controller Output Access Patterns: - **Need aggregated list of all results?** → Specify output_ports and connect to controller
Examples
>>> # Create inner modules >>> reader = GdimTableReader(mname="reader") >>> writer = GdimTableWriter(mname="writer") >>> >>> # Define inner data flow >>> inner_links = [Link(reader, writer, "OutputTables", "InputData")] >>> >>> # Create controller - only aggregate writer's output >>> foreach = ForEachController( ... mname="write_projects", ... links=inner_links, ... iteration_bindings={ ... "reader.proj_id": "proj_id", ... "writer.proj_id": "proj_id", ... }, ... output_ports=["writer.OutputResult"], # Only aggregate this port ... ) >>> >>> # Use in pipeline >>> pipeline.modules.add(foreach) >>> config_source.OutputConfigs >> foreach.InputIterData # Iteration data >>> static_source.OutputTemplate >> reader.InputTemplate # Static input to inner module >>> reader.OutputDebugInfo >> debug_collector.InputData # Direct output from inner module
- Inherits from:
FlowController
Methods:
- __init__(mname: str = 'ForEachController', links: list[Link] | Link | None = None, iteration_bindings: dict[str, str] | None = None, standalone_modules: list[PipeModule] | None = None, auto_run: bool = True, output_ports: list[str] | None = None)
Initialize the ForEachController.
Parameters
- mnamestr
Unique name for this controller. links : list[Link] | Link, optional Links between inner modules. A single :class:
Linkis accepted and wrapped automatically. Modules are extracted from links automatically. iteration_bindings : dict[str, str], optional Maps “module_name.attribute_name” → “item_key”. For each iteration, module.attribute = item[item_key].
Examples
If None (default), no output ports will be created on the controller. Use this to specify exactly which outputs you want to aggregate. **Note**: Controller outputs are always aggregated as lists across iterations. If you need only the last iteration's result, connect directly to the inner module's output port instead of using the controller's aggregated output. Output Port Naming Controller output ports are named based on inner module output ports: #. **No collision**: Port keeps its original name - Inner: ``writer.OutputResult`` → Controller: ``OutputResult`` #. **Name collision**: Port name gets module suffix - Inner: ``writer.OutputResult`` + ``reader.OutputResult`` - Controller: ``OutputResult_writer`` + ``OutputResult_reader`` #. **Reserved ports**: ``OutputAttributes`` is never aggregated #. **Connected ports**: Ports with inner links are not aggregated - Only unconnected output ports become controller outputs
- reset_inner_modules_for_iteration() None
Clear per-iteration state on inner modules before each foreach iteration.
Resets
``executed``/``executed_time``and clears port data on inner links so join nodes (e.g. merge modules with two parents) do not see outputs from the previous iteration. Ports targeted by``iteration_bindings``and ports wired from outside the controller are left untouched.
- execute() dict[str, Any]
Aggregate results from all iterations as lists.
This method is called by Pipeline after all iterations complete. All outputs are aggregated as lists across iterations.
Returns
- Any
dict[str, Any] Aggregated output data for each output port as lists.
Notes
If you need only the last iteration's result, connect directly to the inner module's output port instead of using the controller's output.
- dump_module_params()
Dump controller parameters including inner modules and links.
Returns
- Any
ControllerData Serialized controller data including inner structure.
- restore_module_params(module_data) None
Restore controller from serialized data.
Parameters
- module_dataControllerData
Serialized controller data.
Properties:
- iteration_bindings
Get the iteration bindings configuration.
- current_iteration
Get the current iteration index.
- total_iterations
Get the total number of iterations.
- iteration_results
Get results from all iterations.
Attributes:
- InputIterData: PortReference[PortTypeHint.IterationData]