SpringBoot集成Redis消息队列的实现示例

 更新时间:2025年05月16日 11:08:47   作者:sjsjsbbsbsn  
本文主要介绍了SpringBoot集成Redis消息队列的实现示例,包括配置和消费逻辑,RedisStream提供了高吞吐量、顺序消费和消费组机制等优势,具有一定的参考价值,感兴趣的可以了解一下

一.Redis Stream 消息队列模版配置类

/**
 * Redis Stream 消息队列配置
 */
@Configuration
@RequiredArgsConstructor
public class RedisStreamConfiguration {

    private static final Logger log = LoggerFactory.getLogger(RedisStreamConfiguration.class);
    private final RedisConnectionFactory redisConnectionFactory;
    private final Consumer1 Consumer1;
    private final Consumer2 Consumer2;

    // 定义需要自定义的配置常量
    private static final int BATCH_SIZE = 10; // 每次批量拉取的消息数量
    private static final Duration POLL_TIMEOUT = Duration.ofSeconds(3); // 拉取消息的阻塞超时时间
    private static final String THREAD_NAME_PREFIX = "your-business"; // 线程名称前缀
    private static final String GROUP_NAME_1 = "group1"; // 第一个消费者组名称
    private static final String GROUP_NAME_2 = "group2"; // 第二个消费者组名称
    private static final String CONSUMER_NAME_1 = "consumer1"; // 第一个消费者名称
    private static final String CONSUMER_NAME_2 = "consumer2"; // 第二个消费者名称
    private static final String STREAM_TOPIC_KEY = SHORT_LINK_STATS_STREAM_TOPIC_KEY; // Stream的主题键

    @Bean
    public ExecutorService asyncStreamConsumer() {
        log.info("Redis Stream 消息队列配置线程池");
        AtomicInteger index = new AtomicInteger();
        int processors = Runtime.getRuntime().availableProcessors();

        // 创建一个自定义线程池
        return new ThreadPoolExecutor(
                processors,
                processors + (processors >> 1),
                60,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(),
                runnable -> {
                    Thread thread = new Thread(runnable);
                    thread.setName(THREAD_NAME_PREFIX + "_" + index.incrementAndGet());
                    thread.setDaemon(true);
                    return thread;
                }
        );
    }

    @Bean(initMethod = "start", destroyMethod = "stop")
    public StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer(
            ExecutorService asyncStreamConsumer) {

        // 配置 StreamMessageListenerContainer 容器选项
        StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options =
                StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
                        .batchSize(BATCH_SIZE) // 批量拉取消息数量
                        .executor(asyncStreamConsumer) // 使用配置好的线程池
                        .pollTimeout(POLL_TIMEOUT) // 拉取消息的超时时间
                        .build();

        // 创建 StreamMessageListenerContainer 实例
        StreamMessageListenerContainer<String, MapRecord<String, String, String>> container =
                StreamMessageListenerContainer.create(redisConnectionFactory, options);

        // 配置第一个消息监听器
        container.receiveAutoAck(
                Consumer.from(GROUP_NAME_1, CONSUMER_NAME_1), // 指定第一个消费者组和消费者名称
                StreamOffset.create(STREAM_TOPIC_KEY, ReadOffset.lastConsumed()), // 指定主题和偏移量
                Consumer1 // 指定第一个消息处理逻辑
        );

        // 配置第二个消息监听器
        container.receiveAutoAck(
                Consumer.from(GROUP_NAME_2, CONSUMER_NAME_2), // 指定第二个消费者组和消费者名称
                StreamOffset.create(STREAM_TOPIC_KEY, ReadOffset.lastConsumed()), // 指定主题和偏移量
                Consumer2 // 指定第二个消息处理逻辑
        );

        return container;
    }
}

1. 介绍

RedisStreamConfiguration 是一个用于配置 Redis Stream 消息队列的 Spring 配置类。它通过 Redis Stream 实现消息的异步处理和多消费者消费,适用于需要高吞吐量、低延迟的业务场景。

2. 关键组件和自定义参数

此类主要配置了 Redis Stream 消息监听容器 StreamMessageListenerContainer,包括线程池配置、消费批次和超时时间等,方便用户根据业务需求自定义。

核心参数

  • BATCH_SIZE:定义每次批量拉取的消息数量。通过设定合适的批量大小,可以减少消费请求次数,提升处理效率。
  • POLL_TIMEOUT:设置从 Redis Stream 拉取消息的超时时间。超时控制允许程序在无消息时保持阻塞,等待消息到达。
  • THREAD_NAME_PREFIX:设置线程名称前缀,帮助识别不同业务模块的线程。
  • GROUP_NAME_1 和 GROUP_NAME_2:定义两个不同的消费者组,适用于同一 Stream 多个消费者并行处理消息的场景。
  • CONSUMER_NAME_1 和 CONSUMER_NAME_2:为每个消费者组指定独立的消费者名称,有助于实现消费任务的分配和管理。

