Java线程池拒绝策略原理及任务不丢失方案总结(最近实践)

 更新时间:2025年10月23日 10:45:22   作者:程序员1970  
文章主要讨论Java线程池(ThreadPoolExecutor)的拒绝策略及其适用场景,并提出确保任务不丢失的解决方案,本文结合实例代码给大家介绍的非常详细,感兴趣的朋友跟随小编一起看看吧

一、线程池拒绝策略的核心机制

Java线程池(ThreadPoolExecutor)的拒绝策略在以下条件下触发:

  1. 线程池已满
    • 活跃线程数 ≥ maximumPoolSize
    • 任务队列(workQueue)已满(若为有界队列)。
  2. 线程池关闭
    • 调用shutdown()后提交新任务。

触发流程

  1. 提交任务时,线程池通过execute()方法检查状态。
  2. 若线程池无法接受任务(如满载或关闭),调用RejectedExecutionHandler.rejectedExecution()

二、四种内置拒绝策略及适用场景

策略行为适用场景风险
AbortPolicy抛出RejectedExecutionException关键任务(如支付)调用方需处理异常
CallerRunsPolicy由提交任务的线程直接执行任务非关键但需保证执行(如日志上报)可能阻塞调用线程
DiscardPolicy静默丢弃任务可丢失任务(如监控数据)任务丢失风险
DiscardOldestPolicy丢弃队列中最旧任务,重试提交新任务实时性要求高(如股票行情)可能丢失重要任务

三、确保任务不丢失的解决方案

1. 自定义拒绝策略 + 持久化存储

核心思想:将拒绝的任务保存到外部存储(数据库/消息队列/文件),后续通过重试机制恢复执行。

实现步骤

定义持久化任务实体

@Data
public class PersistedTask {
    private String id;
    private String taskData; // 序列化后的任务
    private int retryCount;
    private LocalDateTime createTime;
}

自定义拒绝策略

public class PersistenceRejectPolicy implements RejectedExecutionHandler {
    private final TaskRepository taskRepository;
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        try {
            String taskData = serializeTask(r);
            taskRepository.save(new PersistedTask(UUID.randomUUID().toString(), taskData, 0));
        } catch (Exception e) {
            log.error("持久化任务失败", e);
        }
    }
    private String serializeTask(Runnable r) {
        return new Gson().toJson(r);
    }
}

定时任务重试

@Scheduled(fixedRate = 5000)
public void retryRejectedTasks() {
    List<PersistedTask> tasks = taskRepository.findPendingTasks();
    for (PersistedTask task : tasks) {
        try {
            Runnable r = deserializeTask(task.getTaskData());
            executor.execute(r);
            taskRepository.markAsCompleted(task.getId());
        } catch (Exception e) {
            if (task.getRetryCount() >= 3) {
                taskRepository.markAsFailed(task.getId());
            } else {
                taskRepository.incrementRetry(task.getId());
            }
        }
    }
}

2. 结合消息队列的异步处理

优势:解耦任务提交与执行,利用消息队列的持久化能力。

实现

public class MqRejectPolicy implements RejectedExecutionHandler {
    private final KafkaTemplate<String, String> kafkaTemplate;
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        kafkaTemplate.send("rejected-tasks", serializeTask(r));
    }
}

3. 线程池参数优化

  • 队列选择
    • 有界队列(如ArrayBlockingQueue):需合理设置容量(例如queueCapacity = maxThreads * 2)。
    • 优先级队列PriorityBlockingQueue):适合需要排序的任务。
  • 线程数配置
    • CPU密集型:corePoolSize = CPU核心数
    • IO密集型:corePoolSize = 2 * CPU核心数

4. 优雅关闭与任务完整性

public void shutdownGracefully(ThreadPoolExecutor executor) {
    executor.shutdown(); // 拒绝新任务
    try {
        if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
            List<Runnable> pendingTasks = executor.shutdownNow(); // 尝试停止正在执行的任务
            savePendingTasks(pendingTasks); // 持久化未执行任务
        }
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
}

四、关键场景实践

场景1:高并发订单处理

  • 策略AbortPolicy + 数据库持久化 + 熔断机制。
  • 配置
ThreadPoolExecutor executor = new ThreadPoolExecutor(
    50, 200, 60, TimeUnit.SECONDS,
    new ArrayBlockingQueue<>(1000),
    new PersistenceRejectPolicy(taskRepository)
);
  • 降级:当数据库连接池满时,触发熔断直接丢弃非关键订单。

