Java多线程之scheduledThreadPool的方法解析

 更新时间:2023年12月28日 09:45:21   作者:竹下星空  
这篇文章主要介绍了Java多线程之scheduledThreadPool的方法解析,queue是DelayedWorkQueue,但通过后面的分析可以知道,最大线程数是不起作用的,最多会起核心线程数的数量,需要的朋友可以参考下

scheduledThreadPool

我们对java中定时任务实现可能会有以下疑问:

怎样做到每个任务延迟指定时间执行?

内部使用了什么数据结构保存延迟任务?

延迟任务放入scheduledThreadPool时机并不固定,怎么保证按延迟时间顺序执行?

构造器

public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
  new DelayedWorkQueue());
}

corePoolSize就是我们传过了的参数,maximumPoolSize是Integer.MAX_VALUE,所以最大线程是无穷大,非核心线程成活时间是0,所以非核心线程执行完firstTask之后如果poll任务没拿到任务则会直接销毁。queue是DelayedWorkQueue。但通过后面的分析可以知道,最大线程数是不起作用的,最多会起核心线程数的数量

schedule(Runnable command,long delay, TimeUnit unit)方法

public ScheduledFuture<?>schedule(Runnable command,
long delay,
   TimeUnit unit){
if(command == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<?> t =decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit)));
delayedExecute(t);
return t;
}
 
  • 通过decorateTask方法获取到RunnableScheduledFuture(实际上是ScheduledFutureTask对象),并把delay时间变成了时间戳
  • 执行delayedExecute方法

delayedExecute方法

private voiddelayedExecute(RunnableScheduledFuture<?> task){
if(isShutdown())
reject(task);
else{
super.getQueue().add(task);
if(isShutdown()&&
!canRunInCurrentRunState(task.isPeriodic())&&
remove(task))
task.cancel(false);
else
ensurePrestart();
}
}
 
  1. 使用queue.add方法把task放入queue
  2. 执行ensurePrestart方法

offer方法

public boolean offer(Runnable x){
if(x == null)
throw new NullPointerException();
    RunnableScheduledFuture<?> e =(RunnableScheduledFuture<?>)x;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
int i = size;
if(i >= queue.length)
grow();
size = i +1;
if(i ==0){
    queue[0]= e;
setIndex(e,0);
}else{
siftUp(i, e);
}
if(queue[0]== e){
    leader = null;
    available.signal();
}
} finally {
lock.unlock();
}
return true;
}
 
  1. DelayedWorkQueue底层使用的是RunnableScheduledFuture的数组,初始化容量是16,之后扩容是以1.5倍进行。
  2. 在offer元素整个过程中使用ReentrantLock进行加锁,所以DelayedWorkQueue是一个线程安全的队列。然后使用了condition来实现阻塞的功能,当poll没有元素时会使用await进行等待,当offer的是数组的第一个元素时会signal,这个signal的设计是排序的点睛之笔,设计的非常巧妙,这块需要offer和take方法一起来看,在take方法时会拿第一个元素来判断delay的时间,如果时间没到会使用await休眠delay时间,但此时如果有delay时间更短的任务放入queue中,此时需要take的任务就不是之前的那个任务了,就要重新执行逻辑获取这个最新delay的任务,这样才能做到任务的正确执行。
  3. 在offer元素时会使用siftUp方法来保证数组中元素是按delay时间从小到大排列,但要注意的是数组前半部分肯定都是排了delay最小的任务,但后半部分不一定是有序的

ensurePrestart()方法

voidensurePrestart(){
int wc =workerCountOf(ctl.get());
if(wc < corePoolSize)
addWorker(null, true);
elseif(wc ==0)
addWorker(null, false);
}
 

这个比较简单,addWorker方法之前我们也分析过了,需要注意的是这里的firstTask默认是空的,所以工作线程会直接从queue中拿任务。这有个比较奇怪的else if,感觉应该永远不用执行,因为wc==0肯定已经被if条件拦截了,也就是只能起核心线程数。最大线程数永远不会起作用

poll方法

