关于ScheduledThreadPoolExecutor不执行的原因分析

 更新时间:2023年08月10日 14:46:32   作者:Redick01  
这篇文章主要介绍了关于ScheduledThreadPoolExecutor不执行的原因分析,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教

ScheduledThreadPoolExecutor不执行原因分析

最近在调试一个监控应用指标的时候发现定时器在服务启动执行一次之后就不执行了,这里用的定时器是Java的调度线程池 ScheduledThreadPoolExecutor ,后来经过排查发现 ScheduledThreadPoolExecutor 线程池处理任务如果抛出异常,会导致线程池不调度;下面就通过一个例子简单分析下为什么异常会导致 ScheduledThreadPoolExecutor 不执行。

ScheduledThreadPoolExecutor不调度分析

示例程序

在示例程序可以看到当计数器中的计数达到5的时候就会主动抛出一个异常,抛出异常后 ScheduledThreadPoolExecutor 就不调度了。

public class ScheduledTask {
    private static final AtomicInteger count = new AtomicInteger(0);
    private static final ScheduledThreadPoolExecutor SCHEDULED_TASK = new ScheduledThreadPoolExecutor(
            1, new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(Thread.currentThread().getThreadGroup(), r, "sc-task");
            t.setDaemon(true);
            return t;
        }
    });
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);
        SCHEDULED_TASK.scheduleWithFixedDelay(() -> {
            System.out.println(111);
            if (count.get() == 5) {
                throw new IllegalArgumentException("my exception");
            }
            count.incrementAndGet();
        }, 0, 5, TimeUnit.SECONDS);
        latch.await();
    }
}

源码分析

  • ScheduledThreadPoolExecutor#run

run方法内部首先判断任务是不是周期性的任务,如果不是周期性任务通过 ScheduledFutureTask.super.run(); 执行任务;如果状态是运行中或shutdown,取消任务执行;如果是周期性的任务,通过 ScheduledFutureTask.super.runAndReset() 执行任务并且重新设置状态,成功了就会执行 setNextRunTime 设置下次调度的时间,问题就是出现在 ScheduledFutureTask.super.runAndReset() ,这里执行任务出现了异常,导致结果为false,就不进行下次调度时间设置等

        public void run() {
            boolean periodic = isPeriodic();
            if (!canRunInCurrentRunState(periodic))
                cancel(false);
            else if (!periodic)
                ScheduledFutureTask.super.run();
            else if (ScheduledFutureTask.super.runAndReset()) {
                setNextRunTime();
                reExecutePeriodic(outerTask);
            }
        }
  • *FutureTask#runAndReset

在线程任务执行过程中抛出异常,然后 catch 到了异常,最终导致这个方法返回false,然后 ScheduledThreadPoolExecutor#run 就不设置下次执行时间了,代码 c.call(); 抛出异常,跳过 ran = true; 代码,最终 runAndReset 返回false。

    protected boolean runAndReset() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return false;
        boolean ran = false;
        int s = state;
        try {
            Callable<V> c = callable;
            if (c != null && s == NEW) {
                try {
                    c.call(); // don't set result
                    ran = true;
                } catch (Throwable ex) {
                    setException(ex);
                }
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
        return ran && s == NEW;
    }

注意:

Java ScheduledThreadPoolExecutor 定时任务线程池所调度的任务中如果抛出了异常,并且异常没有捕获直接抛到框架中,会导致 ScheduledThreadPoolExecutor 定时任务不调度了,具体是因为当异常抛到 ScheduledThreadPoolExecutor 框架中时不进行下次调度时间的设置,从而导致 ScheduledThreadPoolExecutor 定时任务不调度。

ScheduledThreadPoolExecutor线程池例子

ScheduledThreadPoolExecutor 使用

ScheduledThreadPoolExecutor 继承自 ThreadPoolExecutor,主要用来给定时间运行任务,或者定期执行任务。

在 ScheduledThreadPoolExecutor 的实现中,使用了 FutureTask 运行任务以及使用无界队列 DelayedWorkQueue 来保存任务。

1. 使用示例

  • 提交任务

ScheduledThreadPoolExecutor 实现了 ScheduledExecutorService 接口,其中,接口中有四个需要实现的方法,其中 schedule() 的两个方法需要设置任务以及任务启动的延迟时间,scheduleAtFixedRate() 可以设置任务定时重复执行,scheduleWithFixedDelay() 则是设置两个任务之间的执行延迟时间。

ScheduledThreadPoolExecutor poolExecutor = new ScheduledThreadPoolExecutor(2);  
poolExecutor.schedule(() -> {  
    // 提交的任务
}, 5, TimeUnit.HOURS);  
poolExecutor.scheduleAtFixedRate(() -> {  
    // 提交的任务
}, 0, 5, TimeUnit.HOURS);  
poolExecutor.scheduleWithFixedDelay(() -> {  
    // 提交的任务
}, 0, 5, TimeUnit.HOURS);
  • 简单例子

下面的例子中每 500 毫秒打印一次字符串,executor 会有 5 秒的时间来等待任务执行结束,也就是一共可以打印 10 次字符串。

ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(10);  
executor.scheduleWithFixedDelay(() -> {  
    System.out.println("测试");  
}, 0, 500, TimeUnit.MILLISECONDS);  
try {  
    executor.awaitTermination(5, TimeUnit.SECONDS);  
    executor.shutdown();  
} catch (InterruptedException e) {  
    e.printStackTrace();  
}

ScheduledThreadPoolExecutor 原理

1. DelayedWorkQueue

ScheduledThreadPoolExecutor 的构造方法对 DelayedWorkQueue 进行了初始化,并且最大线程数量设置成了 Integer.MAX_VALUE。

public ScheduledThreadPoolExecutor(int corePoolSize) {  
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,  
          new DelayedWorkQueue());  
}

