pipeline.pipeline

Classes

class pipeline.pipeline.GdimState

Runtime state for GDIM platform interaction.

Bundles all attributes needed for a pipeline to communicate with the GDIM data-management platform so they travel together as a single unit (e.g. when a sub-pipeline is created, the whole state is propagated in one assignment).

Attributes

tokenstr | None

Authentication token for the GDIM platform.

proj_idstr | None

Project ID on the GDIM platform.

tpl_idstr | None

Data template ID on the GDIM platform.

hoststr | None

GDIM platform host URL. ``None`` means the default host is used.

templateGdimTemplate | None

Cached GDIM template fetched from the platform. Populated lazily during pipeline execution; do not set this manually. proj_list : dict[str, GdimProjectInfo] | None Cached list of GDIM projects. Populated lazily during pipeline execution; do not set this manually.

Inherits from:

BaseModel

Attributes:

model_config = <ast.Call object at 0x0000028E221E8760>
token: str | None = None
proj_id: str | None = None
tpl_id: str | None = None
host: str | None = None
template: GdimTemplate | None = None
proj_list: dict[(str, GdimProjectInfo)] | None = None
class pipeline.pipeline.PipeLine

Pipeline system connecting together modules and adding a messaging system.

The Pipeline simplifies the configuration and execution of interconnected modules. It provides several features:

  1. Module management: Register, configure, and execute processing modules

  2. Parameter binding: Create high-level attributes that control module parameters

  3. Multi-binding attributes: Create a single attribute that controls multiple module

Attributes

app_namestr
Name of the pipeline application.  Always set — if not provided at
construction time a random ````pipeline_<hex8>```` identifier is assigned.
Can be overwritten by :meth:``restore_pipeline`` when loading from a
````.pipe```` file that has an ````app_name```` field.
.. tip:: Set a meaningful name so log records and Logfire spans are
easy to find::
pipeline = PipeLine(app_name="borehole_report")
app_titlestr | None

Title of the pipeline application

app_versionstr | None

Version of the pipeline application

gdim_stateGdimState

Runtime state for GDIM platform interaction (token, proj_id, tpl_id, host, cached template and project list). Use ``update_gdim_state()`` to set credentials before calling ``run()``.

llm_keystr | None

API key of LLM model

local_functions_pathstr | Path | None

Path to local functions file for custom operations

local_func_require_gdim_proj_listbool

If True, the pipeline will be considered to require gdim_proj_list if it contains PythonCoder modules.

workspacestr | Path | None

Working directory for saving pipeline output files data_to_save : dict[str, tuple[str, str, str | None]] Dictionary mapping data keys to their save information value - (data_type, name, module_name) hide_output_files : dict[str, str] | None Controls which module output files to exclude from results when running by gdi-server

steps_managerStepsManager | None

Ordered list of checkpoint steps for this pipeline

current_stepstr | None

Name of the step that was most recently executed via ``run(use_steps=True)``

last_stepbool

True after the final step has been executed

executed_stepslist[str]

Ordered history of step names that have been executed

verbosebool

When ``True``, structured log records are emitted for every module execution (start, success, failure) including input/output data previews. When ``False`` (default), only exceptions are logged. Console output is configured automatically when verbose=True. run_trace_file capture and the companion HTML report also require verbose=True for the current run (see below). To also send traces to the Logfire cloud UI, set the LOGFIRE_TOKEN environment variable and call :func:~gdi.pipeline.observability.configure_logging once before running:: from gdi.pipeline.observability import configure_logging configure_logging() pipeline.run(verbose=True)

run_trace_filestr | Path | None
Path to a **local debug trace file** (JSON) written at the end of each
``run()`` call when ``verbose=True``.  The file contains the same
structured records emitted during that run, serialised as a **JSON
array** so any JSON viewer can open it directly.
**Requires ``verbose=True``** for the run (on the instance or via
``run(verbose=True)``).  If ``verbose=False``, no trace collector is
attached, the JSON file is **not** updated for that run, and the HTML
step may fail (no file yet) or reflect a **previous** verbose run's
JSON if one is still on disk.
Each successful trace capture **overwrites** the previous JSON — it is a
snapshot of the last verbose run, not a persistent log.
If only a filename is given (no directory), the file is placed inside
````workspace```` (or the current working directory when ````workspace```` is
not set).  Example::
pipeline = PipeLine(
app_name="my_pipeline",
workspace="./output",
verbose=True,
run_trace_file="run_trace.json",   # → ./output/run_trace.json
)
pipeline.run()  # writes run_trace.json + run_trace.html
When ``verbose=True`` and ``run_trace_file`` is set, an HTML viewer is
also rendered automatically after each ``run()`` (same stem, ``.html``
extension), including after a failed run.  No separate flag is needed.
.. warning::
This is intended for **local development only**.  Do not use it as
a production logging solution — it provides no rotation, no
concurrency safety, and is overwritten on every run.
Inherits from:

