Java使用Canal同步MySQL数据到Redis

 更新时间:2024年11月08日 09:59:10   作者:binbinxyz  
在现代微服务架构中,数据同步是一个常见的需求,特别是将 MySQL 数据实时同步到 Redis,下面我们就来看看Java如何使用Canal同步MySQL数据到Redis吧

一、引言

在现代微服务架构中,数据同步是一个常见的需求。特别是将 MySQL 数据实时同步到 Redis,可以显著提升应用的性能和响应速度。本文将详细介绍如何使用 Canal 实现这一目标。Canal 是阿里巴巴开源的一个数据库 Binlog 同步工具,可以实时捕获 MySQL 的 Binlog 日志并将其同步到其他存储系统。

项目地址:alibaba/canal

二、工作原理

1. MySQL主备复制原理

MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)

MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)

MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据

2. canal 工作原理

canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议

MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )

canal 解析 binary log 对象(原始为 byte 流)

三、环境准备

1. 安装和配置 MySQL

Canal的原理是基于mysql binlog技术,所以这里一定需要开启mysql的binlog写入功能,并且配置binlog模式为row。编辑 MySQL 配置文件 my.cnf 或 my.ini,添加或修改以下内容:

[mysqld]
server-id=1
log-bin=mysql-bin
binlog-format=ROW

授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant:

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

重启 MySQL 服务以使配置生效:

sudo service mysql restart

2. 安装和配置 Canal

下载并解压 Canal 服务端:

wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz
tar -zxvf canal.deployer-1.1.5.tar.gz -C /opt/canal
cd /opt/canal

编辑 Canal 配置文件 conf/example/instance.properties,配置 MySQL 服务器的相关信息:

canal.instance.master.address=127.0.0.1:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset=UTF-8
canal.instance.filter.regex=.*\\..*

启动 Canal 服务:

sh bin/startup.sh

查看 server 日志

vi logs/canal/canal.log</pre>
2013-02-05 22:45:27.967 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
2013-02-05 22:45:28.113 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[10.1.29.120:11111]
2013-02-05 22:45:28.210 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......

查看 instance 的日志

vi logs/example/example.log
2013-02-05 22:50:45.636 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2013-02-05 22:50:45.641 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
2013-02-05 22:50:45.803 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example 
2013-02-05 22:50:45.810 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start successful....

如果启动失败,注意检查配置文件conf/example/instance.properties的内容,还要注意JDK版本及配置。建议使用1.6.25。我用openjdk 21启动报错,改回JDK8u421启动成功。

3. 安装和配置 Redis

确保 Redis 服务已经安装并启动。可以在 Redis 客户端中执行以下命令检查:

redis-cli
ping

四、开发 Java 应用

1. 添加依赖

在你的 pom.xml 文件中添加 Canal 客户端和 Redis 客户端的依赖。以下是一个示例:

<dependencies>
    <dependency>
        <groupId>com.alibaba.otter</groupId>
        <artifactId>canal.client</artifactId>
        <version>1.1.5</version>
    </dependency>
    <dependency>
        <groupId>redis.clients</groupId>
        <artifactId>jedis</artifactId>
        <version>5.1.5</version>
    </dependency>
</dependencies>

2. 编写 Canal 客户端代码

创建一个 Java 类来连接 Canal 服务并处理 Binlog 事件,将数据同步到 Redis:

package org.hbin.canal;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import redis.clients.jedis.Jedis;

import java.net.InetSocketAddress;
import java.util.List;

public class CanalToRedisSync {