代码实现

配置了 StreamMessageListenerContainer 来处理 Stream 消息,并分别为两个消费者组和消费者注册不同的监听器。

3. 主要方法说明

ExecutorService(线程池配置)

@Bean
public ExecutorService asyncStreamConsumer() { ... }

用于创建一个自定义线程池,为 Redis Stream 的消息消费提供异步执行环境。processors 设置了核心线程数为 CPU 核心数,最大线程数为 processors + (processors >> 1),即核心数的 1.5 倍。线程命名使用 THREAD_NAME_PREFIX 前缀,方便日志记录和排查问题。

StreamMessageListenerContainer(消息监听容器)

@Bean(initMethod = "start", destroyMethod = "stop")
public StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer(...) { ... }

该方法创建并配置了 Redis Stream 的监听容器。关键步骤如下:

  • 构建容器选项:包括批次大小、线程池、拉取超时时间等参数。

  • 容器实例化:通过 StreamMessageListenerContainer.create() 创建容器,初始化时自动启动。

  • 消息监听器配置

    • 为第一个消费者组 GROUP_NAME_1 和消费者 CONSUMER_NAME_1 配置了消息监听器 Consumer1,实现自动确认并消费消息。
    • 为第二个消费者组 GROUP_NAME_2 和消费者 CONSUMER_NAME_2 配置了另一组消息监听器 Consumer2,以便多消费者处理。

4. 应用场景

此配置适用于 Redis Stream 在大规模并发场景下的消息队列管理。通过灵活配置多个消费者组和消费者,可以实现负载均衡的多线程消费逻辑。

二.消费者模版

/**
 * 消息队列消费者
 */
@RequiredArgsConstructor
@Slf4j
@Component
public class ShortLinkStatsSaveConsumer implements StreamListener<String, MapRecord<String, String, String>> {
   
    private final RedissonClient redissonClient;
    private final StringRedisTemplate stringRedisTemplate;
    private final MessageQueueIdempotentHandler messageQueueIdempotentHandler;


    @Override
    public void onMessage(MapRecord<String, String, String> message) {
        String stream = message.getStream();
        RecordId id = message.getId();
        if (!messageQueueIdempotentHandler.isMessageProcessed(id.toString())) {
            // 判断当前的这个消息流程是否执行完成
            if (messageQueueIdempotentHandler.isAccomplish(id.toString())) {
                return;
            }
            throw new ServiceException("消息未完成流程,需要消息队列重试");
        }
        try {
            Map<String, String> producerMap = message.getValue();
         	//你自己的业务逻辑
            }
            // 删除消息
            stringRedisTemplate.opsForStream().delete(Objects.requireNonNull(stream), id.getValue());
        } catch (Throwable ex) {
            messageQueueIdempotentHandler.delMessageProcessed(id.toString());
            log.error("消费异常", ex);
            throw ex;
        }
        //消费完删除
        messageQueueIdempotentHandler.setAccomplish(id.toString());
    }

   
}

本模板实现了一个 Redis Stream 消息队列消费者的基础结构。该模板主要围绕幂等性检查、消息解析与处理以及消费状态管理三个核心功能,确保消息在高并发环境下的安全性与一致性。 

具体的幂等校验看我另一篇文章

三.生产者模版

/**
 * 短链接监控状态保存消息队列生产者
 */
@Component
@RequiredArgsConstructor
public class Producer implements MessageQueueProducer{

    private final StringRedisTemplate stringRedisTemplate;


    /**
     * 发送消息
     */
    public void send(Map<String, String> producerMap) {
        stringRedisTemplate.opsForStream().add(YOUR_KEY, producerMap);
    }
}

注意YOUR_KEY 替换成你自己的即可

四.总结

1. Redis Stream 消息队列的优势

Redis Stream 是 Redis 提供的一种强大的消息队列解决方案,适用于高吞吐量、低延迟的业务场景。与传统的消息队列系统(如 RabbitMQ 或 Kafka)相比,Redis Stream 在集成与配置方面更加简单,尤其适合基于 Redis 的应用程序。Redis Stream 提供了以下优势:

  • 高吞吐量:支持高并发和快速消息消费,能够在瞬间处理大量的消息。
  • 顺序消费:保证消息的顺序消费,适用于需要顺序处理的业务场景。
  • 消费组机制:通过消费组管理消息消费,可以通过多个消费者并行消费,提高处理能力。
  • 持久化与备份:可以将消息存储在 Redis 中,具备一定的持久化能力,防止数据丢失。

2. Redis Stream 配置与应用

