RocketMQ事务消息机制详解

 更新时间:2024年01月11日 09:28:50   作者:智由静生  
这篇文章主要介绍了RocketMQ事务消息机制详解,RocketMQ服务端将消息持久化之后,向发送方返回Ack确认消息已经发送成功,由于消息为半事务消息,在未收到生产者对该消息的二次确认前,此消息被标记成"暂不能投递"状态,需要的朋友可以参考下

RocketMQ事务消息

RocketMQ提供了事务消息,通过事务消息就能达到分布式事务的最终一致,从而实现了可靠消息服务。

一、事务消息的实现步骤

事务消息发送步骤:

1. 发送方将半事务消息发送至RocketMQ服务端。

2. RocketMQ服务端将消息持久化之后,向发送方返回Ack确认消息已经发送成功。由于消息为半事务消息,在未收到生产者对该消息的二次确认前,此消息被标记成“暂不能投递”状态。

3. 发送方开始执行本地事务逻辑。

4. 发送方根据本地事务执行结果向服务端提交二次确认(Commit 或是 Rollback),服务端收到Commit 状态则将半事务消息标记为可投递,订阅方最终将收到该消息;服务端收到 Rollback 状态则删除半事务消息,订阅方将不会接受该消息。

事务消息回查步骤:

1. 在断网或者是应用重启的特殊情况下,上述步骤4提交的二次确认最终未到达服务端,经过固定时间后服务端将对该消息发起消息回查。

2. 发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。 3. 发送方根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行操作。

二、程序实现

事务消息处理类需要继承RocketMQLocalTransactionListener类。该类的executeLocalTransaction方法负责在接到RocketMQ服务端的Ack确认消息后执行本地方法,也就是事务消息发送步骤中的步骤3。该类的checkLocalTransaction方法负责,在断网或者是应用重启的特殊情况下,执行RocketMQ服务端的消息回查,也就是事务消息回查步骤中的步骤2。

此外,要使该类生效,还需要加@RocketMQTransactionListener注解。这里有个要特别注意的地方。在2.1.0版本前,这个注解有一个属性txProducerGroup,可以用多个@RocketMQTransactionListener来监听不同的txProducerGroup来发送不同类型的事务消息到topic。但是现在在一个项目中,如果你在一个project中写了多个@RocketMQTransactionListener,项目将不能启动,启动会报错。产生这个问题的原因据说是,当使用RocketMQTemplate并发的执行事务时,非常容易出现"illegal state"的异常,原因是一个TransactionProducer在执行事务时不能被共享。所以,必须使用同一个TransactionMQProducer来发送所有类型的事务消息。当然同理也就必须使用一个侦听器处理所有的消息了。

既然必须使用同一个TransactionMQProducer,对于比较大的应用,业务场景很多,就会造成混乱。这里我给出一个方案抛砖引玉。TransactionMQProducer在发送消息时,是可以传递参数对象和指定消息头的。可以把要执行的本地方法的bean名和方法名放进去。

//发送半事务消息
TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(
		topicAndTag,
		MessageBuilder.withPayload(msg)
			.setHeader(Constants.TX_ID_HEADER_NAME, msg.getTxId())
			.setHeader(Constants.CHECK_BEAN_ID_HEADER_NAME, def.getCheckBeanId())
			.setHeader(Constants.BIZ_ID_HEADER_NAME, msg.getBizId())
			.build(),
		def
);

其中def就是参数对象,可以自定义对象,这里是我自定义的TransactionMsgDefinationDto类,可以把想传递的信息放进去,最重要的是要执行的本地方法的bean名和方法名和方法执行参数:executeBeanId(bean名)、executeBeanMethod(方法名)、executeBeanParams(方法执行参数)。该对象可以传给RocketMQLocalTransactionListener的executeLocalTransaction方法,然后通过反射执行。

@Override
	public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
		try {
			//保存消息记录
			String body = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8);
			JSONObject jsonBody = JSONObject.parseObject(body);
			BaseMsgDto dto = JSONObject.toJavaObject(jsonBody, BaseMsgDto.class);//(BaseMsgDto)msg.getPayload();
			TransactionMsgDefinationDto def = (TransactionMsgDefinationDto)arg;
			ProducerLog producerLog = BeanCopyUtils.copyProperties(def, ProducerLog::new);
			String[] tags = def.getMsgTags();
			if(tags !=null && tags.length > 0) {
				StringBuilder tag = new StringBuilder();
				for(int i = 0; i<tags.length; i++) {
					tag.append(tags[0]);
					if(i != tags.length-1) {
						tag.append("||");
					}
				}
				producerLog.setMsgTag(tag.toString());
			}
			producerLog.setBizId(dto.getBizId());
			producerLog.setTxId(dto.getTxId());
			producerLog.setBizType(dto.getBizType());
			producerLog.setGroupName(dto.getProducerGroup());
			producerLog.setMsgBody(body);
			producerLogService.save(producerLog);
			//执行事务方法
			SpringUtil.invokeBeanMethod(def.getExecuteBeanId(), def.getExecuteBeanMethod(), def.getExecuteBeanParams());
			return RocketMQLocalTransactionState.COMMIT;
		} catch (Exception e) {
			logger.error("发生错误:", e);
			return RocketMQLocalTransactionState.UNKNOWN;
		}
	}

