如何利用Java实现MySQL的数据变化监听

 更新时间:2025年02月11日 08:35:39   作者:Huooya  
在高并发和大数据环境下,实时获取 MySQL 数据库的增量变化对数据同步、数据分析、缓存更新等场景至关重要,下面我们就来看看如何通过Java实现MySQL的数据变化监听吧

在高并发和大数据环境下,实时获取 MySQL 数据库的增量变化对数据同步、数据分析、缓存更新等场景至关重要。MySQL 的 binlog(Binary Log) 记录了数据库的所有变更,可以用来实现 增量数据监听。本文将介绍如何利用 binlog 监听 MySQL 数据增量,并提供基于 Java 的 Canal 实现示例。

1.binlog 简介

1.1 什么是 binlog

binlog(Binary Log) 是 MySQL 记录 DDL(数据定义语言,如 CREATEALTER)和 DML(数据操作语言,如 INSERTUPDATEDELETE)的日志文件,它用于:

  • 主从复制:MySQL 主库将 binlog 传输到从库,实现数据同步。
  • 数据恢复:通过 mysqlbinlog 工具解析 binlog 恢复数据。
  • 数据同步:第三方工具(如 Canal)解析 binlog,进行数据同步。

1.2 binlog 的三种格式

binlog 格式说明
STATEMENT记录 SQL 语句本身
ROW记录行数据变更(推荐)
MIXED结合前两者,MySQL 自动判断

由于 ROW 格式能提供精确的行级别变更信息,因此推荐使用它。

2. 开启 binlog 并配置 MySQL

2.1 检查 binlog 是否开启

SHOW VARIABLES LIKE 'log_bin';

如果 log_bin 值为 OFF,说明 binlog 未开启。

2.2 修改 MySQL 配置文件(my.cnf 或 my.ini)

[mysqld] 部分添加以下内容:

server-id=1
log-bin=mysql-bin
binlog-format=ROW
binlog-row-image=FULL
expire_logs_days=7

重启 MySQL:

systemctl restart mysql  # Linux
net stop mysql && net start mysql  # Windows

2.3 验证 binlog 配置

执行:

SHOW BINARY LOGS;

如果有 binlog 文件,如 mysql-bin.000001,说明已开启。

3. 使用 Java 监听 binlog

3.1 选择工具:Canal

阿里巴巴开源的 Canal 可以模拟 MySQL 从库协议,解析 binlog 并实时推送增量数据。

3.2 Java 代码监听 binlog

引入 Maven 依赖

<dependencies>
    <dependency>
        <groupId>com.alibaba.otter</groupId>
        <artifactId>canal.client</artifactId>
        <version>1.1.6</version>
    </dependency>
</dependencies>

编写 Java 代码

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;

import java.net.InetSocketAddress;
import java.util.List;

