SpringBoot集成Redis消息队列的实现示例

 更新时间:2025年05月16日 11:08:47   作者:sjsjsbbsbsn  
本文主要介绍了SpringBoot集成Redis消息队列的实现示例,包括配置和消费逻辑,RedisStream提供了高吞吐量、顺序消费和消费组机制等优势,具有一定的参考价值,感兴趣的可以了解一下

一.Redis Stream 消息队列模版配置类

/**
 * Redis Stream 消息队列配置
 */
@Configuration
@RequiredArgsConstructor
public class RedisStreamConfiguration {

    private static final Logger log = LoggerFactory.getLogger(RedisStreamConfiguration.class);
    private final RedisConnectionFactory redisConnectionFactory;
    private final Consumer1 Consumer1;
    private final Consumer2 Consumer2;

    // 定义需要自定义的配置常量
    private static final int BATCH_SIZE = 10; // 每次批量拉取的消息数量
    private static final Duration POLL_TIMEOUT = Duration.ofSeconds(3); // 拉取消息的阻塞超时时间
    private static final String THREAD_NAME_PREFIX = "your-business"; // 线程名称前缀
    private static final String GROUP_NAME_1 = "group1"; // 第一个消费者组名称
    private static final String GROUP_NAME_2 = "group2"; // 第二个消费者组名称
    private static final String CONSUMER_NAME_1 = "consumer1"; // 第一个消费者名称
    private static final String CONSUMER_NAME_2 = "consumer2"; // 第二个消费者名称
    private static final String STREAM_TOPIC_KEY = SHORT_LINK_STATS_STREAM_TOPIC_KEY; // Stream的主题键

    @Bean
    public ExecutorService asyncStreamConsumer() {
        log.info("Redis Stream 消息队列配置线程池");
        AtomicInteger index = new AtomicInteger();
        int processors = Runtime.getRuntime().availableProcessors();

        // 创建一个自定义线程池
        return new ThreadPoolExecutor(
                processors,
                processors + (processors >> 1),
                60,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(),
                runnable -> {
                    Thread thread = new Thread(runnable);
                    thread.setName(THREAD_NAME_PREFIX + "_" + index.incrementAndGet());
                    thread.setDaemon(true);
                    return thread;
                }
        );
    }

    @Bean(initMethod = "start", destroyMethod = "stop")
    public StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer(
            ExecutorService asyncStreamConsumer) {

        // 配置 StreamMessageListenerContainer 容器选项
        StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options =
                StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
                        .batchSize(BATCH_SIZE) // 批量拉取消息数量
                        .executor(asyncStreamConsumer) // 使用配置好的线程池
                        .pollTimeout(POLL_TIMEOUT) // 拉取消息的超时时间
                        .build();

        // 创建 StreamMessageListenerContainer 实例
        StreamMessageListenerContainer<String, MapRecord<String, String, String>> container =
                StreamMessageListenerContainer.create(redisConnectionFactory, options);

        // 配置第一个消息监听器
        container.receiveAutoAck(
                Consumer.from(GROUP_NAME_1, CONSUMER_NAME_1), // 指定第一个消费者组和消费者名称
                StreamOffset.create(STREAM_TOPIC_KEY, ReadOffset.lastConsumed()), // 指定主题和偏移量
                Consumer1 // 指定第一个消息处理逻辑
        );

        // 配置第二个消息监听器
        container.receiveAutoAck(
                Consumer.from(GROUP_NAME_2, CONSUMER_NAME_2), // 指定第二个消费者组和消费者名称
                StreamOffset.create(STREAM_TOPIC_KEY, ReadOffset.lastConsumed()), // 指定主题和偏移量
                Consumer2 // 指定第二个消息处理逻辑
        );

        return container;
    }
}

1. 介绍

RedisStreamConfiguration 是一个用于配置 Redis Stream 消息队列的 Spring 配置类。它通过 Redis Stream 实现消息的异步处理和多消费者消费,适用于需要高吞吐量、低延迟的业务场景。

2. 关键组件和自定义参数

此类主要配置了 Redis Stream 消息监听容器 StreamMessageListenerContainer,包括线程池配置、消费批次和超时时间等,方便用户根据业务需求自定义。

核心参数

  • BATCH_SIZE:定义每次批量拉取的消息数量。通过设定合适的批量大小,可以减少消费请求次数,提升处理效率。
  • POLL_TIMEOUT:设置从 Redis Stream 拉取消息的超时时间。超时控制允许程序在无消息时保持阻塞,等待消息到达。
  • THREAD_NAME_PREFIX:设置线程名称前缀,帮助识别不同业务模块的线程。
  • GROUP_NAME_1 和 GROUP_NAME_2:定义两个不同的消费者组,适用于同一 Stream 多个消费者并行处理消息的场景。
  • CONSUMER_NAME_1 和 CONSUMER_NAME_2:为每个消费者组指定独立的消费者名称,有助于实现消费任务的分配和管理。

