使用Redis实现轻量级消息队列

 更新时间:2025年11月14日 10:08:48   作者:moxiaoran5753  
使用消息中间件如RabbitMQ或kafka虽然好,但也给服务器带来很大的内存开销,当系统的业务量,并发量不高时,考虑到服务器和维护成本,可考虑使用Redis实现一个轻量级的消息队列,下面就来详细的介绍一下

使用消息中间件如RabbitMQ或kafka虽然好,但也给服务器带来很大的内存开销,当系统的业务量,并发量不高时,考虑到服务器和维护成本,可考虑使用Redis实现一个轻量级的消息队列,实现事件监听的效果。下面介绍下Redis实现消息队列的三种形式。

方式一  Redis Pub/Sub(适用于广播通知)

Redis Pub/Sub 适用于 实时消息推送,但不支持消息持久化,如果消费者掉线,消息会丢失。

(1) 发布消息(生产者)

import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;

@Service
public class OrderService {
    private final StringRedisTemplate redisTemplate;

    public OrderService(StringRedisTemplate redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    public void createOrder(Long orderId) {
        System.out.println("订单创建成功: " + orderId);

        // 发布消息
        redisTemplate.convertAndSend("order.channel", orderId.toString());
    }
}

(2) 订阅消息(消费者)

import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Service;

@Service
public class NotificationService implements MessageListener {

    @Override
    public void onMessage(Message message, byte[] pattern) {
        String orderId = message.toString();
        System.out.println("【通知服务】收到订单创建消息:" + orderId);
    }
}

(3) 注册 Redis 监听器

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;

@Configuration
public class RedisPubSubConfig {

    @Bean
    public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
                                                   MessageListenerAdapter listenerAdapter) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.addMessageListener(listenerAdapter, new PatternTopic("order.channel"));
        return container;
    }

    @Bean
    public MessageListenerAdapter listenerAdapter(NotificationService receiver) {
        return new MessageListenerAdapter(receiver, "onMessage");
    }
}

缺点

  • 无持久化,消费者掉线后无法重新获取消息。
  • 不支持消费组,多个消费者同时订阅时,所有都会收到消息(无法负载均衡)。

方式二:Redis List(适用于任务队列)

使用 Redis ListLPUSH + RPOP)可以实现简单的任务队列,适用于任务异步处理,但不支持回溯消费。

(1) 生产者(推送任务)

import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;

@Service
public class OrderService {
    private final StringRedisTemplate redisTemplate;

    public OrderService(StringRedisTemplate redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    public void createOrder(Long orderId) {
        System.out.println("订单创建成功: " + orderId);

        // 推送到队列
        redisTemplate.opsForList().leftPush("order.queue", orderId.toString());
    }
}

(2) 消费者(轮询获取任务)

import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

@Service
public class NotificationService {
    private final StringRedisTemplate redisTemplate;

    public NotificationService(StringRedisTemplate redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    @Scheduled(fixedDelay = 5000) // 每5秒轮询一次
    public void processOrderQueue() {
        String orderId = redisTemplate.opsForList().rightPop("order.queue");
        if (orderId != null) {
            System.out.println("【通知服务】处理订单:" + orderId);
        }
    }
}

要想消费者能监听到消息并进行处理,需要在方法上添加@Scheduled注解,同时在服务启动类中添加@EnableScheduling注解,或者在配置类添加

import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableScheduling
public class SchedulingConfig {
}

缺点

  • 无消费组,多个消费者时需要自行分配任务,可能会造成任务重复消费或丢失。
  • 无持久化保障,如果任务未处理完,Redis 发生故障,任务可能会丢失。

方式三:Redis Stream(推荐,支持持久化 + 消费组)

Redis Stream 是 Redis 6.0 之后的特性,类似于 Kafka,支持持久化、消费组、多消费者模式

(1) 生产者(推送事件)

import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;

@Service
public class OrderService {
    private final StringRedisTemplate redisTemplate;

    public OrderService(StringRedisTemplate redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    public void createOrder(Long orderId) {
        System.out.println("订单创建成功: " + orderId);

        // 推送到 Redis Stream
        ObjectRecord<String, String> record = StreamRecords.newRecord()
                .ofObject(orderId.toString())
                .withStreamKey("order.stream");
        redisTemplate.opsForStream().add(record);
    }
}

(2) 消费者(监听事件)

import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.time.Duration;
import java.util.Collections;

@Service
public class NotificationService implements StreamListener<String, MapRecord<String, String, String>> {

