modules.mergers 模块帮助

本章节包含 modules.mergers 包中常用「数据合并/融合」模块的使用说明和示例,例如:

  • 多表按索引字段做类似 SQL Join 的合并(MergeTables

  • 将输入表与 GDIM 数据库表对齐并生成可写入结构(MergeGdimTables

  • 多个 ResultModel / SingleResult 混合合并为一个 ResultModel``(``MergeResultModels

MergeSingleResult

模块简介与适用场景

  • MergeSingleResult 用于将多个 SingleResult 合并为一个 SingleResult``(端口:``OutputSingleResult)。

  • 输入端口为「动态输入端口」(Dynamic Ports):可接入任意数量的 SingleResult 输入。

  • MergeSingleResult 已不再推荐用于新流程。建议优先使用 MergeResultModels,因为 SingleResult 后续会逐步淘汰。

  • 典型适用场景:

    • 多个模块分别产出不同 key 的结果(或同 key 的不同来源结果),需要汇总成一个结果对象;

    • 报告生成前,把多个计算模块的 SingleResult 统一合并,便于后续写入模板/汇总输出;

    • 对冲突 key 做可控处理:取第一个/最后一个、拼成列表、拼成字符串、或对不同 key 使用不同策略。

端口说明

  • 输入端口 - 动态输入端口:类型为 SingleResult。需要先通过 add_dynamic_ports_in("InputXXX") 显式添加端口后,才能在 pipeline 里连接到该端口。

  • 输出端口 - OutputSingleResult:合并后的 SingleResult;当未连接任何输入端口、或(在 all_ports_required=True 时)存在未就绪端口时为 None

如何添加动态输入端口(必读)

MergeSingleResult 的输入端口是 “动态端口”,也就是端口数量不固定。使用时你需要:

  • 先在模块上用 add_dynamic_ports_in("InputXXX") 创建若干输入端口;

  • 再把上游模块的 OutputSingleResult 分别连接到这些端口。

Note

  • 动态输入端口名必须以 Input 开头(例如 InputSR1 / InputA),否则会报错。

  • 默认 all_ports_required=True:只要你创建过的任一动态端口数据为 None,该模块就会输出 None;若某些分支是可选的,请把 all_ports_required=False

合并规则与冲突策略(核心概念)

  • 对于不同 key(不冲突):像合并字典一样直接合并。

  • 对于同 key(冲突):按策略合并冲突值,策略可通过 default_strategy 全局设置,也可通过 key_strategies 对特定 key 单独指定。

内置策略一览:

  • overwrite_first:取第一个输入的值(第一个优先)

  • overwrite_last:取最后一个输入的值(最后一个优先,默认)

  • concat_list:把所有值拼成 list;若某个值本身就是 list,会自动展开合并(避免嵌套)

  • concat_string:把所有值按分隔符拼成字符串(string_separator 可配置;若值为 list,会先把元素转为字符串后再拼接)

快速上手示例:默认策略合并(最后一个覆盖)

from gdisdk.modules.mergers import MergeSingleResult

m = MergeSingleResult(mname="MergeSR")
# 先声明动态输入端口(端口名需以 Input 开头)
m.add_dynamic_ports_in("InputSR1")
m.add_dynamic_ports_in("InputSR2")
# 在 pipeline 中把多个模块的 OutputSingleResult 连到这些端口即可
# m.execute()
# merged = m.OutputSingleResult.data

快速上手示例:为不同 key 指定不同策略

from gdisdk.modules.mergers import MergeSingleResult

m = MergeSingleResult(
    mname="MergeSR",
    default_strategy="overwrite_last",
    key_strategies={
        "project_name": "concat_string",   # 也可以用 key 的 title 来匹配
        "bore_ids": "concat_list",
    },
    string_separator=" | ",
)

参数说明

MergeSingleResult 参数一览

参数名

类型

默认值

说明

all_ports_required

bool

True

True 时,动态输入端口只要有任一端口数据为 None 就不会执行合并(输出 None);为 False 时会尽量合并已有的输入。

default_strategy

Literal["overwrite_first","overwrite_last","concat_list","concat_string"]

"overwrite_last"

冲突 key 的默认合并策略。

string_separator

str

", "

concat_string 策略的拼接分隔符。

key_strategies

dict[str, Literal["overwrite_first","overwrite_last","concat_list","concat_string"]] | None

None

为特定 key 指定策略的字典;key 可以写结果的 nametitle;未指定的 key 使用 default_strategy

Note

  • 合并冲突 key 时,UnitResult 的元信息(如 title/unit/description)默认沿用“第一个出现的结果”的元信息。

  • 当你使用 concat_list 时,若输入值中已有 list,会自动展开合并(避免 [[...], [...]] 这种嵌套)。

在 pipeline 中的使用方式

from gdisdk.pipeline.pipeline import PipeLine
from gdisdk.modules.mergers import MergeSingleResult

pipe = PipeLine(app_name="MergeSingleResultDemo", app_title="合并 SingleResult 示例")

m = MergeSingleResult("MergeSR", all_ports_required=False)
m.add_dynamic_ports_in("InputSR1")
m.add_dynamic_ports_in("InputSR2")
m.add_dynamic_ports_in("InputSR3")

# links = (
#     mod1.OutputSingleResult >> m.InputSR1
#     | mod2.OutputSingleResult >> m.InputSR2
#     | mod3.OutputSingleResult >> m.InputSR3
# )
# pipe.add_links(links)
# pipe.add_module(m)
# pipe.run()

更多信息

MergeResultModels

模块简介与适用场景

  • MergeResultModels 用于将多个 ResultModel 和/或 SingleResult 合并为一个 ResultModel``(端口:``OutputResultModel)。

  • 输入端口为「动态输入端口」(Dynamic Ports):可接入任意数量的 ResultModelSingleResult 输入。

  • 推荐在新流程中优先使用本模块,尤其适合从 SingleResult 逐步迁移到 ResultModel 的场景。

  • 典型适用场景:

    • 多个模块输出的是 ResultModel,需要汇总成一个统一结果对象供后续模块继续消费;

    • 老流程里仍有部分模块输出 SingleResult,但下游希望统一接收 ResultModel

    • 需要对同名字段做冲突处理:取第一个/最后一个、拼成列表、拼成字符串、或对不同字段使用不同策略。

端口说明

  • 输入端口 - 动态输入端口:类型支持 ResultModelSingleResult。需要先通过 add_dynamic_ports_in("InputXXX") 显式添加端口后,才能在 pipeline 里连接到该端口。

  • 输出端口 - OutputResultModel:合并后的 ResultModel;当未连接任何输入端口、或(在 all_ports_required=True 时)存在未就绪端口时为 None

如何添加动态输入端口(必读)

MergeResultModels 的输入端口是“动态端口”,也就是端口数量不固定。使用时你需要:

  • 先在模块上用 add_dynamic_ports_in("InputXXX") 创建若干输入端口;

  • 再把上游模块的 OutputResultModelOutputSingleResult 分别连接到这些端口。

Note

  • 动态输入端口名必须以 Input 开头(例如 InputRM1 / InputSR1 / InputA),否则会报错。

  • 默认 all_ports_required=True:只要你创建过的任一动态端口数据为 None,该模块就会输出 None;若某些分支是可选的,请把 all_ports_required=False

字段展开与合并规则(核心概念)

  • 对于 SingleResult 输入:会按 UnitResult.name 展开为字段名,UnitResult.value 作为字段值;若要在 key_strategies 中按标题匹配,可使用 UnitResult.title

  • 对于 ResultModel 输入:会按 model_dump() 结果展开字段;若字段定义带有 Field(title=...),则标题也可用于 key_strategies 匹配。

  • 对于不同字段名(不冲突):像合并字典一样直接合并。

  • 对于同字段名(冲突):按策略合并冲突值,策略可通过 default_strategy 全局设置,也可通过 key_strategies 对特定字段单独指定。

内置策略一览:

  • overwrite_first:取第一个输入的值(第一个优先)

  • overwrite_last:取最后一个输入的值(最后一个优先,默认)

  • concat_list:把所有值拼成 list;若某个值本身就是 list,会自动展开合并(避免嵌套)

  • concat_string:把所有值按分隔符拼成字符串(string_separator 可配置;若值为 list,会先把元素转为字符串后再拼接)

快速上手示例:合并多个 ResultModel

from pydantic import BaseModel, Field
from gdisdk.modules.mergers import MergeResultModels

class ProjectInfo(BaseModel):
    project_name: str = Field(title="项目名称")
    section_name: str = Field(title="标段名称")

class ProjectStats(BaseModel):
    bore_count: int = Field(title="钻孔数量")

m = MergeResultModels(mname="MergeRM")
m.add_dynamic_ports_in("InputRM1")
m.add_dynamic_ports_in("InputRM2")
# 在 pipeline 中把两个模块的 OutputResultModel 连到这些端口即可

快速上手示例:混合合并 ResultModel 和 SingleResult

from gdisdk.modules.mergers import MergeResultModels

m = MergeResultModels(
    mname="MergeMixedResults",
    all_ports_required=False,
    default_strategy="overwrite_last",
    key_strategies={
        "备注": "concat_string",      # 可按 title 匹配
        "bore_ids": "concat_list",   # 也可按字段名匹配
    },
    string_separator=" | ",
)
m.add_dynamic_ports_in("InputRM1")
m.add_dynamic_ports_in("InputSR1")

参数说明

MergeResultModels 参数一览

参数名

类型

默认值

说明

all_ports_required

bool

True

True 时,动态输入端口只要有任一端口数据为 None 就不会执行合并(输出 None);为 False 时会尽量合并已有的输入。

default_strategy

Literal["overwrite_first","overwrite_last","concat_list","concat_string"]

"overwrite_last"

冲突字段的默认合并策略。

string_separator

str

", "

concat_string 策略的拼接分隔符。

key_strategies

dict[str, Literal["overwrite_first","overwrite_last","concat_list","concat_string"]] | None

None

为特定字段指定策略的字典;key 可以写字段名,也可以写字段标题(对 SingleResultUnitResult.title,对 ResultModelField(title=...));未指定的字段使用 default_strategy

Note

  • 输出是运行时动态创建的 Pydantic BaseModel 实例,其字段集合是所有输入字段名的并集。

  • 若只有一个有效输入,模块仍会输出一个 ResultModel;不会回传原始 SingleResult 类型。

  • 当多个输入存在同名字段时,冲突判断基于“字段名”本身;title 只用于 key_strategies 的匹配。

在 pipeline 中的使用方式

from gdisdk.pipeline.pipeline import PipeLine
from gdisdk.modules.mergers import MergeResultModels

pipe = PipeLine(app_name="MergeResultModelsDemo", app_title="合并 ResultModel 示例")

m = MergeResultModels("MergeResults", all_ports_required=False)
m.add_dynamic_ports_in("InputRM1")
m.add_dynamic_ports_in("InputRM2")
m.add_dynamic_ports_in("InputSR1")

# links = (
#     mod_result_1.OutputResultModel >> m.InputRM1
#     | mod_result_2.OutputResultModel >> m.InputRM2
#     | mod_single.OutputSingleResult >> m.InputSR1
# )
# pipe.add_links(links)
# pipe.add_module(m)
# pipe.run()
# merged = m.OutputResultModel.data

更多信息

MergeTables

模块简介与适用场景

  • MergeTables 用于合并多张表:

    • 输入为 TableCollection:按 join_key 依次把多张 TableData 合并为一张 ``TableData``(类似 SQL Join)。

    • 输入为单个 TableData:直接原样输出(不会做合并)。

  • 典型适用场景:

    • 多表(同索引/主键)横向拼接字段(例如:基本信息表 + 计算结果表 + 备注表);

    • 只挑其中部分表参与合并(tables_to_merge);

    • join 键字段可使用「列名」或「字段标题」,在 UI 中也可从候选 join 键中选择。

端口说明

  • 输入端口 - InputTables:输入表集合(TableCollection)或单表(TableData)。

  • 输出端口 - OutputTable:合并后的 TableData;当输入为空、或无法找到可用 join 键/可合并表为空时为 None

快速上手示例:合并集合内全部表(自动选择 join_key)

from gdisdk.modules.mergers import MergeTables

m = MergeTables(mname="MergeTables")
# m.InputTables = tables  # TableCollection
out_table = m.execute()

快速上手示例:指定 join_key(可写列名或字段标题)

from gdisdk.modules.mergers import MergeTables

m = MergeTables(mname="MergeByKey")
m.join_key = "bore_number"      # 列名
# m.join_key = "钻孔编号"        # 字段标题(若能映射到列名也可)
# m.InputTables = tables
out_table = m.execute()

参数说明

MergeTables 参数一览

参数名

类型

默认值

说明

join_key

str | None

None

合并索引字段(列名或字段标题)。为 None 时,会自动选择“所有待合并表都包含的第一个公共列”作为 join 键;不包含 join 键的表会被跳过。

tables_to_merge

list[str] | str | None

None

指定要参与合并的表(表名或表标题)。为 None 时考虑集合内所有表;若为 str 会自动转为单元素列表。

how

Literal["left","right","outer","inner"]

"inner"

合并方式(pandas merge 语义)。常用:inner 仅保留匹配行;left 保留左表全部行。

suffixes

tuple[str, str]

("_x","_y")

重名列后缀(按 pandas merge 语义)。

sort

bool

False

是否对结果按 join key 排序。

copy

bool

True

是否尽量避免复制(按 pandas merge 语义)。

indicator

bool

False

是否添加来源指示列(按 pandas merge 语义)。

validate

Literal["one_to_one","one_to_many","many_to_one","many_to_many"] | None

None

关系校验(按 pandas merge 语义);打开可用于发现重复/主键不唯一等数据质量问题,但会增加开销。

ignore_index

bool

False

是否忽略索引并重新编号(按 pandas merge 语义)。

Note

  • 输入为 TableData 时,该表会被直接返回(等价于“未做合并”)。

  • 输入为 TableCollection 时会按顺序逐张合并;若某张表不包含 join 键会被跳过并给出警告。

在 pipeline 中的使用方式

from gdisdk.pipeline.pipeline import PipeLine
from gdisdk.modules.mergers import MergeTables

pipe = PipeLine(app_name="MergeTablesDemo", app_title="多表合并示例")

m = MergeTables("MergeTables", how="left")
m.join_key = "bore_number"
# links = upstream.OutputTables >> m.InputTables
# pipe.add_links(links)
# pipe.add_module(m)
# pipe.run()
# out = m.OutputTable.data

更多信息

MergeGdimTables

模块简介与适用场景

  • MergeGdimTables 用于把输入表与 GDIM 表进行“对齐合并”,主要用于把输入数据预处理成可写入 GDIM 的结构:

    • 通过主键(primary key)匹配已有 GDIM 行,并生成/填充 gdim_id

    • 输出表结构以 GDIM 表结构为准,确保字段/元信息符合 GDIM 数据库 schema。

  • 典型适用场景:

    • 写入 GDIM 前,把本地/外部表格与 GDIM 现有数据融合,决定“新增行”还是“更新行”;

    • 输入表字段名/标题与 GDIM 不完全一致时,启用自动列映射(auto_map_columns=True)。

端口说明

  • 输入端口 - InputTables:待对齐合并的输入表(TableCollectionTableData)。 - InputGdimTables:作为基准的 GDIM 表集合(TableCollection),每张表必须包含 gdim_id 列。

  • 输出端口 - OutputTables:合并后的表(TableCollectionTableData,与输入类型保持一致)。

输出行为(如何理解 how)

  • ``how=”ignore”``(默认):

    • 输入中与 GDIM 已存在主键匹配的行会被**剔除**(只保留真正“新增”的行);

    • 输出中 gdim_id 会被置为 None,表示后续写入为新增插入。

  • how="overwrite"

    • 所有输入行都会保留;

    • 主键匹配的行会带上对应 ``gdim_id``(表示更新/覆盖 GDIM 既有行);

    • 主键不匹配的行 ``gdim_id=None``(表示新增插入)。

快速上手示例:只输出新增行(ignore,默认)

from gdisdk.modules.mergers import MergeGdimTables

m = MergeGdimTables(mname="MergeWithGdim", how="ignore")
# m.InputTables = input_tables      # TableCollection 或 TableData
# m.InputGdimTables = gdim_tables   # TableCollection(含 gdim_id)
out = m.execute()

快速上手示例:允许更新(overwrite)并启用自动列映射

from gdisdk.modules.mergers import MergeGdimTables

m = MergeGdimTables(
    mname="MergeWithGdimOverwrite",
    how="overwrite",
    auto_map_columns=True,
)

参数说明

MergeGdimTables 参数一览

参数名

类型

默认值

说明

how

Literal["ignore","overwrite"]

"ignore"

主键匹配行的处理方式:ignore 仅保留新增行;overwrite 保留全部行并为匹配行赋 gdim_id

auto_map_columns

bool

False

是否自动做列映射。为 True 时会做更灵活的 name/title 交叉匹配(更慢);为 False 时只做快速匹配(列名完全一致或列标题完全一致)。

Note

  • InputGdimTables 中的每张表都必须包含 gdim_id 列,否则会抛出 ValueError

  • 若某张输入表在 InputGdimTables 中找不到同名/同标题的 GDIM 表,则该表会原样返回(不会添加 gdim_id,也不会做列映射)。

  • InputGdimTables 为空集合,则直接返回 ``InputTables``(不做处理)。

在 pipeline 中的使用方式

from gdisdk.pipeline.pipeline import PipeLine
from gdisdk.modules.mergers import MergeGdimTables

pipe = PipeLine(app_name="MergeGdimTablesDemo", app_title="与 GDIM 表对齐示例")

m = MergeGdimTables("MergeWithGdim", how="ignore")
# links = input_reader.OutputTables >> m.InputTables | gdim_reader.OutputTables >> m.InputGdimTables
# pipe.add_links(links)
# pipe.add_module(m)
# pipe.run()
# out = m.OutputTables.data

更多信息