Apache SeaTunnel 将 CDC 数据流转换为 Append-Only 模式的详细过程

 更新时间:2025年11月20日 09:07:47   作者:SeaTunnel  
RowKindExtractor转换插件用于将CDC数据流转换为Append-Only模式,同时将原始的 RowKind 信息提取为一个新的字段,本文将介绍RowKindExtractor的核心功能,其在 CDC 数据同步场景下的使用方法,以及配置选项、注意事项及多种应用示例,感兴趣的朋友一起看看吧

RowKindExtractor 是 Apache SeaTunnel 的一个转换插件,它能将 CDC 数据流转为 Append-Only 模式,并提取原始 RowKind 信息为新字段。本文将介绍 RowKindExtractor 的核心功能,其在 CDC 数据同步场景下的使用方法,以及配置选项、注意事项及多种应用示例。

RowKindExtractor

RowKindExtractor 转换插件用于将 CDC(Change Data Capture)数据流转换为 Append-Only(仅追加)模式,同时将原始的 RowKind 信息提取为一个新的字段。

核心功能:

  • 将所有数据行的 RowKind 统一改为 +I(INSERT),实现 Append-Only 模式
  • 将原始的 RowKind 信息(INSERT、UPDATE_BEFORE、UPDATE_AFTER、DELETE)保存到新增的字段中
  • 支持短格式和完整格式两种输出方式

为什么需要这个插件?

在 CDC 数据同步场景中,数据行带有 RowKind 标记(+I、-U、+U、-D),表示不同的变更类型。但某些下游系统(如数据湖、分析系统)只支持 Append-Only 模式,不支持 UPDATE 和 DELETE 操作。此时需要:

  1. 将所有数据转换为 INSERT 类型(Append-Only)
  2. 将原始的变更类型保存为普通字段,供后续分析使用

转换示例:

输入(CDC 数据):
  RowKind: -D (DELETE)
  数据: id=1, name="test1", age=20
输出(Append-Only 数据):
  RowKind: +I (INSERT)
  数据: id=1, name="test1", age=20, row_kind="DELETE"

典型应用场景

  • 将 CDC 数据写入只支持 Append 的数据湖
  • 需要在数据仓库中保留完整的变更历史记录
  • 需要对不同类型的变更进行统计分析

配置选项

custom_field_name [string]

指定新增字段的名称,该字段用于存储原始的 RowKind 信息。

默认值:row_kind

注意事项:

  • 字段名不能与原有字段重名,否则会报错
  • 建议使用有意义的名称,如 operation_type、change_type、cdc_op 等

示例:

custom_field_name = "operation_type"  # 使用自定义字段名

transform_type [enum]

指定 RowKind 字段值的输出格式。

可选值:

默认值:SHORT

各值含义:

选择建议:

  • SHORT 格式:节省存储空间,适合对存储敏感的场景
  • FULL 格式:可读性更好,适合需要人工查看或分析的场景

示例:

transform_type = FULL  # 使用完整格式

完整示例

  • 示例 1:使用默认配置(SHORT 格式)

使用默认配置,将 CDC 数据转换为 Append-Only 模式,RowKind 以短格式保存。

env {
  parallelism = 1
  job.mode = "STREAMING"
}
source {
  MySQL-CDC {
    plugin_output = "cdc_source"
    server-id = 5652
    username = "root"
    password = "your_password"
    table-names = ["mydb.users"]
    url = "jdbc:mysql://localhost:3306/mydb"
  }
}
transform {
  RowKindExtractor {
    plugin_input = "cdc_source"
    plugin_output = "append_only_data"
    # 使用默认配置:
    # custom_field_name = "row_kind"
    # transform_type = SHORT
  }
}
sink {
  Console {
    plugin_input = "append_only_data"
  }
}

数据转换过程:

输入数据(CDC 格式):
  1. RowKind=+I, id=1, name="张三", age=25
  2. RowKind=-U, id=1, name="张三", age=25
  3. RowKind=+U, id=1, name="张三", age=26
  4. RowKind=-D, id=1, name="张三", age=26
输出数据(Append-Only 格式):
  1. RowKind=+I, id=1, name="张三", age=25, row_kind="+I"
  2. RowKind=+I, id=1, name="张三", age=25, row_kind="-U"
  3. RowKind=+I, id=1, name="张三", age=26, row_kind="+U"
  4. RowKind=+I, id=1, name="张三", age=26, row_kind="-D"
  • 示例 2:使用 FULL 格式和自定义字段名

使用完整格式输出 RowKind,并自定义字段名称。

env {
  parallelism = 1
  job.mode = "STREAMING"
}
source {
  MySQL-CDC {
    plugin_output = "cdc_source"
    server-id = 5652
    username = "root"
    password = "your_password"
    table-names = ["mydb.orders"]
    url = "jdbc:mysql://localhost:3306/mydb"
  }
}
transform {
  RowKindExtractor {
    plugin_input = "cdc_source"
    plugin_output = "append_only_data"
    custom_field_name = "operation_type"  # 自定义字段名
    transform_type = FULL                 # 使用完整格式
  }
}
sink {
  Iceberg {
    plugin_input = "append_only_data"
    catalog_name = "iceberg_catalog"
    database = "mydb"
    table = "orders_history"
    # Iceberg 表会包含 operation_type 字段,记录每条数据的变更类型
  }
}
数据转换过程:
输入数据(CDC 格式):
  1. RowKind=+I, order_id=1001, amount=100.00
  2. RowKind=-U, order_id=1001, amount=100.00
  3. RowKind=+U, order_id=1001, amount=150.00
  4. RowKind=-D, order_id=1001, amount=150.00
