SpringBoot 如何通过集成 Flink CDC 来实时追踪 MySql 数据变动

 更新时间:2025年06月20日 11:07:22   作者:cici15874  
本文介绍如何在SpringBoot项目中集成FlinkCDC,实现对MySQL数据库实时变动的捕获与处理,包含环境准备、依赖引入、作业配置及测试验证等步骤,帮助开发者构建高效的数据同步与分析应用,感兴趣的朋友一起看看吧

一、概述

Flink CDC 是一个基于 Apache Flink 的数据捕获工具,能够实时捕获和处理数据库的变动事件。通过集成 Flink CDC,可以实时追踪 MySQL 数据库中的数据变动,构建高效的数据处理和分析应用。本文将介绍如何在 SpringBoot 项目中集成 Flink CDC,并实现对 MySQL 数据变动的实时追踪。

二、准备工作

1. 环境准备

  • JDK 1.8+
  • Maven 3.6+
  • MySQL 数据库
  • Apache Flink 1.12+
  • SpringBoot 2.5+

2. 创建 MySQL 数据库和表

CREATE DATABASE test_db;
USE test_db;
CREATE TABLE users (
    id INT AUTO_INCREMENT PRIMARY KEY,
    name VARCHAR(255) NOT NULL,
    email VARCHAR(255) NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
​

三、集成步骤

1. 引入依赖

在 SpringBoot 项目的 pom.xml 中添加必要的依赖:

<dependencies>
    <!-- Spring Boot Dependencies -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-jpa</artifactId>
    </dependency>
    <!-- Flink Dependencies -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>1.12.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.12</artifactId>
        <version>1.12.0</version>
    </dependency>
    <!-- Flink CDC Dependencies -->
    <dependency>
        <groupId>com.ververica</groupId>
        <artifactId>flink-connector-mysql-cdc</artifactId>
        <version>2.0.0</version>
    </dependency>
</dependencies>
​

2. 配置 Flink CDC

在 SpringBoot 项目中创建 Flink CDC 配置类:

import com.ververica.cdc.connectors.mysql.MySQLSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class FlinkCdcConfig {
    @Bean
    public DataStreamSource<String> mysqlSource(StreamExecutionEnvironment env) {
        MySQLSource<String> source = MySQLSource.<String>builder()
            .hostname("localhost")
            .port(3306)
            .databaseList("test_db")
            .tableList("test_db.users")
            .username("root")
            .password("password")
            .deserializer(new JsonDebeziumDeserializationSchema())
            .startupOptions(StartupOptions.initial())
            .build();
        return env.fromSource(source, WatermarkStrategy.noWatermarks(), "MySQL Source");
    }
}
​

3. 创建 Flink 作业

在 SpringBoot 项目中创建 Flink 作业:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
@Component
public class FlinkJobRunner implements CommandLineRunner {
    private final StreamExecutionEnvironment env;
    private final DataStreamSource<String> mysqlSource;
    public FlinkJobRunner(StreamExecutionEnvironment env, DataStreamSource<String> mysqlSource) {
        this.env = env;
        this.mysqlSource = mysqlSource;
    }
    @Override
    public void run(String... args) throws Exception {
        mysqlSource.print();
        env.execute("Flink CDC Job");
    }
}
​

4. 启动 SpringBoot 应用

运行 SpringBoot 应用,启动后会自动执行 Flink 作业,并打印 MySQL 数据库中 users 表的变动。

四、验证和测试

1. 插入测试数据

向 MySQL 数据库中插入数据:

INSERT INTO users (name, email) VALUES ('Alice', 'alice@example.com');
INSERT INTO users (name, email) VALUES ('Bob', 'bob@example.com');
​

2. 验证输出

查看 SpringBoot 应用的控制台输出,确认是否正确捕获并打印了 MySQL 数据库中的变动。

到此这篇关于SpringBoot 通过集成 Flink CDC 来实时追踪 MySql 数据变动的文章就介绍到这了,更多相关SpringBoot Flink CDC 追踪MySql 数据变动内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • SpringBoot+MyBatis-Plus实现分页示例

    SpringBoot+MyBatis-Plus实现分页示例

    本文介绍了SpringBoot+MyBatis-Plus实现分页示例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2024-12-12
  • 详解SpringBoot的jar为什么可以直接运行

    详解SpringBoot的jar为什么可以直接运行

    SpringBoot提供了一个插件spring-boot-maven-plugin用于把程序打包成一个可执行的jar包,本文给大家介绍了为什么SpringBoot的jar可以直接运行,文中有相关的代码示例供大家参考,感兴趣的朋友可以参考下
    2024-02-02
  • IDEA的部署设置改为war exploded运行项目出错问题

    IDEA的部署设置改为war exploded运行项目出错问题

    在使用IDEA配置warexploded部署时,可能会遇到路径问题或404错误,解决方法是进入Deployment设置,删除Application content中的/marry_war_exploded,使其为空,然后重新运行项目即可,这是一种有效的解决策略,希望能帮助到遇到同样问题的开发者
    2024-10-10
  • springboot如何通过controller层实现页面切换

    springboot如何通过controller层实现页面切换

    在Spring Boot中,通过Controller层实现页面切换背景,Spring Boot的默认注解是@RestController,它包含了@Controller和@ResponseBody,@ResponseBody会将返回值转换为字符串返回,因此无法实现页面切换,将@RestController换成@Controller
    2024-12-12
  • Java实现AES加密算法的简单示例分享

    Java实现AES加密算法的简单示例分享

    这篇文章主要介绍了Java实现AES加密算法的简单示例分享,AES算法是基于对密码值的置换和替代,需要的朋友可以参考下
    2016-04-04
  • 详解Java中对象序列化与反序列化

    详解Java中对象序列化与反序列化

    这篇文章主要为大家详细介绍了Java中对象序列化与反序列化,感兴趣的小伙伴们可以参考一下
    2016-02-02
  • JAVA版排序算法之快速排序示例

    JAVA版排序算法之快速排序示例

    这篇文章主要介绍了JAVA版排序算法之快速排序,结合实例形式分析了基于java版的遍历、递归实现快速排序功能的具体步骤与操作技巧,需要的朋友可以参考下
    2017-01-01
  • 深入理解Java中的弱引用

    深入理解Java中的弱引用

    这篇文章主要介绍了深入理解Java中的弱引用,本文讲解了强引用、弱引用、引用队列、四种引用、软引用、虚引用等内容,需要的朋友可以参考下
    2015-01-01
  • java去除数组重复元素的四种方法

    java去除数组重复元素的四种方法

    本文给大家分享四种java去除数组重复元素的方法,每种方法通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧
    2021-11-11
  • 使用Java和Ehcache实现缓存策略的设置及示例代码

    使用Java和Ehcache实现缓存策略的设置及示例代码

    本文介绍了如何使用Java和Ehcache实现缓存策略,包括基本配置、缓存策略的设置以及示例代码,感兴趣的朋友跟随小编一起看看吧
    2025-12-12

最新评论