PipelineVisualizationMixin, PipelineLoggerMixin, PipelineRequirementsMixin

Methods:

__init__(app_name: str | None = None, app_title: str | None = None, description: str | None = None, app_version: str | None = None, workspace: str | Path | None = None, local_functions_path: str | Path | None = None, verbose: bool = False, run_trace_file: str | Path | None = None) None

Initialize the PipeLine object.

Parameters

app_namestr | None, default: None

The name of the app.

app_titlestr | None, default: None

The title of the app.

descriptionstr | None, default: None

The description of the pipeline.

app_versionstr | None, default: None

The version of the app.

workspacestr | Path | None

Working directory for saving pipeline output files

verbosebool, default: False

When True, structured log records are emitted for every module execution. Also enables run_trace_file capture for that run.

run_trace_filestr | Path | None, default: None

Path to the local debug trace JSON file. See the class-level docstring for path resolution and examples. JSON and HTML are written only when verbose=True for the run (constructor default or run(verbose=True)). Can be overridden per-run via run(run_trace_file=...).

update_gdim_state(token: str | None = _UNSET, proj_id: str | None = _UNSET, tpl_id: str | None = _UNSET, host: str | None = _UNSET, template: GdimTemplate | None = _UNSET, proj_list: dict[str, GdimProjectInfo] | None = _UNSET) None

Update GDIM platform connection state.

This is the preferred way to configure GDIM credentials before calling ``run()``. When any credential field (``token``, ``proj_id``, ``tpl_id``, ``host``) changes its value, the cached ``template`` and ``proj_list`` are cleared automatically so they will be re-fetched on the next run.

Examples

>>> pipeline.update_gdim_state(token=log_in(), proj_id="1967572096104902658")

Add several links for the network.

Get a link by its name.

Remove a link from the pipelines, but the module will not be removed.

link: Link or str
    If it's str, it should be the name of the link.

Get all the links below the module.

Get all the links above the module.

run(from_module: PipeModule | str | None = None, return_results: bool | dict[str, str | list[str]] = True, verbose: bool | None = None, run_trace_file: str | Path | None = _UNSET, use_steps: bool = False) PipelineResult

Run the pipeline from start modules, a specified module, or the next checkpoint step.

Parameters

from_modulePipeModule | str | None, default: None

If None, run from all start modules. If a module name/object, run from that module. Ignored when ``use_steps=True``.

Returns

Any
PipelineResult
    Execution results.  When ``use_steps=True``, also contains
    ``current_step``, ``last_step``, and ``next_step``.
add_module(module: PipeModule) None

Add a module to the pipeline.

get_module(mname: str) PipeModule | None

Get a module by its name.

remove_module(module: PipeModule | str, with_modules_below: bool = False) None

Remove a module or with modules below, and all related links.

Parameters

modulePipeModule or str

If it’s str, it should be the name of the module.

with_modules_belowbool, default: False

If True, the removed module and all the modules below will be removed.

save_pipeline(file: str | Path | None = None) None | PipelineData

Save the pipeline data to a .pipe file and return a PipelineData object.

Returns

Any
PipelineData | None
    The pipeline data. If the pipeline is empty, return None.
restore_pipeline(file: str | Path | PipelineData, reset_to_auto_run: bool = True, silent: bool = False, local_functions_path: str | Path | None = None) None

Restore the pipeline from a .pipe file (JSON format), a URL, or a PipeLineData object.

Parameters

reset_to_auto_runbool, default: True

If True, all modules will be set to auto_run=True regardless of their saved state. If False, modules will restore their auto_run state from the .pipe file.

silentbool, default: False

If True, suppresses printed messages (default: False)

local_functions_pathstr | Path | None, default: None

The path to the local functions file. If None, the path saved in the pipeline data will be used.

add_step(step: PipelineStep, step_index: int | None = None) None

Add a checkpoint step to the pipeline.

Parameters

stepPipelineStep

The step definition to add.

step_indexint | None, default: None

Insert position in the step list. Appended at the end when ``None``.

remove_step(step_name: str | int) None

Remove a step definition from the pipeline.

Parameters

step_namestr | int

Name or index of the step to remove

get_steps() list[PipelineStep]

Get all defined execution steps.

Returns

Any
list[PipelineStep]
    List of execution steps in order
run_single_step(step: str | int) PipelineResult

