流程控制 (Flow Control)

本章介绍如何在**单个 Pipeline 内**对一组数据重复执行同一子流程。

如果你还不熟悉 Pipeline 的基本连线与执行机制,建议先阅读 新手入门 (Getting Started)用户指南 (User Guide)运行机制 (Runtime)

什么时候需要流程控制

典型场景包括:

  • 对多个 GDIM 项目依次执行相同的读表、转换、写回操作;

  • 对一批配置项(如多份文件、多组参数)重复跑同一处理链;

  • 上游模块输出 list[dict] 形式的任务列表,下游需要逐项处理。

这类需求可以通过 ForEachController 实现:控制器接收迭代列表,在每次迭代中把列表项绑定到内层模块参数,再按连线顺序执行内层模块。

ForEachController 是什么

ForEachController 是一种**流程控制器**,在 Pipeline 图中表现为一个特殊模块节点。它:

  • 通过 InputIterData 端口接收迭代数据(list[dict[str, Any]]);

  • 在内部维护一组**内层模块**及其连线;

  • 对列表中的每一项,按 iteration_bindings 把字典键值写入内层模块属性;

  • 执行完所有迭代后,按配置聚合指定输出端口的结果。

与普通模块的区别在于:控制器本身不承载业务算法,真正的计算逻辑写在内层模块中;PipeLine.run() 识别到控制器后,会代为驱动迭代与子流程执行。

与其它 PipeModule 一样,创建控制器时使用 mname 指定唯一名称(即实例的 module.mname),例如 ForEachController(mname="UpdateAllProjects", ...)

导入方式:

from gdisdk.pipeline import ForEachController
from gdisdk.pipeline import Link, PipeLine

核心概念

1. InputIterData — 迭代输入

InputIterData 的类型为 IterationData,即 list[dict[str, Any]]。 列表中的每个字典代表一次迭代上下文,例如:

[
    {"proj_id": "1991460394451709953", "project_tables": table_a},
    {"proj_id": "1991460394451709954", "project_tables": table_b},
]

2. iteration_bindings — 每次迭代注入的参数

iteration_bindings"module_name.attribute_name" 映射到迭代字典中的键名。 每次迭代开始时,控制器会执行:

inner_module.attribute = item[item_key]

示例:

iteration_bindings={
    "GdimTableReader.proj_id": "proj_id",
    "GdimTableWriter.proj_id": "proj_id",
    "ReplaceFieldsValue.InputTables": "project_tables",
}

其中 GdimTableReaderGdimTableWriter 是内层模块的 mnameproj_idproject_tablesInputIterData 每条记录中的键。

3. links — 内层数据流

通过 links 参数传入内层模块之间的 Link。 可传入**单个** Linklist[Link]。 内层模块会从连线中自动提取,不需要再单独维护一份模块列表。

inner_links = reader.OutputTables >> writer.InputData

4. output_ports — 需要聚合的输出

output_ports 指定哪些内层输出端口要在控制器上**聚合成列表**。 格式为 "module_name.port_name",例如 ["MergeGdimTables.OutputTables"]

  • ``output_ports=None``(默认):控制器不创建聚合输出端口;

  • 指定后,控制器输出端口的值为 list,元素顺序与迭代顺序一致。

端口类型说明见 端口类型 (Port Type) 中的 IterationData / IterationResult

三种结果访问方式

根据业务需要,可以选择不同的输出读取策略:

方式 A:需要所有迭代结果的列表

output_ports 中声明内层端口,并连接到控制器的聚合输出:

foreach = ForEachController(
    mname="write_all_projects",
    links=inner_links,
    iteration_bindings={"writer.proj_id": "proj_id"},
    output_ports=["writer.OutputResult"],
)
# foreach.OutputResult.data → [result_iter1, result_iter2, ...]

方式 B:只需要最后一次迭代的结果

不要把该端口放进 output_ports,而是**直接连到内层模块的输出端口**:

inner_writer.OutputDebugInfo >> downstream.InputData
# downstream 拿到的是最后一轮迭代结束时 inner_writer 的输出

方式 C:不需要控制器层面的输出

保持 output_ports=None,所有下游只通过内层模块端口或控制器外的其他连线取数。

聚合端口的命名规则

当多个内层模块存在同名输出端口时,控制器上的聚合端口会自动加模块后缀以避免冲突,例如:

  • writer.OutputResult → 控制器端口名 OutputResult

  • 同时有 writer.OutputResultreader.OutputResultOutputResult_writerOutputResult_reader

接入 Pipeline

步骤 1:创建控制器与内层模块

reader = GdimTableReader(mname="GdimTableReader")
writer = GdimTableWriter(mname="GdimTableWriter")

