详解Java中CountDownLatch异步转同步工具类

 更新时间:2021年06月30日 10:38:27   作者:lingzhi_ying  
今天给大家带来的是关于Java的相关知识,文章围绕着CountDownLatch异步转同步工具类展开,文中有非常详细的介绍及代码示例,需要的朋友可以参考下

使用场景

由于公司业务需求,需要对接socket、MQTT等消息队列。
众所周知 socket 是双向通信,socket的回复是人为定义的,客户端推送消息给服务端,服务端的回复是两条线。无法像http请求有回复。
下发指令给硬件时,需要校验此次数据下发是否成功。
用户体验而言,点击按钮就要知道此次的下发成功或失败。

在这里插入图片描述

如上图模型,

第一种方案使用Tread.sleep
优点:占用资源小,放弃当前cpu资源
缺点: 回复速度快,休眠时间过长,仍然需要等待休眠结束才能返回,响应速度是固定的,无法及时响应第二种方案使用CountDownLatch

package com.lzy.demo.delay;

import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class CountDownLatchPool {

    //countDonw池
    private final static Map<Integer, CountDownLatch> countDownLatchMap = new ConcurrentHashMap<>();
    //延迟队列
    private final static DelayQueue<MessageDelayQueueUtil> delayQueue = new DelayQueue<>();

    private volatile static boolean flag =false;
    //单线程池
    private final static ExecutorService t = new ThreadPoolExecutor(1, 1,
        0L, TimeUnit.MILLISECONDS,
        new ArrayBlockingQueue<>(1));

    public static void addCountDownLatch(Integer messageId) {
        CountDownLatch countDownLatch = countDownLatchMap.putIfAbsent(messageId,new CountDownLatch(1) );
        if(countDownLatch == null){
            countDownLatch = countDownLatchMap.get(messageId);
        }
        try {
            addDelayQueue(messageId);
            countDownLatch.await(3L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("阻塞等待结束~~~~~~");
    }

    public static void removeCountDownLatch(Integer messageId){
        CountDownLatch countDownLatch = countDownLatchMap.get(messageId);
        if(countDownLatch == null)
            return;
        countDownLatch.countDown();
        countDownLatchMap.remove(messageId);
        System.out.println("清除Map数据"+countDownLatchMap);
    }

    private static void addDelayQueue(Integer messageId){
        delayQueue.add(new MessageDelayQueueUtil(messageId));
        clearMessageId();
    }

    private static void clearMessageId(){
        synchronized (CountDownLatchPool.class){
            if(flag){
                return;
            }
            flag = true;
        }
        t.execute(()->{
            while (delayQueue.size() > 0){
                System.out.println("进入线程并开始执行");
                try {
                    MessageDelayQueueUtil take = delayQueue.take();
                    Integer messageId1 = take.getMessageId();
                    removeCountDownLatch(messageId1);
                    System.out.println("清除队列数据"+messageId1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            flag = false;
            System.out.println("结束end----");
        });
    }

    public static void main(String[] args) throws InterruptedException {
        /*
        测试超时清空map
        new Thread(()->addCountDownLatch(1)).start();
        new Thread(()->addCountDownLatch(2)).start();
        new Thread(()->addCountDownLatch(3)).start();
        */
        //提前创建线程,清空countdown
        new Thread(()->{
            try {
                Thread.sleep(500L);
                removeCountDownLatch(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
        //开始阻塞
        addCountDownLatch(1);
    	//通过调整上面的sleep我们发现阻塞市场取决于countDownLatch.countDown()执行时间
    	System.out.println("阻塞结束----");
    }
}
class MessageDelayQueueUtil implements Delayed {

    private Integer messageId;
    private long avaibleTime;

    public Integer getMessageId() {
        return messageId;
    }

    public void setMessageId(Integer messageId) {
        this.messageId = messageId;
    }

    public long getAvaibleTime() {
        return avaibleTime;
    }

    public void setAvaibleTime(long avaibleTime) {
        this.avaibleTime = avaibleTime;
    }

    public MessageDelayQueueUtil(Integer messageId){
        this.messageId = messageId;
        //avaibleTime = 当前时间+ delayTime
        //重试3次,每次3秒+1秒的延迟
        this.avaibleTime=3000*3+1000 + System.currentTimeMillis();
    }

    @Override
    public long getDelay(TimeUnit unit) {
        long diffTime= avaibleTime- System.currentTimeMillis();
        return unit.convert(diffTime,TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed o) {
        //compareTo用在DelayedUser的排序
        return (int)(this.avaibleTime - ((MessageDelayQueueUtil) o).getAvaibleTime());
    }
}

由于socket并不确定每次都会有数据返回,所以map的数据会越来越大,最终导致内存溢出
需定时清除map内的无效数据。
可以使用DelayedQuene延迟队列来处理,相当于给对象添加一个过期时间

使用方法 addCountDownLatch 等待消息,异步回调消息清空removeCountDownLatch

到此这篇关于详解Java中CountDownLatch异步转同步工具类的文章就介绍到这了,更多相关CountDownLatch异步转同步工具类内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • 基于MapReduce实现决策树算法

    基于MapReduce实现决策树算法

    这篇文章主要为大家详细介绍了基于MapReduce实现决策树算法,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2019-08-08
  • idea数据库驱动下载失败的问题及解决

    idea数据库驱动下载失败的问题及解决

    这篇文章主要介绍了idea数据库驱动下载失败的问题及解决方案,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2024-01-01
  • 在AOP中Spring生成代理类的两种方式

    在AOP中Spring生成代理类的两种方式

    今天小编就为大家分享一篇关于在AOP中Spring生成代理类的两种方式,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看看吧
    2019-01-01
  • 一文详解JavaWeb过滤器(Filter)

    一文详解JavaWeb过滤器(Filter)

    本文主要介绍了一文详解JavaWeb过滤器(Filter),文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2023-05-05
  • Java中NoClassDefFoundError 和 ClassNotFoundException的区别

    Java中NoClassDefFoundError 和 ClassNotFoundException的区别

    Java中NoClassDefFoundError和ClassNotFoundException的区别,从类继承层次上来看,ClassNotFoundException是从Exception继承的,所以ClassNotFoundException是一个检查异常。具体详情需要的朋友可以参考下面文章内容
    2022-06-06
  • 教你用Java GUI实现文本文件的读写

    教你用Java GUI实现文本文件的读写

    今天带大家来学习怎么用JavaSwing实现实现文本文件读写,文中有非常详细的代码示例,对正在学习java的小伙伴们有很好的帮助,需要的朋友可以参考下
    2021-05-05
  • Spring 实现给Bean属性注入null值

    Spring 实现给Bean属性注入null值

    这篇文章主要介绍了Spring 实现给Bean属性注入null值的操作,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-08-08
  • java调用dll方法总结

    java调用dll方法总结

    本篇文章小编给大家整理了java调用dll的方法的总结,有需要的朋友参考学下一下吧。
    2017-12-12
  • SpringCloud Hystrix熔断器使用方法介绍

    SpringCloud Hystrix熔断器使用方法介绍

    通过hystrix可以解决雪崩效应问题,它提供了资源隔离、降级机制、融断、缓存等功能。接下来通过本文给大家分享SpringCloud集成Hystrix熔断,感兴趣的朋友一起看看吧
    2023-03-03
  • springboot log多环境配置方式

    springboot log多环境配置方式

    这篇文章主要介绍了springboot log多环境配置方式,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2023-06-06

最新评论