Run a single step by its name or index.

reset_steps() None

Reset step-execution state so the next ``run(use_steps=True)`` starts from step 0.

set_all_modules_auto_run(auto_run: bool = True) None

Set all modules to auto run.

add_attribute(attr_name: str, module_name: str, param_name: str, attr_title: str | None = None, ui_schema_overrides: dict[str, Any] | None = None, ui_schema_function_name: str | Callable | None = None) None

Add an attribute to the pipeline.

Parameters

attr_namestr

The name of the attribute at pipeline level to distinguish from other attributes in the pipeline.

module_namestr

The name of the module that owns this attribute.

param_namestr

The name of the parameter in the module. attr_title : str, optional The title of this attribute which is used for users to understand. If None, attr_name will be used. ui_schema_overrides : dict[str, Any], optional Dictionary of UIAttributeSchema parameters to override. Keys should be valid UIAttributeSchema field names (e.g., ‘default’, ‘minimum’, ‘maximum’, ‘selections’, ‘units’, ‘required’, ‘visible’, etc.). This is the preferred way to customize attribute properties. If None, the original UI schema from the module will be used as-is.

Examples

Basic usage with default UI schema:

>>> pipeline.add_attribute(
...     attr_name="pile_length",
...     module_name="PileModule",
...     param_name="length",
...     attr_title="桩长"
... )

Override default value and range using UI schema:

>>> pipeline.add_attribute(
...     attr_name="pile_diameter",
...     module_name="PileModule",
...     param_name="diameter",
...     ui_schema_overrides={
...         "default": 1.2,
...         "minimum": 0.5,
...         "maximum": 3.0,
...         "units": Units.m
...     }
... )

Customize selections using UI schema:

>>> pipeline.add_attribute(
...     attr_name="soil_type",
...     module_name="SoilModule",
...     param_name="type",
...     ui_schema_overrides={
...         "selections": ["clay", "sand", "gravel"],
...         "selections_name": ["黏土", "砂土", "砾石"],
...         "default": "clay"
...     }
... )

Set UI properties using UI schema:

>>> pipeline.add_attribute(
...     attr_name="advanced_param",
...     module_name="AdvancedModule",
...     param_name="param",
...     ui_schema_overrides={
...         "visible": False,  # Hide from UI
...         "readonly": True,  # Make read-only
...     }
... )

Using inline ui_schema_function_name for dynamic schema:

>>> def dynamic_soil_schema(pipeline):
...     data_module = pipeline.get_module("DataReader")
...     soil_types = data_module["OutputSoilTypes"] if data_module else []
...     return StringAttributeSchema(
...         title="土层类型",
...         selections=soil_types,
...         depends_on=["InputSoilData"],
...     )
...
>>> pipeline.add_attribute(
...     attr_name="soil_type",
...     module_name="SoilModule",
...     param_name="type",
...     ui_schema_function_name=dynamic_soil_schema
... )

Notes

- The module must have ``ui_schema`` or ``values_range``(legacy) defined and contain the specified parameter
- The original RangeModel/NestedRangeModel will be converted to UIAttributeSchema automatically
- Invalid field names for the UI schema type will raise ValueError
- When ``ui_schema_function_name`` is provided, the function is called to generate the schema,
  and both ``ui_schema_overrides`` and module's predefined ui_schema are ignored
get_attribute_value(attr_name: str) Any

Get the value of a pipeline attribute.

Parameters

attr_namestr

The name of the attribute to get.

Returns

Any
Any
    The current value of the attribute.
set_attributes() None

Set multiple attribute values for the pipeline at once.

Parameters

**kwargsdict

Dictionary of attribute names and their values to set. Each key should be an attribute name and each value is the value to set.

Examples

>>> pipeline.set_attributes(token="abc123", proj_id=42, bore_number="ZK1")
remove_attribute(attr_name: str) None

Remove an attribute from the pipeline.

Parameters

attr_namestr

The name of the attribute to remove.

get_return_results_info() list[ModuleResultInfo]

Get information about what results will be returned based on return_results_config.

This method provides metadata about the return results without executing the pipeline, useful for UI design tools to know what outputs to expect.

Returns

Any
list[ModuleResultInfo]
    List of ModuleResultInfo objects containing module_name, port_name, and port_type
    for each result that will be returned when the pipeline runs.
get_attribute_info(update_attributes: list[str] | Literal[all] | None = None) AttributesInfoResponse

Get detailed information about all attributes including their UI schemas.

This method dynamically fetches the latest UI schema from each module and applies any stored overrides, ensuring the information is always up-to-date with module changes while preserving pipeline-level customizations.

Examples