输出数据(Append-Only 格式,FULL 格式):
  1. RowKind=+I, order_id=1001, amount=100.00, operation_type="INSERT"
  2. RowKind=+I, order_id=1001, amount=100.00, operation_type="UPDATE_BEFORE"
  3. RowKind=+I, order_id=1001, amount=150.00, operation_type="UPDATE_AFTER"
  4. RowKind=+I, order_id=1001, amount=150.00, operation_type="DELETE"
  • 示例 3:完整的测试示例(使用 FakeSource)

使用 FakeSource 生成测试数据,演示各种 RowKind 的转换效果。

env {
  parallelism = 1
  job.mode = "BATCH"
}
source {
  FakeSource {
    plugin_output = "fake_cdc_data"
    schema = {
      fields {
        pk_id = bigint
        name = string
        score = int
      }
      primaryKey {
        name = "pk_id"
        columnNames = [pk_id]
      }
    }
    rows = [
      {
        kind = INSERT
        fields = [1, "A", 100]
      },
      {
        kind = INSERT
        fields = [2, "B", 100]
      },
      {
        kind = UPDATE_BEFORE
        fields = [1, "A", 100]
      },
      {
        kind = UPDATE_AFTER
        fields = [1, "A_updated", 95]
      },
      {
        kind = UPDATE_BEFORE
        fields = [2, "B", 100]
      },
      {
        kind = UPDATE_AFTER
        fields = [2, "B_updated", 98]
      },
      {
        kind = DELETE
        fields = [1, "A_updated", 95]
      }
    ]
  }
}
transform {
  RowKindExtractor {
    plugin_input = "fake_cdc_data"
    plugin_output = "transformed_data"
    custom_field_name = "change_type"
    transform_type = FULL
  }
}
sink {
  Console {
    plugin_input = "transformed_data"
  }
}

预期输出:

+I, pk_id=1, name="A", score=100, change_type="INSERT"
+I, pk_id=2, name="B", score=100, change_type="INSERT"
+I, pk_id=1, name="A", score=100, change_type="UPDATE_BEFORE"
+I, pk_id=1, name="A_updated", score=95, change_type="UPDATE_AFTER"
+I, pk_id=2, name="B", score=100, change_type="UPDATE_BEFORE"
+I, pk_id=2, name="B_updated", score=98, change_type="UPDATE_AFTER"
+I, pk_id=1, name="A_updated", score=95, change_type="DELETE"

到此这篇关于Apache SeaTunnel 将 CDC 数据流转换为 Append-Only 模式的详细过程的文章就介绍到这了,更多相关Apache SeaTunnel Append-Only 模式内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Centos/Ubuntu下安装nodejs教程

    Centos/Ubuntu下安装nodejs教程

    本篇文章主要介绍了Centos/Ubuntu下安装nodejs教程,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2017-04-04
  • Ubuntu如何升级Python版本

    Ubuntu如何升级Python版本

    Ubuntu 22.04 Docker中,安装Python 3.11后,使用update-alternatives设置为默认版本,最后用python3 -V验证
    2025-08-08
  • 用apache和tomcat搭建集群(负载均衡)

    用apache和tomcat搭建集群(负载均衡)

    这篇文章主要介绍了用apache和tomcat搭建集群,实现负载均衡,需要的朋友可以参考下
    2014-12-12
  • Ubuntu17.04配置更换国内源的方法

    Ubuntu17.04配置更换国内源的方法

    本篇文章主要介绍了Ubuntu17.04配置国内源的方法,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2018-03-03
  • 重启Linux服务器后数据消失问题的解决方法(重新挂载)

    重启Linux服务器后数据消失问题的解决方法(重新挂载)

    在使用 reboot 命令重启服务器后,服务器内挂载的文件全部丢失,那应该如何重新挂载呢?所以本文小编给大家介绍了重启Linux服务器后数据消失问题的解决方法,并通过图文讲解的非常详细,需要的朋友可以参考下
    2024-09-09
  • 让Apache 2支持.htaccess并实现目录加密的方法

    让Apache 2支持.htaccess并实现目录加密的方法

    这篇文章主要介绍了让Apache 2支持.htaccess并实现目录加密的方法,文中给出了详细的方法步骤,并给出了示例代码,对大家具有一定的参考价值,需要的朋友们下面来一起看看吧。
    2017-02-02
  • Linux 中的文件复制cp命令和scp命令详解

    Linux 中的文件复制cp命令和scp命令详解

    这篇文章主要介绍了Linux 中的文件复制cp命令和scp命令详解的相关资料,需要的朋友可以参考下
    2017-03-03
  • linux中$符号的基础用法总结

    linux中$符号的基础用法总结

    这篇文章主要给大家介绍了关于linux中$符号的基础用法,文中通过示例代码介绍的非常详细,对大家学习或者使用linux系统具有一定的参考学习价值,需要的朋友们下面来一起学习学习吧
    2019-11-11
  • 检查Linux中磁盘使用情况的四种方法

    检查Linux中磁盘使用情况的四种方法

    有时你需要下载一些重要文件或将一些照片传输到你的Linux系统,但面临磁盘空间不足的问题,你前往你的文件管理器删除你不再需要的大文件,但你不知道其中哪些占用了你大部分的磁盘空间,在本文中,我们将展示一些简单的方法来检查Linux中磁盘使用情况
    2025-06-06
  • vim的一些常用简单操作小结

    vim的一些常用简单操作小结

    大家都知道vim在Linux下使用很多,但是习惯了在Windows下的文本操作,在vim中进行文本操作会觉得很不方便,但是vim是一个很强大的工具,只是还不熟练去使用它,下面是一些常用的vim文本操作方法。
    2016-09-09

最新评论