本文介绍了如何在 Spring Boot 中集成 Redis Stream 消息队列的配置与消费逻辑,主要包括:

  • 消息消费配置:通过 StreamMessageListenerContainer 实现消息的异步消费。配置了批量拉取的数量、阻塞超时、线程池等自定义参数,帮助提升系统的并发处理能力。
  • 多消费者并行处理:通过消费者组(Consumer Group)机制,实现多消费者并行消费同一个 Stream,提高消息处理的吞吐量和效率。
  • 幂等性与消费确认:通过 MessageQueueIdempotentHandler 来保证消息的幂等性,避免重复消费的问题。处理逻辑保证每条消息只会被消费一次,且在消费失败时能够适当回滚,确保系统的可靠性。

3. 消费者与生产者模板

  • 消费者模板:消费者通过实现 StreamListener 接口来处理从 Redis Stream 拉取的消息。为了保证幂等性,消费者首先检查消息是否已经处理过,未完成的消息会被标记并重试,确保消息处理的安全性。
  • 生产者模板:生产者通过 StringRedisTemplate 将消息发送到 Redis Stream。当业务中有新的消息需要处理时,生产者将消息添加到 Redis Stream 进行后续处理。

4. 应用场景

Redis Stream 适用于许多场景,特别是需要高并发、高吞吐量且保证顺序消费的业务需求。例如,短链接生成与访问统计、订单处理、日志收集等业务场景,都能通过 Redis Stream 实现高效、可靠的消息队列。

到此这篇关于SpringBoot集成Redis消息队列的实现示例的文章就介绍到这了,更多相关SpringBoot Redis消息队列内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • 详解Java如何判断ResultSet结果集是否为空

    详解Java如何判断ResultSet结果集是否为空

    ResultSet 表示 select 语句的查询结果集。这篇文章主要为大家详细介绍了Java如何判断ResultSet结果集是否为空,感兴趣的可以了解一下
    2023-02-02
  • IDEA关于.properties资源文件的编码调整问题

    IDEA关于.properties资源文件的编码调整问题

    这篇文章主要介绍了IDEA关于.properties资源文件的编码调整问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2024-06-06
  • Java正则替换手机号代码实例

    Java正则替换手机号代码实例

    本文的主要内容是Java语言中正则表达式替换手机号的第4到第7位,实现方法十分简单,同时涉及了一些正则表达式的相关用法,需要的朋友可以参考下。
    2017-09-09
  • Java实现ArrayList排序的方法详解

    Java实现ArrayList排序的方法详解

    Java中常见的ArrayList排序方法主要为三种:JDK8的stream、Comparator#compare()和Comparable#compareTo(),本文将详解这三者的使用,需要的可以参考一下
    2022-05-05
  • Java中toString函数的使用示例代码

    Java中toString函数的使用示例代码

    toString()函数用于将当前对象以字符串的形式返回,比如我定义了一个User类,创建了一个user对象,然后使用相应命令去打印user对象,本文结合示例代码介绍了toString函数的使用,需要的朋友可以参考下
    2024-02-02
  • Mybatis-Plus 官方神器发布

    Mybatis-Plus 官方神器发布

    mybatis-mate 为 mp 企业级模块,支持分库分表,数据审计、数据敏感词过滤(AC算法),字段加密,字典回写(数据绑定),数据权限,表结构自动生成 SQL 维护等,旨在更敏捷优雅处理数据,今天介绍一个 MyBatis - Plus 官方发布的神器,感兴趣的朋友一起看看吧
    2021-11-11
  • JPA @Query时,无法使用limit函数的问题及解决

    JPA @Query时,无法使用limit函数的问题及解决

    这篇文章主要介绍了JPA @Query时,无法使用limit函数的问题及解决方案,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-03-03
  • Java图文并茂详解NIO与零拷贝

    Java图文并茂详解NIO与零拷贝

    零拷贝是网络编程的关键,很多性能优化都离不开。在 Java 程序中,常用的零拷贝有 mmap(memory map,内存映射) 和 sendFile。那么它们在 OS(操作系统) 中,到底是怎么样的一个的设计?另外我们看下NIO 中如何使用零拷贝
    2022-11-11
  • 解决idea启动报错javax.imageio.IIOException的问题

    解决idea启动报错javax.imageio.IIOException的问题

    这篇文章主要介绍了idea启动报错javax.imageio.IIOException,解决打不开idea问题,本文通过图文并茂的形式给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-09-09
  • Spring cloud config 配置文件加密方式

    Spring cloud config 配置文件加密方式

    这篇文章给大家介绍了Spring cloud config 配置文件加密方式,非常不错,具有一定的参考借鉴价值,感兴趣的朋友跟随脚步之家小编一起学习吧
    2018-05-05

最新评论