    public static void main(String[] args) {
        // 创建 Canal 连接
        InetSocketAddress address = new InetSocketAddress("127.0.0.1", 11111);
        CanalConnector connector = CanalConnectors.newSingleConnector(address, "example", "", "");

        // 连接到 Canal 服务
        connector.connect();
        connector.subscribe(".*\\..*");
        connector.rollback();

        // 创建 Redis 客户端
        Jedis jedis = new Jedis("127.0.0.1", 6379);

        while (true) {
            Message message = connector.getWithoutAck(100); // 获取最多 100 条记录
            long batchId = message.getId();
            int size = message.getEntries().size();
            if (batchId == -1 || size == 0) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } else {
                handleEntry(message.getEntries(), jedis);
            }

            connector.ack(batchId); // 提交确认
            // connector.rollback(batchId); // 处理失败, 回滚数据
        }
    }

    private static void handleEntry(List<CanalEntry.Entry> entries, Jedis jedis) {
        for (CanalEntry.Entry entry : entries) {
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                continue;
            }

            CanalEntry.RowChange rowChange = null;
            try {
                rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
            }

            CanalEntry.EventType eventType = rowChange.getEventType();
            System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));

            for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                if (eventType == CanalEntry.EventType.DELETE) {
                    syncDelete(rowData.getBeforeColumnsList(), jedis, entry.getHeader().getSchemaName(), entry.getHeader().getTableName());
                } else if (eventType == CanalEntry.EventType.INSERT) {
                    syncInsert(rowData.getAfterColumnsList(), jedis, entry.getHeader().getSchemaName(), entry.getHeader().getTableName());
                } else {
                    System.out.println("-------> before");
                    syncUpdate(rowData.getBeforeColumnsList(), jedis, entry.getHeader().getSchemaName(), entry.getHeader().getTableName());
                    System.out.println("-------> after");
                    syncUpdate(rowData.getAfterColumnsList(), jedis, entry.getHeader().getSchemaName(), entry.getHeader().getTableName());
                }
            }
        }
    }

    private static void syncInsert(List<CanalEntry.Column> columns, Jedis jedis, String schema, String table) {
        StringBuilder key = new StringBuilder();
        StringBuilder value = new StringBuilder();
        for (CanalEntry.Column column : columns) {
            if (column.getName().equals("id")) {
                key.append(column.getValue());
            } else {
                value.append(column.getName()).append(":").append(column.getValue()).append(",");
            }
        }
        System.out.println("Insert: " + key.toString() + " -> " + value.toString());
        jedis.hset(schema + ":" + table, key.toString(), value.toString());
    }

    private static void syncUpdate(List<CanalEntry.Column> columns, Jedis jedis, String schema, String table) {
        StringBuilder key = new StringBuilder();
        StringBuilder value = new StringBuilder();
        for (CanalEntry.Column column : columns) {
            if (column.getName().equals("id")) {
                key.append(column.getValue());
            } else {
                value.append(column.getName()).append(":").append(column.getValue()).append(",");
            }
        }
        System.out.println("Update: " + key.toString() + " -> " + value.toString());
        jedis.hset(schema + ":" + table, key.toString(), value.toString());
    }

    private static void syncDelete(List<CanalEntry.Column> columns, Jedis jedis, String schema, String table) {
        StringBuilder key = new StringBuilder();
        for (CanalEntry.Column column : columns) {
            if (column.getName().equals("id")) {
                key.append(column.getValue());
            }
        }
        System.out.println("Delete: " + key.toString());
        jedis.hdel(schema + ":" + table, key.toString());
    }
}

3. 运行和测试

3.1 启动 Canal 服务:

sh /opt/canal/bin/startup.sh

3.2 启动 Redis 服务:

确保 Redis 服务已经启动,可以在 Redis 客户端中执行以下命令检查:

redis-cli
ping

3.3 启动 Java 应用:

编译并运行上述 Java 应用,确保 Canal 服务和 MySQL 服务器正常运行。

3.4 测试数据同步:

在 MySQL 中插入、更新或删除数据,观察 Java 应用是否能够实时捕获这些变化并将数据同步到 Redis。

相关SQL如下:

drop database if exists canal;
create database canal;
use canal;

drop table if exists user;
create table user(
  `id` bigint AUTO_INCREMENT primary key,
  `name` varchar(20) NOT NULL,
  `age` tinyint DEFAULT 0,
  `detail` varchar(100) DEFAULT '',
  `create_time` date,
  `update_time` date
);

insert into user value(1, 'Tom1', 25, 'canal', '2024-11-07', '2024-11-07');
insert into user value(2, 'Tom2', 25, 'canal', '2024-11-07', '2024-11-07');
insert into user value(3, 'Tom3', 25, 'canal', '2024-11-07', '2024-11-07');
update user set age=26 where id=2;
delete from user where id=3;

输出信息:

