Redis消息队列、阻塞队列、延时队列的实现

 更新时间:2023年11月10日 11:28:07   作者:夏寻.  
Redis是一种常用的内存数据库,它提供了丰富的功能,通常用于数据缓存和分布式队列,本文主要介绍了Redis消息队列、阻塞队列、延时队列的实现,感兴趣的可以了解一下

redis 队列的优点是轻量级,业务足够简单时不需要使用rabbitMq这样专业的消息中间件;缺点是弹出队列中的元素时,即使该消息处理失败也无法再次进行消费

Redis队列 List

 一、普通队列

可以直接使用Redis的list数据类型实现消息队列,只需简单的两个指令lpush和rpop或者rpush和lpop。

  • lpush+rpop:左进右出的队列
  • rpush+lpop:左出右进的队列

使用redis的命令来模拟普通队列

使用lpush命令生产消息:

>lpush queue:single 1
"1"
>lpush queue:single 2
"2"
>lpush queue:single 3
"3"

使用rpop命令消费消息:

>rpop queue:single
"1"
>rpop queue:single
"2"
>rpop queue:single
"3"

使用Java代码来实现普通队列:

生产者SingleProducer

package com.morris.redis.demo.queue.single;

import redis.clients.jedis.Jedis;

/**
 * 生产者
 */
public class SingleProducer {

    public static final String SINGLE_QUEUE_NAME = "queue:single";

    public static void main(String[] args) {
        Jedis jedis = new Jedis();
        for (int i = 0; i < 100; i++) {
            jedis.lpush(SINGLE_QUEUE_NAME, "hello " + i);
        }
        jedis.close();
    }
}

消费者SingleConsumer:

package com.morris.redis.demo.queue.single;

import redis.clients.jedis.Jedis;

import java.util.Objects;
import java.util.concurrent.TimeUnit;

/**
 * 消费者
 */
public class SingleConsumer {

    public static void main(String[] args) throws InterruptedException {
        Jedis jedis = new Jedis();
        while (true) {
            String message = jedis.rpop(SingleProducer.SINGLE_QUEUE_NAME);
            if(Objects.nonNull(message)) {
                System.out.println(message);
            } else {
                TimeUnit.MILLISECONDS.sleep(500);
            }
        }
    }
}

上面的代码已经基本实现了普通队列的生产与消费,但是上述的例子中消息的消费者存在两个问题:

  • 普通的redis队列,为了实现业务,通常会使用while进行循环,这样的话没有消息时依旧会频繁的执行循环,造成cpu的空转,所以一般会在代码中增加sleep来解决该问题,但因此又会造成消息延迟问题。
  • 如果生产者速度大于消费者消费速度,消息队列长度会一直增大,时间久了会占用大量内存空间。

二、Redis阻塞队列

redis队列提供了 “阻塞式” 拉取消息的命令:BRPOP / BLPOP,这里的 B 指的是阻塞(Block)。
如果队列为空,消费者在拉取消息时就「阻塞等待」,一旦有新消息过来,就通知消费者立即处理新消息。

使用redis的brpop命令来模拟阻塞队列

>brpop queue:single 30

使用 BRPOP 这种阻塞式方式拉取消息时,还支持传入一个「超时时间」,如果设置为 0,则表示不设置超时,直到有新消息才返回,否则会在指定的超时时间后返回 NULL

Java阻塞队列生产者实现如下:

package com.cxh;

import org.junit.jupiter.api.Test;
import redis.clients.jedis.Jedis;

import java.util.concurrent.TimeUnit;

/**
 * 生产者类
 * 生产者每隔600ms生成一条消息
 * */
class MessageProducer extends Thread{
    public static final String MESSAGE_KEY = "message:queue";
    private volatile int count;

