Apache SeaTunnel 将 CDC 数据流转换为 Append-Only 模式的详细过程

 更新时间:2025年11月20日 09:07:47   作者:SeaTunnel  
RowKindExtractor转换插件用于将CDC数据流转换为Append-Only模式,同时将原始的 RowKind 信息提取为一个新的字段,本文将介绍RowKindExtractor的核心功能,其在 CDC 数据同步场景下的使用方法,以及配置选项、注意事项及多种应用示例,感兴趣的朋友一起看看吧

RowKindExtractor 是 Apache SeaTunnel 的一个转换插件,它能将 CDC 数据流转为 Append-Only 模式,并提取原始 RowKind 信息为新字段。本文将介绍 RowKindExtractor 的核心功能,其在 CDC 数据同步场景下的使用方法,以及配置选项、注意事项及多种应用示例。

RowKindExtractor

RowKindExtractor 转换插件用于将 CDC(Change Data Capture)数据流转换为 Append-Only(仅追加)模式,同时将原始的 RowKind 信息提取为一个新的字段。

核心功能:

  • 将所有数据行的 RowKind 统一改为 +I(INSERT),实现 Append-Only 模式
  • 将原始的 RowKind 信息(INSERT、UPDATE_BEFORE、UPDATE_AFTER、DELETE)保存到新增的字段中
  • 支持短格式和完整格式两种输出方式

为什么需要这个插件?

在 CDC 数据同步场景中,数据行带有 RowKind 标记(+I、-U、+U、-D),表示不同的变更类型。但某些下游系统(如数据湖、分析系统)只支持 Append-Only 模式,不支持 UPDATE 和 DELETE 操作。此时需要:

  1. 将所有数据转换为 INSERT 类型(Append-Only)
  2. 将原始的变更类型保存为普通字段,供后续分析使用

转换示例:

输入(CDC 数据):
  RowKind: -D (DELETE)
  数据: id=1, name="test1", age=20
输出(Append-Only 数据):
  RowKind: +I (INSERT)
  数据: id=1, name="test1", age=20, row_kind="DELETE"

典型应用场景

  • 将 CDC 数据写入只支持 Append 的数据湖
  • 需要在数据仓库中保留完整的变更历史记录
  • 需要对不同类型的变更进行统计分析

配置选项

custom_field_name [string]

指定新增字段的名称,该字段用于存储原始的 RowKind 信息。

默认值:row_kind

注意事项:

  • 字段名不能与原有字段重名,否则会报错
  • 建议使用有意义的名称,如 operation_type、change_type、cdc_op 等

示例:

custom_field_name = "operation_type"  # 使用自定义字段名

transform_type [enum]

指定 RowKind 字段值的输出格式。

可选值:

默认值:SHORT

各值含义:

选择建议:

  • SHORT 格式:节省存储空间,适合对存储敏感的场景
  • FULL 格式:可读性更好,适合需要人工查看或分析的场景

示例:

transform_type = FULL  # 使用完整格式

完整示例

  • 示例 1:使用默认配置(SHORT 格式)

使用默认配置,将 CDC 数据转换为 Append-Only 模式,RowKind 以短格式保存。

env {
  parallelism = 1
  job.mode = "STREAMING"
}
source {
  MySQL-CDC {
    plugin_output = "cdc_source"
    server-id = 5652
    username = "root"
    password = "your_password"
    table-names = ["mydb.users"]
    url = "jdbc:mysql://localhost:3306/mydb"
  }
}
transform {
  RowKindExtractor {
    plugin_input = "cdc_source"
    plugin_output = "append_only_data"
    # 使用默认配置:
    # custom_field_name = "row_kind"
    # transform_type = SHORT
  }
}
sink {
  Console {
    plugin_input = "append_only_data"
  }
}

数据转换过程:

输入数据(CDC 格式):
  1. RowKind=+I, id=1, name="张三", age=25
  2. RowKind=-U, id=1, name="张三", age=25
  3. RowKind=+U, id=1, name="张三", age=26
  4. RowKind=-D, id=1, name="张三", age=26
