MySQL同步Elasticsearch的6种方案小结

 更新时间:2025年05月06日 09:43:28   作者:苏三说技术  
在分布式架构中,MySQL与Elasticsearch(ES)的协同已成为解决高并发查询与复杂检索的标配组合,本文整理了MySQL同步ES的6种主流方案,大家可以根据自己的需要进行选择

引言

在分布式架构中,MySQL与Elasticsearch(ES)的协同已成为解决高并发查询与复杂检索的标配组合。

然而,如何实现两者间的高效数据同步,是架构设计中绕不开的难题。

这篇文章跟大家一起聊聊MySQL同步ES的6种主流方案,结合代码示例与场景案例,帮助开发者避开常见陷阱,做出最优技术选型。

方案一:同步双写

场景:适用于对数据实时性要求极高,且业务逻辑简单的场景,如金融交易记录同步。

在业务代码中同时写入MySQL与ES。

代码如下:

@Transactional  
public void createOrder(Order order) {  
    // 写入MySQL  
    orderMapper.insert(order);  
    // 同步写入ES  
    IndexRequest request = new IndexRequest("orders")  
        .id(order.getId())  
        .source(JSON.toJSONString(order), XContentType.JSON);  
    client.index(request, RequestOptions.DEFAULT);  
}

痛点

  • 硬编码侵入:所有涉及写操作的地方均需添加ES写入逻辑。
  • 性能瓶颈:双写操作导致事务时间延长,TPS下降30%以上。
  • 数据一致性风险:若ES写入失败,需引入补偿机制(如本地事务表+定时重试)。

方案二:异步双写

场景:电商订单状态更新后需同步至ES供客服系统检索。

我们可以使用MQ进行解耦。

架构图如下

代码示例如下

// 生产者端  
public void updateProduct(Product product) {  
    productMapper.update(product);  
    kafkaTemplate.send("product-update", product.getId());  
}  

// 消费者端  
@KafkaListener(topics = "product-update")  
public void syncToEs(String productId) {  
    Product product = productMapper.selectById(productId);  
    esClient.index(product);  
}

优势

  • 吞吐量提升:通过MQ削峰填谷,可承载万级QPS。
  • 故障隔离:ES宕机不影响主业务链路。

缺陷

  • 消息堆积:突发流量可能导致消费延迟(需监控Lag值)。
  • 顺序性问题:需通过分区键保证同一数据的顺序消费。

方案三:Logstash定时拉取

场景:用户行为日志的T+1分析场景。

该方案低侵入但高延迟。

配置示例如下

input {  
  jdbc {  
    jdbc_driver => "com.mysql.jdbc.Driver"  
    jdbc_url => "jdbc:mysql://localhost:3306/log_db"  
    schedule => "*/5 * * * *"  # 每5分钟执行  
    statement => "SELECT * FROM user_log WHERE update_time > :sql_last_value"  
  }  
}  
output {  
  elasticsearch {  
    hosts => ["es-host:9200"]  
    index => "user_logs"  
  }  
}

适用性分析

  • 优点:零代码改造,适合历史数据迁移。
  • 致命伤
    • 分钟级延迟(无法满足实时搜索)
    • 全表扫描压力大(需优化增量字段索引)

方案四:Canal监听Binlog

场景:社交平台动态实时搜索(如微博热搜更新)。

技术栈:Canal + RocketMQ + ES

该方案高实时,并且低侵入。

架构流程如下

关键配置

# canal.properties  
canal.instance.master.address=127.0.0.1:3306  
canal.mq.topic=canal.es.sync

避坑指南

  • 数据漂移:需处理DDL变更(通过Schema Registry管理映射)。
  • 幂等消费:通过_id唯一键避免重复写入。

方案五:DataX批量同步

场景:将历史订单数据从分库分表MySQL迁移至ES。

该方案是大数据迁移的首选。

配置文件如下

{  
  "job": {  
    "content": [{  
      "reader": {  
        "name": "mysqlreader",  
        "parameter": { "splitPk": "id", "querySql": "SELECT * FROM orders" }  
      },  
      "writer": {  
        "name": "elasticsearchwriter",  
        "parameter": { "endpoint": "http://es-host:9200", "index": "orders" }  
      }  
    }]  
  }  
}

性能调优

  • 调整channel数提升并发(建议与分片数对齐)
  • 启用limit分批查询避免OOM