public class BinlogListener {
    public static void main(String[] args) {
        // 连接 Canal
        CanalConnector connector = CanalConnectors.newSingleConnector(
                new InetSocketAddress("127.0.0.1", 11111), 
                "example", "canal", "canal");
        

        try {
            connector.connect();
            connector.subscribe(".*\\..*"); // 监听所有库表
            connector.rollback();
    
            while (true) {
                Message message = connector.getWithoutAck(100); // 获取数据
                long batchId = message.getId();
                List<CanalEntry.Entry> entries = message.getEntries();
    
                if (batchId != -1 && !entries.isEmpty()) {
                    for (CanalEntry.Entry entry : entries) {
                        if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
                            processEntry(entry);
                        }
                    }
                }
                connector.ack(batchId); // 确认消息
            }
        } finally {
            connector.disconnect();
        }
    }
    
    private static void processEntry(CanalEntry.Entry entry) {
        try {
            CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            CanalEntry.EventType eventType = rowChange.getEventType();
    
            System.out.println("变更表:" + entry.getHeader().getTableName());
            System.out.println("变更类型:" + eventType);
    
            for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                if (eventType == CanalEntry.EventType.DELETE) {
                    System.out.println("删除数据:" + rowData.getBeforeColumnsList());
                } else if (eventType == CanalEntry.EventType.INSERT) {
                    System.out.println("新增数据:" + rowData.getAfterColumnsList());
                } else {
                    System.out.println("更新前数据:" + rowData.getBeforeColumnsList());
                    System.out.println("更新后数据:" + rowData.getAfterColumnsList());
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

4. 代码解析

1.创建 Canal 连接

CanalConnector connector = CanalConnectors.newSingleConnector(
    new InetSocketAddress("127.0.0.1", 11111), 
    "example", "canal", "canal");
  • 127.0.0.1:Canal 服务器地址
  • 11111:Canal 端口
  • example:Canal 实例
  • canal/canal:默认账号密码

2.获取 binlog 变更数据

Message message = connector.getWithoutAck(100);

getWithoutAck(100):拉取 100 条 binlog 事件。

3.解析 binlog

for (CanalEntry.Entry entry : entries) {
    if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
        processEntry(entry);
    }
}

仅处理 ROWDATA 类型的变更,忽略事务等其他信息。

4.分类处理 INSERT、UPDATE、DELETE

if (eventType == CanalEntry.EventType.DELETE) {
    System.out.println("删除数据:" + rowData.getBeforeColumnsList());
} else if (eventType == CanalEntry.EventType.INSERT) {
    System.out.println("新增数据:" + rowData.getAfterColumnsList());
} else {
    System.out.println("更新前数据:" + rowData.getBeforeColumnsList());
    System.out.println("更新后数据:" + rowData.getAfterColumnsList());
}

总结

  • MySQL binlog 记录数据库变更,可用于监听增量数据。
  • Canal 作为 MySQL 从库解析 binlog,实现数据同步。
  • Java 代码示例 展示如何用 Canal 监听 INSERTUPDATEDELETE 操作,并解析变更数据。

这种方案适用于 分布式数据同步缓存一致性数据变更通知,是实时数据处理的重要手段。

到此这篇关于如何利用Java实现MySQL的数据变化监听的文章就介绍到这了,更多相关Java监听MySQL数据变化内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • 实例讲解Java编程中数组反射的使用方法

    实例讲解Java编程中数组反射的使用方法

    这篇文章主要介绍了Java编程中数组反射的使用方法,通过编写数组反射工具类可以重用许多基础代码,减少对类型的判断过程,需要的朋友可以参考下
    2016-04-04
  • 浅谈java中HashMap键的比较方式

    浅谈java中HashMap键的比较方式

    今天带大家了解一下java中HashMap键的比较方式,文中有非常详细的解释说明及代码示例,对正在学习java的小伙伴们很有帮助,需要的朋友可以参考下
    2021-05-05
  • SpringBoot之自定义Schema扩展方式

    SpringBoot之自定义Schema扩展方式

    这篇文章主要介绍了SpringBoot之自定义Schema扩展方式,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-09-09
  • Java毕业设计实战之生活旅行分享平台的实现

    Java毕业设计实战之生活旅行分享平台的实现

    这是一个使用了java+Springboot+JPA+Jsp+Html+js+Ajax+maven+mysql开发的生活旅行分享平台,是一个毕业设计的实战练习,具有分享发布平台该有的所有功能,感兴趣的朋友快来看看吧
    2022-02-02
  • 详解Java中native方法的使用

    详解Java中native方法的使用

    native是与C++联合开发的时候用的!使用native关键字说明这个方法是原生函数,也就是这个方法是用C/C++语言实现的,并且被编译成了DLL,由java去调用。本文给大家介绍java 中native方法使用,感兴趣的朋友一起看看吧
    2020-09-09
  • Java class文件格式之数据类型_动力节点Java学院整理

    Java class文件格式之数据类型_动力节点Java学院整理

    这篇文章主要介绍了Java class文件格式之数据类型的相关资料,需要的朋友可以参考下
    2017-06-06
  • Java zookeeper图形化工具ZooInspector用法详解

    Java zookeeper图形化工具ZooInspector用法详解

    这篇文章主要介绍了Java zookeeper图形化工具ZooInspector用法详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-07-07
  • 详解java如何正确使用volatile

    详解java如何正确使用volatile

    这篇文章给大家分享了java如何正确使用volatile的相关知识点内容,有兴趣的朋友可以参考学习下。
    2018-07-07
  • 史上最难的一道Java面试题

    史上最难的一道Java面试题

    本文给大家分享一道史上最难的一道Java面试题,非常不错,具有参考借鉴价值,需要的朋友参考下吧
    2018-03-03
  • Springboot 整合maven插口调用maven release plugin实现一键打包功能

    Springboot 整合maven插口调用maven release plugin实现一键打包功能

    这篇文章主要介绍了Springboot 整合maven插口调用maven release plugin实现一键打包功能,整合maven-invoker使程序去执行mvn命令,结合示例代码给大家介绍的非常详细,需要的朋友可以参考下
    2022-03-03

最新评论