场景2:实时数据分析

  • 策略DiscardOldestPolicy + Kafka缓冲。
  • 配置
ThreadPoolExecutor executor = new ThreadPoolExecutor(
    10, 50, 30, TimeUnit.SECONDS,
    new PriorityBlockingQueue<>(100),
    new MqRejectPolicy(kafkaTemplate)
);

五、风险与优化建议

  1. 持久化性能瓶颈
    • 解决方案:批量插入数据库,或使用Redis List暂存任务。
  2. 任务重复执行
    • 解决方案:为任务添加唯一ID,执行前检查是否已处理。
  3. 内存泄漏
    • 解决方案:定期清理EmergencyQueue中的积压任务。
  4. 监控缺失
    • 解决方案:通过Micrometer暴露以下指标:
      • threadpool.rejected.count:拒绝任务数。
      • threadpool.queue.size:队列堆积情况。

总结

确保任务不丢失的核心在于:

  1. 拒绝策略选择:根据业务容忍度选择内置策略或自定义持久化方案。
  2. 持久化设计:结合数据库/消息队列存储拒绝任务。
  3. 重试机制:通过定时任务或消费者恢复任务。
  4. 系统优化:合理配置线程池参数,配合监控与降级策略。

最佳实践:在金融、电商等关键系统中,推荐自定义持久化策略 + 数据库/Kafka + 重试机制;在日志上报等非关键场景,可使用CallerRunsPolicy + 本地缓存平衡性能与可靠性。

到此这篇关于Java线程池拒绝策略原理及任务不丢失方案总结(最近实践)的文章就介绍到这了,更多相关java线程池拒绝策略内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Eureka源码阅读Client启动入口注册续约及定时任务

    Eureka源码阅读Client启动入口注册续约及定时任务

    这篇文章主要为大家介绍了Eureka源码阅读Client启动入口注册续约及定时任务示例详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2022-10-10
  • Linux centos7环境下jdk安装教程

    Linux centos7环境下jdk安装教程

    这篇文章主要为大家详细介绍了Linux centos7环境下jdk的安装教程,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2017-03-03
  • 基于Jackson实现API接口数据脱敏的示例详解

    基于Jackson实现API接口数据脱敏的示例详解

    用户的一些敏感数据,例如手机号、邮箱、身份证等信息,在数据库以明文存储,但在接口返回数据给浏览器(或三方客户端)时,希望对这些敏感数据进行脱敏,所以本文就给大家介绍以恶如何利用Jackson实现API接口数据脱敏,需要的朋友可以参考下
    2023-08-08
  • IDEA中的Run/Debug Configurations各项解读

    IDEA中的Run/Debug Configurations各项解读

    这篇文章主要介绍了IDEA中的Run/Debug Configurations各项解读,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2023-09-09
  • Java中时间API的基本使用教程

    Java中时间API的基本使用教程

    这篇文章主要介绍了Java中时间API的基本使用教程,文中通过示例代码介绍的非常详细,对大家学习或者使用Java具有一定的参考学习价值,需要的朋友们下面来一起学习学习吧
    2019-09-09
  • java怎么连接并访问activemq

    java怎么连接并访问activemq

    这篇文章主要介绍了java怎么连接并访问activemq,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2019-07-07
  • 深入理解Java设计模式之访问者模式

    深入理解Java设计模式之访问者模式

    这篇文章主要介绍了JAVA设计模式之访问者模式的的相关资料,文中示例代码非常详细,供大家参考和学习,感兴趣的朋友可以了解
    2021-11-11
  • Java中单元测试框架JUnit知识点整理

    Java中单元测试框架JUnit知识点整理

    在Java开发中JUnit是最常用的单元测试框架之一,编写JUnit测试的目的是确保代码的正确性、可维护性和可扩展性,这篇文章主要介绍了Java中单元测试框架JUnit知识点整理的相关资料,需要的朋友可以参考下
    2025-08-08
  • Java8的Optional如何干掉空指针(示例详解)

    Java8的Optional如何干掉空指针(示例详解)

    这篇文章主要介绍了Java8的Optional如何干掉空指针,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2021-01-01
  • Springboot项目长时间不进行接口操作,提示HikariPool-1警告的解决

    Springboot项目长时间不进行接口操作,提示HikariPool-1警告的解决

    这篇文章主要介绍了Springboot项目长时间不进行接口操作,提示HikariPool-1警告的解决方案,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2023-12-12

最新评论