运行机制 (Runtime)

本章只介绍 gdisdk.pipeline.pipeline.PipeLine 最核心的运行机制,帮助你理解:

  • Pipeline 在内部如何组织模块与连线;

  • add_links() / run() / add_attribute() / update_gdim_state() 分别处于哪一层;

  • 为什么同一个 Pipeline 既能在本地脚本中运行,也能在 GDIM 中作为应用运行。

如果你还不熟悉如何创建第一个 Pipeline,建议先阅读 新手入门 (Getting Started)用户指南 (User Guide)。 本章不重复介绍模块选型、UI 编排、上传流程,也不展开循环等流程控制(见 流程控制 (Flow Control))。

Pipeline 是什么

从机制上看,PipeLine 本质上是一个**有向数据流图**:

  • 节点是模块( PipeModule 实例);

  • 边是模块之间的端口连线( Link );

  • parents / children 记录上下游关系;

  • start_modules 是没有上游连线的模块;

  • end_modules 是没有下游连线的模块。

这意味着,Pipeline 并不是简单地“按列表顺序执行模块”,而是根据**连线关系**决定数据从哪里来、向哪里流,以及哪些模块是起点和终点。

对使用者而言,最重要的结论是:

  • 你只需要关心模块之间是否正确连线;

  • 不需要手动维护“执行顺序列表”;

  • 大多数情况下,起点模块和终点模块都可以由 Pipeline 自动推导。

连线如何生效

在日常开发里,最常见的写法是:

links = (
    read_csv.OutputTable >> clean_table.InputTable
    | clean_table.OutputTable >> plot_line.InputData
)
pipeline.add_links(links)

这里有两层机制:

  1. >> 用于表达“某个输出端口连接到某个输入端口”;

  2. | 用于把多条 Link 组合成一个链接组,方便一次性传给 add_links()

add_links() 的作用不只是“记住这些连线”,它还会同时完成几件关键事情:

  • 自动把连线两端的模块注册进当前 Pipeline;

  • 建立 parents / children 关系,供后续调度使用;

  • 把父模块输出端口对象挂接到子模块对应输入端口上;

  • 调用子模块的 update_ui_schema(reset=...),让子模块根据新的输入上下文刷新参数 UI。

最后这一点很重要。很多模块的参数选项会依赖输入数据,例如:

  • 某个下拉框的可选字段列表来自上游表格列名;

  • 某个参数是否可见、默认值是什么,要根据输入端口内容决定。

因此,连线不仅决定数据流,也会影响模块参数的可配置方式

相比之下,add_module() 只负责把模块加入 Pipeline,本身并不会建立数据流关系。 它更适合只有单个模块的简单场景;一旦涉及上下游,通常优先使用 add_links()

如何执行

run() 是 Pipeline 的执行入口,但它做的事情也不只是“从头到尾跑一遍”。

默认情况下:

  • Pipeline 会先找出所有 start_modules

  • 对每个起点模块,如果 auto_run=True,则先执行该模块;

  • 然后沿着连线向下传播,把父模块输出写入子模块输入;

  • 若子模块 auto_run=True,则继续执行该子模块;

  • 一直传播到所有可自动执行的下游模块。

这意味着,真正驱动执行的是“图结构 + auto_run 状态”,而不是模块被创建的先后顺序。 从当前实现看,起点模块执行完成后,下游链路的传播顺序采用**深度优先**方式遍历;不过在日常使用中,更建议把关注点放在“连线关系是否正确、 auto_run 是否符合预期”上,而不是依赖某一条具体遍历顺序。

如果你只希望从某个模块继续往下跑,可以使用:

result = pipeline.run(from_module="CleanTable")

此时,指定模块会被视为本次执行的起点,其下游分支继续按同样规则传播。

本地脚本里,读取结果通常有两种方式:

result = pipeline.run(return_results={"PlotLine": "OutputPlotData"})
plot_data = result.get_result("PlotLine", "OutputPlotData")

# 或者更直接地读模块输出端口
plot_data = plot_line.OutputPlotData.data

这里需要区分两个层面:

  • run(return_results=...) 控制的是**这一次 Python 调用返回什么**;

  • return_results_config 描述的是**这个 Pipeline 对外应该暴露哪些结果端口**,常用于 GDIM / 前端获取结果元信息。

