Spring Boot 事务实战之如何解决 DB 与 MQ 的"双写不一致"问题

 更新时间:2025年12月10日 10:10:29   作者:qq_25624705  
本文结合代码案例探讨如何利用Spring的 TransactionSynchronizationManager 实现事务提交后触发(Trigger After Commit)机制,优雅解决数据库与消息队列的"双写一致性"问题,感兴趣的朋友跟随小编一起看看吧

摘要:在分布式系统中,“先存数据库还是先发消息”是一个经典的架构难题。特别是在 IM 系统的多媒体消息处理场景中,如果处理顺序不当,不仅会导致对象存储(OSS)中产生无法回收的“孤儿文件”,还会引发并发重复处理的问题。本文结合代码案例,探讨如何利用 Spring 的 TransactionSynchronizationManager 实现 事务提交后触发 (Trigger After Commit) 机制,优雅解决数据库与消息队列的“双写一致性”问题。

1. 引言:一个看似简单的顺序问题

在“信令与媒体分离”的架构中,核心流程通常如下:API 服务收到消息 -> 落库(标记为 Pending) -> 异步通知 Worker 搬运文件

这一流程涉及两个异构系统的写操作:

  • DB Write:将消息元数据写入 MySQL。
  • MQ Write:将搬运任务发布到 NATS。

在实际开发中,直觉性的代码编写往往会陷入以下误区:

误区一:先发消息,后入库

// ❌ 错误示范
natsPublisher.publish(task); // 1. 消息发出,Worker 开始下载转存
messageRepository.save(message); // 2. 数据库报错(如字段超长、唯一键冲突)
// 后果:DB 回滚,业务无记录,但 OSS 中产生了一个永远无法被引用的“孤儿文件”。

误区二:在事务内发消息

// ❌ 错误示范
@Transactional
public void handle() {
    messageRepository.save(message);
    natsPublisher.publish(task); 
    // 3. 代码执行完毕,但在事务提交(Commit)的一瞬间数据库连接断开
}
// 后果:Worker 收到任务并完成处理,但在回调更新状态时发现 DB 中不存在该记录。

2. 核心方案:事务提交后的“惊险一跃”

为了保证 “只有数据库确确实实持久化成功了,才去触发异步任务”,最佳实践是利用 Spring 框架提供的事务同步机制。

以下是优化后的代码实现:

2.1 主业务逻辑

// 1. 准备阶段:预生成任务(纯内存操作,无副作用)
// 此时并没有真正发送 NATS 消息,只是构建了对象
List<MediaTransferTask> mediaTasks = prepareMediaTransferTasks(msg, ids.sessionId());
// 2. 构建消息实体
WxMessage message = buildMessage(msg, accountId, ids.sessionId(), ids.senderId());
try {
    // 【核心步骤 A】数据库落库 (Source of Truth)
    // 这是唯一的“事实来源”。如果这里失败,后续一切都不应发生。
    messageRepository.save(message);
    log.info("Message saved: id={}, wxid={}", message.getId(), message.getWxid());
    // 3. 发布会话更新事件 (内存事件或 MQ)
    SessionUpdateEvent event = SessionUpdateEvent.builder()
        .accountId(accountId)
        // ... build params
        .build();
    sessionEventPublisher.publishSessionUpdate(event);
    // 【核心步骤 B】注册事务回调
    // 关键点:这里不是立即发送,而是“预约”发送
    publishMediaTransferTasksAfterCommit(mediaTasks);
    return ProcessResult.success();
} catch (DataIntegrityViolationException e) {
    // 【并发场景的保护】
    // 如果两个线程同时处理同一条消息(如网络重放或客户端重试),
    // 数据库的唯一索引会抛出此异常。
    // 由于消息发送逻辑在事务提交后执行,失败的线程事务回滚,
    // 因此“afterCommit”钩子不会被触发,完美避免了 Worker 重复搬运文件。
    log.debug("Duplicate message (concurrent): {}", msg.getMessageId());
    return ProcessResult.duplicate();
}

2.2 事务同步器的实现

publishMediaTransferTasksAfterCommit 方法利用了 Spring 的 TransactionSynchronizationManager 来挂载回调。
private void publishMediaTransferTasksAfterCommit(List<MediaTransferTask> tasks) {
    if (CollectionUtils.isEmpty(tasks)) {
        return;
    }
    // 判断当前是否在事务中
    if (TransactionSynchronizationManager.isActualTransactionActive()) {
        // 注册同步器
        TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
            @Override
            public void afterCommit() {
                // 【真正发送的时机】
                // 只有当 DB 事务成功 Commit 后,这一行才会执行
                // 此时 DB 里一定有数据,Worker 回调一定能成功
                tasks.forEach(mediaTransferPublisher::publishMediaTransfer);
                log.debug("Async media tasks published after commit: size={}", tasks.size());
            }
        });
    } else {
        // 如果不在事务中(比如非事务方法调用),则立即发送(降级策略)
        tasks.forEach(mediaTransferPublisher::publishMediaTransfer);
    }
}

3. 深度解析:方案优势

3.1 杜绝“孤儿资源”