    private final StringRedisTemplate redisTemplate;
    private final StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer;

    public NotificationService(StringRedisTemplate redisTemplate,
                               StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer) {
        this.redisTemplate = redisTemplate;
        this.listenerContainer = listenerContainer;
    }

    @PostConstruct
    public void startListening() {
        listenerContainer.receive(StreamOffset.fromStart("order.stream"), this);
    }

    @Override
    public void onMessage(MapRecord<String, String, String> message) {
        String orderId = message.getValue().values().iterator().next();
        System.out.println("【通知服务】订单 " + orderId + " 创建成功!");
    }
}

优点

  • 持久化存储,即使 Redis 重启,消息不会丢失。
  • 支持消费组,多个消费者可以负载均衡地消费消息。
  • 支持回溯,可以读取历史消息。

总结

方案适用场景优点缺点
Pub/Sub即时消息通知低延迟无持久化,消费者掉线丢消息
List简单任务队列轻量级无消费组,任务可能丢失
Stream高级事件流处理持久化、消费组复杂度较高

如果需求是轻量级队列,推荐 Redis Stream,它类似 Kafka,支持消费组和持久化,比 Redis List 更稳定。

到此这篇关于使用Redis实现轻量级消息队列的文章就介绍到这了,更多相关Redis 轻量级消息队列内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • 使用Go实现邮箱验证码API功能

    使用Go实现邮箱验证码API功能

    本文将带你了解一个项目如何实现一个邮箱验证接口,即一个可用的发送邮箱验证码API和验证验证码是否正确功能,对Go实现邮箱验证码API详细过程感兴趣的朋友一起看看吧
    2024-06-06
  • Go语言中println和fmt.Println区别

    Go语言中println和fmt.Println区别

    本文主要介绍了Go语言中println和fmt.Println区别,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2023-07-07
  • 深入理解Go语言中二维切片的使用

    深入理解Go语言中二维切片的使用

    本文深入讲解了Go语言中二维切片的概念与应用,用于表示矩阵、表格等二维数据结构,文中通过示例代码介绍的非常详细,需要的朋友们下面随着小编来一起学习学习吧
    2025-07-07
  • Golang解析JSON遇到的坑及解决方法

    Golang解析JSON遇到的坑及解决方法

    这篇文章主要为大家介绍了Golang解析JSON时会遇到的一些坑及解决方法,文中的示例代码讲解详细,对我们学习Go语言有一点的帮助,需要的可以参考一下
    2023-02-02
  • golang结构化日志log/slog包之slog.Record的用法简介

    golang结构化日志log/slog包之slog.Record的用法简介

    这篇文章主要为大家详细介绍了golang结构化日志log/slog包中slog.Record结构体的使用方法和需要注意的点,文中的示例代码讲解详细,需要的可以学习一下
    2023-10-10
  • golang beego框架路由ORM增删改查完整案例

    golang beego框架路由ORM增删改查完整案例

    这篇文章主要为大家介绍了golang beego框架路由ORM增删改查完整案例,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步早日升职加薪
    2022-04-04
  • golang实现通过smtp发送电子邮件的方法

    golang实现通过smtp发送电子邮件的方法

    这篇文章主要介绍了golang实现通过smtp发送电子邮件的方法,实例分析了Go语言基于SMTP协议发送邮件的相关技巧,需要的朋友可以参考下
    2016-07-07
  • golang实现sql结果集以json格式输出的方法

    golang实现sql结果集以json格式输出的方法

    这篇文章主要介绍了golang实现sql结果集以json格式输出的方法,涉及Go语言针对sql结果集的遍历、转换及json格式相关操作技巧,需要的朋友可以参考下
    2017-03-03
  • go mutex互斥锁使用Lock和Unlock方法占有释放资源

    go mutex互斥锁使用Lock和Unlock方法占有释放资源

    Go号称是为了高并发而生的,在高并发场景下,势必会涉及到对公共资源的竞争,当对应场景发生时,我们经常会使用 mutex 的 Lock() 和 Unlock() 方法来占有或释放资源,虽然调用简单,但 mutex 的内部却涉及挺多的,本文来好好研究一下
    2023-09-09
  • 一文教你如何在Golang中用好泛型

    一文教你如何在Golang中用好泛型

    golang的泛型已经出来了一年多了,从提案被接受开始我就在关注泛型了,好用是好用,但问题也很多,所以本文就来教大家如何在Golang中用好泛型吧
    2023-07-07

最新评论