    public void putMessage(String mess){
        Jedis jedis = new Jedis("127.0.0.1", 6379);
/*        jedis.auth("123456");*/
        Long size = jedis.lpush(MESSAGE_KEY, mess);
        System.out.println("Put " + Thread.currentThread().getName() + " put message " + count);
        count++;
    }
    @Override
    public synchronized  void run() {
        for(int i = 0 ; i < 1; i++){
            putMessage("message" + count);
            try {
                Thread.sleep(600);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

Java阻塞队列消费者实现如下:

package com.cxh.Component;

import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import redis.clients.jedis.Jedis;

import java.util.Arrays;
import java.util.List;

@Component
public class component implements CommandLineRunner {
    public static final String MESSAGE_KEY = "message:queue";
    @Override
    public void run(String... args) throws Exception {
/*        //todo: 需要执行的方法
        System.out.println(Arrays.toString(args));*/
        System.out.println("comsumer 111");
        Jedis jedis = new Jedis("127.0.0.1", 6379);
//        String message = jedis.rpop(MESSAGE_KEY);
//        System.out.println("Pop " + Thread.currentThread().getName() + "comsumer message = " + message); //多线程的时候使用
        while (true){
            List<String> message = jedis.brpop(0, MESSAGE_KEY);
            System.out.println(message.toString());
        }
    }

}

上面的代码已经实现了延迟队列的生产与消费,需要注意的是:

  • 无法实现一次生产多次消费(使用 pub/sub 发布订阅模式,可以实现 1:N 的消息队列,即一次生产,多端消费
  • 阻塞时间结束后代码会继续向下执行
  • 如果设置的超时时间太长,这个连接太久没有活跃过,可能会被 Redis Server 判定为无效连接,之后 Redis Server 会强制把这个客户端踢下线。所以,客户端要有处理机制。
    实际项目中redis连接超时时间远大于20s,因此正常情况不会出现redis超时问题。以防万一增加redis异常捕获,出现异常时杀掉当前进程,同时supervisord会自动重新拉起该进程

三、Redis延迟队列

zset 会按 score 进行排序,如果 score 代表想要执行时间的时间戳。在某个时间将它插入zset集合中,它会按照时间戳大小进行排序,也就是对执行时间前后进行排序。

起一个死循环线程不断地进行取第一个key值,如果当前时间戳大于等于该key值的socre就将它取出来进行消费删除,可以达到延时执行的目的。

下面使用redis的zset来模拟延时队列

命令生产者:

127.0.0.1:6379> zadd queue:delay 1 order1 2 order2 3 order3
(integer) 0

命令消费者:

127.0.0.1:6379> zrange queue:delay 0 0 withscores
1) "order1"
2) "1"
127.0.0.1:6379> zrem queue:delay order1
(integer) 1

使用Java代码来实现普通队列:

生产者DelayProducer :

package com.morris.redis.demo.queue.delay;

import redis.clients.jedis.Jedis;

import java.util.Date;
import java.util.Random;

/**
 * 生产者
 */
public class DelayProducer {

    public static final String DELAY_QUEUE_NAME = "queue:delay";

    public static void main(String[] args) {
        Jedis jedis = new Jedis();
        long now = new Date().getTime();
        Random random = new Random();
        for (int i = 0; i < 10; i++) {
            int second = random.nextInt(30); // 随机订单失效时间
            jedis.zadd(DELAY_QUEUE_NAME, now + second * 1000, "order"+i);
        }
        jedis.close();
    }
}

消费者DelayConsumer :

package com.morris.redis.demo.queue.delay;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.Tuple;

import java.util.Date;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;

/**
 * 消费者
 */
public class DelayConsumer {

    public static void main(String[] args) throws InterruptedException {
        Jedis jedis = new Jedis();
        while (true) {
            long now = new Date().getTime();
            Set<Tuple> tupleSet = jedis.zrangeWithScores(DelayProducer.DELAY_QUEUE_NAME, 0, 0);
            if(tupleSet.isEmpty()) {
                TimeUnit.MILLISECONDS.sleep(500);
            } else {
                for (Tuple tuple : tupleSet) {
                    Double score = tuple.getScore();
                    long time = score.longValue();
                    if(time < now) {
                        jedis.zrem(DelayProducer.DELAY_QUEUE_NAME, tuple.getElement());
                        System.out.println("order[" + tuple.getElement() +"] is timeout at " + time);
                    } else {
                        TimeUnit.MILLISECONDS.sleep(500);
                    }
                    break;
                }
            }
        }
    }
}

应用场景

延时队列在项目中的应用还是比较多的,尤其像电商类平台:

  • 12306 下单成功后,在半个小时内没有支付,自动取消订单。
  • 如果订单一直处于某一个未完结状态时,及时处理关单,并退还库存。
  • 淘宝新建商户一个月内还没上传商品信息,将冻结商铺等。
  • 会议预定系统,在预定会议开始前半小时通知所有预定该会议的用户。
  • 安全工单超过 24 小时未处理,则自动拉企业群提醒相关责任人。
  • 用户下单外卖以后,距离超时时间还有 10 分钟时提醒外卖小哥即将超时。
  • 外卖平台发送订餐通知,下单成功后 60s 给用户推送短信。

到此这篇关于Redis消息队列、阻塞队列、延时队列的实现的文章就介绍到这了,更多相关Redis消息队列、阻塞队列、延时队列内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家! 

相关文章

  • Redis并发访问问题详细讲解

    Redis并发访问问题详细讲解

    本文主要介绍了Redis如何应对并发访问,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2022-12-12
  • Redis分片集群的实现示例

    Redis分片集群的实现示例

    本文介绍了搭建Redis分片集群,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2024-12-12
  • Redis简介

    Redis简介

    Redis是一个开源,高级的键值存储和一个适用的解决方案,用于构建高性能,可扩展的Web应用程序。关于redis的相关知识大家可以通过本教程学习
    2017-05-05
  • AOP Redis自定义注解实现细粒度接口IP访问限制

    AOP Redis自定义注解实现细粒度接口IP访问限制

    这篇文章主要为大家介绍了AOP Redis自定义注解实现细粒度接口IP访问限制,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2022-10-10
  • Redis实现布隆过滤器的代码详解

    Redis实现布隆过滤器的代码详解

    布隆过滤器(Bloom Filter)是Redis 4.0版本提供的新功能,它被作为插件加载到Redis服务器中,给Redis提供强大的去重功能,本文将给大家详细介绍一下Redis布隆过滤器,文中有相关的代码示例,需要的朋友可以参考下
    2023-07-07
  • 解决Redis启动警告问题

    解决Redis启动警告问题

    这篇文章介绍了解决Redis启动警告问题的方法,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2022-02-02
  • NestJS+Redis实现缓存步骤详解

    NestJS+Redis实现缓存步骤详解

    这篇文章主要介绍了NestJS+Redis实现缓存,本文分步骤给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2021-08-08
  • 解读Redis秒杀优化方案(阻塞队列+基于Stream流的消息队列)

    解读Redis秒杀优化方案(阻塞队列+基于Stream流的消息队列)

    该文章介绍了使用Redis的阻塞队列和Stream流的消息队列来优化秒杀系统的方案,通过将秒杀流程拆分为两条流水线,使用Redis缓存缓解数据库压力,并结合Lua脚本进行原子性判断,使用阻塞队列和消息队列异步处理订单,有效提高了系统的并发处理能力和可用性
    2025-02-02
  • Redis如何实现分布式锁

    Redis如何实现分布式锁

    这篇文章主要介绍了Redis如何实现分布式锁问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2023-10-10
  • 基于Redis实现每日登录失败次数限制

    基于Redis实现每日登录失败次数限制

    这篇文章主要介绍了通过redis实现每日登录失败次数限制的问题,通过redis记录登录失败的次数,以用户的username为key,本文给出了实例代码,需要的朋友可以参考下
    2019-08-08

最新评论