Canal入门使用小结

 更新时间:2025年02月07日 10:37:15   作者:何中应  
Canal是一款MySQL数据库增量日志解析工具,用于实现数据库之间的数据同步,本文主要介绍了Canal入门使用小结,感兴趣的可以了解一下

说明:canal [kə’næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费(官方介绍)。一言以蔽之,Canal是一款实现数据同步的组件。可以实现数据库之间、数据库与Redis、ES之间的数据同步。本文介绍Canal的入门使用。

Canal介绍

Canal实现原理是伪装成MySQL主节点的从节点,接收主节点的binlog日志,解析、提取数据库操作,将对数据库的操作通过代码更新到其他组件中,如其他数据库、ES、Redis等,官方解释如下:

在这里插入图片描述

官方提供的结构图如下:

在这里插入图片描述

Canal安装

首先,从官网上下载Canal服务器,地址:https://github.com/alibaba/canal/releases

在这里插入图片描述

下载下来,解压,如下:

在这里插入图片描述

canal配置文件暂时不用管,先修改一下example文件中监测的目前节点配置,修改成自己需要监测的MySQL配置,如下:

在这里插入图片描述

修改完,启动canal服务,双击startup.bat文件,如下:

在这里插入图片描述

Canal使用

只要你的MySQL服务器的IP、账号密码没输错,且测试过能用Navicat或其他数据库连接工具成功连接数据库,那么就可以进行下面的编码工作了。

首先,创建一个Maven项目,pom.xml如下,导个canal依赖就行了

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.hezy</groupId>
    <artifactId>canal_demo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <!--canal客户端-->
        <dependency>
            <groupId>top.javatool</groupId>
            <artifactId>canal-spring-boot-starter</artifactId>
            <version>1.2.1-RELEASE</version>
        </dependency>
    </dependencies>

</project>

测试代码如下,用来连接canal服务器,打印canal监测到的数据内容;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;

import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.List;

/**
 * Canal处理器
 * 作用:打印canal服务器监测到的数据
 */
public class CanalHandler {
    public static void main(String[] args) throws InvalidProtocolBufferException {

        // 1.创建连接
        CanalConnector canalConnector = CanalConnectors
                .newSingleConnector(new InetSocketAddress("localhost", 11111), "example", "", "");

        // 2.抓取数据
        while (true) {

            // 3.开始连接
            canalConnector.connect();

            // 4.订阅数据,所有的库和表
            canalConnector.subscribe(".*\\..*");

            // 5.抓取数据,每次抓取100条
            Message message = canalConnector.get(100);

            // 6.获取entry集合
            List<CanalEntry.Entry> entries = message.getEntries();

            // 7.判断是否有数据
            if (entries.size() == 0) {
                System.out.println(">>>暂无数据<<<");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } else {
                // 8.解析数据
                for (CanalEntry.Entry entry : entries) {
                    // 获取表名
                    String tableName = entry.getHeader().getTableName();
                    // 获取操作类型
                    CanalEntry.EntryType entryType = entry.getEntryType();

                    // 判断entryType是否为ROWDATA
                    if (CanalEntry.EntryType.ROWDATA.equals(entryType)) {
                        // 序列化数据
                        ByteString storeValue = entry.getStoreValue();
                        // 反序列化数据
                        CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);

                        // 获取事件类型
                        CanalEntry.EventType eventType = rowChange.getEventType();
                        // 获取具体的数据
                        List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
                        // 遍历打印
                        for (CanalEntry.RowData rowData : rowDatasList) {
                            // 获取拉取前后的数据
                            List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
                            List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();

                            // 用Map存储每条数据
                            HashMap<String, Object> beforeMap = new HashMap<>();
                            HashMap<String, Object> afterMap = new HashMap<>();

                            // 获取不同操作的数据
                            if (CanalEntry.EventType.INSERT.equals(eventType)) {
                                System.out.println("【" + tableName + "】表插入数据");
                                for (CanalEntry.Column column : afterColumnsList) {
                                    afterMap.put(column.getName(), column.getValue());
                                }
                                System.out.println("新增数据:" + afterMap);
                            } else if (CanalEntry.EventType.UPDATE.equals(eventType)) {
                                System.out.println("【" + tableName + "】表更新数据");
                                for (CanalEntry.Column column : beforeColumnsList) {
                                    beforeMap.put(column.getName(), column.getValue());
                                }
                                System.out.println("更新前:" + beforeMap);
                                System.out.println("----");
                                for (CanalEntry.Column column : afterColumnsList) {
                                    afterMap.put(column.getName(), column.getValue());
                                }
                                System.out.println("更新后:" + afterMap);
                            } else if (CanalEntry.EventType.DELETE.equals(eventType)) {
                                System.out.println("【" + tableName + "】表删除数据");
                                for (CanalEntry.Column column : beforeColumnsList) {
                                    beforeMap.put(column.getName(), column.getValue());
                                }
                                System.out.println("被删除的数据:" + beforeMap);
                            }
                        }
                    }
                }
            }
        }
    }
}