public RunnableScheduledFuture<?>poll(long timeout, TimeUnit unit)
    throws InterruptedException {
long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
加锁
    lock.lockInterruptibly();
    try {
自旋
for(;;){
拿到queue中的第一个元素,如果是空则awaitNanos时间,等待时间过后如果queue中还是没有元素则返回null。
    RunnableScheduledFuture<?> first = queue[0];
if(first == null){
if(nanos <=0)
return null;
else
    nanos = available.awaitNanos(nanos);
}else{
拿到第一个任务的delay时间,如果到了delay时间则返回finishPoll方法的结果
long delay = first.getDelay(NANOSECONDS);
if(delay <=0)
returnfinishPoll(first);
如果传入的nanos小于等于0则返回null
if(nanos <=0)
return null;
first = null;// don't retain ref while waiting
如果等待时间还不够或前一个需要执行的任务还在执行,则当前线程直接等待
if(nanos < delay || leader != null)
    nanos = available.awaitNanos(nanos);
else{否则当前线程可以执行(leader线程),但需要awaitNanos delay的时间才能执行
    Thread thisThread = Thread.currentThread();
    leader = thisThread;
    try {
long timeLeft = available.awaitNanos(delay);
nanos -= delay - timeLeft;
} finally {
当等待时间到了之后就  leader = null说明此时可以返回finishPoll方法的结果
if(leader == thisThread)
    leader = null;
}
}
}
}
} finally {
if(leader == null && queue[0]!= null)
    available.signal();
lock.unlock();
}
}
  1. DelayedWorkQueue的poll方法也是使用reentrantLock来保证线程安全,然后使用condition.awaitNanos来达到等待特定时间的效果,这里使用leader线程保证了排在第一位的任务只有一个工作线程获取到,其他工作线程进行排队等待,在获取到第一个任务的工作线程delay时间到了之后会take到这个任务并signal排队的第一个工作线程继续获取下一个任务,周而复始。
  2. 在使用finishPoll方法返回delay时间到了的任务时会用siftDown对queue后半部分的任务进行排序,因为之前offer时使用siftUp方法只对queue前半部分进行了排序
  3. 回到ScheduledThreadPool线程池,keepAliveTime是0,所以当first任务的delay时间还没有到时会直接返回null,然后非核心工作线程就会直接销毁,之后的代码都不会执行,而核心线程则执行的take方法,take方法才会进入下面这段逻辑
if (leader != null)
      available.await();
  else {
    Thread thisThread = Thread.currentThread();
   leader = thisThread;
   try {
  available.awaitNanos(delay);
   } finally {
  if (leader == thisThread)
       leader = null;
       }
 }

scheduleAtFixedRate方法

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
  long initialDelay,
  long period,
  TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
  null,
  triggerTime(initialDelay, unit),
  unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
 
  1. 这个方法跟之前schedule方法差不多了,都是使用了ScheduledFutureTask,这边多了个period变量保存执行周期值,outerTask引用了自身的对象,然后也是使用delayExecute方法把任务放入了queue中,此时任务的delay是initialDelay,所以会在initialDelay时间之后出队然后执行
  2. 由于现在工作线程中的task是ScheduledFutureTask,所以工作线程调用的task.run方法是ScheduledFutureTask.run方法

ScheduledFutureTask.run方法

public void run() {
    boolean periodic = isPeriodic();
    if (!canRunInCurrentRunState(periodic))
cancel(false);
    else if (!periodic)
ScheduledFutureTask.super.run();
    else if (ScheduledFutureTask.super.runAndReset()) {
setNextRunTime();
reExecutePeriodic(outerTask);
    }
}
 

1.判断是不是周期执行的任务,之前的schedule方法的period是0,所以会执行super.run();然后执行传入的runnable中的run方法,而scheduleAtFixedRate方法的period不是0,则会执行super.runAndReset();方法,执行传入的runnable中的run方法之后执行setNextRunTime();

重新设置delay时间(initialDelay+period),然后把任务又放入queue中

scheduleWithFixedDelay方法

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
 long initialDelay,
 long delay,
 TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (delay <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
  null,
  triggerTime(initialDelay, unit),
  unit.toNanos(-delay));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
 

