流程控制 (Flow Control)
本章介绍如何在**单个 Pipeline 内**对一组数据重复执行同一子流程。
如果你还不熟悉 Pipeline 的基本连线与执行机制,建议先阅读 新手入门 (Getting Started)、用户指南 (User Guide) 和 运行机制 (Runtime)。
什么时候需要流程控制
典型场景包括:
对多个 GDIM 项目依次执行相同的读表、转换、写回操作;
对一批配置项(如多份文件、多组参数)重复跑同一处理链;
上游模块输出
list[dict]形式的任务列表,下游需要逐项处理。
这类需求可以通过 ForEachController 实现:控制器接收迭代列表,在每次迭代中把列表项绑定到内层模块参数,再按连线顺序执行内层模块。
ForEachController 是什么
ForEachController 是一种**流程控制器**,在 Pipeline 图中表现为一个特殊模块节点。它:
通过
InputIterData端口接收迭代数据(list[dict[str, Any]]);在内部维护一组**内层模块**及其连线;
对列表中的每一项,按
iteration_bindings把字典键值写入内层模块属性;执行完所有迭代后,按配置聚合指定输出端口的结果。
与普通模块的区别在于:控制器本身不承载业务算法,真正的计算逻辑写在内层模块中;PipeLine.run() 识别到控制器后,会代为驱动迭代与子流程执行。
与其它 PipeModule 一样,创建控制器时使用 mname 指定唯一名称(即实例的 module.mname),例如 ForEachController(mname="UpdateAllProjects", ...)。
导入方式:
from gdisdk.pipeline import ForEachController
from gdisdk.pipeline import Link, PipeLine
核心概念
1. InputIterData — 迭代输入
InputIterData 的类型为 IterationData,即 list[dict[str, Any]]。
列表中的每个字典代表一次迭代上下文,例如:
[
{"proj_id": "1991460394451709953", "project_tables": table_a},
{"proj_id": "1991460394451709954", "project_tables": table_b},
]
2. iteration_bindings — 每次迭代注入的参数
iteration_bindings 把 "module_name.attribute_name" 映射到迭代字典中的键名。
每次迭代开始时,控制器会执行:
inner_module.attribute = item[item_key]
示例:
iteration_bindings={
"GdimTableReader.proj_id": "proj_id",
"GdimTableWriter.proj_id": "proj_id",
"ReplaceFieldsValue.InputTables": "project_tables",
}
其中 GdimTableReader、GdimTableWriter 是内层模块的 mname;proj_id、project_tables 是 InputIterData 每条记录中的键。
3. links — 内层数据流
通过 links 参数传入内层模块之间的 Link。
可传入**单个** Link 或 list[Link]。
内层模块会从连线中自动提取,不需要再单独维护一份模块列表。
inner_links = reader.OutputTables >> writer.InputData
4. output_ports — 需要聚合的输出
output_ports 指定哪些内层输出端口要在控制器上**聚合成列表**。
格式为 "module_name.port_name",例如 ["MergeGdimTables.OutputTables"]。
``output_ports=None``(默认):控制器不创建聚合输出端口;
指定后,控制器输出端口的值为
list,元素顺序与迭代顺序一致。
端口类型说明见 端口类型 (Port Type) 中的 IterationData / IterationResult。
三种结果访问方式
根据业务需要,可以选择不同的输出读取策略:
方式 A:需要所有迭代结果的列表
在 output_ports 中声明内层端口,并连接到控制器的聚合输出:
foreach = ForEachController(
mname="write_all_projects",
links=inner_links,
iteration_bindings={"writer.proj_id": "proj_id"},
output_ports=["writer.OutputResult"],
)
# foreach.OutputResult.data → [result_iter1, result_iter2, ...]
方式 B:只需要最后一次迭代的结果
不要把该端口放进 output_ports,而是**直接连到内层模块的输出端口**:
inner_writer.OutputDebugInfo >> downstream.InputData
# downstream 拿到的是最后一轮迭代结束时 inner_writer 的输出
方式 C:不需要控制器层面的输出
保持 output_ports=None,所有下游只通过内层模块端口或控制器外的其他连线取数。
聚合端口的命名规则
当多个内层模块存在同名输出端口时,控制器上的聚合端口会自动加模块后缀以避免冲突,例如:
仅
writer.OutputResult→ 控制器端口名OutputResult同时有
writer.OutputResult与reader.OutputResult→OutputResult_writer、OutputResult_reader
接入 Pipeline
步骤 1:创建控制器与内层模块
reader = GdimTableReader(mname="GdimTableReader")
writer = GdimTableWriter(mname="GdimTableWriter")
foreach = ForEachController(
mname="UpdateAllProjects",
links=reader.OutputTables >> writer.InputData,
iteration_bindings={
"GdimTableReader.proj_id": "proj_id",
"GdimTableWriter.proj_id": "proj_id",
},
output_ports=["GdimTableWriter.OutputResult"],
)
步骤 2:连接迭代数据与外层模块
迭代数据通常由上游 PythonCoder 或数据处理模块生成,再连到 InputIterData:
iteration_builder.OutputIterData >> foreach.InputIterData
步骤 3:向内层模块提供静态输入
迭代之间**不变**的数据,可以直接连到内层模块,而不经过控制器:
token_reader.OutputToken >> reader.InputToken
static_template.OutputTemplate >> reader.InputTemplate
这种写法是推荐模式:iteration_bindings 只管每轮变化的参数,静态连线负责共享输入。
步骤 4:注册到 Pipeline 并运行
ForEachController 作为普通模块节点,通过 add_links() 一并注册即可:
pipeline = PipeLine(app_name="MultiProjectUpload", app_title="多项目写入")
pipeline.add_links(
iteration_builder.OutputIterationData >> foreach.InputIterData
| token_reader.OutputToken >> reader.InputToken
| foreach.OutputResult >> result_collector.InputResults
)
pipeline.run()
完整示例
下面给出一个可运行的简化示例,演示「列表驱动 + 内层链路 + 结果聚合」的完整流程:
from gdisdk.pipeline import (
ForEachController,
Link,
PipeLine,
PipeModule,
PortReference,
module_decorator,
PortTypeHint,
)
@module_decorator()
class IterationSource(PipeModule):
"""提供迭代列表。"""
OutputData: PortReference[PortTypeHint.GeneralArray]
def execute(self):
self._ports_out["OutputData"].data = [
{"project_id": "proj_1", "mult": 2},
{"project_id": "proj_2", "mult": 3},
]
@module_decorator()
class ValueProvider(PipeModule):
OutputValue: PortReference[PortTypeHint.General]
def __init__(self, mname=None, auto_run=True, value=10):
super().__init__(mname=mname, auto_run=auto_run)
self.value = value
def execute(self):
self._ports_out["OutputValue"].data = self.value
@module_decorator()
class DataProcessor(PipeModule):
InputValue: PortReference[PortTypeHint.General]
OutputResult: PortReference[PortTypeHint.General]
def __init__(self, mname=None, auto_run=True, proj_id="", multiplier=1):
super().__init__(mname=mname, auto_run=auto_run)
self.proj_id = proj_id
self.multiplier = multiplier
def execute(self):
value = self._ports_in["InputValue"].data or 0
self._ports_out["OutputResult"].data = f"{self.proj_id}: {value * self.multiplier}"
provider = ValueProvider(mname="provider", auto_run=True)
processor = DataProcessor(mname="processor", auto_run=True)
foreach = ForEachController(
mname="process_each",
links=provider.OutputValue >> processor.InputValue,
iteration_bindings={
"processor.proj_id": "project_id",
"processor.multiplier": "mult",
},
output_ports=["processor.OutputResult"],
)
source = IterationSource(mname="source", auto_run=True)
pipeline = PipeLine(app_name="ForEachDemo", app_title="ForEach 示例")
pipeline.add_links(
source.OutputData >> foreach.InputIterData
)
pipeline.run()
print(foreach.OutputResult.data)
# ['proj_1: 20', 'proj_2: 30']
常见注意事项
内层模块的 auto_run
内层模块通常应设置 auto_run=True,以便在每次迭代子流程中自动执行。若关闭 auto_run,需要自行确保执行时机,一般不建议在控制器场景下这样做。
iteration_bindings 的键名
键名必须是 "mname.attribute" 形式,其中 mname 与模块实例化时传入的名称一致。绑定到端口数据时,属性名使用端口名(如 "ReplaceFieldsValue.InputTables")。
空迭代列表
当 InputIterData 为空列表时,控制器不会执行内层模块,output_ports 对应的聚合结果为空列表 []。
调试与排错
带 ForEachController 的 Pipeline 建议使用 verbose、run_trace_file 与运行轨迹报告排查问题。
具体方法(含如何区分控制器与内层模块、如何核对迭代轮数等)见 运行调试 (Debug) 全文,其中「判断循环次数是否正确」一节专门针对 ForEachController。
相关 API
本章侧重使用说明。完整的类签名与参数列表见 API 参考:
pipeline.flowControl —
ForEachController、FlowController端口类型 (Port Type) —
IterationData、IterationResult端口类型
相关主题
运行机制 (Runtime) —
add_links()、run()与图结构传播机制用户指南 (User Guide) — 如何把 Pipeline 做成可配置、可上传的应用
运行调试 (Debug) —
verbose、run_trace_file与运行轨迹解读