java线程池ExecutorService超时处理小结

 更新时间:2024年09月26日 10:23:04   作者:小百菜  
使用ExecutorService时,设置子线程执行超时是一个常见需求,本文就来详细的介绍一下ExecutorService超时的三种方法,感兴趣的可以了解一下

场景问题:使用线程池ExecutorService,想设置每个子线程的执行超时时间,使用future.get()来监听超时,当有子线程阻塞时,导致有的队列任务还未执行就被取消了。

方式一、使用 future.get() 来监听超时取消

这种办法看似能解决问题,但是当任务累积处理不过来时,会漏执行。
比如下面的例子,就实际只会执行一个子线程。

package com.study;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.*;

public class Test {
    private static final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
    private static final ExecutorService threadPool = Executors.newFixedThreadPool(1);

    public static void main(String[] args) throws Exception {
        for (int i = 0; i < 10; i++) {
            Future<?> future = threadPool.submit(new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println(LocalDateTime.now().format(formatter));
                        Thread.sleep(5000);
                    } catch (InterruptedException e) {
                        // e.printStackTrace();
                    }
                }
            });
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        future.get(3, TimeUnit.SECONDS);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (ExecutionException e) {
                        e.printStackTrace();
                    } catch (TimeoutException e) {//超时异常
                        future.cancel(true); // 超时后取消任务
                    }
                }
            }).start();
        }
    }
}

方式二、在子线程内部,超时后去发送中断信号

package com.study;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.*;

public class Test {
    private static final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
    private static final ExecutorService threadPool = Executors.newFixedThreadPool(1);
    private static final ScheduledExecutorService timeoutExecutor = new ScheduledThreadPoolExecutor(1);//监听超时,这个数量要和线程池数量相同

    public static void main(String[] args) throws Exception {
        for (int i = 0; i < 10; i++) {
            // int delay = 3;
            int delay = i + 1;
            threadPool.submit(new Runnable() {
                @Override
                public void run() {
                    ScheduledFuture<?> schedule = null;
                    try {
                        Thread thread = Thread.currentThread();
                        // 启动一个定时器,如果任务执行超过3秒则中断当前线程
                        schedule = timeoutExecutor.schedule(() -> {
                            thread.interrupt(); // 中断当前正在执行的任务
                        }, delay, TimeUnit.SECONDS);
                        System.out.println(LocalDateTime.now().format(formatter));
                        Thread.sleep(5000);
                        // FileOutputStream fos = new FileOutputStream("d:/test.txt" + k);
                        // for (int j = 0; j < 1000000; j++) {
                        //     fos.write("123".getBytes());
                        // }
                        // fos.close();
                    } catch (InterruptedException e) {
                        // e.printStackTrace();
                    } finally {
                        if (schedule != null) {
                            //取消任务
                            schedule.cancel(true);
                        }
                    }
                }
            });
        }
    }
}

这里其实还是有问题 ,把 Thread.sleep(5000);改成注释的io阻塞,还是要等线程执行结束后才会取消线程执行。

所以单纯使用  future 是实现不了这个场景的逻辑的。

timeoutExecutor 数量和 线程池数量要一致的原因如下示例。

package com.study;

