pipeline.flowControl

Flow control constructs for pipelines.

This module provides controller classes that enable flow control within a single pipeline,
such as loops and conditional execution. Unlike PipelineRunner which orchestrates multiple
pipelines, these controllers work within a single PipeLine to control module execution flow.

Classes:
    ForEachController: Execute inner modules for each item in an iteration list.

Notes

For if/else branching, use :class:``pipeline.pipeline.ConditionalLink`` (or
    :meth:``pipeline.pipeline.Link.when``) instead of a controller.  Conditional
    routing is a property of edges (links), not nodes (modules).

Future:
    WhileController: Loop while a condition is true.

Classes

class pipeline.flowControl.FlowController

Base class for all flow control constructs within a pipeline.

Flow controllers are special modules that control execution flow rather than processing data directly. They contain inner modules and manage their execution based on specific control logic (iteration, conditions, etc.).

Unlike regular modules:

  • They don’t have business logic in execute() - the pipeline handles execution

  • They contain inner modules and links

  • They have special port semantics (forwarded, aggregated)

Attributes

_is_controllerClassVar[bool]

Marker flag for Pipeline to detect controller modules.

_controller_typeClassVar[str]

Type of controller (e.g., “for_each”, “if”, “while”).

Inherits from:

PipeModule

Methods:

__init__(mname: str, auto_run: bool = True)

Initialize the FlowController.

Parameters

mnamestr

Unique name for this controller. auto_run : bool, default True Whether to execute automatically when inputs are ready.

get_cal_params() dict[str, Any]

Get calculation parameters for serialization.

Excludes inner_modules and inner_links properties since they are serialized separately in the ControllerData structure.

execute() Any

Execute is called after all iterations/conditions are handled.

For controllers, this typically aggregates results. The actual iteration/condition logic is handled by the Pipeline.

Properties:

inner_modules

Get the list of inner modules.

Get the list of inner links.

class pipeline.flowControl.ForEachController

Execute inner modules for each item in an iteration list.

This controller enables loop semantics within a pipeline. It:
#. Takes an iteration list via InputIterData port
#. For each item, applies bindings to inner module attributes
#. Executes inner modules in order
#. Aggregates specified outputs from all iterations into lists

Design Philosophy:
- The controller manages iteration logic and output aggregation
- External modules can link directly to inner modules for:
  - Static input data (data that doesn't change between iterations)
  - Direct output data (to get the final/last iteration's result)
- Use iteration_bindings for data that changes per iteration
- Use output_ports to explicitly specify which outputs to aggregate into lists
- By default (output_ports=None), no outputs are aggregated on the controller

Output Access Patterns:
- **Need aggregated list of all results?** → Specify output_ports and connect to controller

Examples

>>> # Create inner modules
>>> reader = GdimTableReader(mname="reader")
>>> writer = GdimTableWriter(mname="writer")
>>>
>>> # Define inner data flow
>>> inner_links = [Link(reader, writer, "OutputTables", "InputData")]
>>>
>>> # Create controller - only aggregate writer's output
>>> foreach = ForEachController(
...     mname="write_projects",
...     links=inner_links,
...     iteration_bindings={
...         "reader.proj_id": "proj_id",
...         "writer.proj_id": "proj_id",
...     },
...     output_ports=["writer.OutputResult"],  # Only aggregate this port
... )
>>>
>>> # Use in pipeline
>>> pipeline.modules.add(foreach)
>>> config_source.OutputConfigs >> foreach.InputIterData  # Iteration data
>>> static_source.OutputTemplate >> reader.InputTemplate  # Static input to inner module
>>> reader.OutputDebugInfo >> debug_collector.InputData   # Direct output from inner module
Inherits from:

FlowController

Methods:

__init__(mname: str = 'ForEachController', links: list[Link] | Link | None = None, iteration_bindings: dict[str, str] | None = None, standalone_modules: list[PipeModule] | None = None, auto_run: bool = True, output_ports: list[str] | None = None)

Initialize the ForEachController.

Parameters

mnamestr

Unique name for this controller. links : list[Link] | Link, optional Links between inner modules. A single :class:Link is accepted and wrapped automatically. Modules are extracted from links automatically. iteration_bindings : dict[str, str], optional Maps “module_name.attribute_name” → “item_key”. For each iteration, module.attribute = item[item_key].

Examples

If None (default), no output ports will be created on the controller.
    Use this to specify exactly which outputs you want to aggregate.

    **Note**: Controller outputs are always aggregated as lists across iterations.
    If you need only the last iteration's result, connect directly to the inner
    module's output port instead of using the controller's aggregated output.

Output Port Naming
Controller output ports are named based on inner module output ports:
#. **No collision**: Port keeps its original name
- Inner: ``writer.OutputResult`` → Controller: ``OutputResult``
#. **Name collision**: Port name gets module suffix
- Inner: ``writer.OutputResult`` + ``reader.OutputResult``
- Controller: ``OutputResult_writer`` + ``OutputResult_reader``
#. **Reserved ports**: ``OutputAttributes`` is never aggregated
#. **Connected ports**: Ports with inner links are not aggregated
- Only unconnected output ports become controller outputs
reset_inner_modules_for_iteration() None

Clear per-iteration state on inner modules before each foreach iteration.

Resets ``executed`` / ``executed_time`` and clears port data on inner links so join nodes (e.g. merge modules with two parents) do not see outputs from the previous iteration. Ports targeted by ``iteration_bindings`` and ports wired from outside the controller are left untouched.

get_cal_params() dict[str, Any]

Get calculation parameters for serialization.

execute() dict[str, Any]

Aggregate results from all iterations as lists.

This method is called by Pipeline after all iterations complete. All outputs are aggregated as lists across iterations.

Returns

Any
dict[str, Any]
    Aggregated output data for each output port as lists.

Notes

If you need only the last iteration's result, connect directly to the
inner module's output port instead of using the controller's output.
reset_iteration_state() None

Reset iteration state for a new run.

dump_module_params()

Dump controller parameters including inner modules and links.

Returns

Any
ControllerData
    Serialized controller data including inner structure.
restore_module_params(module_data) None

Restore controller from serialized data.

Parameters

module_dataControllerData

Serialized controller data.

Properties:

iteration_bindings

Get the iteration bindings configuration.

current_iteration

Get the current iteration index.

total_iterations

Get the total number of iterations.

iteration_results

Get results from all iterations.

Attributes:

InputIterData: PortReference[PortTypeHint.IterationData]