基于Redis Streams的实时消息处理实战指南

 更新时间:2025年07月16日 09:12:39   作者:浅沫云归  
这篇文章主要为大家详细介绍了在生产环境中基于 Redis Streams 构建实时消息处理的完整经验,包括技术选型、核心代码示例、踩坑解决和优化方案,希望对大家有所帮助

业务场景描述

在我们公司的电商平台中,存在大量异步事件需要实时处理,例如用户下单、库存更新、支付回调等。这些事件对消息的可靠性、顺序性和高吞吐量有较高要求。传统的消息中间件(如Kafka、RabbitMQ)在运维成本或部署复杂度上存在一定挑战,在部分场景下难以满足“轻量、低延迟、易集成” 的需求。

经过调研和验证,Redis 6.0+ 提供的 Streams 特性在嵌入式部署、快速上手方面具有显著优势。本篇文章将分享我们在生产环境中基于 Redis Streams 构建实时消息处理的完整经验,包括技术选型、核心代码示例、踩坑解决和优化方案。

技术选型过程

  • 消息可靠性:Redis Streams 支持持久化,且提供 ACK 机制和 Pending List,能够有效追踪消费进度。
  • 顺序消费:同一消费者组内,可保证分片流(同一 key)中消息按写入顺序被串行消费。
  • 横向扩展:可通过 Stream 分片(多个 Stream Key)或消费者组内多实例并行消费提高吞吐。
  • 运营成本:Redis 已是团队基础设施,集群部署与监控成熟度高,二次成本低。
  • 客户端生态:Lettuce、Jedis、Redisson 等客户端均有支持,编码友好。

基于以上考量,最终选型 Redis Streams,落地于现有 Redis 集群,无需额外独立中间件部署。

实现方案详解

环境与依赖

Maven 依赖(以 Lettuce 客户端为例):

<dependencies>
    <dependency>
        <groupId>io.lettuce</groupId>
        <artifactId>lettuce-core</artifactId>
        <version>6.1.5.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>1.7.30</version>
    </dependency>
    <dependency>
        <groupId>ch.qos.logback</groupId>
        <artifactId>logback-classic</artifactId>
        <version>1.2.3</version>
    </dependency>
</dependencies>

SpringBoot 配置(application.yml):

spring:
  redis:
    host: redis-cluster-host
    port: 6379
    password: your_password
    timeout: 2000ms

流程设计

  • Producer 将事件写入 Stream:XADD
  • 多消费者(Consumer Group)并行读取:XREADGROUP
  • 消费确认:XACK
  • 异常消息追踪:Pending-List 与 XCLAIM 回补处理

生产者实现

import io.lettuce.core.RedisClient;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.sync.RedisCommands;
import java.util.HashMap;
import java.util.Map;

public class RedisStreamProducer {
    private RedisClient client;
    private StatefulRedisConnection<String, String> connection;
    private RedisCommands<String, String> commands;
    private static final String STREAM_KEY = "orderStream";

    public RedisStreamProducer(String uri) {
        client = RedisClient.create(uri);
        connection = client.connect();
        commands = connection.sync();
    }

    public String sendMessage(Map<String, String> message) {
        // XADD key * field value [field value ...]
        return commands.xadd(STREAM_KEY, message);
    }

    public void shutdown() {
        connection.close();
        client.shutdown();
    }

    public static void main(String[] args) {
        RedisStreamProducer producer = new RedisStreamProducer("redis://:your_password@redis-host:6379/0");
        Map<String, String> order = new HashMap<>();
        order.put("orderId", "123456");
        order.put("userId", "u7890");
        order.put("amount", "258.50");
        String messageId = producer.sendMessage(order);
        System.out.println("消息发送成功, ID=" + messageId);
        producer.shutdown();
    }
}

消费者实现

import io.lettuce.core.RedisClient;
import io.lettuce.core.StreamMessage;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.sync.RedisCommands;
import io.lettuce.core.models.stream.Consumer;
import io.lettuce.core.models.stream.PendingMessage;

import java.time.Duration;
import java.util.List;
import java.util.Map;

public class RedisStreamConsumer {
    private RedisClient client;
    private StatefulRedisConnection<String, String> connection;
    private RedisCommands<String, String> commands;

    private static final String STREAM_KEY = "orderStream";
    private static final String GROUP_NAME = "orderGroup";
    private static final String CONSUMER_NAME = "consumer-1";

    public RedisStreamConsumer(String uri) {
        client = RedisClient.create(uri);
        connection = client.connect();
        commands = connection.sync();
        // 创建消费者组, 如果已创建可 ignore
        try {
            commands.xgroupCreate(STREAM_KEY, GROUP_NAME, "$", true);
        } catch (Exception e) {
            // Group exists
        }
    }

