MySQL数据变化监听的实现方案

 更新时间:2025年02月10日 10:33:31   作者:Huooya  
在高并发和大数据环境下,实时获取MySQL数据库的增量变化对数据同步、数据分析、缓存更新等场景至关重要,MySQL的binlog(Binary Log) 记录了数据库的所有变更,可以用来实现 增量数据监听,本文将介绍如何利用binlog监听MySQL数据增量,并提供基 Java的Canal实现示例

1. binlog 简介

1.1 什么是 binlog?

binlog(Binary Log) 是 MySQL 记录 DDL(数据定义语言,如 CREATEALTER)和 DML(数据操作语言,如 INSERTUPDATEDELETE)的日志文件,它用于:

  • 主从复制:MySQL 主库将 binlog 传输到从库,实现数据同步。
  • 数据恢复:通过 mysqlbinlog 工具解析 binlog 恢复数据。
  • 数据同步:第三方工具(如 Canal)解析 binlog,进行数据同步。

1.2 binlog 的三种格式

binlog 格式说明
STATEMENT记录 SQL 语句本身
ROW记录行数据变更(推荐)
MIXED结合前两者,MySQL 自动判断

由于 ROW 格式能提供精确的行级别变更信息,因此推荐使用它。

2. 开启 binlog 并配置 MySQL

2.1 检查 binlog 是否开启

SHOW VARIABLES LIKE 'log_bin';

如果 log_bin 值为 OFF,说明 binlog 未开启。

2.2 修改 MySQL 配置文件(my.cnf 或 my.ini)

在 [mysqld] 部分添加以下内容:

server-id=1
log-bin=mysql-bin
binlog-format=ROW
binlog-row-image=FULL
expire_logs_days=7

重启 MySQL:

systemctl restart mysql  # Linux
net stop mysql && net start mysql  # Windows

2.3 验证 binlog 配置

执行:

SHOW BINARY LOGS;

如果有 binlog 文件,如 mysql-bin.000001,说明已开启。

3. 使用 Java 监听 binlog

3.1 选择工具:Canal

阿里巴巴开源的 Canal 可以模拟 MySQL 从库协议,解析 binlog 并实时推送增量数据。

3.2 Java 代码监听 binlog

引入 Maven 依赖

<dependencies>
    <dependency>
        <groupId>com.alibaba.otter</groupId>
        <artifactId>canal.client</artifactId>
        <version>1.1.6</version>
    </dependency>
</dependencies>

编写 Java 代码

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;

import java.net.InetSocketAddress;
import java.util.List;