代码实现

配置了 StreamMessageListenerContainer 来处理 Stream 消息,并分别为两个消费者组和消费者注册不同的监听器。

3. 主要方法说明

ExecutorService(线程池配置)

@Bean
public ExecutorService asyncStreamConsumer() { ... }

用于创建一个自定义线程池,为 Redis Stream 的消息消费提供异步执行环境。processors 设置了核心线程数为 CPU 核心数,最大线程数为 processors + (processors >> 1),即核心数的 1.5 倍。线程命名使用 THREAD_NAME_PREFIX 前缀,方便日志记录和排查问题。

StreamMessageListenerContainer(消息监听容器)

@Bean(initMethod = "start", destroyMethod = "stop")
public StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer(...) { ... }

该方法创建并配置了 Redis Stream 的监听容器。关键步骤如下:

  • 构建容器选项:包括批次大小、线程池、拉取超时时间等参数。

  • 容器实例化:通过 StreamMessageListenerContainer.create() 创建容器,初始化时自动启动。

  • 消息监听器配置

    • 为第一个消费者组 GROUP_NAME_1 和消费者 CONSUMER_NAME_1 配置了消息监听器 Consumer1,实现自动确认并消费消息。
    • 为第二个消费者组 GROUP_NAME_2 和消费者 CONSUMER_NAME_2 配置了另一组消息监听器 Consumer2,以便多消费者处理。

4. 应用场景

此配置适用于 Redis Stream 在大规模并发场景下的消息队列管理。通过灵活配置多个消费者组和消费者,可以实现负载均衡的多线程消费逻辑。

二.消费者模版

/**
 * 消息队列消费者
 */
@RequiredArgsConstructor
@Slf4j
@Component
public class ShortLinkStatsSaveConsumer implements StreamListener<String, MapRecord<String, String, String>> {
   
    private final RedissonClient redissonClient;
    private final StringRedisTemplate stringRedisTemplate;
    private final MessageQueueIdempotentHandler messageQueueIdempotentHandler;


    @Override
    public void onMessage(MapRecord<String, String, String> message) {
        String stream = message.getStream();
        RecordId id = message.getId();
        if (!messageQueueIdempotentHandler.isMessageProcessed(id.toString())) {
            // 判断当前的这个消息流程是否执行完成
            if (messageQueueIdempotentHandler.isAccomplish(id.toString())) {
                return;
            }
            throw new ServiceException("消息未完成流程,需要消息队列重试");
        }
        try {
            Map<String, String> producerMap = message.getValue();
         	//你自己的业务逻辑
            }
            // 删除消息
            stringRedisTemplate.opsForStream().delete(Objects.requireNonNull(stream), id.getValue());
        } catch (Throwable ex) {
            messageQueueIdempotentHandler.delMessageProcessed(id.toString());
            log.error("消费异常", ex);
            throw ex;
        }
        //消费完删除
        messageQueueIdempotentHandler.setAccomplish(id.toString());
    }

   
}

本模板实现了一个 Redis Stream 消息队列消费者的基础结构。该模板主要围绕幂等性检查、消息解析与处理以及消费状态管理三个核心功能,确保消息在高并发环境下的安全性与一致性。 

具体的幂等校验看我另一篇文章

三.生产者模版

/**
 * 短链接监控状态保存消息队列生产者
 */
@Component
@RequiredArgsConstructor
public class Producer implements MessageQueueProducer{

    private final StringRedisTemplate stringRedisTemplate;


    /**
     * 发送消息
     */
    public void send(Map<String, String> producerMap) {
        stringRedisTemplate.opsForStream().add(YOUR_KEY, producerMap);
    }
}

注意YOUR_KEY 替换成你自己的即可

四.总结

1. Redis Stream 消息队列的优势

Redis Stream 是 Redis 提供的一种强大的消息队列解决方案,适用于高吞吐量、低延迟的业务场景。与传统的消息队列系统(如 RabbitMQ 或 Kafka)相比,Redis Stream 在集成与配置方面更加简单,尤其适合基于 Redis 的应用程序。Redis Stream 提供了以下优势:

  • 高吞吐量:支持高并发和快速消息消费,能够在瞬间处理大量的消息。
  • 顺序消费:保证消息的顺序消费,适用于需要顺序处理的业务场景。
  • 消费组机制:通过消费组管理消息消费,可以通过多个消费者并行消费,提高处理能力。
  • 持久化与备份:可以将消息存储在 Redis 中,具备一定的持久化能力,防止数据丢失。

2. Redis Stream 配置与应用