import java.time.LocalDateTime;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ScheduledExecutorServiceExample {

    private static final ScheduledExecutorService timeoutExecutor = new ScheduledThreadPoolExecutor(2);

    public static void main(String[] args) throws InterruptedException {
        // 调用schedule方法两次
        scheduleTask("Task 1");
        scheduleTask("Task 2");
        scheduleTask("Task 3");
    }

    private static void scheduleTask(String taskName) {
        timeoutExecutor.schedule(() -> {
            System.out.println(taskName + " started at: " + LocalDateTime.now());
            try {
                // 模拟任务执行
                Thread.sleep(2000); // 假设每个任务执行2秒
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }, 3, TimeUnit.SECONDS);
    }
}

方式三、自己定义锁来实现

package com.study;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.*;

public class Test {
    private static final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
    private static final ExecutorService threadPool = Executors.newFixedThreadPool(16);//这里不能设置为1了,这里已经不是用来控制并发数量了,只是为了重复利用线程
    private static final ScheduledExecutorService timeoutExecutor = new ScheduledThreadPoolExecutor(1);//监听超时,这个数量要和线程池数量相同

    public static void main(String[] args) throws Exception {
        for (int i = 0; i < 10; i++) {
            Thread.sleep(50);
            // int delay = 3;
            int delay = i + 1;
            int k = i;
            threadPool.submit(new Runnable() {
                @Override
                public void run() {
                    ScheduledFuture<?> schedule = null;
                    try {
                        ThreadPool.awaitThread();
                        // 启动一个定时器,如果任务执行超过3秒则中断当前线程
                        // timeoutExecutor如果只有一个线程池,这里面的代码片段会阻塞,上一个线程在这里的代码片段执行完后,当前线程才会执行这里的代码片段,
                        // 但是影响不大,因为这里的代码片段只是释放动作,一瞬间就会执行完,所以影响不大,
                        // 如果其他场景这里阻塞时间比较久,那么timeoutExecutor线程大小要和threadPool线程大小一致。
                        schedule = timeoutExecutor.schedule(() -> {
                            System.out.println("释放1");
                            ThreadPool.releaseThread();
                        }, delay, TimeUnit.SECONDS);
                        System.out.println("【" + Thread.currentThread().getName() + "】" + LocalDateTime.now().format(formatter));
                        Thread.sleep(5000);
                        // FileOutputStream fos = new FileOutputStream("d:/test.txt" + k);
                        // for (int j = 0; j < 1000000; j++) {
                        //     fos.write("123".getBytes());
                        // }
                        // fos.close();
                    } catch (Exception e) {
                        System.out.println("异常");
                    } finally {
                        if (schedule != null) {
                            //cancel返回true任务还未执行,需要取消任务
                            if (schedule.cancel(true)) {
                                System.out.println("释放2");
                                ThreadPool.releaseThread();
                            }
                        }
                    }
                }
            });
        }
    }
}
package com.study;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 线程池
 */
public class ThreadPool {
    private static final int MAX_POOL_SIZE = 1; // 最大线程数,控制并发数量
    private static int totalThread = 0; // 总线程数
    private static final Lock lock = new ReentrantLock(true);
    private static final Condition notice = lock.newCondition();

    /**
     * 从线程池获取线程
     */
    public static boolean awaitThread() {
        lock.lock();
        try {
            // 尝试从线程池中获取线程
            if (totalThread < MAX_POOL_SIZE) {
                totalThread++;
                return true;
            }
            // 线程已到达最大线程数,等待归还线程,最长等待1小时,await()会释放当前线程的锁
            if (notice.await(1, TimeUnit.HOURS)) {
                totalThread++;
                return true;
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
        return false;
    }


    /**
     * 释放线程到线程池
     */
    public static void releaseThread() {
        lock.lock();
        try {
            totalThread--;
            // 通知有空闲,signal()会唤醒其中一个await()线程
            notice.signal();
        } finally {
            lock.unlock();
        }
    }

}

到此这篇关于java线程池ExecutorService超时处理小结的文章就介绍到这了,更多相关java线程池ExecutorService超时内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家! 

相关文章

  • Intellij idea下使用不同tomcat编译maven项目的服务器路径方法详解

    Intellij idea下使用不同tomcat编译maven项目的服务器路径方法详解

    今天小编就为大家分享一篇关于Intellij idea下使用不同tomcat编译maven项目的服务器路径方法详解,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看看吧
    2019-02-02
  • JAVA中Collections.sort()方法使用详解

    JAVA中Collections.sort()方法使用详解

    这篇文章主要给大家介绍了关于JAVA中Collections.sort()方法使用的相关资料,Java中Collections.sort()方法是用来对List类型进行排序的,文中通过代码将使用的方法介绍的非常详细,需要的朋友可以参考下
    2024-05-05
  • Java多线程编程之读写锁ReadWriteLock用法实例

    Java多线程编程之读写锁ReadWriteLock用法实例

    这篇文章主要介绍了Java多线程编程之读写锁ReadWriteLock用法实例,本文直接给出编码实例,需要的朋友可以参考下
    2015-05-05
  • @RequestBody注解的原理及使用技巧分享

    @RequestBody注解的原理及使用技巧分享

    这篇文章主要介绍了@RequestBody注解的原理及使用技巧分享,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-10-10
  • 三张图彻底了解Java中字符串的不变性

    三张图彻底了解Java中字符串的不变性

    这篇文章主要通过三张图彻底帮助大家了解Java中字符串的不变性,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2017-06-06
  • Spring Data Redis中Hash结构操作技巧与最佳实践

    Spring Data Redis中Hash结构操作技巧与最佳实践

    本文全面掌握Spring Data Redis中Hash结构的操作技巧与最佳实践,包括节省内存、支持单字段更新和查询、原子性操作等,通过SpringDataRedis的opsForHash(),可以方便地进行Hash结构的操作,感兴趣的朋友跟随小编一起看看吧
    2026-01-01
  • SpringBoot整合Kotlin构建Web服务的方法示例

    SpringBoot整合Kotlin构建Web服务的方法示例

    这篇文章主要介绍了SpringBoot整合Kotlin构建Web服务的方法示例,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2019-02-02
  • 浅谈MyBatis 如何执行一条 SQL语句

    浅谈MyBatis 如何执行一条 SQL语句

    Mybatis 是 Java 开发中比较常用的 ORM 框架。在日常工作中,我们都是直接通过 Spring Boot 自动配置,并直接使用,但是却不知道 Mybatis 是如何执行一条 SQL 语句的,下面就一起讲解一下
    2021-05-05
  • 使用springboot activiti关闭验证自动部署方式

    使用springboot activiti关闭验证自动部署方式

    这篇文章主要介绍了使用springboot activiti关闭验证自动部署方式,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-09-09
  • mybatis like模糊查询特殊字符报错转义处理方式

    mybatis like模糊查询特殊字符报错转义处理方式

    这篇文章主要介绍了mybatis like模糊查询特殊字符报错转义处理方式,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-01-01

最新评论