RocketMQ保证消息的有序性的案例分享

 更新时间:2024年04月25日 08:50:22   作者:一只爱撸猫的程序猿  
Apache RocketMQ 是一个常用的开源消息中间件,它提供了强大的有序消息处理能力,这里我们会探讨 RocketMQ 是如何保证消息的有序性的,包括其设计原理和相关的源码实现,需要的朋友可以参考下

在分布式系统中,消息队列(MQ)的有序性是一个重要的特性,尤其是在需要保证事件顺序执行的业务场景下。Apache RocketMQ 是一个常用的开源消息中间件,它提供了强大的有序消息处理能力。这里我们会探讨 RocketMQ 是如何保证消息的有序性的,包括其设计原理和相关的源码实现。

简单原理

RocketMQ 有序消息的基本概念

RocketMQ 保证有序性的主要方法是通过顺序消息来实现的。在 RocketMQ 中,顺序消息分为全局顺序和分区顺序两种:

  • 全局顺序:指的是消息全局范围内的有序,也就是在所有的消息中,都是按照发送的顺序来消费。
  • 分区顺序:指的是在同一个队列(Queue)中的消息是有序的,而不同队列间的消息并不保证有序。

RocketMQ 默认使用分区顺序,通过将同一个 topic 下的消息分到同一个队列(queue)中,来保证队列内的消息有序。

RocketMQ 有序消息的实现机制

消息发送

在发送端,RocketMQ 通过确保生产者向同一个队列(Queue)发送消息来保证消息的有序性。生产者在发送消息时可以指定消息的 keys 或者其他属性,RocketMQ 通过这些属性计算消息应该发送到哪个队列。

源码示例(伪代码):

public class Producer {
    public void sendMessages(List<Message> messages) {
        for (Message msg : messages) {
            int queueId = this.calculateQueueId(msg);
            msg.setQueueId(queueId);
            this.sendMessageToQueue(msg, queueId);
        }
    }

    private int calculateQueueId(Message msg) {
        // 使用 hash 算法基于 message key 计算队列 ID
        return Math.abs(msg.getKey().hashCode()) % this.queueSize;
    }
}

消息消费

在消费端,RocketMQ 使用单线程消费模式来保证同一个队列的消息顺序性。消费者会固定分配到某个队列,而且是单线程从该队列拉取并处理消息,从而保证消息的有序处理。

源码示例(伪代码):

public class Consumer {
    public void consume() {
        while (true) {
            Message msg = this.pullMessage();
            this.processMessage(msg);
        }
    }
}

简单案例

在Spring Boot中使用RocketMQ来保证消息的队列顺序性,我们需要配置RocketMQ的客户端和服务器端以支持顺序消息。以下是一个基于RocketMQ和Spring Boot实现的消息顺序发送和消费的例子。这个场景假设我们需要在一个电商系统中处理订单状态更新,订单状态更新必须按照顺序来处理,以避免状态不一致。

步骤1: 添加依赖

首先,确保你的pom.xml中加入了RocketMQ的Spring Boot Starter依赖。

<dependencies>
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-spring-boot-starter</artifactId>
        <version>2.1.0</version>
    </dependency>
</dependencies>

步骤2: 配置RocketMQ

application.ymlapplication.properties中配置RocketMQ的基础属性:

rocketmq:
  name-server: 127.0.0.1:9876  # 修改为你的NameServer地址
  producer:
    group: order_producer_group
    send-message-timeout: 3000
  consumer:
    group: order_consumer_group
    consume-thread-min: 1
    consume-thread-max: 1

步骤3: 生产者配置

创建一个生产者服务,这个服务将订单状态更新作为顺序消息发送。

import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class OrderStatusProducer {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    public void sendOrderStatusUpdate(String orderStatus, String orderId) {
        // 使用订单ID作为key保证同一个订单的更新在同一个队列
        SendResult result = rocketMQTemplate.syncSendOrderly("order-topic", orderStatus, orderId);
        System.out.println("Message sent, result: " + result.getSendStatus());
    }
}

步骤4: 消费者配置

创建一个消费者服务,这个服务将按顺序消费订单状态更新。

import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