输出数据(Append-Only 格式):
  1. RowKind=+I, id=1, name="张三", age=25, row_kind="+I"
  2. RowKind=+I, id=1, name="张三", age=25, row_kind="-U"
  3. RowKind=+I, id=1, name="张三", age=26, row_kind="+U"
  4. RowKind=+I, id=1, name="张三", age=26, row_kind="-D"
  • 示例 2:使用 FULL 格式和自定义字段名

使用完整格式输出 RowKind,并自定义字段名称。

env {
  parallelism = 1
  job.mode = "STREAMING"
}
source {
  MySQL-CDC {
    plugin_output = "cdc_source"
    server-id = 5652
    username = "root"
    password = "your_password"
    table-names = ["mydb.orders"]
    url = "jdbc:mysql://localhost:3306/mydb"
  }
}
transform {
  RowKindExtractor {
    plugin_input = "cdc_source"
    plugin_output = "append_only_data"
    custom_field_name = "operation_type"  # 自定义字段名
    transform_type = FULL                 # 使用完整格式
  }
}
sink {
  Iceberg {
    plugin_input = "append_only_data"
    catalog_name = "iceberg_catalog"
    database = "mydb"
    table = "orders_history"
    # Iceberg 表会包含 operation_type 字段,记录每条数据的变更类型
  }
}
数据转换过程:
输入数据(CDC 格式):
  1. RowKind=+I, order_id=1001, amount=100.00
  2. RowKind=-U, order_id=1001, amount=100.00
  3. RowKind=+U, order_id=1001, amount=150.00
  4. RowKind=-D, order_id=1001, amount=150.00
输出数据(Append-Only 格式,FULL 格式):
  1. RowKind=+I, order_id=1001, amount=100.00, operation_type="INSERT"
  2. RowKind=+I, order_id=1001, amount=100.00, operation_type="UPDATE_BEFORE"
  3. RowKind=+I, order_id=1001, amount=150.00, operation_type="UPDATE_AFTER"
  4. RowKind=+I, order_id=1001, amount=150.00, operation_type="DELETE"
  • 示例 3:完整的测试示例(使用 FakeSource)

使用 FakeSource 生成测试数据,演示各种 RowKind 的转换效果。

env {
  parallelism = 1
  job.mode = "BATCH"
}
source {
  FakeSource {
    plugin_output = "fake_cdc_data"
    schema = {
      fields {
        pk_id = bigint
        name = string
        score = int
      }
      primaryKey {
        name = "pk_id"
        columnNames = [pk_id]
      }
    }
    rows = [
      {
        kind = INSERT
        fields = [1, "A", 100]
      },
      {
        kind = INSERT
        fields = [2, "B", 100]
      },
      {
        kind = UPDATE_BEFORE
        fields = [1, "A", 100]
      },
      {
        kind = UPDATE_AFTER
        fields = [1, "A_updated", 95]
      },
      {
        kind = UPDATE_BEFORE
        fields = [2, "B", 100]
      },
      {
        kind = UPDATE_AFTER
        fields = [2, "B_updated", 98]
      },
      {
        kind = DELETE
        fields = [1, "A_updated", 95]
      }
    ]
  }
}
transform {
  RowKindExtractor {
    plugin_input = "fake_cdc_data"
    plugin_output = "transformed_data"
    custom_field_name = "change_type"
    transform_type = FULL
  }
}
sink {
  Console {
    plugin_input = "transformed_data"
  }
}

预期输出:

+I, pk_id=1, name="A", score=100, change_type="INSERT"
+I, pk_id=2, name="B", score=100, change_type="INSERT"
+I, pk_id=1, name="A", score=100, change_type="UPDATE_BEFORE"
+I, pk_id=1, name="A_updated", score=95, change_type="UPDATE_AFTER"
+I, pk_id=2, name="B", score=100, change_type="UPDATE_BEFORE"
+I, pk_id=2, name="B_updated", score=98, change_type="UPDATE_AFTER"
+I, pk_id=1, name="A_updated", score=95, change_type="DELETE"