本文介绍了如何在 Spring Boot 中集成 Redis Stream 消息队列的配置与消费逻辑,主要包括:

  • 消息消费配置:通过 StreamMessageListenerContainer 实现消息的异步消费。配置了批量拉取的数量、阻塞超时、线程池等自定义参数,帮助提升系统的并发处理能力。
  • 多消费者并行处理:通过消费者组(Consumer Group)机制,实现多消费者并行消费同一个 Stream,提高消息处理的吞吐量和效率。
  • 幂等性与消费确认:通过 MessageQueueIdempotentHandler 来保证消息的幂等性,避免重复消费的问题。处理逻辑保证每条消息只会被消费一次,且在消费失败时能够适当回滚,确保系统的可靠性。

3. 消费者与生产者模板

  • 消费者模板:消费者通过实现 StreamListener 接口来处理从 Redis Stream 拉取的消息。为了保证幂等性,消费者首先检查消息是否已经处理过,未完成的消息会被标记并重试,确保消息处理的安全性。
  • 生产者模板:生产者通过 StringRedisTemplate 将消息发送到 Redis Stream。当业务中有新的消息需要处理时,生产者将消息添加到 Redis Stream 进行后续处理。

4. 应用场景

Redis Stream 适用于许多场景,特别是需要高并发、高吞吐量且保证顺序消费的业务需求。例如,短链接生成与访问统计、订单处理、日志收集等业务场景,都能通过 Redis Stream 实现高效、可靠的消息队列。

到此这篇关于SpringBoot集成Redis消息队列的实现示例的文章就介绍到这了,更多相关SpringBoot Redis消息队列内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Java实现窗体程序显示日历表

    Java实现窗体程序显示日历表

    这篇文章主要为大家详细介绍了Java实现窗体程序显示日历表,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2022-06-06
  • java(Java 25 LTS)的下载、安装、配置图文教程 (IDEA 2025 为例)

    java(Java 25 LTS)的下载、安装、配置图文教程 (IDEA 2025 为例)

    在Java开发中选择合适的JDK版本至关,重要这篇文章主要介绍了java(Java 25 LTS)的下载、安装、配置的相关资料,文中通过代码介绍的非常详细,需要的朋友可以参考下
    2025-11-11
  • PowerJob的DispatchStrategy方法工作流程源码解读

    PowerJob的DispatchStrategy方法工作流程源码解读

    这篇文章主要为大家介绍了PowerJob的DispatchStrategy方法工作流程源码解读,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2024-01-01
  • 在java上使用亚马逊云储存方法

    在java上使用亚马逊云储存方法

    这篇文章主要介绍了在java上使用亚马逊云储存方法,首先写一个配置类,写一个controller接口调用方法存储文件,本文结合示例代码给大家介绍的非常详细,需要的朋友参考下吧
    2024-01-01
  • 深入理解SpringMVC的参数绑定与数据响应机制

    深入理解SpringMVC的参数绑定与数据响应机制

    本文将深入探讨SpringMVC的参数绑定方式,包括基本类型、对象、集合等类型的绑定方式,以及如何处理参数校验和异常。同时,本文还将介绍SpringMVC的数据响应机制,包括如何返回JSON、XML等格式的数据,以及如何处理文件上传和下载。
    2023-06-06
  • hadoop 切片机制分析与应用

    hadoop 切片机制分析与应用

    切片这个词对于做过python开发的同学一定不陌生,但是与hadoop中的切片有所区别,hadoop中的切片是为了优化hadoop的job在处理过程中MapTask阶段的性能达到最优而言
    2022-02-02
  • springboot使用RedisRepository操作数据的实现

    springboot使用RedisRepository操作数据的实现

    本文主要介绍了springboot使用RedisRepository操作数据的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2022-05-05
  • SpringBoot实现联表查询的代码详解

    SpringBoot实现联表查询的代码详解

    这篇文章主要介绍了SpringBoot中如何实现联表查询,文中通过代码示例和图文结合的方式讲解的非常详细,对大家的学习或工作有一定的帮助,需要的朋友可以参考下
    2024-05-05
  • 在Spring Boot中实现多环境配置的方法

    在Spring Boot中实现多环境配置的方法

    在SpringBoot中,实现多环境配置是一项重要且常用的功能,它允许开发者为不同的运行环境,这种方式简化了环境切换的复杂度,提高了项目的可维护性和灵活性,本文给大家介绍在Spring Boot中实现多环境配置的方法,感兴趣的朋友跟随小编一起看看吧
    2024-09-09
  • Java垃圾回收之标记压缩算法详解

    Java垃圾回收之标记压缩算法详解

    今天小编就为大家分享一篇关于Java垃圾回收之标记压缩算法详解,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看看吧
    2018-10-10

最新评论