这个方法几乎跟scheduleAtFixedRate方法一模一样,区别在于period是个负数,通过之前我们对scheduleAtFixedRate方法的分析,period这个参数在算周期执行间隔时会用到,也就是setNextRunTime方法

setNextRunTime方法

private void setNextRunTime() {
    long p = period;
    if (p > 0)
time += p;
    else
time = triggerTime(-p);
}
 

当period大于0时,也就是scheduleAtFixedRate执行时,是直接在之前的time加上了period,而scheduleWithFixedDelay方法执行时,是用triggerTime方法在当前时间加上了periode,不同的计算方式的区别在于,scheduleAtFixedRate不会管任务的执行时间,我只要保证任务固定频率执行就好了,所以他是几乎精确的period时间执行,而scheduleWithFixedDelay是在任务之后的时间+period时间来确定下一次任务执行的时间,所以任务执行的频率相对来说不固定

到此这篇关于Java多线程之scheduledThreadPool的方法解析的文章就介绍到这了,更多相关scheduledThreadPool的方法内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Spring配置多数据源导致事物无法回滚问题

    Spring配置多数据源导致事物无法回滚问题

    这篇文章主要介绍了Spring配置多数据源导致事物无法回滚问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2024-01-01
  • Java正则表达式,提取双引号中间的部分方法

    Java正则表达式,提取双引号中间的部分方法

    今天小编就为大家分享一篇Java正则表达式,提取双引号中间的部分方法,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2018-07-07
  • 一个Java中BigDecimal的问题记录

    一个Java中BigDecimal的问题记录

    这篇文章主要给大家介绍了关于Java中一个BigDecimal问题的相关资料,通过文中介绍的方法可以很方便的解决BigDecimal进行计算的时候不管怎么计算,最后得到的值都没有变化的问题,需要的朋友可以参考下
    2021-11-11
  • java哈希算法HashMap经典面试题目汇总解析

    java哈希算法HashMap经典面试题目汇总解析

    这篇文章主要为大家介绍了java哈希算法HashMap的经典面试题目汇总及问题解析,帮助大家彻底征服面试官,实现薪资自由,有需要的朋友可以借鉴参考下,希望能够有所帮助
    2022-03-03
  • IDEA解决maven包冲突easypoi NoClassDefFoundError的问题

    IDEA解决maven包冲突easypoi NoClassDefFoundError的问题

    这篇文章主要介绍了IDEA解决maven包冲突easypoi NoClassDefFoundError的问题,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-10-10
  • elasticsearch节点间通信的基础transport启动过程

    elasticsearch节点间通信的基础transport启动过程

    这篇文章主要为大家介绍了elasticsearch节点间通信的基础transport启动过程,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2022-04-04
  • 深入浅出重构Mybatis与Spring集成的SqlSessionFactoryBean(上)

    深入浅出重构Mybatis与Spring集成的SqlSessionFactoryBean(上)

    通常来讲,重构是指不改变功能的情况下优化代码,但本文所说的重构也包括了添加功能。这篇文章主要介绍了重构Mybatis与Spring集成的SqlSessionFactoryBean(上)的相关资料,需要的朋友可以参考下
    2016-11-11
  • 解决java使用axios.js的post请求后台时无法接收到入参的问题

    解决java使用axios.js的post请求后台时无法接收到入参的问题

    今天小编就为大家分享一篇解决java使用axios.js的post请求后台时无法接收到入参的问题,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2018-09-09
  • Java中joda日期格式化工具的使用示例

    Java中joda日期格式化工具的使用示例

    这篇文章主要介绍了Java中joda日期格式化工具的使用示例,帮助大家更好的利用Java处理时间,感兴趣的朋友可以了解下
    2021-01-01
  • 深入浅出JAVA MyBatis-快速入门

    深入浅出JAVA MyBatis-快速入门

    这篇文章主要介绍了在今天这篇博文中,我将要介绍一下mybatis的框架原理,以及mybatis的入门程序,实现用户的增删改查,她有什么优缺点以及mybatis和hibernate之间存在着怎么样的关系,大家这些问题一起通过本文学习吧
    2021-06-06

最新评论