    public void consume() {
        while (true) {
            // 从 Pending List 先处理未 ack 的消息
            List<PendingMessage> pending = commands.xpending(STREAM_KEY, GROUP_NAME, Range.unbounded(), Limit.from(10));
            for (PendingMessage pm : pending) {
                // 重新消费
                StreamMessage<String, String> msg = commands.xclaim(
                    STREAM_KEY,
                    GROUP_NAME,
                    CONSUMER_NAME,
                    5000,
                    pm.getId());
                process(msg.getBody());
                commands.xack(STREAM_KEY, GROUP_NAME, pm.getId());
            }

            // 正常读取新消息
            List<StreamMessage<String, String>> messages = commands.xreadgroup(
                Consumer.from(GROUP_NAME, CONSUMER_NAME),
                XReadArgs.StreamOffset.lastConsumed(STREAM_KEY));
            if (messages != null) {
                for (StreamMessage<String, String> msg : messages) {
                    process(msg.getBody());
                    commands.xack(STREAM_KEY, GROUP_NAME, msg.getId());
                }
            }

            // 轮询间隔
            try {
                Thread.sleep(200);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            }
        }
    }

    private void process(Map<String, String> body) {
        // 业务处理逻辑
        System.out.println("处理订单: " + body);
    }

    public void shutdown() {
        connection.close();
        client.shutdown();
    }

    public static void main(String[] args) {
        RedisStreamConsumer consumer = new RedisStreamConsumer("redis://:your_password@redis-host:6379/0");
        consumer.consume();
        consumer.shutdown();
    }
}

踩过的坑与解决方案

1.消息重复消费

  • 问题:消费者处理过程中抛出异常导致 ack 未发送,Pending List 中累积大量消息。
  • 解决:定期扫描 Pending List,并结合 XCLAIM 将“活跃但挂起”消息重新分配给健康消费者处理;同时在业务端做好幂等控制。

2.消息积压与内存压力

  • 问题:Stream 长度持续增长,Redis 实例内存压力上升。
  • 解决:使用 XTRIM MAXLEN ~ N 对流进行修剪,结合业务保留时间策略,定期分批清理历史消息。

3.消费者实例重启后状态丢失

  • 问题:未及时恢复 Pending List 中未处理消息,导致部分消息长时间滞留。
  • 解决:消费者启动时优先处理 Pending List,再进入正常消费流程;并通过定时任务对挂起较久的消息进行报警或二次补偿处理。

总结与最佳实践

  • Redis Streams 适合轻量级、低运维成本的实时消息场景,结合 ACK、Pending List 能保证高可靠性。
  • 采用消费者组(Consumer Group)可支持横向扩展,读写分离与顺序消费兼得。
  • 业务侧必须做好幂等设计,避免消息重复带来的副作用。
  • 对 Stream 进行合理修剪,避免数据无节制增长导致内存问题。
  • 建议结合监控告警,对 Pending List 长度、消费者积压情况进行实时监控。

到此这篇关于基于Redis Streams的实时消息处理实战指南的文章就介绍到这了,更多相关Redis Streams消息处理内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • 利用Supervisor管理Redis进程的方法教程

    利用Supervisor管理Redis进程的方法教程

    Supervisor 是可以在类 UNIX 系统中进行管理和监控各种进程的小型系统。它自带了客户端和服务端工具,下面这篇文章主要给大家介绍了关于利用Supervisor管理Redis进程的相关资料,需要的朋友可以参考借鉴,下面来一起看看吧。
    2017-08-08
  • Redis SCAN命令详解

    Redis SCAN命令详解

    SCAN 命令是一个基于游标的迭代器,每次被调用之后, 都会向用户返回一个新的游标, 用户在下次迭代时需要使用这个新游标作为 SCAN 命令的游标参数, 以此来延续之前的迭代过程,这篇文章给大家介绍了Redis SCAN命令的相关知识,感兴趣的朋友一起看看吧
    2022-07-07
  • Redis如何存储对象

    Redis如何存储对象

    这篇文章主要介绍了Redis如何存储对象,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-06-06
  • redis中bind配置的详细步骤

    redis中bind配置的详细步骤

    本文主要介绍了redis中bind配置的详细步骤,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2023-07-07
  • Redis数据结构SortedSet的底层原理解析

    Redis数据结构SortedSet的底层原理解析

    这篇文章主要介绍了Redis数据结构SortedSet的底层原理解析,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-07-07
  • 虚拟机下的Redis无法访问报错500解决方法

    虚拟机下的Redis无法访问报错500解决方法

    这篇文章主要介绍了虚拟机下的Redis无法访问,报错500解决方法,由于我的redis是在虚拟机下安装的,无法访问redis的原因是因为虚拟机的ip地址和主机不同,文中通过图文结合给出了详细的解决方法,需要的朋友可以参考下
    2024-02-02
  • Redis中6种缓存更新策略详解

    Redis中6种缓存更新策略详解

    Redis作为一款高性能的内存数据库,已经成为缓存层的首选解决方案,然而,使用缓存时最大的挑战在于保证缓存数据与底层数据源的一致性,本文将介绍Redis中6种缓存更新策略,需要的朋友可以参考下
    2025-05-05
  • Redis中五种数据类型简单操作

    Redis中五种数据类型简单操作

    这篇文章主要介绍了Redis中五种数据类型简单操作的相关资料,需要的朋友可以参考下
    2017-04-04
  • Redisson 加锁解锁的实现

    Redisson 加锁解锁的实现

    本文主要介绍了Redisson 加锁解锁的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2022-08-08
  • 控制Redis的hash的field中的过期时间

    控制Redis的hash的field中的过期时间

    这篇文章主要介绍了控制Redis的hash的field中的过期时间问题,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2023-01-01

最新评论