启动程序,查看控制台,检测中……

在这里插入图片描述

使用Navicat连接数据库,查看数据库test库,i_user表内容;

在这里插入图片描述

此时,我们新增一条数据,看控制台,canal成功接收到了这次修改;

在这里插入图片描述

更新数据;

在这里插入图片描述

删除数据;

在这里插入图片描述

头能过身体就能过,接下来不就好办了。将Canal接收到的数据转为对象,根据不同的操作类型分发给自己想要同步的组件,同步给从MySQL,就调用对应的Mapper;同步给Redis,就调用Redis对应的方法,ES同样。

总结

本文介绍了Canal入门使用,参考B站视频:Canal极简入门:一小时让你快速上手Canal数据同步神技~

到此这篇关于Canal入门使用小结的文章就介绍到这了,更多相关Canal入门内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支

相关文章

  • mysql不重启的情况下修改参数变量

    mysql不重启的情况下修改参数变量

    这篇文章主要介绍了mysql不重启的情况下修改参数变量,需要的朋友可以参考下
    2014-06-06
  • Mysql连接无效(invalid connection)问题及解决

    Mysql连接无效(invalid connection)问题及解决

    这篇文章主要介绍了Mysql连接无效(invalid connection)问题及解决方案,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2023-02-02
  • MySQL Count函数使用教程

    MySQL Count函数使用教程

    这篇文章主要介绍了MySQL Count函数,COUNT()是一个聚合函数,返回指定匹配条件的行数。开发中常用来统计表中数据,全部数据,不为NULL数据,或者去重数据
    2022-12-12
  • MySQL和Oracle的元数据抽取实例分析

    MySQL和Oracle的元数据抽取实例分析

    MySQL和Oracle虽然在架构上有很大的不同,但是如果从某些方面比较起来,它们有些方面也是相通的,下面这篇文章主要给大家介绍了关于MySQL和Oracle元数据抽取的相关资料,需要的朋友可以参考下
    2021-12-12
  • MySQL5.7中的sql_mode默认值带来的坑及解决方法

    MySQL5.7中的sql_mode默认值带来的坑及解决方法

    这篇文章主要介绍了MySQL5.7中的sql_mode默认值带来的坑及解决方法,非常不错,具有一定的参考借鉴价值,需要的朋友可以参考下
    2018-11-11
  • mysql use命令选择数据库详解

    mysql use命令选择数据库详解

    这篇文章主要介绍了mysql 使用use命令选择数据库的相关资料,需要的朋友可以参考下
    2016-09-09
  • 大批量数据用mysql批量更新数据的4种方法总结

    大批量数据用mysql批量更新数据的4种方法总结

    这篇文章主要给大家介绍了关于大批量数据用mysql批量更新数据的4种方法,要在MySQL中新增大批量数据,可以通过以下几种方法来实现,文中给出了详细的代码示例,需要的朋友可以参考下
    2024-05-05
  • 在Linux系统安装Mysql教程

    在Linux系统安装Mysql教程

    本文给大家分享的是如何在linux下安装mysql 图解教程,步奏非常详细,也很实用,这里推荐给大家
    2016-04-04
  • 批量 kill mysql 中运行时间长的sql

    批量 kill mysql 中运行时间长的sql

    这篇文章主要介绍了批量 kill mysql 中运行时间长的sql,需要的朋友可以参考下
    2016-01-01
  • MySQL按照汉字的拼音排序简单实例

    MySQL按照汉字的拼音排序简单实例

    下面小编就为大家带来一篇MySQL按照汉字的拼音排序简单实例。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2017-01-01

最新评论