================> binlog[binlog.000008:6390] , name[canal,user] , eventType : CREATE
================> binlog[binlog.000008:6899] , name[canal,user] , eventType : INSERT
Insert: 1 -> name:Tom1,age:25,detail:canal,create_time:2024-11-07,update_time:2024-11-07,
================> binlog[binlog.000008:7213] , name[canal,user] , eventType : INSERT
Insert: 2 -> name:Tom2,age:25,detail:canal,create_time:2024-11-07,update_time:2024-11-07,
================> binlog[binlog.000008:7527] , name[canal,user] , eventType : INSERT
Insert: 3 -> name:Tom3,age:25,detail:canal,create_time:2024-11-07,update_time:2024-11-07,
================> binlog[binlog.000008:7850] , name[canal,user] , eventType : UPDATE
-------> before
Update: 2 -> name:Tom2,age:25,detail:canal,create_time:2024-11-07,update_time:2024-11-07,
-------> after
Update: 2 -> name:Tom2,age:26,detail:canal,create_time:2024-11-07,update_time:2024-11-07,
================> binlog[binlog.000008:8193] , name[canal,user] , eventType : DELETE
Delete: 3

五、注意事项

性能优化:根据实际需求调整 Canal 和 Redis 的配置,以优化性能。

错误处理:在生产环境中,需要增加错误处理和重试机制,确保数据同步的可靠性。

安全性:确保 Canal 和 Redis 的连接是安全的,使用适当的认证和授权机制。

六、结论

通过使用 Canal,我们可以轻松地将 MySQL 数据实时同步到 Redis、Kafka 或其他系统。这不仅提高了数据的一致性和实时性,还为应用提供了更高的性能和响应速度。

以上就是Java使用Canal同步MySQL数据到Redis的详细内容,更多关于Java Canal同步MySQL数据的资料请关注脚本之家其它相关文章!

相关文章

  • Java的SPI机制实例详解

    Java的SPI机制实例详解

    这篇文章主要介绍了Java的SPI机制实例详解的相关资料,需要的朋友可以参考下
    2017-06-06
  • 通过Java代码来创建view的方法

    通过Java代码来创建view的方法

    本文给大家分享通过java代码创建view的方法,以TextView为例创建控件的方法,需要的的朋友参考下吧
    2017-08-08
  • Spring事务失效的一种原因关于this调用的问题

    Spring事务失效的一种原因关于this调用的问题

    这篇文章主要介绍了Spring事务失效的一种原因关于this调用的问题,本文给大家分享问题原因及解决办法,通过实例代码给大家介绍的非常详细,需要的朋友可以参考下
    2021-10-10
  • 基于Java实现Socket编程入门

    基于Java实现Socket编程入门

    Java最初是作为网络编程语言出现的,使得客户端和服务器的沟通变成了现实,而在网络编程中,使用最多的就是Socket,本文就来介绍一下基于Java实现Socket编程入门,感兴趣的可以来了解一下
    2022-03-03
  • Java延迟队列原理与用法实例详解

    Java延迟队列原理与用法实例详解

    这篇文章主要介绍了Java延迟队列原理与用法,结合实例形式详细分析了延迟队列的概念、原理、功能及具体使用方法,需要的朋友可以参考下
    2018-09-09
  • JDK动态代理原理:只能代理接口,不能代理类问题

    JDK动态代理原理:只能代理接口,不能代理类问题

    这篇文章主要介绍了JDK动态代理原理:只能代理接口,不能代理类问题。具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-11-11
  • 什么是Java自旋锁

    什么是Java自旋锁

    这篇文章主要介绍了什么是Java自旋锁,在有些场景中,同步资源的锁定时间很短,为了这一小段时间去切换线程,线程挂起和恢复现场的花费可能会让系统得不偿失,下面来了解具体内容介绍吧
    2022-01-01
  • Kafka 安装与配置详细过程

    Kafka 安装与配置详细过程

    本节详细介绍 Kafka 运行环境的搭建,为了节省篇幅,本节的内容以 Linux CentOS 作为安装演示的操作系统,其他 Linux 系列的操作系统也可以参考本节的内容,对Kafka 安装与配置相关知识感兴趣的朋友一起看看吧
    2021-11-11
  • SpringBoot中如何使用Assert进行断言校验

    SpringBoot中如何使用Assert进行断言校验

    Java 提供了内置的 assert 机制,而 Spring 框架也提供了更强大的 Assert 工具类来帮助开发者进行参数校验和状态检查,下面我们就来看看SpringBoot中如何使用Assert进行断言校验吧
    2025-07-07
  • Spring BeanFactory工厂使用教程

    Spring BeanFactory工厂使用教程

    Spring的本质是一个bean工厂(beanFactory)或者说bean容器,它按照我们的要求,生产我们需要的各种各样的bean,提供给我们使用。只是在生产bean的过程中,需要解决bean之间的依赖问题,才引入了依赖注入(DI)这种技术
    2023-02-02

最新评论