如果你在本地脚本中直接开发,通常直接读取模块输出端口就足够; 如果你在把 Pipeline 做成应用,return_results_configget_return_results_info() 会更重要。

模块何时算成功执行

Pipeline 在调度时会区分两件事:

  • execute() **有没有被调用且未抛异常**(一次运行尝试);

  • 模块的 executed 标志是否为 ``True``(本次尝试是否**算成功完成**)。

对绝大多数内置模块与用户模块,后者由 @module_decorator() 自动维护。

因此,成功执行的判定标准是 ``execute()`` 的返回值不为 ``None``,而不是“输出端口里有没有业务数据”:

  • 返回空的 TableCollection、空的 TableData、空列表等**非 None 对象**,仍视为成功执行(executed=True);

  • 返回 None 表示模块**尚未完成本职工作**(例如仍在等待上游输入、条件不满足而主动跳过计算),此时 executed=False

  • execute() 抛出异常则整次 run() 失败,由 Pipeline 包装为 PipelineExecutionError,不会计入成功执行。

这与 PipeLine._execute_module 中的计数逻辑一致:只有 executed=True 时才会增加 executed_count

成功执行后,Pipeline 会在多处使用该状态:

  • GDIM 落库:通过 save_data_to_db 登记、并在平台运行时序列化端口数据时,若 executed=False,对应端口会被视为不可用(__get_port_data 返回 None);

  • 运行追踪:开启 verbose / run_trace_file 时,追踪页会用 executedexecuted_count 区分“已执行”“返回空结果(executed_count > 0 但当前 executed=False)”“未运行”等状态(详见 运行调试 (Debug))。

对模块作者而言,最实用的约定是:

  • 计算完成时,在 execute() 末尾 return 一个代表结果的对象(通常与写入输出端口的数据一致);

  • 若当前不应产出结果,则 return None,并确保 executed 保持为 False

  • 若关闭 status_manage``(``@module_decorator(status_manage=False)),需自行维护 executed / executed_time,否则 Pipeline 无法识别该模块是否已成功执行。

Note

executed 描述的是**单次 execute() 调用是否产出有效结果**; 下游模块是否继续 auto_run,仍取决于连线传播与 auto_run 配置,并不一律因上游 executed=False 而停止。 若希望整条分支不运行,应使用 ConditionalLink、关闭 auto_run 或在上游 return None 的同时让下游模块自行判断输入是否可用。

属性绑定

从机制上看,Pipeline 级属性并不是一套独立参数系统,它本质上是对模块参数的一层映射

例如:

pipeline.add_attribute(
    attr_name="tpl_number",
    module_name="QueryTable",
    param_name="tpl_number",
    attr_title="沉降点编号",
)

它表示:

  • Pipeline 对外提供一个名为 tpl_number 的属性;

  • 这个属性的实际落点是 QueryTable.tpl_number

  • 用户修改 Pipeline 属性时,最终改动的还是模块自身参数。

因此,Pipeline 属性的核心价值不是“多存一份参数”,而是:

  • 把分散在不同模块里的参数提升到统一入口;

  • 让 UI / GDIM 不需要理解模块内部结构,也能安全配置参数;

  • 为前端返回一份更稳定的属性描述,而不是直接暴露 Python 对象细节。

与之配套的两个常用方法是:

  • set_attributes(**kwargs):一次性给多个 Pipeline 属性赋值;

  • get_attribute_info():返回属性当前值和 UI schema 信息。

其中 get_attribute_info() 不只是“读配置”,还承担**动态刷新属性描述**的职责。 如果某个属性的 UI schema 依赖输入端口或其他参数变化,Pipeline 会在合适的时候重新调用模块的 update_ui_schema(),再把最新结果整理成统一的属性信息返回给前端。

这也是为什么:

  • 模块负责定义“某个参数应该如何展示”;

  • Pipeline 负责把这些参数汇总成“整个应用可配置的属性面板”。

add_dict_attribute()add_multi_binding_attribute() 这类方法,本质上仍然属于同一套绑定机制,只是面向更复杂的参数映射场景。 本章先不展开它们的高级用法。

运行上下文

除了模块和连线本身,PipeLine 还维护一份**运行上下文**,让多个模块共享同一组环境信息。

最常见的上下文包括:

  • gdim_state:GDIM 访问状态,如 tokenproj_idtpl_idhost

  • workspace:输出文件的默认工作目录;

  • local_functions_path:PythonCoder 或本地函数相关脚本路径;

  • llm_key:某些 AI 模块会读取的模型密钥。

