SpringBoot中Redisson延迟队列的示例

 更新时间:2024年10月16日 09:28:39   作者:Frank-fu  
延时队列是一种常见的需求,延时队列允许我们延迟处理某些任务,本文主要介绍了Redisson延迟队列的示例,具有一定的参考价值,感兴趣的可以了解一下

场景:

需求:

支付的二维码,超过两个小时以后,如果还未支付,则自动转为取消支付,或者支付超时的状态

需求分析:

1,动态定时任务:

每个支付的二维码创建的时候,创建一个动态的定时任务,两个小时候自动执行,更新支付状态,可以解决这个问题。

(1)持久化:

如果服务重启了,动态定时任务会丢失,导致部分数据没办法更新状态。

(2)分布式:

如果当服务重启时,自动扫描数据,重新计算时间,再次创建动态定时任务。可以解决(1)的问题,但是当分布式,多个节点的时候,都会重新加载所有的任务,这样性能上不是最优解,只能在数据源上加上节点名称,不同的服务节点,加载属于自己的定时任务,可以解决这个问题。总的想想,太麻烦了,还是算了。

2,Redisson延迟队列

(1)持久化:队列信息放在Redis上,服务重启不影响。

(2)分布式:多节点去Redis拿去数据,谁抢到算谁的,不会存在同一个任务,多个节点支持。唯一不足就是过度依赖Redis,万一Redis崩了,那就凉凉了(那就是要把Redis配置高可用,当前业务就不用管了)。总体来说还是比较好用的。

实现

1,创建延迟队列的监听任务【RedisDelayedQueueListener】,消费延迟队列

2,创建新增延迟队列的类,用于创建延迟队列

3,整体初始化,把监听任务与spring绑定,扫描各个监听延迟队列的实现类,并开启单独线程,监听任务。

4,创建延迟任务(开始测试使用)

连接Redis

不贴代码了,自己在网上搜

监听延迟队列

接口:

/**
 * 队列事件监听接口,需要实现这个方法
 *
 * @module
 * @author frank
 * @date 2021/8/19 10:50
 */
public interface RedisDelayedQueueListener<T> {
    /**
     * 执行方法
     *
     * @param t
     */
    void invoke(T t);
}

实现:

import com.sxmaps.netschool.common.redisson.RedisDelayedQueueListener;
import com.sxmaps.netschool.service.vo.school.SchoolAccountPayStateReqVO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * 支付二维码监听器
 *
 * @module
 * @author frank
 * @date 2021/8/19 10:49
 */
@Component
public class PayQCordListener implements RedisDelayedQueueListener<SchoolAccountPayStateReqVO> {

    private final Logger logger = LoggerFactory.getLogger(PayQCordListener.class);
    @Autowired
    private SchoolAccountService schoolAccountService;

    @Override
    public void invoke(SchoolAccountPayStateReqVO payStateReqVO) {
        logger.info("支付二维码-延迟失效,内容:{}", payStateReqVO);
         //处理业务,更新二维码状态
        logger.info("支付二维码-延迟失效,内容:{},处理结果:{}", payStateReqVO,respDTO);
    }
}

增加延迟队列

import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.concurrent.TimeUnit;

/**
 * 增加延迟信息
 *
 * @author frank
 * @module
 * @date 2021/8/19 10:49
 */
@Component
public class RedisDelayedQueue {

    private final Logger logger = LoggerFactory.getLogger(RedisDelayedQueue.class);

    @Autowired
    RedissonClient redissonClient;

    /**
     * 添加队列
     *
     * @param t        DTO传输类
     * @param delay    时间数量
     * @param timeUnit 时间单位
     * @param <T>      泛型
     */
    private <T> void addQueue(T t, long delay, TimeUnit timeUnit, String queueName) {
        logger.info("添加延迟队列,监听名称:{},时间:{},时间单位:{},内容:{}" , queueName, delay, timeUnit,t);
        RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(queueName);
        RDelayedQueue<T> delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue);
        delayedQueue.offer(t, delay, timeUnit);
    }

    /**
     * 添加队列-秒
     *
     * @param t     DTO传输类
     * @param delay 时间数量
     * @param <T>   泛型
     */
    public <T> void addQueueSeconds(T t, long delay, Class<? extends RedisDelayedQueueListener> clazz) {
        addQueue(t, delay, TimeUnit.SECONDS, clazz.getName());
    }

    /**
     * 添加队列-分
     *
     * @param t     DTO传输类
     * @param delay 时间数量
     * @param <T>   泛型
     */
    public <T> void addQueueMinutes(T t, long delay, Class<? extends RedisDelayedQueueListener> clazz) {
        addQueue(t, delay, TimeUnit.MINUTES, clazz.getName());
    }

    /**
     * 添加队列-时
     *
     * @param t     DTO传输类
     * @param delay 时间数量
     * @param <T>   泛型
     */
    public <T> void addQueueHours(T t, long delay, Class<? extends RedisDelayedQueueListener> clazz) {
        addQueue(t, delay, TimeUnit.HOURS, clazz.getName());
    }
    /**
     * 添加队列-天
     *
     * @param t     DTO传输类
     * @param delay 时间数量
     * @param <T>   泛型
     */
    public <T> void addQueueDays(T t, long delay, Class<? extends RedisDelayedQueueListener> clazz) {
        addQueue(t, delay, TimeUnit.DAYS, clazz.getName());
    }
}

