modules.mergers 模块帮助
本章节包含 modules.mergers 包中常用「数据合并/融合」模块的使用说明和示例,例如:
多表按索引字段做类似 SQL Join 的合并(
MergeTables)将输入表与 GDIM 数据库表对齐并生成可写入结构(
MergeGdimTables)多个
ResultModel/SingleResult混合合并为一个ResultModel``(``MergeResultModels)
MergeSingleResult
模块简介与适用场景
MergeSingleResult用于将多个SingleResult合并为一个SingleResult``(端口:``OutputSingleResult)。输入端口为「动态输入端口」(Dynamic Ports):可接入任意数量的
SingleResult输入。MergeSingleResult已不再推荐用于新流程。建议优先使用MergeResultModels,因为SingleResult后续会逐步淘汰。典型适用场景:
多个模块分别产出不同 key 的结果(或同 key 的不同来源结果),需要汇总成一个结果对象;
报告生成前,把多个计算模块的
SingleResult统一合并,便于后续写入模板/汇总输出;对冲突 key 做可控处理:取第一个/最后一个、拼成列表、拼成字符串、或对不同 key 使用不同策略。
端口说明
输入端口 - 动态输入端口:类型为
SingleResult。需要先通过add_dynamic_ports_in("InputXXX")显式添加端口后,才能在 pipeline 里连接到该端口。输出端口 -
OutputSingleResult:合并后的SingleResult;当未连接任何输入端口、或(在all_ports_required=True时)存在未就绪端口时为None。
如何添加动态输入端口(必读)
MergeSingleResult 的输入端口是 “动态端口”,也就是端口数量不固定。使用时你需要:
先在模块上用
add_dynamic_ports_in("InputXXX")创建若干输入端口;再把上游模块的
OutputSingleResult分别连接到这些端口。
Note
动态输入端口名必须以
Input开头(例如InputSR1/InputA),否则会报错。默认
all_ports_required=True:只要你创建过的任一动态端口数据为None,该模块就会输出None;若某些分支是可选的,请把all_ports_required=False。
合并规则与冲突策略(核心概念)
对于不同 key(不冲突):像合并字典一样直接合并。
对于同 key(冲突):按策略合并冲突值,策略可通过
default_strategy全局设置,也可通过key_strategies对特定 key 单独指定。
内置策略一览:
overwrite_first:取第一个输入的值(第一个优先)overwrite_last:取最后一个输入的值(最后一个优先,默认)concat_list:把所有值拼成 list;若某个值本身就是 list,会自动展开合并(避免嵌套)concat_string:把所有值按分隔符拼成字符串(string_separator可配置;若值为 list,会先把元素转为字符串后再拼接)
快速上手示例:默认策略合并(最后一个覆盖)
from gdisdk.modules.mergers import MergeSingleResult
m = MergeSingleResult(mname="MergeSR")
# 先声明动态输入端口(端口名需以 Input 开头)
m.add_dynamic_ports_in("InputSR1")
m.add_dynamic_ports_in("InputSR2")
# 在 pipeline 中把多个模块的 OutputSingleResult 连到这些端口即可
# m.execute()
# merged = m.OutputSingleResult.data
快速上手示例:为不同 key 指定不同策略
from gdisdk.modules.mergers import MergeSingleResult
m = MergeSingleResult(
mname="MergeSR",
default_strategy="overwrite_last",
key_strategies={
"project_name": "concat_string", # 也可以用 key 的 title 来匹配
"bore_ids": "concat_list",
},
string_separator=" | ",
)
参数说明
参数名 |
类型 |
默认值 |
说明 |
|---|---|---|---|
|
|
|
为 |
|
|
|
冲突 key 的默认合并策略。 |
|
|
|
|
|
|
|
为特定 key 指定策略的字典;key 可以写结果的 |
Note
合并冲突 key 时,
UnitResult的元信息(如title/unit/description)默认沿用“第一个出现的结果”的元信息。当你使用
concat_list时,若输入值中已有 list,会自动展开合并(避免[[...], [...]]这种嵌套)。
在 pipeline 中的使用方式
from gdisdk.pipeline.pipeline import PipeLine
from gdisdk.modules.mergers import MergeSingleResult
pipe = PipeLine(app_name="MergeSingleResultDemo", app_title="合并 SingleResult 示例")
m = MergeSingleResult("MergeSR", all_ports_required=False)
m.add_dynamic_ports_in("InputSR1")
m.add_dynamic_ports_in("InputSR2")
m.add_dynamic_ports_in("InputSR3")
# links = (
# mod1.OutputSingleResult >> m.InputSR1
# | mod2.OutputSingleResult >> m.InputSR2
# | mod3.OutputSingleResult >> m.InputSR3
# )
# pipe.add_links(links)
# pipe.add_module(m)
# pipe.run()
更多信息
MergeResultModels
模块简介与适用场景
MergeResultModels用于将多个ResultModel和/或SingleResult合并为一个ResultModel``(端口:``OutputResultModel)。输入端口为「动态输入端口」(Dynamic Ports):可接入任意数量的
ResultModel或SingleResult输入。推荐在新流程中优先使用本模块,尤其适合从
SingleResult逐步迁移到ResultModel的场景。典型适用场景:
多个模块输出的是
ResultModel,需要汇总成一个统一结果对象供后续模块继续消费;老流程里仍有部分模块输出
SingleResult,但下游希望统一接收ResultModel;需要对同名字段做冲突处理:取第一个/最后一个、拼成列表、拼成字符串、或对不同字段使用不同策略。
端口说明
输入端口 - 动态输入端口:类型支持
ResultModel和SingleResult。需要先通过add_dynamic_ports_in("InputXXX")显式添加端口后,才能在 pipeline 里连接到该端口。输出端口 -
OutputResultModel:合并后的ResultModel;当未连接任何输入端口、或(在all_ports_required=True时)存在未就绪端口时为None。
如何添加动态输入端口(必读)
MergeResultModels 的输入端口是“动态端口”,也就是端口数量不固定。使用时你需要:
先在模块上用
add_dynamic_ports_in("InputXXX")创建若干输入端口;再把上游模块的
OutputResultModel或OutputSingleResult分别连接到这些端口。
Note
动态输入端口名必须以
Input开头(例如InputRM1/InputSR1/InputA),否则会报错。默认
all_ports_required=True:只要你创建过的任一动态端口数据为None,该模块就会输出None;若某些分支是可选的,请把all_ports_required=False。
字段展开与合并规则(核心概念)
对于
SingleResult输入:会按UnitResult.name展开为字段名,UnitResult.value作为字段值;若要在key_strategies中按标题匹配,可使用UnitResult.title。对于
ResultModel输入:会按model_dump()结果展开字段;若字段定义带有Field(title=...),则标题也可用于key_strategies匹配。对于不同字段名(不冲突):像合并字典一样直接合并。
对于同字段名(冲突):按策略合并冲突值,策略可通过
default_strategy全局设置,也可通过key_strategies对特定字段单独指定。
内置策略一览:
overwrite_first:取第一个输入的值(第一个优先)overwrite_last:取最后一个输入的值(最后一个优先,默认)concat_list:把所有值拼成 list;若某个值本身就是 list,会自动展开合并(避免嵌套)concat_string:把所有值按分隔符拼成字符串(string_separator可配置;若值为 list,会先把元素转为字符串后再拼接)
快速上手示例:合并多个 ResultModel
from pydantic import BaseModel, Field
from gdisdk.modules.mergers import MergeResultModels
class ProjectInfo(BaseModel):
project_name: str = Field(title="项目名称")
section_name: str = Field(title="标段名称")
class ProjectStats(BaseModel):
bore_count: int = Field(title="钻孔数量")
m = MergeResultModels(mname="MergeRM")
m.add_dynamic_ports_in("InputRM1")
m.add_dynamic_ports_in("InputRM2")
# 在 pipeline 中把两个模块的 OutputResultModel 连到这些端口即可
快速上手示例:混合合并 ResultModel 和 SingleResult
from gdisdk.modules.mergers import MergeResultModels
m = MergeResultModels(
mname="MergeMixedResults",
all_ports_required=False,
default_strategy="overwrite_last",
key_strategies={
"备注": "concat_string", # 可按 title 匹配
"bore_ids": "concat_list", # 也可按字段名匹配
},
string_separator=" | ",
)
m.add_dynamic_ports_in("InputRM1")
m.add_dynamic_ports_in("InputSR1")
参数说明
参数名 |
类型 |
默认值 |
说明 |
|---|---|---|---|
|
|
|
为 |
|
|
|
冲突字段的默认合并策略。 |
|
|
|
|
|
|
|
为特定字段指定策略的字典;key 可以写字段名,也可以写字段标题(对 |
Note
输出是运行时动态创建的 Pydantic
BaseModel实例,其字段集合是所有输入字段名的并集。若只有一个有效输入,模块仍会输出一个
ResultModel;不会回传原始SingleResult类型。当多个输入存在同名字段时,冲突判断基于“字段名”本身;
title只用于key_strategies的匹配。
在 pipeline 中的使用方式
from gdisdk.pipeline.pipeline import PipeLine
from gdisdk.modules.mergers import MergeResultModels
pipe = PipeLine(app_name="MergeResultModelsDemo", app_title="合并 ResultModel 示例")
m = MergeResultModels("MergeResults", all_ports_required=False)
m.add_dynamic_ports_in("InputRM1")
m.add_dynamic_ports_in("InputRM2")
m.add_dynamic_ports_in("InputSR1")
# links = (
# mod_result_1.OutputResultModel >> m.InputRM1
# | mod_result_2.OutputResultModel >> m.InputRM2
# | mod_single.OutputSingleResult >> m.InputSR1
# )
# pipe.add_links(links)
# pipe.add_module(m)
# pipe.run()
# merged = m.OutputResultModel.data
更多信息
MergeTables
模块简介与适用场景
MergeTables用于合并多张表:输入为
TableCollection:按join_key依次把多张TableData合并为一张 ``TableData``(类似 SQL Join)。输入为单个
TableData:直接原样输出(不会做合并)。
典型适用场景:
多表(同索引/主键)横向拼接字段(例如:基本信息表 + 计算结果表 + 备注表);
只挑其中部分表参与合并(
tables_to_merge);join 键字段可使用「列名」或「字段标题」,在 UI 中也可从候选 join 键中选择。
端口说明
输入端口 -
InputTables:输入表集合(TableCollection)或单表(TableData)。输出端口 -
OutputTable:合并后的TableData;当输入为空、或无法找到可用 join 键/可合并表为空时为None。
快速上手示例:合并集合内全部表(自动选择 join_key)
from gdisdk.modules.mergers import MergeTables
m = MergeTables(mname="MergeTables")
# m.InputTables = tables # TableCollection
out_table = m.execute()
快速上手示例:指定 join_key(可写列名或字段标题)
from gdisdk.modules.mergers import MergeTables
m = MergeTables(mname="MergeByKey")
m.join_key = "bore_number" # 列名
# m.join_key = "钻孔编号" # 字段标题(若能映射到列名也可)
# m.InputTables = tables
out_table = m.execute()
参数说明
参数名 |
类型 |
默认值 |
说明 |
|---|---|---|---|
|
|
|
合并索引字段(列名或字段标题)。为 |
|
|
|
指定要参与合并的表(表名或表标题)。为 |
|
|
|
合并方式(pandas merge 语义)。常用: |
|
|
|
重名列后缀(按 pandas merge 语义)。 |
|
|
|
是否对结果按 join key 排序。 |
|
|
|
是否尽量避免复制(按 pandas merge 语义)。 |
|
|
|
是否添加来源指示列(按 pandas merge 语义)。 |
|
|
|
关系校验(按 pandas merge 语义);打开可用于发现重复/主键不唯一等数据质量问题,但会增加开销。 |
|
|
|
是否忽略索引并重新编号(按 pandas merge 语义)。 |
Note
输入为
TableData时,该表会被直接返回(等价于“未做合并”)。输入为
TableCollection时会按顺序逐张合并;若某张表不包含 join 键会被跳过并给出警告。
在 pipeline 中的使用方式
from gdisdk.pipeline.pipeline import PipeLine
from gdisdk.modules.mergers import MergeTables
pipe = PipeLine(app_name="MergeTablesDemo", app_title="多表合并示例")
m = MergeTables("MergeTables", how="left")
m.join_key = "bore_number"
# links = upstream.OutputTables >> m.InputTables
# pipe.add_links(links)
# pipe.add_module(m)
# pipe.run()
# out = m.OutputTable.data
更多信息
MergeGdimTables
模块简介与适用场景
MergeGdimTables用于把输入表与 GDIM 表进行“对齐合并”,主要用于把输入数据预处理成可写入 GDIM 的结构:通过主键(primary key)匹配已有 GDIM 行,并生成/填充
gdim_id;输出表结构以 GDIM 表结构为准,确保字段/元信息符合 GDIM 数据库 schema。
典型适用场景:
写入 GDIM 前,把本地/外部表格与 GDIM 现有数据融合,决定“新增行”还是“更新行”;
输入表字段名/标题与 GDIM 不完全一致时,启用自动列映射(
auto_map_columns=True)。
端口说明
输入端口 -
InputTables:待对齐合并的输入表(TableCollection或TableData)。 -InputGdimTables:作为基准的 GDIM 表集合(TableCollection),每张表必须包含gdim_id列。输出端口 -
OutputTables:合并后的表(TableCollection或TableData,与输入类型保持一致)。
输出行为(如何理解 how)
-
输入中与 GDIM 已存在主键匹配的行会被**剔除**(只保留真正“新增”的行);
输出中
gdim_id会被置为None,表示后续写入为新增插入。
how="overwrite":
快速上手示例:只输出新增行(ignore,默认)
from gdisdk.modules.mergers import MergeGdimTables
m = MergeGdimTables(mname="MergeWithGdim", how="ignore")
# m.InputTables = input_tables # TableCollection 或 TableData
# m.InputGdimTables = gdim_tables # TableCollection(含 gdim_id)
out = m.execute()
快速上手示例:允许更新(overwrite)并启用自动列映射
from gdisdk.modules.mergers import MergeGdimTables
m = MergeGdimTables(
mname="MergeWithGdimOverwrite",
how="overwrite",
auto_map_columns=True,
)
参数说明
参数名 |
类型 |
默认值 |
说明 |
|---|---|---|---|
|
|
|
主键匹配行的处理方式: |
|
|
|
是否自动做列映射。为 |
Note
InputGdimTables中的每张表都必须包含gdim_id列,否则会抛出ValueError。若某张输入表在
InputGdimTables中找不到同名/同标题的 GDIM 表,则该表会原样返回(不会添加gdim_id,也不会做列映射)。若
InputGdimTables为空集合,则直接返回 ``InputTables``(不做处理)。
在 pipeline 中的使用方式
from gdisdk.pipeline.pipeline import PipeLine
from gdisdk.modules.mergers import MergeGdimTables
pipe = PipeLine(app_name="MergeGdimTablesDemo", app_title="与 GDIM 表对齐示例")
m = MergeGdimTables("MergeWithGdim", how="ignore")
# links = input_reader.OutputTables >> m.InputTables | gdim_reader.OutputTables >> m.InputGdimTables
# pipe.add_links(links)
# pipe.add_module(m)
# pipe.run()
# out = m.OutputTables.data
更多信息