Java实现MySQL数据实时同步至Elasticsearch的方法详解

 更新时间:2025年03月10日 08:43:37   作者:小诸葛IT课堂  
MySQL擅长事务处理,而Elasticsearch(ES)则专注于搜索与分析,将MySQL数据实时同步到ES,可以充分发挥两者的优势,下面我们就来看看如何使用Java实现这一功能吧

引言:为什么需要实时同步?

MySQL擅长事务处理,而Elasticsearch(ES)则专注于搜索与分析。将MySQL数据实时同步到ES,可以充分发挥两者的优势,例如:

  • 构建高性能搜索服务
  • 实时数据分析与大屏展示
  • 提升复杂查询效率

传统方案(如定时全量同步)存在延迟高、资源浪费等问题。本文将基于MySQL Binlog监听实现毫秒级实时同步,并提供完整Java代码及深度源码解析。

一、技术选型与核心原理

1.1 核心组件

MySQL Binlog:MySQL的二进制日志,记录所有数据变更事件(增删改)。

Canal/OpenReplicator:解析Binlog的工具(本文使用轻量级mysql-binlog-connector-java)。

Elasticsearch High Level REST Client:ES官方Java客户端,用于数据写入。

1.2 架构流程图

MySQL Server → Binlog → Java监听程序 → 数据转换 → Elasticsearch

二、环境准备与配置

2.1 MySQL开启Binlog

# 修改my.cnf(Linux)或my.ini(Windows)
[mysqld]
server_id=1
log_bin=mysql-bin
binlog_format=ROW  # 必须为ROW模式

2.2 创建ES索引

PUT /user
{
  "mappings": {
    "properties": {
      "id": {"type": "integer"},
      "name": {"type": "text"},
      "email": {"type": "keyword"},
      "create_time": {"type": "date"}
    }
  }
}

三、Java代码实现

3.1 Maven依赖

<dependency>
    <groupId>com.github.shyiko</groupId>
    <artifactId>mysql-binlog-connector-java</artifactId>
    <version>0.25.4</version>
</dependency>
<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-high-level-client</artifactId>
    <version>7.17.3</version>
</dependency>

3.2 核心代码(Binlog监听与同步)

import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.*;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
 
public class MySQL2ESSyncer {
 
    private static final String ES_INDEX = "user";
 
    public static void main(String[] args) throws Exception {
        // 初始化ES客户端
        RestHighLevelClient esClient = ESClientFactory.createClient();
 
        // 配置Binlog监听
        BinaryLogClient client = new BinaryLogClient("localhost", 3306, "root", "password");
        client.setServerId(1001); // 唯一ID,避免冲突
 
        client.registerEventListener(event -> {
            EventData data = event.getData();
            if (data instanceof WriteRowsEventData) {
                // 处理插入事件
                handleWriteEvent((WriteRowsEventData) data, esClient);
            } else if (data instanceof UpdateRowsEventData) {
                // 处理更新事件
                handleUpdateEvent((UpdateRowsEventData) data, esClient);
            } else if (data instanceof DeleteRowsEventData) {
                // 处理删除事件
                handleDeleteEvent((DeleteRowsEventData) data, esClient);
            }
        });
 
        client.connect(); // 启动监听
    }
 
    private static void handleWriteEvent(WriteRowsEventData eventData, RestHighLevelClient esClient) {
        eventData.getRows().forEach(row -> {
            // 假设表结构为:id, name, email, create_time
            String json = String.format(
                "{\"id\":%d,\"name\":\"%s\",\"email\":\"%s\",\"create_time\":\"%s\"}",
                row[0], row[1], row[2], row[3]
            );
            IndexRequest request = new IndexRequest(ES_INDEX)
                .id(row[0].toString())
                .source(json, XContentType.JSON);
            esClient.index(request, RequestOptions.DEFAULT);
        });
    }
 
    // 更新和删除处理类似,代码略(完整源码见文末链接)
}

四、源码深度解析