Default usage - auto-updates input-dependent attributes
info = pipeline.get_attribute_info()
✓ Efficient: Only updates attrs depending on input ports

Update specific attributes (e.g., after 'a' changed, update 'b' which depends on it)
pipeline.set_attributes(a=new_value)
info = pipeline.get_attribute_info(update_attributes=["b"])
✓ Updates 'b' + any input-dependent attributes automatically

Force update all attributes (comprehensive)
info = pipeline.get_attribute_info(update_attributes="all")
✓ Updates everything (rarely needed)

Notes

"Input" are automatically detected and updated in all modes. This optimization
    ensures input-port-dependent schemas stay fresh without manual tracking.

    Use cases:
    - None: Default - handles input port changes automatically (most common)
    - ["attr1", "attr2"]: When specific attributes changed + auto-update input-dependent
    - "all": Force update everything (rarely needed)

    Typical workflow for dependent parameters:
    1. User changes parameter 'a' via set_attributes()
    2. Frontend knows 'b' depends on 'a' (from developer documentation)
    3. Frontend calls get_attribute_info(update_attributes=["b"])
    4. Module's update_ui_schema() recalculates 'b' schema based on new 'a' value
    5. Input-dependent attributes are also automatically updated
    6. Frontend gets updated schemas for both 'b' and input-dependent attrs
add_dict_attribute(attr_name: str, module_name: str, param_name: str, key_name: str, attr_title: str | None = None) None

Add an attribute to the pipeline that maps to a specific key in a dictionary parameter.

This method works with dictionary parameters that follow the dict[str, Any] format. The parameter must have an ObjectAttributeSchema defined in the module’s ui_schema, or a NestedRangeModel with vtype=”dict” in the module’s values_range (legacy support).

Parameters

attr_namestr

The name of the attribute at pipeline level.

module_namestr

The name of the module.

param_namestr

The name of the parameter in the module that is a dictionary. This parameter MUST have an ObjectAttributeSchema in ui_schema or a NestedRangeModel with vtype=”dict” in values_range (legacy).

key_namestr

The dictionary key to access within the parameter’s dictionary. This key MUST be defined in the ObjectAttributeSchema.properties or NestedRangeModel.fields (legacy). attr_title : str, optional The title of this attribute which is used for users to understand. If None, the title from the field schema will be used.

Examples

Module must define ObjectAttributeSchema in get_ui_schema():
ui_schema["data"] = ObjectAttributeSchema(
    title="Data Collection",
    properties={"x": StringAttributeSchema(title="X Value")}
)

Or legacy NestedRangeModel in set_cal_params():
values_range["data"] = NestedRangeModel(
    vtype="dict",
    title="Data Collection",
    fields={"x": RangeModel(vtype="str", title="X Value")}
)

Then use the method:
pipeline.add_dict_attribute("x_value", "my_module", "data", "x")

Notes

Requirements:
- The parameter must be defined in module.ui_schema with an ObjectAttributeSchema, OR
- The parameter must be defined in module.values_range with a NestedRangeModel (vtype="dict") [legacy]
- The key_name must be defined in the schema properties/fields
- The parameter must be a dictionary (dict[str, Any])
add_multi_binding_attribute(attr_name: str, bindings: list[tuple[str, str, str | None]], attr_title: str | None = None, ui_schema_overrides: dict[str, Any] | None = None) None

Add an attribute that binds to multiple parameters across different modules.

This method creates a single pipeline attribute that controls multiple module parameters. When the attribute value is set, all bound parameters will be updated with the same value. The parameters can be regular module parameters or keys in dictionary parameters.

Parameters

attr_namestr

The name of the attribute at pipeline level. bindings : list[tuple[str, str, str | None]] List of binding tuples, each containing:

  • module_name: name of the module

  • param_name: name of the parameter in the module

  • dict_key: key in the dictionary parameter (if parameter is a dict, otherwise None)

attr_title : str, optional The title of the attribute for display purposes. If None, attr_name will be used. ui_schema_overrides : dict[str, Any], optional Dictionary of UIAttributeSchema parameters to override. Keys should be valid UIAttributeSchema field names (e.g., ‘default’, ‘minimum’, ‘maximum’, ‘selections’, ‘units’, ‘required’, ‘visible’, etc.). This allows customization of the UI schema derived from the first binding.

Examples

Bind a single attribute to parameters in multiple modules
pipeline.add_multi_binding_attribute(
    "shared_threshold",
    [
        ("detection_module", "threshold", None),                # Regular parameter
        ("filtering_module", "params", "min_threshold"),        # Dictionary parameter
    ],
    attr_title="Detection Threshold",
    ui_schema_overrides={
        "minimum": 0.0,
        "maximum": 1.0,
        "units": Units.m
    }
)