方案六:Flink流处理

场景:商品价格变更时,需关联用户画像计算实时推荐评分。

该方案适合于复杂的ETL场景。

代码片段如下

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();  
env.addSource(new CanalSource())  
   .map(record -> parseToPriceEvent(record))  
   .keyBy(event -> event.getProductId())  
   .connect(userProfileBroadcastStream)  
   .process(new PriceRecommendationProcess())  
   .addSink(new ElasticsearchSink());

优势

  • 状态管理:精准处理乱序事件(Watermark机制)
  • 维表关联:通过Broadcast State实现实时画像关联

总结

对于文章上面给出的这6种技术方案,我们在实际工作中,该如何做选型呢?

下面用一张表格做对比:

方案实时性侵入性复杂度适用阶段
同步双写秒级小型单体项目
MQ异步秒级中型分布式系统
Logstash分钟级离线分析
Canal毫秒级高并发生产环境
DataX小时级历史数据迁移
Flink毫秒级极高实时数仓

苏三的建议

  • 若团队无运维中间件能力 → 选择Logstash或同步双写
  • 需秒级延迟且允许改造 → MQ异步 + 本地事务表
  • 追求极致实时且资源充足 → Canal + Flink双保险

到此这篇关于MySQL同步Elasticsearch的6种方案小结的文章就介绍到这了,更多相关MySQL同步Elasticsearch内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • MySQL细数发生索引失效的情况

    MySQL细数发生索引失效的情况

    本文主要介绍了MySQL导致索引失效的几种情况,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2022-07-07
  • 数据从MySQL迁移到Oracle 需要注意什么

    数据从MySQL迁移到Oracle 需要注意什么

    将数据从MySQL迁移到Oracle,大家需要注意什么?Oracle移植到mysql,又需要注意什么?如何有效解决移植过程的问题,为了数据库的兼容性我们又该注意些什么?感兴趣的小伙伴们可以参考一下
    2016-11-11
  • Mysql服务器的启动与停止(一)

    Mysql服务器的启动与停止(一)

    Mysql服务器的启动与停止(一)...
    2006-11-11
  • 基于Mysql+JavaSwing的超市商品管理系统设计与实现

    基于Mysql+JavaSwing的超市商品管理系统设计与实现

    本项目是使用Java swing开发,可实现超市管理系统商品列表信息查询、添加商品信息和修改商品管理以及删除商品信息和安装商品信息查询等功能。界面设计和功能比较简单基础、适合作为Java课设设计以及学习技术使用,需要的朋友可以参考一下
    2021-09-09
  • 一键安装mysql5.7及密码策略修改方法

    一键安装mysql5.7及密码策略修改方法

    这篇文章主要介绍了一键安装mysql5.7及密码策略修改方法,非常不错,具有一定的参考借鉴价值,需要的朋友可以参考下
    2018-10-10
  • MySQL常用存储引擎功能与用法详解

    MySQL常用存储引擎功能与用法详解

    这篇文章主要介绍了MySQL常用存储引擎功能与用法,较为详细的分析了mysql存储引擎的分类、功能、使用方法及相关操作注意事项,需要的朋友可以参考下
    2018-04-04
  • MySQL常用命令与内部组件及SQL优化详情

    MySQL常用命令与内部组件及SQL优化详情

    这篇文章主要介绍了MySQL常用命令与内部组件及SQL优化详情,文章围绕主题展开详细的内容介绍,具有一定的参考价值,需要的朋友可以参考一下
    2022-07-07
  • MySQL去除重叠时间求时间差和的实现

    MySQL去除重叠时间求时间差和的实现

    在生产中常常出现计算两个时间差的业务,比如总宕机时间、总开通会员时间等,本文就详细的来介绍一下如何计算,感兴趣的可以了解一下
    2021-08-08
  • mysql获得60天前unix时间思路及代码

    mysql获得60天前unix时间思路及代码

    首先根据now()获得当前时间,使用adddate()方法获得60天前时间,使用unix_timestamp()方法转换时间类型
    2014-08-08
  • 推荐几款MySQL相关工具

    推荐几款MySQL相关工具

    这篇文章主要介绍了几款MySQL相关工具的相关资料,帮助大家更好的使用和维护MySQL 数据库,感兴趣的朋友可以了解下
    2020-11-11

最新评论