4.1 Binlog监听流程

BinaryLogClient:核心类,负责连接MySQL并监听Binlog。

事件类型判断:根据WriteRowsEventData、UpdateRowsEventData、DeleteRowsEventData区分增、改、删操作。

4.2 数据转换关键点

Row数据解析:从事件中提取变更的行的具体值,需与表结构顺序对应。

ES文档ID:建议使用MySQL主键,确保更新/删除操作能精准定位文档。

4.3 异常处理与优化

重试机制:ES写入失败时,可加入重试队列。

批量提交:攒批写入ES提升性能(需权衡实时性)。

事务一致性:确保Binlog位置持久化,避免数据丢失。

五、方案优缺点对比

方案实时性复杂度资源消耗
定时全量同步低(分钟级)
基于触发器高(需改表)
Binlog监听

六、总结与扩展

本文实现了基于Binlog的MySQL到ES的实时同步,具备以下优势:

  • 实时性:毫秒级延迟,满足大部分业务场景。
  • 无侵入:无需修改MySQL表结构。
  • 可扩展:可轻松适配其他数据源(如PostgreSQL)。

扩展方向:

  • 使用Kafka作为中间层,解耦生产与消费。
  • 增加监控报警,保障数据一致性。
  • 支持DDL变更自动同步(如表结构修改)。

到此这篇关于Java实现MySQL数据实时同步至Elasticsearch的方法详解的文章就介绍到这了,更多相关Java MySQL数据同步至Elasticsearch内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Springboot2.1.6集成activiti7出现登录验证的实现

    Springboot2.1.6集成activiti7出现登录验证的实现

    这篇文章主要介绍了Springboot2.1.6集成activiti7出现登录验证的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-12-12
  • SpringBoot新特性之全局懒加载机制

    SpringBoot新特性之全局懒加载机制

    这篇文章主要介绍了SpringBoot新特性之全局懒加载机制,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-07-07
  • Java的RTTI和反射机制代码分析

    Java的RTTI和反射机制代码分析

    这篇文章主要涉及了Java的RTTI和反射机制代码分析的相关内容,在介绍运行时类型识别的同时,又向大家展示了其实例以及什么时候会用到反射机制,内容丰富,需要的朋友可以参考下。
    2017-09-09
  • 基于StringBuilder类中的重要方法(介绍)

    基于StringBuilder类中的重要方法(介绍)

    下面小编就为大家带来一篇基于StringBuilder类中的重要方法(介绍)。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2017-07-07
  • Spring OAuth2.0 单元测试解决方案

    Spring OAuth2.0 单元测试解决方案

    这篇文章主要介绍了Spring OAuth2.0 单元测试解决方案,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-10-10
  • java获取中文拼音首字母的实例

    java获取中文拼音首字母的实例

    下面小编就为大家带来一篇java获取中文拼音首字母的实例。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2017-09-09
  • IDEA maven引入SSL证书校验问题及处理

    IDEA maven引入SSL证书校验问题及处理

    这篇文章主要讨论了在Maven项目中遇到依赖导入问题,特别是关于PKIX路径构建失败的错误,文章提供了三种解决方法:手动下载依赖、忽略SSL证书校验以及生成并导入SSL证书,每种方法都有详细的步骤和示例代码,帮助开发者解决这个问题
    2025-02-02
  • Java 锁的知识总结及实例代码

    Java 锁的知识总结及实例代码

    这篇文章主要介绍了Java 锁的知识总结及实例代码,需要的朋友可以参考下
    2016-09-09
  • java LRU算法介绍与用法示例

    java LRU算法介绍与用法示例

    这篇文章主要介绍了java LRU算法,简单介绍了LRU算法的概念并结合实例形式分析了LRU算法的具体使用方法,需要的朋友可以参考下
    2017-09-09
  • Java基于fork/koin类实现并发排序

    Java基于fork/koin类实现并发排序

    这篇文章主要介绍了Java基于fork/koin类实现并发排序,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-02-02

最新评论