When the attribute is set, all parameters will be updated
pipeline.set_attribute("shared_threshold", 0.7)  # Updates all parameters to 0.7
save_data_to_db(name: str, data_type: Literal[port, pipeline, module] = 'port', module_name: str | None = None) None

Add data from various sources (output ports, pipeline attributes, module attributes) to GDIM database when running on GDIM.

Parameters

namestr
  • For port: the output port name

  • For pipeline: the direct pipeline attribute name (e.g., ‘workspace’, ‘app_name’)

  • For module: the module parameter/attribute name

data_typestr, default: “port”

The type of data to save:

  • “port”: Output port data (requires module_name)

  • “pipeline”: Direct pipeline attribute value (e.g., workspace, app_name)

  • “module”: Module attribute/parameter value (requires module_name)

module_name : str, optional The name of the module. Required for “port” and “module” data types.

Examples

Add output port data
>>> pipeline.save_data_to_db("OutputData", "port", "MyModule")

Add pipeline attribute value
>>> pipeline.save_data_to_db("workspace", "pipeline")

Add module attribute/parameter value
>>> pipeline.save_data_to_db("threshold", "module", "FilterModule")

Key Naming Strategy
- Output ports: "module_name@port_name"
- Pipeline attributes: "pipeline@attr_name" (e.g., "pipeline@workspace")
- Module attributes: "module_name#attr_name"
get_data_to_save() list[PipelineRunDataItem]

Get structured data items from various sources that were marked for GDIM database storage.

This method retrieves data from output ports, pipeline attributes, and module attributes that were previously marked with save_data_to_db() to save to GDIM database and returns them as structured PipelineRunDataItem objects with full metadata for GDIM database storage.

Returns

Any
list[PipelineRunDataItem]
    List of structured data items, each containing:
    - name: The data item name (port_name, attr_name, or param_name)
    - data_type: Source type ("port", "pipeline", "module")
    - module_name: Module name (None for "pipeline" data type)
    - value_type: Type of the data value (e.g., "TableData", "str", "int")
    - data: The actual serialized data content
    - timestamp: ISO format timestamp when data was captured

Notes

For output ports, if a module hasn't been executed, the data field will be None.
get_output_files() dict[str, dict[str, str | Path | list[str | Path] | GdimMinIOFile | list[GdimMinIOFile] | None]]

Get all file paths and file objects from output ports of modules with file-related port types.

This method checks all modules in the pipeline and collects file data from output ports that have PortType.FilePath, PortType.FilesPath, PortType.GdimFile, or PortType.GdimFiles. Useful for knowing what files were generated after running the pipeline or individual steps.

Returns

Any
dict[str, dict[str, str |  Path  | list[str |  Path]  | GdimMinIOFile |  list[GdimMinIOFile]  | None]]
    Dictionary mapping module names to their file outputs.
    Format: {module_name: {port_name: file_data}}
    - For PortType.FilePath: file_data is a single str or Path
    - For PortType.FilesPath: file_data is a list[str | Path]
    - For PortType.GdimFile: file_data is a single GdimMinIOFile object
    - For PortType.GdimFiles: file_data is a list[GdimMinIOFile] objects
    Only includes modules that have file-related output ports with actual file data.
    Excludes outputs specified in hide_output_files.

Examples

>>> files = pipeline.get_output_files()
>>> print(files)
{'report_module': {'OutputFile': '/path/to/report.pdf'},
 'batch_module': {'OutputFiles': ['/path/to/file1.csv', '/path/to/file2.csv']},
 'upload_module': {'GdimFile': GdimMinIOFile(fileId='123', originalFilename='data.xlsx', ...)},
 'batch_upload_module': {'GdimFiles': [GdimMinIOFile(...), GdimMinIOFile(...)]}}

Properties:

gdim_token

Deprecated: access ``pipeline.gdim_state.token`` directly.

gdim_proj_id

Deprecated: access ``pipeline.gdim_state.proj_id`` directly.

gdim_tpl_id

Deprecated: access ``pipeline.gdim_state.tpl_id`` directly.

gdim_host

Deprecated: access ``pipeline.gdim_state.host`` directly.

gdim_template

Deprecated: access ``pipeline.gdim_state.template`` directly.

gdim_proj_list

Deprecated: access ``pipeline.gdim_state.proj_list`` directly.

module_names
attributes
steps
start_modules
start_modules_names
end_modules_names
end_modules
gdim_template_required

Check if the gdim template is required for the pipeline.

gdim_proj_list_required

Check if the gdim project list is required for the pipeline.