pipeline.pipeline
Classes
- class pipeline.pipeline.GdimState
Runtime state for GDIM platform interaction.
Bundles all attributes needed for a pipeline to communicate with the GDIM data-management platform so they travel together as a single unit (e.g. when a sub-pipeline is created, the whole state is propagated in one assignment).
Attributes
- tokenstr | None
Authentication token for the GDIM platform.
- proj_idstr | None
Project ID on the GDIM platform.
- tpl_idstr | None
Data template ID on the GDIM platform.
- hoststr | None
GDIM platform host URL.
``None``means the default host is used.- templateGdimTemplate | None
Cached GDIM template fetched from the platform. Populated lazily during pipeline execution; do not set this manually. proj_list : dict[str, GdimProjectInfo] | None Cached list of GDIM projects. Populated lazily during pipeline execution; do not set this manually.
- Inherits from:
BaseModel
Attributes:
- model_config = <ast.Call object at 0x0000028E221E8760>
- token: str | None = None
- proj_id: str | None = None
- tpl_id: str | None = None
- host: str | None = None
- template: GdimTemplate | None = None
- proj_list: dict[(str, GdimProjectInfo)] | None = None
- class pipeline.pipeline.PipeLine
Pipeline system connecting together modules and adding a messaging system.
The Pipeline simplifies the configuration and execution of interconnected modules. It provides several features:
Module management: Register, configure, and execute processing modules
Parameter binding: Create high-level attributes that control module parameters
Multi-binding attributes: Create a single attribute that controls multiple module
Attributes
- app_namestr
Name of the pipeline application. Always set — if not provided at construction time a random ````pipeline_<hex8>```` identifier is assigned. Can be overwritten by :meth:``restore_pipeline`` when loading from a ````.pipe```` file that has an ````app_name```` field. .. tip:: Set a meaningful name so log records and Logfire spans are easy to find:: pipeline = PipeLine(app_name="borehole_report")
- app_titlestr | None
Title of the pipeline application
- app_versionstr | None
Version of the pipeline application
- gdim_stateGdimState
Runtime state for GDIM platform interaction (token, proj_id, tpl_id, host, cached template and project list). Use
``update_gdim_state()``to set credentials before calling``run()``.- llm_keystr | None
API key of LLM model
- local_functions_pathstr | Path | None
Path to local functions file for custom operations
- local_func_require_gdim_proj_listbool
If True, the pipeline will be considered to require gdim_proj_list if it contains PythonCoder modules.
- workspacestr | Path | None
Working directory for saving pipeline output files data_to_save : dict[str, tuple[str, str, str | None]] Dictionary mapping data keys to their save information value - (data_type, name, module_name) hide_output_files : dict[str, str] | None Controls which module output files to exclude from results when running by
gdi-server- steps_managerStepsManager | None
Ordered list of checkpoint steps for this pipeline
- current_stepstr | None
Name of the step that was most recently executed via
``run(use_steps=True)``- last_stepbool
True after the final step has been executed
- executed_stepslist[str]
Ordered history of step names that have been executed
- verbosebool
When
``True``, structured log records are emitted for every module execution (start, success, failure) including input/output data previews. When``False``(default), only exceptions are logged. Console output is configured automatically whenverbose=True.run_trace_filecapture and the companion HTML report also requireverbose=Truefor the current run (see below). To also send traces to the Logfire cloud UI, set theLOGFIRE_TOKENenvironment variable and call :func:~gdi.pipeline.observability.configure_loggingonce before running:: from gdi.pipeline.observability import configure_logging configure_logging() pipeline.run(verbose=True)- run_trace_filestr | Path | None
Path to a **local debug trace file** (JSON) written at the end of each ``run()`` call when ``verbose=True``. The file contains the same structured records emitted during that run, serialised as a **JSON array** so any JSON viewer can open it directly. **Requires ``verbose=True``** for the run (on the instance or via ``run(verbose=True)``). If ``verbose=False``, no trace collector is attached, the JSON file is **not** updated for that run, and the HTML step may fail (no file yet) or reflect a **previous** verbose run's JSON if one is still on disk. Each successful trace capture **overwrites** the previous JSON — it is a snapshot of the last verbose run, not a persistent log. If only a filename is given (no directory), the file is placed inside ````workspace```` (or the current working directory when ````workspace```` is not set). Example:: pipeline = PipeLine( app_name="my_pipeline", workspace="./output", verbose=True, run_trace_file="run_trace.json", # → ./output/run_trace.json ) pipeline.run() # writes run_trace.json + run_trace.html When ``verbose=True`` and ``run_trace_file`` is set, an HTML viewer is also rendered automatically after each ``run()`` (same stem, ``.html`` extension), including after a failed run. No separate flag is needed. .. warning:: This is intended for **local development only**. Do not use it as a production logging solution — it provides no rotation, no concurrency safety, and is overwritten on every run.
- Inherits from:
PipelineVisualizationMixin, PipelineLoggerMixin, PipelineRequirementsMixin
Methods:
- __init__(app_name: str | None = None, app_title: str | None = None, description: str | None = None, app_version: str | None = None, workspace: str | Path | None = None, local_functions_path: str | Path | None = None, verbose: bool = False, run_trace_file: str | Path | None = None) None
Initialize the PipeLine object.
Parameters
- app_namestr | None, default: None
The name of the app.
- app_titlestr | None, default: None
The title of the app.
- descriptionstr | None, default: None
The description of the pipeline.
- app_versionstr | None, default: None
The version of the app.
- workspacestr | Path | None
Working directory for saving pipeline output files
- verbosebool, default: False
When
True, structured log records are emitted for every module execution. Also enablesrun_trace_filecapture for that run.- run_trace_filestr | Path | None, default: None
Path to the local debug trace JSON file. See the class-level docstring for path resolution and examples. JSON and HTML are written only when
verbose=Truefor the run (constructor default orrun(verbose=True)). Can be overridden per-run viarun(run_trace_file=...).
- update_gdim_state(token: str | None = _UNSET, proj_id: str | None = _UNSET, tpl_id: str | None = _UNSET, host: str | None = _UNSET, template: GdimTemplate | None = _UNSET, proj_list: dict[str, GdimProjectInfo] | None = _UNSET) None
Update GDIM platform connection state.
This is the preferred way to configure GDIM credentials before calling
``run()``. When any credential field (``token``,``proj_id``,``tpl_id``,``host``) changes its value, the cached``template``and``proj_list``are cleared automatically so they will be re-fetched on the next run.Examples
>>> pipeline.update_gdim_state(token=log_in(), proj_id="1967572096104902658")
- add_links(links: list[Link] | Link, reset_child: bool = True) None
Add several links for the network.
- remove_link(link: Link | str) None
Remove a link from the pipelines, but the module will not be removed.
link: Link or str If it's str, it should be the name of the link.
- get_below_links(module: PipeModule) list[Link]
Get all the links below the module.
- get_above_links(module: PipeModule) list[Link]
Get all the links above the module.
- run(from_module: PipeModule | str | None = None, return_results: bool | dict[str, str | list[str]] = True, verbose: bool | None = None, run_trace_file: str | Path | None = _UNSET, use_steps: bool = False) PipelineResult
Run the pipeline from start modules, a specified module, or the next checkpoint step.
Parameters
- from_modulePipeModule | str | None, default: None
If None, run from all start modules. If a module name/object, run from that module. Ignored when
``use_steps=True``.
Returns
- Any
PipelineResult Execution results. When ``use_steps=True``, also contains ``current_step``, ``last_step``, and ``next_step``.
- add_module(module: PipeModule) None
Add a module to the pipeline.
- get_module(mname: str) PipeModule | None
Get a module by its name.
- remove_module(module: PipeModule | str, with_modules_below: bool = False) None
Remove a module or with modules below, and all related links.
Parameters
- modulePipeModule or str
If it’s
str, it should be the name of the module.- with_modules_belowbool, default: False
If True, the removed module and all the modules below will be removed.
- save_pipeline(file: str | Path | None = None) None | PipelineData
Save the pipeline data to a .pipe file and return a PipelineData object.
Returns
- Any
PipelineData | None The pipeline data. If the pipeline is empty, return None.
- restore_pipeline(file: str | Path | PipelineData, reset_to_auto_run: bool = True, silent: bool = False, local_functions_path: str | Path | None = None) None
Restore the pipeline from a .pipe file (JSON format), a URL, or a PipeLineData object.
Parameters
- reset_to_auto_runbool, default: True
If True, all modules will be set to auto_run=True regardless of their saved state. If False, modules will restore their auto_run state from the .pipe file.
- silentbool, default: False
If True, suppresses printed messages (default: False)
- local_functions_pathstr | Path | None, default: None
The path to the local functions file. If None, the path saved in the pipeline data will be used.
- add_step(step: PipelineStep, step_index: int | None = None) None
Add a checkpoint step to the pipeline.
Parameters
- stepPipelineStep
The step definition to add.
- step_indexint | None, default: None
Insert position in the step list. Appended at the end when
``None``.
- remove_step(step_name: str | int) None
Remove a step definition from the pipeline.
Parameters
- step_namestr | int
Name or index of the step to remove
- get_steps() list[PipelineStep]
Get all defined execution steps.
Returns
- Any
list[PipelineStep] List of execution steps in order
- reset_steps() None
Reset step-execution state so the next
``run(use_steps=True)``starts from step 0.
- add_attribute(attr_name: str, module_name: str, param_name: str, attr_title: str | None = None, ui_schema_overrides: dict[str, Any] | None = None, ui_schema_function_name: str | Callable | None = None) None
Add an attribute to the pipeline.
Parameters
- attr_namestr
The name of the attribute at pipeline level to distinguish from other attributes in the pipeline.
- module_namestr
The name of the module that owns this attribute.
- param_namestr
The name of the parameter in the module. attr_title : str, optional The title of this attribute which is used for users to understand. If None,
attr_namewill be used. ui_schema_overrides : dict[str, Any], optional Dictionary of UIAttributeSchema parameters to override. Keys should be valid UIAttributeSchema field names (e.g., ‘default’, ‘minimum’, ‘maximum’, ‘selections’, ‘units’, ‘required’, ‘visible’, etc.). This is the preferred way to customize attribute properties. If None, the original UI schema from the module will be used as-is.
Examples
Basic usage with default UI schema: >>> pipeline.add_attribute( ... attr_name="pile_length", ... module_name="PileModule", ... param_name="length", ... attr_title="桩长" ... ) Override default value and range using UI schema: >>> pipeline.add_attribute( ... attr_name="pile_diameter", ... module_name="PileModule", ... param_name="diameter", ... ui_schema_overrides={ ... "default": 1.2, ... "minimum": 0.5, ... "maximum": 3.0, ... "units": Units.m ... } ... ) Customize selections using UI schema: >>> pipeline.add_attribute( ... attr_name="soil_type", ... module_name="SoilModule", ... param_name="type", ... ui_schema_overrides={ ... "selections": ["clay", "sand", "gravel"], ... "selections_name": ["黏土", "砂土", "砾石"], ... "default": "clay" ... } ... ) Set UI properties using UI schema: >>> pipeline.add_attribute( ... attr_name="advanced_param", ... module_name="AdvancedModule", ... param_name="param", ... ui_schema_overrides={ ... "visible": False, # Hide from UI ... "readonly": True, # Make read-only ... } ... ) Using inline ui_schema_function_name for dynamic schema: >>> def dynamic_soil_schema(pipeline): ... data_module = pipeline.get_module("DataReader") ... soil_types = data_module["OutputSoilTypes"] if data_module else [] ... return StringAttributeSchema( ... title="土层类型", ... selections=soil_types, ... depends_on=["InputSoilData"], ... ) ... >>> pipeline.add_attribute( ... attr_name="soil_type", ... module_name="SoilModule", ... param_name="type", ... ui_schema_function_name=dynamic_soil_schema ... )
Notes
- The module must have ``ui_schema`` or ``values_range``(legacy) defined and contain the specified parameter - The original RangeModel/NestedRangeModel will be converted to UIAttributeSchema automatically - Invalid field names for the UI schema type will raise ValueError - When ``ui_schema_function_name`` is provided, the function is called to generate the schema, and both ``ui_schema_overrides`` and module's predefined ui_schema are ignored
- get_attribute_value(attr_name: str) Any
Get the value of a pipeline attribute.
Parameters
- attr_namestr
The name of the attribute to get.
Returns
- Any
Any The current value of the attribute.
- set_attributes() None
Set multiple attribute values for the pipeline at once.
Parameters
- **kwargsdict
Dictionary of attribute names and their values to set. Each key should be an attribute name and each value is the value to set.
Examples
>>> pipeline.set_attributes(token="abc123", proj_id=42, bore_number="ZK1")
- remove_attribute(attr_name: str) None
Remove an attribute from the pipeline.
Parameters
- attr_namestr
The name of the attribute to remove.
- get_return_results_info() list[ModuleResultInfo]
Get information about what results will be returned based on return_results_config.
This method provides metadata about the return results without executing the pipeline, useful for UI design tools to know what outputs to expect.
Returns
- Any
list[ModuleResultInfo] List of ModuleResultInfo objects containing module_name, port_name, and port_type for each result that will be returned when the pipeline runs.
- get_attribute_info(update_attributes: list[str] | Literal[all] | None = None) AttributesInfoResponse
Get detailed information about all attributes including their UI schemas.
This method dynamically fetches the latest UI schema from each module and applies any stored overrides, ensuring the information is always up-to-date with module changes while preserving pipeline-level customizations.
Examples
Default usage - auto-updates input-dependent attributes info = pipeline.get_attribute_info() ✓ Efficient: Only updates attrs depending on input ports Update specific attributes (e.g., after 'a' changed, update 'b' which depends on it) pipeline.set_attributes(a=new_value) info = pipeline.get_attribute_info(update_attributes=["b"]) ✓ Updates 'b' + any input-dependent attributes automatically Force update all attributes (comprehensive) info = pipeline.get_attribute_info(update_attributes="all") ✓ Updates everything (rarely needed)
Notes
"Input" are automatically detected and updated in all modes. This optimization ensures input-port-dependent schemas stay fresh without manual tracking. Use cases: - None: Default - handles input port changes automatically (most common) - ["attr1", "attr2"]: When specific attributes changed + auto-update input-dependent - "all": Force update everything (rarely needed) Typical workflow for dependent parameters: 1. User changes parameter 'a' via set_attributes() 2. Frontend knows 'b' depends on 'a' (from developer documentation) 3. Frontend calls get_attribute_info(update_attributes=["b"]) 4. Module's update_ui_schema() recalculates 'b' schema based on new 'a' value 5. Input-dependent attributes are also automatically updated 6. Frontend gets updated schemas for both 'b' and input-dependent attrs
- add_dict_attribute(attr_name: str, module_name: str, param_name: str, key_name: str, attr_title: str | None = None) None
Add an attribute to the pipeline that maps to a specific key in a dictionary parameter.
This method works with dictionary parameters that follow the dict[str, Any] format. The parameter must have an ObjectAttributeSchema defined in the module’s ui_schema, or a NestedRangeModel with vtype=”dict” in the module’s values_range (legacy support).
Parameters
- attr_namestr
The name of the attribute at pipeline level.
- module_namestr
The name of the module.
- param_namestr
The name of the parameter in the module that is a dictionary. This parameter MUST have an ObjectAttributeSchema in ui_schema or a NestedRangeModel with vtype=”dict” in values_range (legacy).
- key_namestr
The dictionary key to access within the parameter’s dictionary. This key MUST be defined in the ObjectAttributeSchema.properties or NestedRangeModel.fields (legacy). attr_title : str, optional The title of this attribute which is used for users to understand. If None, the title from the field schema will be used.
Examples
Module must define ObjectAttributeSchema in get_ui_schema(): ui_schema["data"] = ObjectAttributeSchema( title="Data Collection", properties={"x": StringAttributeSchema(title="X Value")} ) Or legacy NestedRangeModel in set_cal_params(): values_range["data"] = NestedRangeModel( vtype="dict", title="Data Collection", fields={"x": RangeModel(vtype="str", title="X Value")} ) Then use the method: pipeline.add_dict_attribute("x_value", "my_module", "data", "x")Notes
Requirements: - The parameter must be defined in module.ui_schema with an ObjectAttributeSchema, OR - The parameter must be defined in module.values_range with a NestedRangeModel (vtype="dict") [legacy] - The key_name must be defined in the schema properties/fields - The parameter must be a dictionary (dict[str, Any])
- add_multi_binding_attribute(attr_name: str, bindings: list[tuple[str, str, str | None]], attr_title: str | None = None, ui_schema_overrides: dict[str, Any] | None = None) None
Add an attribute that binds to multiple parameters across different modules.
This method creates a single pipeline attribute that controls multiple module parameters. When the attribute value is set, all bound parameters will be updated with the same value. The parameters can be regular module parameters or keys in dictionary parameters.
Parameters
- attr_namestr
The name of the attribute at pipeline level. bindings : list[tuple[str, str, str | None]] List of binding tuples, each containing:
module_name: name of the module
param_name: name of the parameter in the module
dict_key: key in the dictionary parameter (if parameter is a dict, otherwise None)
attr_title : str, optional The title of the attribute for display purposes. If None, attr_name will be used. ui_schema_overrides : dict[str, Any], optional Dictionary of UIAttributeSchema parameters to override. Keys should be valid UIAttributeSchema field names (e.g., ‘default’, ‘minimum’, ‘maximum’, ‘selections’, ‘units’, ‘required’, ‘visible’, etc.). This allows customization of the UI schema derived from the first binding.
Examples
Bind a single attribute to parameters in multiple modules pipeline.add_multi_binding_attribute( "shared_threshold", [ ("detection_module", "threshold", None), # Regular parameter ("filtering_module", "params", "min_threshold"), # Dictionary parameter ], attr_title="Detection Threshold", ui_schema_overrides={ "minimum": 0.0, "maximum": 1.0, "units": Units.m } ) When the attribute is set, all parameters will be updated pipeline.set_attribute("shared_threshold", 0.7) # Updates all parameters to 0.7
- save_data_to_db(name: str, data_type: Literal[port, pipeline, module] = 'port', module_name: str | None = None) None
Add data from various sources (output ports, pipeline attributes, module attributes) to GDIM database when running on GDIM.
Parameters
- namestr
For port: the output port name
For pipeline: the direct pipeline attribute name (e.g., ‘workspace’, ‘app_name’)
For module: the module parameter/attribute name
- data_typestr, default: “port”
The type of data to save:
“port”: Output port data (requires module_name)
“pipeline”: Direct pipeline attribute value (e.g., workspace, app_name)
“module”: Module attribute/parameter value (requires module_name)
module_name : str, optional The name of the module. Required for “port” and “module” data types.
Examples
Add output port data >>> pipeline.save_data_to_db("OutputData", "port", "MyModule") Add pipeline attribute value >>> pipeline.save_data_to_db("workspace", "pipeline") Add module attribute/parameter value >>> pipeline.save_data_to_db("threshold", "module", "FilterModule") Key Naming Strategy - Output ports: "module_name@port_name" - Pipeline attributes: "pipeline@attr_name" (e.g., "pipeline@workspace") - Module attributes: "module_name#attr_name"
- get_data_to_save() list[PipelineRunDataItem]
Get structured data items from various sources that were marked for GDIM database storage.
This method retrieves data from output ports, pipeline attributes, and module attributes that were previously marked with
save_data_to_db()to save to GDIM database and returns them as structured PipelineRunDataItem objects with full metadata for GDIM database storage.Returns
- Any
list[PipelineRunDataItem] List of structured data items, each containing: - name: The data item name (port_name, attr_name, or param_name) - data_type: Source type ("port", "pipeline", "module") - module_name: Module name (None for "pipeline" data type) - value_type: Type of the data value (e.g., "TableData", "str", "int") - data: The actual serialized data content - timestamp: ISO format timestamp when data was captured
Notes
For output ports, if a module hasn't been executed, the data field will be None.
- get_output_files() dict[str, dict[str, str | Path | list[str | Path] | GdimMinIOFile | list[GdimMinIOFile] | None]]
Get all file paths and file objects from output ports of modules with file-related port types.
This method checks all modules in the pipeline and collects file data from output ports that have PortType.FilePath, PortType.FilesPath, PortType.GdimFile, or PortType.GdimFiles. Useful for knowing what files were generated after running the pipeline or individual steps.
Returns
- Any
dict[str, dict[str, str | Path | list[str | Path] | GdimMinIOFile | list[GdimMinIOFile] | None]] Dictionary mapping module names to their file outputs. Format: {module_name: {port_name: file_data}} - For PortType.FilePath: file_data is a single str or Path - For PortType.FilesPath: file_data is a list[str | Path] - For PortType.GdimFile: file_data is a single GdimMinIOFile object - For PortType.GdimFiles: file_data is a list[GdimMinIOFile] objects Only includes modules that have file-related output ports with actual file data. Excludes outputs specified in hide_output_files.
Examples
>>> files = pipeline.get_output_files() >>> print(files) {'report_module': {'OutputFile': '/path/to/report.pdf'}, 'batch_module': {'OutputFiles': ['/path/to/file1.csv', '/path/to/file2.csv']}, 'upload_module': {'GdimFile': GdimMinIOFile(fileId='123', originalFilename='data.xlsx', ...)}, 'batch_upload_module': {'GdimFiles': [GdimMinIOFile(...), GdimMinIOFile(...)]}}
Properties:
- gdim_token
Deprecated: access
``pipeline.gdim_state.token``directly.
- gdim_proj_id
Deprecated: access
``pipeline.gdim_state.proj_id``directly.
- gdim_tpl_id
Deprecated: access
``pipeline.gdim_state.tpl_id``directly.
- gdim_host
Deprecated: access
``pipeline.gdim_state.host``directly.
- gdim_template
Deprecated: access
``pipeline.gdim_state.template``directly.
- gdim_proj_list
Deprecated: access
``pipeline.gdim_state.proj_list``directly.
- module_names
- link_names
- attributes
- steps
- start_modules
- start_modules_names
- end_modules_names
- end_modules
- gdim_template_required
Check if the gdim template is required for the pipeline.
- gdim_proj_list_required
Check if the gdim project list is required for the pipeline.