放在消息头header中的数据可以传递给RocketMQLocalTransactionListener的checkLocalTransaction方法,然后同样通过反射执行。

@Override
	public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
		try {
			String txId = (String)msg.getHeaders().get(Constants.TX_ID_HEADER_NAME); 
			String checkBeanId = (String)msg.getHeaders().get(Constants.CHECK_BEAN_ID_HEADER_NAME);
			Long bizId = Long.parseLong((String)msg.getHeaders().get(Constants.BIZ_ID_HEADER_NAME));
			//执行检查方法
			Boolean ret = (Boolean)SpringUtil.invokeBeanMethod(checkBeanId, "check", new Object[]{bizId, txId});
			if(ret.booleanValue())
				return RocketMQLocalTransactionState.COMMIT;
			else
				return RocketMQLocalTransactionState.ROLLBACK;
		} catch (Exception e) {
			logger.error("发生错误:", e);
			return RocketMQLocalTransactionState.UNKNOWN;
		}
	}

到此这篇关于RocketMQ事务消息机制详解的文章就介绍到这了,更多相关RocketMQ事务消息内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • springboot 项目使用jasypt加密数据源的方法

    springboot 项目使用jasypt加密数据源的方法

    Jasypt 是一个 Java 库,它允许开发者以最小的努力为他/她的项目添加基本的加密功能,而且不需要对密码学的工作原理有深刻的了解。接下来通过本文给大家介绍springboot 项目使用jasypt加密数据源的问题,一起看看吧
    2021-11-11
  • Spring Boot使用过滤器和拦截器分别实现REST接口简易安全认证示例代码详解

    Spring Boot使用过滤器和拦截器分别实现REST接口简易安全认证示例代码详解

    这篇文章主要介绍了Spring Boot使用过滤器和拦截器分别实现REST接口简易安全认证示例代码,通过开发实践,理解过滤器和拦截器的工作原理,需要的朋友可以参考下
    2018-06-06
  • SpringBoot3整合SpringSecurity6快速入门示例教程

    SpringBoot3整合SpringSecurity6快速入门示例教程

    SpringSecurity 是Spring大家族中一名重要成员,是专门负责安全的框架,本文给大家介绍SpringBoot3整合SpringSecurity6快速入门示例教程,感兴趣的朋友一起看看吧
    2025-04-04
  • Java调用打印机的2种方式举例(无驱/有驱)

    Java调用打印机的2种方式举例(无驱/有驱)

    我们平时使用某些软件或者在超市购物的时候都会发现可以使用打印机进行打印,这篇文章主要给大家介绍了关于Java调用打印机的2种方式,分别是无驱/有驱的相关资料,需要的朋友可以参考下
    2023-11-11
  • idea创建xml文件全过程

    idea创建xml文件全过程

    总结:通过File->Settings->Editor->FileAndCodeTemplates,创建一个自定义的XML文件模板,并命名为XMLFile.xml,后缀名为xml,模板内容可自定义,并启用实时模板功能,然后在文件夹中右键New,即可找到并创建XML文件
    2025-11-11
  • 使用Java把文本内容转换成网页的实现方法分享

    使用Java把文本内容转换成网页的实现方法分享

    这篇文章主要介绍了使用Java把文本内容转换成网页的实现方法分享,利用到了Java中的文件io包,需要的朋友可以参考下
    2015-11-11
  • mybatis-plus雪花算法生成Id使用详解

    mybatis-plus雪花算法生成Id使用详解

    本文主要介绍了mybatis-plus雪花算法生成Id使用详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2022-07-07
  • 详解Spring AOP 实现主从读写分离

    详解Spring AOP 实现主从读写分离

    本篇文章主要介绍了Spring AOP 实现主从读写分离,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2017-03-03
  • java导出dbf文件生僻汉字处理方式

    java导出dbf文件生僻汉字处理方式

    这篇文章主要介绍了java导出dbf文件生僻汉字处理方式,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-06-06
  • 详解springboot WebTestClient的使用

    详解springboot WebTestClient的使用

    WebClient是一个响应式客户端,它提供了RestTemplate的替代方法。这篇文章主要介绍了详解springboot WebTestClient的使用, 具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2018-11-11

最新评论