public class BinlogListener {
    public static void main(String[] args) {
        // 连接 Canal
        CanalConnector connector = CanalConnectors.newSingleConnector(
                new InetSocketAddress("127.0.0.1", 11111), 
                "example", "canal", "canal");
        

        try {
            connector.connect();
            connector.subscribe(".*\\..*"); // 监听所有库表
            connector.rollback();
    
            while (true) {
                Message message = connector.getWithoutAck(100); // 获取数据
                long batchId = message.getId();
                List<CanalEntry.Entry> entries = message.getEntries();
    
                if (batchId != -1 && !entries.isEmpty()) {
                    for (CanalEntry.Entry entry : entries) {
                        if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
                            processEntry(entry);
                        }
                    }
                }
                connector.ack(batchId); // 确认消息
            }
        } finally {
            connector.disconnect();
        }
    }
    
    private static void processEntry(CanalEntry.Entry entry) {
        try {
            CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            CanalEntry.EventType eventType = rowChange.getEventType();
    
            System.out.println("变更表:" + entry.getHeader().getTableName());
            System.out.println("变更类型:" + eventType);
    
            for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                if (eventType == CanalEntry.EventType.DELETE) {
                    System.out.println("删除数据:" + rowData.getBeforeColumnsList());
                } else if (eventType == CanalEntry.EventType.INSERT) {
                    System.out.println("新增数据:" + rowData.getAfterColumnsList());
                } else {
                    System.out.println("更新前数据:" + rowData.getBeforeColumnsList());
                    System.out.println("更新后数据:" + rowData.getAfterColumnsList());
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

4. 代码解析

  • 创建 Canal 连接

CanalConnector connector = CanalConnectors.newSingleConnector(
    new InetSocketAddress("127.0.0.1", 11111), 
    "example", "canal", "canal");
    • 127.0.0.1:Canal 服务器地址
    • 11111:Canal 端口
    • example:Canal 实例
    • canal/canal:默认账号密码
  • 获取 binlog 变更数据

Message message = connector.getWithoutAck(100);
    • getWithoutAck(100):拉取 100 条 binlog 事件。
  • 解析 binlog

for (CanalEntry.Entry entry : entries) {
    if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
        processEntry(entry);
    }
}
  • 仅处理 ROWDATA 类型的变更,忽略事务等其他信息。

  • 分类处理 INSERTUPDATEDELETE

if (eventType == CanalEntry.EventType.DELETE) {
    System.out.println("删除数据:" + rowData.getBeforeColumnsList());
} else if (eventType == CanalEntry.EventType.INSERT) {
    System.out.println("新增数据:" + rowData.getAfterColumnsList());
} else {
    System.out.println("更新前数据:" + rowData.getBeforeColumnsList());
    System.out.println("更新后数据:" + rowData.getAfterColumnsList());
}

总结

  • MySQL binlog 记录数据库变更,可用于监听增量数据。
  • Canal 作为 MySQL 从库解析 binlog,实现数据同步。
  • Java 代码示例 展示如何用 Canal 监听 INSERTUPDATEDELETE 操作,并解析变更数据。

这种方案适用于 分布式数据同步缓存一致性数据变更通知,是实时数据处理的重要手段。

以上就是MySQL数据变化监听的实现方案的详细内容,更多关于MySQL数据变化监听的资料请关注脚本之家其它相关文章!

相关文章

  • 如何将Excel文件导入MySQL数据库

    如何将Excel文件导入MySQL数据库

    这篇文章主要为大家详细介绍了Excel文件导入MySQL数据库的具体方法,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2019-07-07
  • 宝塔服务器的mysql数据库自动备份到gitee项目

    宝塔服务器的mysql数据库自动备份到gitee项目

    文章介绍了一个开源脚本,用于自动备份宝塔服务器上的所有数据库,并将备份文件上传到Gitee项目中,脚本支持多服务器数据信息备份,并且可以定时执行备份任务
    2025-12-12
  • MySQL慢查询诊断与SQL注入防御详解

    MySQL慢查询诊断与SQL注入防御详解

    本文介绍MySQL慢查询诊断与SQL注入防御的相关知识,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧
    2026-04-04
  • MySQL数据类型和表的操作方法

    MySQL数据类型和表的操作方法

    文章系统介绍了SQL常用数据类型(数值、字符串、日期)及表操作(创建、修改、删除),涵盖BIT、TINYINT、VARCHAR、ENUM等类型定义,以及SHOW TABLES、CREATE TABLE、ALTER TABLE等语句用法,强调数据类型选择与表结构管理的关键性,感兴趣的朋友跟随小编一起看看吧
    2025-10-10
  • Mysql索引类型与基本用法实例分析

    Mysql索引类型与基本用法实例分析

    这篇文章主要介绍了Mysql索引类型与基本用法,结合实例形式分析了Mysql索引类型中普通索引、唯一索引、主键索引、组合索引、全文索引基本概念、原理与使用方法,需要的朋友可以参考下
    2020-06-06
  • Windows平台下MySQL安装与配置方法与注意事项

    Windows平台下MySQL安装与配置方法与注意事项

    这篇文章主要介绍了Windows平台下MySQL安装与配置方法与注意事项,需要的朋友可以参考下
    2017-04-04
  • mysql中的int类型对应于java中的Long类型详解

    mysql中的int类型对应于java中的Long类型详解

    这篇文章主要介绍了mysql中的int类型对应于java中的Long类型,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2024-04-04
  • JDBC探索之SQLException解析

    JDBC探索之SQLException解析

    这篇文章主要介绍了JDBC探索之SQLException解析,具有一定参考价值,需要的朋友可以了解下。
    2017-10-10
  • mysql索引简介及explain使用详解

    mysql索引简介及explain使用详解

    这篇文章详细介绍了MySQL的三层逻辑架构,介绍了SQL优化的基本概念、执行过程以及如何使用索引优化查询,最后,通过分析执行计划(EXPLAIN)来理解SQL的执行情况,感兴趣的朋友跟随小编一起看看吧
    2025-12-12
  • MySQL入门教程(五)之表的创建、修改和删除

    MySQL入门教程(五)之表的创建、修改和删除

    MySQL 为关系型数据库(Relational Database Management System), 本文给大家介绍MySQL入门教程(五)之表的创建、修改和删除,需要的朋友一起学习吧
    2016-04-04

最新评论