整体初始化

import org.redisson.api.RBlockingQueue;
import org.redisson.api.RedissonClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;

import java.util.Map;

/**
 * 初始化队列监听
 *
 * @module
 * @author frank
 * @date 2021/8/19 10:49
 */
@Component
public class RedisDelayedQueueInit implements ApplicationContextAware {

    private final Logger logger = LoggerFactory.getLogger(RedisDelayedQueueInit.class);
    @Autowired
    RedissonClient redissonClient;

    /**
     * 获取应用上下文并获取相应的接口实现类
     *
     * @param applicationContext
     * @throws BeansException
     */
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        Map<String, RedisDelayedQueueListener> map = applicationContext.getBeansOfType(RedisDelayedQueueListener.class);
        for (Map.Entry<String, RedisDelayedQueueListener> taskEventListenerEntry : map.entrySet()) {
            String listenerName = taskEventListenerEntry.getValue().getClass().getName();
            startThread(listenerName, taskEventListenerEntry.getValue());
        }
    }

    /**
     * 启动线程获取队列*
     *
     * @param queueName                 queueName
     * @param redisDelayedQueueListener 任务回调监听
     * @param <T>                       泛型
     * @return
     */
    private <T> void startThread(String queueName, RedisDelayedQueueListener redisDelayedQueueListener) {
        RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(queueName);
        //服务重启后,无offer,take不到信息。
        redissonClient.getDelayedQueue(blockingFairQueue);
        //由于此线程需要常驻,可以新建线程,不用交给线程池管理
        Thread thread = new Thread(() -> {
            logger.info("启动监听队列线程" + queueName);
            while (true) {
                try {
                    T t = blockingFairQueue.take();
                    logger.info("监听队列线程,监听名称:{},内容:{}", queueName, t);
                    redisDelayedQueueListener.invoke(t);
                } catch (Exception e) {
                    logger.info("监听队列线程错误,", e);
                }
            }
        });
        thread.setName(queueName);
        thread.start();
    }

}

创建延迟任务

@Autowired
RedisDelayedQueue queue;
.................

queue.addQueueHours(new SchoolAccountPayStateReqVO(dto.getPayNo()),2, PayQCordListener.class);

到此这篇关于Redisson延迟队列的示例的文章就介绍到这了,更多相关Redisson延迟队列内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Java基础入门 Swing中间容器的使用

    Java基础入门 Swing中间容器的使用

    这篇文章主要介绍了Java基础入门 Swing中间容器的使用,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-12-12
  • 使用spring data的page和pageable如何实现分页查询

    使用spring data的page和pageable如何实现分页查询

    这篇文章主要介绍了使用spring data的page和pageable如何实现分页查询,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-12-12
  • spring基于注解配置实现事务控制操作

    spring基于注解配置实现事务控制操作

    这篇文章主要介绍了spring基于注解配置实现事务控制操作,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-09-09
  • Javamail使用过程中常见问题解决方案

    Javamail使用过程中常见问题解决方案

    这篇文章主要介绍了Javamail使用过程中常见问题解决方案,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-08-08
  • Struts2拦截器Interceptor的原理与配置实例详解

    Struts2拦截器Interceptor的原理与配置实例详解

    拦截器是一种AOP(面向切面编程)思想的编程方式.它提供一种机制是开发者能够把相对独立的代码抽离出来,配置到Action前后执行。下面这篇文章主要给大家介绍了关于Struts2拦截器Interceptor的原理与配置的相关资料,需要的朋友可以参考下。
    2017-11-11
  • Java利用Geotools从DEM数据中读取指定位置的高程信息全过程

    Java利用Geotools从DEM数据中读取指定位置的高程信息全过程

    Geotools作为一款功能强大且开源的地理工具库,为地理数据的处理和分析提供了丰富的类库和便捷的接口,能够很好地满足从DEM数据中读取高程信息这一实战需求,本文将深入讲解如何利用Geotools从获取DEM数据到成功读取指定位置高程信息的全过程,需要的朋友可以参考下
    2025-03-03
  • Spring Boot 使用 Swagger 构建 RestAPI 接口文档

    Spring Boot 使用 Swagger 构建 RestAPI 接口文档

    这篇文章主要介绍了Spring Boot 使用 Swagger 构建 RestAPI 接口文档,帮助大家更好的理解和使用Spring Boot框架,感兴趣的朋友可以了解下
    2020-10-10
  • Java中详细解析Map接口

    Java中详细解析Map接口

    这篇文章主要介绍了Java8 中 Map 接口的新方法,本文通过代码实例给大家介绍的非常详细,具有一定的参考借鉴价值,需要的朋友可以参考下
    2021-08-08
  • 举例解析Java的设计模式编程中里氏替换原则的意义

    举例解析Java的设计模式编程中里氏替换原则的意义

    这篇文章主要介绍了Java的设计模式中里氏替换原则的意义,文中举例来说明里氏替换原则中强调的继承特性方面可能带来的问题,需要的朋友可以参考下
    2016-02-02
  • Java中HashMap的初始容量设置方式

    Java中HashMap的初始容量设置方式

    这篇文章主要介绍了Java中HashMap的初始容量设置方式,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-06-06

最新评论