其中最推荐的入口是:

pipeline.update_gdim_state(
    token=log_in(),
    proj_id="你的GDIM项目ID",
)

这样做的意义不只是“把值设进去”,还在于:

  • GDIM 相关状态被统一收敛到 gdim_state

  • 当关键凭据变更时,相关缓存会按需要失效;

  • 后续模块在运行时可以共享同一份上下文,而不需要每个模块单独配置一遍。

对使用者来说,可以把它理解为: 模块负责做事,Pipeline 负责把做事所需的公共环境准备好。

跨 Pipeline 数据:save_data_to_db 与 GDIM 数据库

当多个应用需要前后衔接(例如:应用 A 做专业计算,应用 B 只用 A 的结果生成报告)时,除了在同一个 Pipeline 里用端口连线传递数据外,还可以在 GDIM 平台上把应用 A 的运行结果写入项目数据库,再由应用 B 读回。

在 Pipeline 构建阶段调用 save_data_to_db() 即可登记需要落库的数据;应用成功运行在 GDIM 上之后,这些条目会按统一规则序列化并保存。典型用法如下:

  • 输出端口数据pipeline.save_data_to_db("OutputTable", "port", "MyModule") 要求 MyModule 上存在名为 OutputTable 的输出端口,且端口类型须能被平台序列化(常见表格、数值、ResultModel 等均支持,详见实现中的可序列化类型集合)。

  • 模块参数 / 计算属性pipeline.save_data_to_db("threshold", "module", "FilterModule") 对应落库键 "FilterModule#threshold"

  • Pipeline 本体上的直接属性pipeline.save_data_to_db("workspace", "pipeline") 仅适用于 hasattr(pipeline, name) 为真的字段(例如 workspaceapp_nameapp_title 等)。 通过 add_attribute() 暴露给前端的属性,实际存储在底层模块参数上,应使用上面的 data_type="module" 方式,写出**模块名 + 参数名**。

键名规则小结(与读回时一致):

  • 端口:"{module_name}@{port_name}"

  • Pipeline 直接属性:"pipeline@{attr_name}"

  • 模块参数:"{module_name}#{attr_name}"

下游应用使用 GdimAppDataReader,将 app_title 设为**上游应用在 GDIM 模板中的标题**(须能唯一定位到保存了数据的那一个应用),即可得到包含上述全部键的 ResultModel。若只需要其中一项,可再接 GdimAppDataSelector 按端口名/参数名拆分(模块级说明见 modules.readers 模块帮助modules.filters 模块帮助)。

Note

该机制面向 GDIM 平台运行时 的持久化与跨应用复用;本地脚本若未走平台落库流程,可能看不到与线上相同的数据。

保存与复用

当一个 Pipeline 设计完成后,可以通过 save_pipeline() 保存为 .pipe 文件。

这个文件主要承担两类作用:

  • 保存 Pipeline 的结构与配置,便于复用或上传到 GDIM;

  • 作为应用描述文件,让平台知道有哪些模块、连线、属性和结果配置。

对绝大多数使用者而言,只需要知道:

  • .pipe 是 Pipeline 的序列化结果;

  • 一般关心“能否正确保存、上传、恢复”,而不需要研究其内部结构;

  • restore_pipeline() 主要用于加载已有 Pipeline,属于进阶能力,本章不展开细节。

推荐理解顺序

如果你已经会写一个基本 Pipeline,建议按下面顺序理解其机制:

  1. 先理解“Pipeline 是图结构,而不是步骤列表”;

  2. 再理解 add_links() 为什么同时影响数据流和参数 UI;

  3. 然后理解 run() 如何从起点模块向下传播执行;

  4. 再理解 execute() 返回值如何决定 executed``executed_count``(什么算“成功执行”);

  5. 最后再理解 add_attribute() / get_attribute_info() 为什么能支撑应用级参数面板;

  6. 若涉及多个 GDIM 应用衔接,再阅读 save_data_to_dbGdimAppDataReader / GdimAppDataSelector 的配合方式。

理解了这几层之后,再去看各个具体模块的 update_ui_schema()、端口设计和应用配置,整体会更清晰。

相关主题

本章只解释 Pipeline 的核心机制;如果你还关心更偏“应用化”或“开发辅助”的能力,可以继续看: