RabbitMQ消费端单线程与多线程案例讲解

 更新时间:2025年07月26日 09:30:25   作者:你我约定有三  
文章解析RabbitMQ消费端单线程与多线程处理机制,说明concurrency控制消费者数量,max-concurrency控制最大线程数,prefetch影响消息预取量,强调线程池使用会导致顺序混乱,适用于对顺序无要求的批量处理场景,感兴趣的朋友一起看看吧

🌟 一、基础概念

模型消费者数量每个消费者内部线程数顺序性场景说明
单消费者单线程11✅ 保序处理逻辑简单,保证顺序的常见场景
单消费者多线程1>1❌ 不保序提升处理能力,放弃顺序要求
多消费者单线程>11❌ 不保序多个队列/分区消费,提升并发
多消费者多线程>1>1❌ 不保序高并发场景下批量处理,放弃顺序
concurrency# 初始消费者线程数
max-concurrency# 最大消费者线程数
prefetch# 每个消费者预取的消息数
  • concurrency: 2
    • 表示初始创建的消费者线程数量
    • 系统启动时会立即创建 2 个消费者线程
    • 这些线程会持续监听消息队列
  • max-concurrency: 2
    • 表示允许的最大消费者线程数量
    • 这里设置为 2(与 concurrency 相同),表示线程数不会动态扩展
    • 如果设置 max-concurrency > concurrency,系统会在负载高时动态增加消费者

详细解释:

       concurrency和max-concurrency不会影响每个消费者是否是多线程执行,只会导致有多个消费者线程,只有用线程池才会导致每个消费者多线程消费

        而没有用线程池,也设置prefetch是因为消息被大量预取,单线程处理不过来时堆积等待,单线程并不会影响消息的顺序性,只有使用了线程池才会影响

        使用了线程池一定会导致消息顺序性问题这与设不设置prefetch无关,因为使用线程池后,任务交个线程池就返回了属于异步

举个例子:

                1. RabbitMQ 给消费者推送消息1,消费者收到,提交给线程池任务A(耗时长)。

                2. 消费者马上ACK消息1(因为业务交给线程池了,自己处理完毕的感觉

                3.  RabbitMQ 再给消费者推送消息2,消费者收到,提交给线程池任务B(耗时短)。

                4. R线程池调度先跑完任务B,后跑任务A。

✅ 单消费者 + 单线程消费

  • 保证顺序:消费者内部串行执行。
  • 配置关键
spring:
  rabbitmq:
    listener:
      simple:
        concurrency: 1
        max-concurrency: 1
        prefetch: 1

消费者代码

@Component
public class MultiConsumerSingleThread {
    @RabbitListener(queues = "order_queue", concurrency = "2")
    public void receive(String message) {
        System.out.println("🧾 [线程:" + Thread.currentThread().getName() + "] 收到消息:" + message);
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

❌ 单消费者 + 多线程消费

  • 不保顺序:一个消费者使用线程池异步处理消息。
  • 配置关键:默认配置 + 手动异步处理
  • 消费者代码
@Component
public class MultiThreadConsumer {
    private final ExecutorService executor = Executors.newFixedThreadPool(5);
    @RabbitListener(queues = "order_queue")
    public void receive(String message) {
        executor.submit(() -> {
            System.out.println("🧾 [线程:" + Thread.currentThread().getName() + "] 收到消息:" + message);
            try {
                Thread.sleep(500); // 模拟耗时
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    }
}

说明:消息提交到线程池,先到的不一定先处理完成,顺序可能乱。

❌ 多消费者 + 单线程消费

  • 不保顺序:多个消费者实例轮询分配消息,各自顺序保留,但整体顺序错乱。
  • 配置关键
spring:
  rabbitmq:
    listener:
      simple:
        concurrency: 2
        max-concurrency: 2
        prefetch: 1

消费者代码(共享类,也可拆成多个类模拟多实例)

@Component
public class MultiConsumerSingleThread {
    //concurrency = "2":它和配置文件中的 concurrency: 2 作用一致,但优先级更高。
    @RabbitListener(queues = "order_queue", concurrency = "2")
    public void receive(String message) {
        System.out.println("🧾 [线程:" + Thread.currentThread().getName() + "] 收到消息:" + message);
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

❌ 多消费者 + 多线程消费

  • 不保顺序:每个消费者又使用线程池异步处理消息,最大吞吐量模式。
  • 适合场景:数据导入、日志收集、发送通知等对顺序无要求的批量处理。
  • 配置关键
spring:
  rabbitmq:
    listener:
      simple:
        concurrency: 3
        max-concurrency: 3
        prefetch: 10

消费者代码

@Component
public class MultiConsumerMultiThread {
    private final ExecutorService executor = Executors.newFixedThreadPool(10);
    @RabbitListener(queues = "order_queue", concurrency = "3")
    public void receive(String message) {
        executor.submit(() -> {
            System.out.println("🧾 [线程:" + Thread.currentThread().getName() + "] 收到消息:" + message);
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    }
}

🧠 补充说明

  • concurrency: 控制并发消费者数量,等于消费者数。
  • prefetch: 控制每个消费者本地最多拉取多少条消息(如 1 表示严格串行处理)。
  • 每个 @RabbitListener 本质上是一个容器,可以通过 concurrency 配置“实例个数”。

到此这篇关于RabbitMQ消费端单线程与多线程的文章就介绍到这了,更多相关RabbitMQ单线程与多线程内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • SpringBoot中使用@Async注解失效场景及说明

    SpringBoot中使用@Async注解失效场景及说明

    在Spring Boot中,@Async注解就像一把刀,能帮你轻松处理那些耗时的任务,让主线程可以继续忙别的事儿,不过,跟所有强大的工具一样,用不好它也可能出岔子,为了避免这些坑,咱们得深入了解下@Async注解,接下来,咱们就来聊聊7种常见的@Async失效情况,需要的朋友可以参考下
    2024-07-07
  • Java对int[]数组做新增删除去重操作代码

    Java对int[]数组做新增删除去重操作代码

    这篇文章主要介绍了Java里面对int[]数组做新增删除去重实现,这里记录下使用int[]数组对数组进行新增删除去重等操作,用来更加了解java里面的集合类思想,需要的朋友可以参考下
    2023-10-10
  • 详解Java List的扩容机制原理及应用

    详解Java List的扩容机制原理及应用

    在Java中,List是一种非常常用的数据结构,用于存储有序的元素集合,本文将分析Java List的扩容机制原理,并通过示例代码和测试代码来加强阐述内容,具有一定的参考价值,感兴趣的可以了解一下
    2023-08-08
  • 使用Java实现简单搭建内网穿透

    使用Java实现简单搭建内网穿透

    内网穿透是一种网络技术,适用于需要远程访问本地部署服务的场景,本文主要为大家介绍了如何使用Java实现简单搭建内网穿透,感兴趣的可以了解下
    2024-02-02
  • Java遍历字符串和统计字符个数的操作方法

    Java遍历字符串和统计字符个数的操作方法

    这篇文章主要介绍了Java遍历字符串和统计字符个数的操作方法,本文通过实例代码给大家介绍的非常详细,感兴趣的朋友跟随小编一起看看吧
    2024-12-12
  • java 多线程Thread与runnable的区别

    java 多线程Thread与runnable的区别

    这篇文章主要介绍了java 多线程Thread与runnable的区别的相关资料,java线程有两种方法继承thread类与实现runnable接口,下面就提供实例帮助大家理解,需要的朋友可以参考下
    2017-08-08
  • scala中常用特殊符号详解

    scala中常用特殊符号详解

    这篇文章主要介绍了scala中常用特殊符号详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2019-06-06
  • MyBatis动态创建表的实例代码

    MyBatis动态创建表的实例代码

    在项目需求中,我们经常会遇到动态操作数据表的需求,常见的我们会把日志、设备实时位置信息等存入数据表,并且以一定时间段生成一个表来存储。接下来通过本文给大家介绍MyBatis动态创建表的方法,感兴趣的朋友一起看看吧
    2018-07-07
  • 解决 java.lang.NoSuchMethodError的错误

    解决 java.lang.NoSuchMethodError的错误

    这篇文章主要介绍了解决 java.lang.NoSuchMethodError的错误的相关资料,需要的朋友可以参考下
    2017-06-06
  • springMVC发送邮件的简单实现

    springMVC发送邮件的简单实现

    本篇文章主要介绍了springMVC发送邮件的简单实现 ,主要是利用利用javax.mail发送邮件,图片与附件都可发送,有兴趣的可以了解一下
    2017-04-04

最新评论