foreach = ForEachController(
    mname="UpdateAllProjects",
    links=reader.OutputTables >> writer.InputData,
    iteration_bindings={
        "GdimTableReader.proj_id": "proj_id",
        "GdimTableWriter.proj_id": "proj_id",
    },
    output_ports=["GdimTableWriter.OutputResult"],
)

步骤 2:连接迭代数据与外层模块

迭代数据通常由上游 PythonCoder 或数据处理模块生成,再连到 InputIterData

iteration_builder.OutputIterData >> foreach.InputIterData

步骤 3:向内层模块提供静态输入

迭代之间**不变**的数据,可以直接连到内层模块,而不经过控制器:

token_reader.OutputToken >> reader.InputToken
static_template.OutputTemplate >> reader.InputTemplate

这种写法是推荐模式:iteration_bindings 只管每轮变化的参数,静态连线负责共享输入。

步骤 4:注册到 Pipeline 并运行

ForEachController 作为普通模块节点,通过 add_links() 一并注册即可:

pipeline = PipeLine(app_name="MultiProjectUpload", app_title="多项目写入")
pipeline.add_links(
    iteration_builder.OutputIterationData >> foreach.InputIterData
    | token_reader.OutputToken >> reader.InputToken
    | foreach.OutputResult >> result_collector.InputResults
)
pipeline.run()

完整示例

下面给出一个可运行的简化示例,演示「列表驱动 + 内层链路 + 结果聚合」的完整流程:

from gdisdk.pipeline import (
    ForEachController,
    Link,
    PipeLine,
    PipeModule,
    PortReference,
    module_decorator,
    PortTypeHint,
)


@module_decorator()
class IterationSource(PipeModule):
    """提供迭代列表。"""

    OutputData: PortReference[PortTypeHint.GeneralArray]

    def execute(self):
        self._ports_out["OutputData"].data = [
            {"project_id": "proj_1", "mult": 2},
            {"project_id": "proj_2", "mult": 3},
        ]


@module_decorator()
class ValueProvider(PipeModule):
    OutputValue: PortReference[PortTypeHint.General]

    def __init__(self, mname=None, auto_run=True, value=10):
        super().__init__(mname=mname, auto_run=auto_run)
        self.value = value

    def execute(self):
        self._ports_out["OutputValue"].data = self.value


@module_decorator()
class DataProcessor(PipeModule):
    InputValue: PortReference[PortTypeHint.General]
    OutputResult: PortReference[PortTypeHint.General]

    def __init__(self, mname=None, auto_run=True, proj_id="", multiplier=1):
        super().__init__(mname=mname, auto_run=auto_run)
        self.proj_id = proj_id
        self.multiplier = multiplier

    def execute(self):
        value = self._ports_in["InputValue"].data or 0
        self._ports_out["OutputResult"].data = f"{self.proj_id}: {value * self.multiplier}"


provider = ValueProvider(mname="provider", auto_run=True)
processor = DataProcessor(mname="processor", auto_run=True)

foreach = ForEachController(
    mname="process_each",
    links=provider.OutputValue >> processor.InputValue,
    iteration_bindings={
        "processor.proj_id": "project_id",
        "processor.multiplier": "mult",
    },
    output_ports=["processor.OutputResult"],
)

source = IterationSource(mname="source", auto_run=True)

pipeline = PipeLine(app_name="ForEachDemo", app_title="ForEach 示例")
pipeline.add_links(
    source.OutputData >> foreach.InputIterData
)
pipeline.run()

print(foreach.OutputResult.data)
# ['proj_1: 20', 'proj_2: 30']

常见注意事项

内层模块的 auto_run

内层模块通常应设置 auto_run=True,以便在每次迭代子流程中自动执行。若关闭 auto_run,需要自行确保执行时机,一般不建议在控制器场景下这样做。

iteration_bindings 的键名

键名必须是 "mname.attribute" 形式,其中 mname 与模块实例化时传入的名称一致。绑定到端口数据时,属性名使用端口名(如 "ReplaceFieldsValue.InputTables")。

空迭代列表

InputIterData 为空列表时,控制器不会执行内层模块,output_ports 对应的聚合结果为空列表 []

调试与排错

ForEachController 的 Pipeline 建议使用 verboserun_trace_file 与运行轨迹报告排查问题。 具体方法(含如何区分控制器与内层模块、如何核对迭代轮数等)见 运行调试 (Debug) 全文,其中「判断循环次数是否正确」一节专门针对 ForEachController

相关 API

本章侧重使用说明。完整的类签名与参数列表见 API 参考:

相关主题