通过 afterCommit 钩子,严格保证了因果关系:因(DB落库成功) -> 果(触发搬运)。 如果 messageRepository.save(message) 因为任何原因(业务校验失败、数据库异常)导致事务回滚,afterCommit 回调将永远不会被执行,NATS 消息也就不会发出,从而从源头上避免了 OSS 资源的浪费。

3.2 天然的幂等性防护

代码中对 DataIntegrityViolationException 的捕获处理是该方案的另一大亮点。 在分布式场景下,消息重复投递是常见现象。

  • 无保护模式:若未加控制,两个线程可能都会发出 NATS 消息,导致 Worker 下载上传两次同样的图片,浪费带宽和计算资源。
  • 事务同步模式:数据库的唯一约束(Unique Key)充当了“守门员”。第二个线程在 save 时会因冲突被拒绝,随之事务回滚。由于事务未成功提交, 其注册的 afterCommit 钩子自动失效。最终,只有抢锁成功的线程才会发出唯一的一条异步任务。

4. 兜底策略:应对“反向不一致”

虽然该方案解决了“有文件没记录”的问题,但理论上仍存在极低概率的“反向不一致”:DB 提交成功了,但在执行 afterCommit 发送 NATS 消息的一瞬间,服务宕机或断电。

此时,数据库中存在一条状态为 PENDING 的记录,但永远不会有 Worker 来处理它。

为了达到金融级的一致性,系统应补充一个兜底补偿机制

  • **定时任务 (Compensation Job)**:每隔一定周期(如 5 分钟)扫描一次消息表。
  • 筛选条件create_time < 5分钟前 AND media_status = 'PENDING'
  • 补偿动作:重新构建 MediaTransferTask 并补发到 NATS。

5. 总结

在处理“数据库事务”与“外部系统调用(MQ/RPC)”混合的业务场景时,“事务同步器(Transaction Synchronization)” 是 Spring 体系中解决双写一致性问题的利器。

通过这一模式的重构,系统实现了:

  • 资源一致性:杜绝了 OSS 孤儿文件。
  • 并发安全性:利用数据库锁自动解决并发任务重复发布问题。
  • 逻辑严密性:确保状态流转严格遵循业务时序。

核心原则:先落库,再提交,回调之中发消息。

到此这篇关于Spring Boot 事务实战之如何优雅解决 DB 与 MQ 的"双写不一致"问题的文章就介绍到这了,更多相关springboot db与mq双写不一致内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Maven的安装+配置本地仓库路径方式

    Maven的安装+配置本地仓库路径方式

    这篇文章主要介绍了Maven的安装+配置本地仓库路径方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2024-09-09
  • 最新版Eclipse安装、配置图文教程详解

    最新版Eclipse安装、配置图文教程详解

    这篇文章主要介绍了新版Eclipse安装、配置,本文通过图文并茂的形式给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-08-08
  • 告诉你springboot各个文件夹的作用

    告诉你springboot各个文件夹的作用

    这篇文章主要介绍了springboot各个文件夹是干嘛的,本文通过截图实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2022-08-08
  • 一文带你了解Java中的Object类及类中方法

    一文带你了解Java中的Object类及类中方法

    Object是Java默认提供的一个类。Java里面除了Object类,所有的类都是存在继承关系的。默认会继承Object父 类。即所有类的对象都可以使用Object的引用进行接收。本文就来为大家详细讲讲Object类及类中方法,感兴趣的可以了解一下
    2022-08-08
  • Java中4种校验注解详解(值校验、范围校验、长度校验、格式校验)

    Java中4种校验注解详解(值校验、范围校验、长度校验、格式校验)

    这篇文章主要给大家介绍了关于Java中4种校验注解详解的相关资料,分别包括值校验、范围校验、长度校验、格式校验等,Java注解(Annotation)是一种元数据,它可以被添加到Java代码中,并可以提供额外的信息和指令,需要的朋友可以参考下
    2023-08-08
  • Java设计模式之java状态模式详解

    Java设计模式之java状态模式详解

    这篇文章主要介绍了Java设计模式之状态模式定义与用法,结合具体实例形式详细分析了Java状态模式的概念、原理、定义及相关操作技巧,需要的朋友可以参考下
    2021-09-09
  • Springboot 多租户SaaS搭建方案

    Springboot 多租户SaaS搭建方案

    这篇文章主要介绍了Springboot 多租户SaaS方案,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2022-06-06
  • 学习JVM之java内存区域与异常

    学习JVM之java内存区域与异常

    关于JVM内存区域的知识对于初学者来说其实是很重要的,了解Java内存分配的原理,这对于以后JAVA的学习会有更深刻的理解。下面来看看详细介绍。
    2016-07-07
  • 详解Java的Struts框架以及相关的MVC设计理念

    详解Java的Struts框架以及相关的MVC设计理念

    这篇文章主要介绍了详解Java的Struts框架以及相关的MVC设计理念,Struts是Java的SSH三大web开发框架之一,需要的朋友可以参考下
    2015-12-12
  • 使用SpringSecurity处理CSRF攻击的方法步骤

    使用SpringSecurity处理CSRF攻击的方法步骤

    这篇文章主要介绍了使用SpringSecurity处理CSRF攻击的方法步骤,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2019-03-03

最新评论