其中,DelayedWorkQueue 中的队列是 RunnableScheduledFuture 类型以及容量为 16 的数组。

private static final int INITIAL_CAPACITY = 16;  
private RunnableScheduledFuture<?>[] queue =  
    new RunnableScheduledFuture<?>[INITIAL_CAPACITY];  
private final ReentrantLock lock = new ReentrantLock();

队列添加任务是以 DelayedWorkQueue 以堆作为数据结构存储任务,在添加元素的时候,会使用基于 Siftup 版本进行元素添加,并且会根据任务的执行时间的大小来排序。

public boolean offer(Runnable x) {  
    if (x == null)  
        throw new NullPointerException();
    RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;  
    final ReentrantLock lock = this.lock;  
    lock.lock();
    try {
        // 当队列的容量不够,会扩充 50%
        int i = size;
        if (i >= queue.length)
            grow();
        size = i + 1;
        if (i == 0) {
            queue[0] = e;
            setIndex(e, 0);
        } else {  
            siftUp(i, e);  
        }  
        if (queue[0] == e) {  
            leader = null;  
            available.signal();  
        }  
    } finally {  
        lock.unlock();  
    }  
    return true;  
}

2. delayedExecute()

ScheduledExecutorService 接口的四个实现方法中都涉及到了 delayedExecute(),方法主要用来判断线程池的状态以及对线程进行初始化。

private void delayedExecute(RunnableScheduledFuture<?> task) {
    // 如果线程池关闭了,需要执行饱和策略
    if (isShutdown())  
        reject(task);  
    else {
        // 添加到队列中
        super.getQueue().add(task);  
        if (isShutdown() &&  
            !canRunInCurrentRunState(task.isPeriodic()) &&  
            remove(task))  
            task.cancel(false);  
        else
            // 判断等待队列中是否已经满了,会使用到 ThreadPoolExecutor
            ensurePrestart();  
    }  
}
void ensurePrestart() {  
    int wc = workerCountOf(ctl.get());  
    if (wc < corePoolSize)  
        addWorker(null, true);  
    else if (wc == 0)  
        addWorker(null, false);  
}

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

相关文章

  • Java8中方便又实用的Map函数总结

    Java8中方便又实用的Map函数总结

    java8之后,常用的Map接口中添加了一些非常实用的函数,可以大大简化一些特定场景的代码编写,提升代码可读性,快跟随小编一起来看看吧
    2022-11-11
  • SpringBoot下载Excel文件时,报错文件损坏的解决方案

    SpringBoot下载Excel文件时,报错文件损坏的解决方案

    这篇文章主要介绍了SpringBoot下载Excel文件时,报错文件损坏的解决方案,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-06-06
  • 详解Spring Boot应用的启动和停止(start启动)

    详解Spring Boot应用的启动和停止(start启动)

    这篇文章主要介绍了详解Spring Boot应用的启动和停止(start启动),小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2018-12-12
  • 如何在Spring Boot微服务使用ValueOperations操作Redis集群String字符串

    如何在Spring Boot微服务使用ValueOperations操作Redis集群String字符串

    这篇文章主要介绍了在Spring Boot微服务使用ValueOperations操作Redis集群String字符串类型数据,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2023-06-06
  • 详谈jvm--Java中init和clinit的区别

    详谈jvm--Java中init和clinit的区别

    下面小编就为大家带来一篇详谈jvm--Java中init和clinit的区别。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2017-10-10
  • Java轻松使用工具类实现获取MP3音频时长

    Java轻松使用工具类实现获取MP3音频时长

    在Java中,工具类定义了一组公共方法,这篇文章将介绍Java中使用工具类来获取一个MP3音频文件的时间长度,感兴趣的同学继续往下阅读吧
    2021-10-10
  • 实现一个简单Dubbo完整过程详解

    实现一个简单Dubbo完整过程详解

    这篇文章主要为大家介绍了实现一个简单Dubbo完整过程详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-07-07
  • MyBatis Plus实现中文排序的两种有效方法

    MyBatis Plus实现中文排序的两种有效方法

    在MyBatis Plus项目开发中,针对中文数据的排序需求是一个常见的挑战,尤其是在需要按照拼音或特定语言逻辑排序时,本文整合了两种有效的方法,旨在帮助开发者克服MyBatis Plus在处理中文排序时遇到的障碍,需要的朋友可以参考下
    2024-08-08
  • Java字符串操作全解析之语法、示例与应用场景分析

    Java字符串操作全解析之语法、示例与应用场景分析

    在Java算法题和日常开发中,字符串处理是必备的核心技能,本文全面梳理Java中字符串的常用操作语法,结合代码示例、应用场景和避坑指南,可快速掌握字符串处理技巧,轻松应对笔试面试高频题目,感兴趣的朋友一起看看吧
    2025-04-04
  • java实现文件读写与压缩实例

    java实现文件读写与压缩实例

    这篇文章主要介绍了java实现文件读写与压缩实例,有助于读者加深对文件操作的理解,需要的朋友可以参考下
    2014-07-07

最新评论