@Service
@RocketMQMessageListener(topic = "order-topic", consumerGroup = "order_consumer_group", consumeMode = ConsumeMode.ORDERLY)
public class OrderStatusConsumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        System.out.println("Received order update: " + message);
        // 处理订单更新逻辑
        processOrderUpdate(message);
    }

    private void processOrderUpdate(String status) {
        // 实现订单更新处理逻辑
        System.out.println("Processing order status update: " + status);
    }
}

步骤5: 测试消息顺序

你可以通过编写一个简单的测试来发送多个消息,并观察消费者是否按顺序接收它们。

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

@Component
public class OrderStatusTestRunner implements CommandLineRunner {

    @Autowired
    private OrderStatusProducer producer;

    @Override
    public void run(String... args) throws Exception {
        producer.sendOrderStatusUpdate("Order Created", "OrderId123");
        producer.sendOrderStatusUpdate("Payment Received", "OrderId123");
        producer.sendOrderStatusUpdate("Shipped", "OrderId123");
        producer.sendOrderStatusUpdate("Delivered", "OrderId123");
    }
}

通过这个设置,RocketMQ 和 Spring Boot 能够保证同一个订单的不同状态更新是按照发送顺序被处理的。这对于需要顺序一致性的业务逻辑是非常重要的。

以上就是RocketMQ保证消息的有序性的案例分享的详细内容,更多关于RocketMQ消息有序性的资料请关注脚本之家其它相关文章!

相关文章

  • Maven使用Nexus创建私服的实现

    Maven使用Nexus创建私服的实现

    本文主要介绍了Maven使用Nexus创建私服的实现,通过建立自己的私服,就可以降低中央仓库负荷、节省外网带宽、加速Maven构建、自己部署构件等,从而高效地使用Maven,感兴趣的可以了解一下
    2024-04-04
  • java中request对象各种方法的使用实例分析

    java中request对象各种方法的使用实例分析

    这篇文章主要介绍了java中request对象各种方法的使用,结合完整实例形式较为详细的分析了request对象的功能及其常用方法的使用技巧,需要的朋友可以参考下
    2015-12-12
  • 详解如何判断Java线程池任务已执行完

    详解如何判断Java线程池任务已执行完

    线程池的使用并不复杂,麻烦的是如何判断线程池中的任务已经全部执行完了,所以接下来,我们就来看看如何判断线程中的任务是否已经全部执行完吧
    2023-08-08
  • Java分页工具类及其使用(示例分享)

    Java分页工具类及其使用(示例分享)

    本篇文章主要分享了Java分页工具类及其使用的示例代码,具有一定的参考价值,下面跟着小编一起来看下吧
    2017-01-01
  • java实现解析json复杂数据的方法详解

    java实现解析json复杂数据的方法详解

    这篇文章主要为大家详细介绍了java如何实现解析json复杂数据,文中的示例代码讲解详细,具有一定的借鉴价值,感兴趣的小伙伴可以学习一下
    2024-01-01
  • SpringMVC前端和后端数据交互总结

    SpringMVC前端和后端数据交互总结

    本篇文章主要介绍了SpringMVC前端和后端数据交互总结,具有一定的参考价值,感兴趣的小伙伴们可以参考一下。
    2017-03-03
  • Mybatis 复杂对象resultMap的使用

    Mybatis 复杂对象resultMap的使用

    这篇文章主要介绍了Mybatis 复杂对象resultMap的使用,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-08-08
  • Vue中对象赋值问题:对象引用被保留仅部分属性被覆盖的解决方案

    Vue中对象赋值问题:对象引用被保留仅部分属性被覆盖的解决方案

    在 Vue 中,当你直接给一个响应式对象(如 reactive 或 ref 包装的对象)赋新值时,如果直接使用 = 赋值,可能会遇到 对象引用被保留,仅部分属性被覆盖 的问题,本文给大家介绍了Vue中对象赋值问题:对象引用被保留仅部分属性被覆盖的解决方案,需要的朋友可以参考下
    2025-07-07
  • Spring Boot Async异步执行任务过程详解

    Spring Boot Async异步执行任务过程详解

    这篇文章主要介绍了Spring Boot Async异步执行任务过程详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-08-08
  • Mybatis Mapper中多参数方法不使用@param注解报错的解决

    Mybatis Mapper中多参数方法不使用@param注解报错的解决

    这篇文章主要介绍了Mybatis Mapper中多参数方法不使用@param注解报错的解决方案,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教。
    2022-01-01

最新评论