到此这篇关于Apache SeaTunnel 将 CDC 数据流转换为 Append-Only 模式的详细过程的文章就介绍到这了,更多相关Apache SeaTunnel Append-Only 模式内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • 学习Apache的mod rewrite、access写法

    学习Apache的mod rewrite、access写法

    Apache的mod_rewrite是提供了强大URL操作的杀手级的模块,可以实现几乎所有你梦想的URL操作类型,其代价是你必须接受其复杂性,因为mod_rewrite的主要障碍就是初学者不容易理解和运用,即使是Apache专家有时也会发掘出mod_rewrite的新用途。
    2008-09-09
  • Linux深入解析IS_ERR函数的使用方式

    Linux深入解析IS_ERR函数的使用方式

    文章解释了Linux内核中IS_ERR函数的作用及原理,说明其通过检测特定地址范围的指针来识别错误码,用于内存分配和资源获取等场景,并给出调试技巧与底层宏实现解析
    2025-07-07
  • linux下make命令实现输出高亮的方法

    linux下make命令实现输出高亮的方法

    Linux 下 make 命令是系统管理员和程序员用的最频繁的命令之一。管理员用它通过命令行来编译和安装很多开源的工具,程序员用它来管理他们大型复杂的项目编译问题。这篇文章主要给大家介绍了关于linux下make命令实现输出高亮的方法,需要的朋友可以参考下。
    2017-07-07
  • 浅谈Linux配置定时,使用crontab -e与直接编辑/etc/crontab的区别

    浅谈Linux配置定时,使用crontab -e与直接编辑/etc/crontab的区别

    下面小编就为大家带来一篇浅谈Linux配置定时,使用crontab -e与直接编辑/etc/crontab的区别。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2016-11-11
  • ssh连接超时解决方法

    ssh连接超时解决方法

    本文为大家介绍了ssh连接超时的解决方法, 另外提供一种不修改配置文件就可以解决连接超时的方法,大家参考使用吧
    2014-01-01
  • 一文详解如何在CentOS 7系统中挂载数据盘并修改默认挂载目录

    一文详解如何在CentOS 7系统中挂载数据盘并修改默认挂载目录

    本文介绍了在CentOS7系统中挂载数据盘的方法,包括查看未挂载磁盘、分区、格式化、创建挂载目录、临时挂载及配置fstab实现永久挂载,详细步骤确保数据盘成功挂载并保持在系统重启后仍然挂载,需要的朋友可以参考下
    2026-04-04
  • 在linux中用同一个版本的R 同时安装 Seurat2 和 Seurat3的教程

    在linux中用同一个版本的R 同时安装 Seurat2 和 Seurat3的教程

    这篇文章主要介绍了在linux中用同一个版本的R 同时安装 Seurat2 和 Seurat3的教程,本文给大家介绍的非常详细,具有一定的参考借鉴价值,需要的朋友可以参考下
    2019-08-08
  • Linux多台服务器之间免密登录的实现步骤

    Linux多台服务器之间免密登录的实现步骤

    在日常的后端开发和运维工作中,我们经常需要频繁登录不同的服务器进行部署、排查问题或上传文件,传统的基于用户名和密码的登录方式既不安全又不高效,为此,我们可以利用 SSH 公钥认证的方式,实现无密码(免密)登录远程服务器,下面小编给大家详细说说
    2025-04-04
  • Linux删除文件名包含无效编码字符文件的方法

    Linux删除文件名包含无效编码字符文件的方法

    本文介绍了在Linux中处理文件名包含无效编码字符或特殊不可见字符的方法,包括确认文件名问题、删除无效编码文件的几种方法,如通过inode编号、通配符匹配等,需要的朋友可以参考下
    2026-01-01
  • linux操作之重定向问题

    linux操作之重定向问题

    这篇文章主要介绍了linux操作之重定向问题,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2023-04-04

最新评论