Java多线程ThreadForge的实现

 更新时间:2026年05月21日 08:18:12   作者:一只叫煤球的猫  
本文主要介绍了Java多线程ThreadForge的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

从场景切入

产品说:「用户详情页太慢了,能不能优化一下?」

你一看代码,三个接口串行调用:先查用户信息,再查订单列表,最后查积分余额。每个接口 200ms,加起来 600ms。

「简单,改成并发调用就行。」你心想。

于是你创建了一个线程池,用 Future 提交了三个任务。

写完提测,QA 说偶尔会超时。

你加了个 future.get(500, MILLISECONDS)

又过了几天,测试环境出现了线程泄漏,你赶紧补了个 finally { executor.shutdown() }

上线前,tech lead 问:「如果用户服务挂了,另外两个任务会取消吗?」你愣了一下,又加了一堆 cancel 逻辑和异常处理。

这时候你发现,一个简单的「并发调用三个接口」,代码已经写了 50 多行。

并且下次遇到类似场景,还得把这些逻辑再写一遍:超时、取消、异常传播、资源清理……每次都要重新思考一遍边界条件。

传统的 ExecutorServiceFutureCompletableFuture 确实非常强大,但也足够啰嗦:

  • 线程池要手动创建和关闭
  • 超时逻辑每个任务都要写一遍
  • 失败了要不要取消其他任务?得自己判断
  • 异常怎么传播?要么吞掉,要么手动包装
  • 想知道任务跑了多久?自己打日志

某一天,我猛然惊醒:写并发代码,不应该这么费脑子。

ThreadForge:把复杂度收敛到一个可推理的模型里

ThreadForge 的设计哲学很简单:先降低认知成本,再追求性能。

可以把它理解成一个结构化并发框架——让你用写同步代码的思维写并发代码,同时自动处理那些容易遗漏的边界情况。

也可以把它理解成对于 Java 内置并发工具的二次包装,目标是让Java并发更简单、更清晰。

什么是结构化?

看一个最简单的例子:

try (ThreadScope scope = ThreadScope.open()) {
    Task<String> user = scope.submit("load-user", () -> fetchUser());
    Task<Integer> orders = scope.submit("load-orders", () -> fetchOrders());
    
    scope.await(user, orders);
    
    // 到这里,两个任务肯定都结束了(成功、失败或超时)
    String result = user.await() + ":" + orders.await();
}
// scope 关闭时,所有任务自动取消、资源自动清理

这段代码有几个关键点:

  1. 所有任务都绑定在 ThreadScope,生命周期有边界,不会泄漏
  2. 默认就是安全的:默认超时、默认失败传播、自动取消
  3. 代码结构就是任务关系:读代码的人一眼就能看出两个任务是并发的,且必须都完成才能继续

对比传统写法,你需要:

  • 创建线程池,配置核心线程数、队列大小
  • 提交任务,手动处理 Future
  • 写 try-finally 确保 shutdown
  • 手动处理超时和异常传播

ThreadForge 让你省掉这些重复劳动,专注业务逻辑。

五个让你省脑力的设计

1. 默认行为就是正确的

// 默认:FAIL_FAST + 30秒超时 + 自动取消其他任务
try (ThreadScope scope = ThreadScope.open()) {
    Task<Integer> a = scope.submit(() -> riskyRpc());
    Task<Integer> b = scope.submit(() -> anotherRpc());
    scope.await(a, b);
} catch (ScopeTimeoutException timeout) {
    // 超时了,所有任务已被自动取消
    fallback();
} catch (FailurePropagationException failed) {
    // 某个任务失败了,其他任务已被自动取消
    handleError(failed);
}

不需要配置,不需要思考,开箱即用。

2. 失败策略明确且统一

不同场景对失败的容忍度不同,ThreadForge 提供了 5 种明确的策略:

  • FAIL_FAST:快速失败,立即取消其他任务(默认)
  • COLLECT_ALL:等所有任务结束,汇总所有失败
  • SUPERVISOR:不自动取消,失败信息收集到 Outcome
  • CANCEL_OTHERS:失败后取消其余任务,但不抛异常
  • IGNORE_ALL:忽略失败,只返回成功的结果
// 场景:批量导入,即使部分失败也要知道哪些成功了
try (ThreadScope scope = ThreadScope.open()
        .withFailurePolicy(FailurePolicy.SUPERVISOR)) {
    
    List<Task<Void>> tasks = ids.stream()
        .map(id -> scope.submit(() -> importData(id)))
        .collect(toList());
    
    Outcome outcome = scope.await(tasks);
    
    // 明确知道哪些成功、哪些失败
    log.info("成功: {}, 失败: {}", 
        outcome.successCount(), outcome.failureCount());
}

3. 并发度控制不再需要手动管理队列

// 场景:调用外部 API,最多同时50个请求
try (ThreadScope scope = ThreadScope.open()
        .withConcurrencyLimit(50)) {
    
    List<Task<Result>> tasks = hugeIdList.stream()
        .map(id -> scope.submit(() -> externalApi.call(id)))
        .collect(toList());
    
    List<Result> results = scope.awaitAll(tasks);
}
// 自动限流,不会把外部服务打爆

不需要自己写信号量,不需要手动分批,框架自动处理。

4. 生命周期观测统一收口

ThreadScope scope = ThreadScope.open()
    .withHook(new ThreadHook() {
        @Override
        public void onStart(TaskInfo info) {
            metrics.taskStarted(info.name());
        }
        
        @Override
        public void onSuccess(TaskInfo info, Duration duration) {
            metrics.taskSuccess(info.name(), duration.toMillis());
        }
        
        @Override
        public void onFailure(TaskInfo info, Throwable error, Duration duration) {
            log.error("Task {} failed after {}", info.name(), duration, error);
            metrics.taskFailed(info.name());
        }
    });

一处埋点,全局生效。

不需要在每个任务里重复写日志和监控代码。

5. 跨 JDK 版本的一致体验

// 同一套 API
try (ThreadScope scope = ThreadScope.open()) {
    // JDK 21+: 自动使用虚拟线程
    // JDK 8-20: 自动降级到线程池
    Task<String> task = scope.submit(() -> longRunningTask());
    return task.await();
}

不需要分叉代码,不需要写 if-else,框架自动适配。

适用场景

ThreadForge 特别适合这些场景:

并发 RPC 聚合

try (ThreadScope scope = ThreadScope.open()) {
    Task<User> user = scope.submit(() -> userService.get(uid));
    Task<List<Order>> orders = scope.submit(() -> orderService.list(uid));
    Task<Profile> profile = scope.submit(() -> profileService.get(uid));
    
    scope.await(user, orders, profile);
    
    return buildResponse(user.await(), orders.await(), profile.await());
}

批量数据处理

try (ThreadScope scope = ThreadScope.open()
        .withConcurrencyLimit(100)
        .withDeadline(Duration.ofMinutes(5))) {
    
    List<Task<Void>> tasks = records.stream()
        .map(r -> scope.submit(() -> process(r)))
        .collect(toList());
    
    scope.awaitAll(tasks);
}

生产者-消费者模式

try (ThreadScope scope = ThreadScope.open()) {
    Channel<Data> channel = Channel.bounded(1000);
    
    scope.submit(() -> {
        for (Data d : datasource) {
            channel.send(d);
        }
        channel.close();
        return null;
    });
    
    List<Task<Void>> consumers = IntStream.range(0, 4)
        .mapToObj(i -> scope.submit(() -> {
            for (Data d : channel) {
                process(d);
            }
            return null;
        }))
        .collect(toList());
    
    scope.awaitAll(consumers);
}

开始使用

Maven:

<dependency>
    <groupId>pub.lighting</groupId>
    <artifactId>threadforge-core</artifactId>
    <version>1.0.1</version>
</dependency>

Gradle:

implementation("pub.lighting:threadforge-core:1.0.1")

最小示例:

try (ThreadScope scope = ThreadScope.open()) {
    Task<String> task = scope.submit(() -> "Hello, ThreadForge");
    System.out.println(task.await());
}

写在最后

ThreadForge 的目标不是取代所有并发工具,而是让 80% 的常见场景变得简单、安全、可维护。

当你还在调试并发问题时,当新人看不懂老代码里的线程逻辑时,当你想加个超时却不知道从哪儿改起时——不妨试试 ThreadForge。

让并发回归简单,让代码重新可读。

到此这篇关于Java多线程ThreadForge的实现的文章就介绍到这了,更多相关Java ThreadForge内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • SpringBoot+WebMagic实现网页爬虫的示例代码

    SpringBoot+WebMagic实现网页爬虫的示例代码

    本文是对spring boot+WebMagic+MyBatis做了整合,使用WebMagic爬取数据,然后通过MyBatis持久化爬取的数据到mysql数据库,具有一定的参考价值,感兴趣的可以了解一下
    2023-10-10
  • Mybatis Plus 3.4.0分页拦截器的用法小结

    Mybatis Plus 3.4.0分页拦截器的用法小结

    本文主要介绍了Mybatis Plus 3.4.0分页拦截器的用法小结,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2025-03-03
  • java程序运行时内存分配详解

    java程序运行时内存分配详解

    这篇文章主要介绍了java程序运行时内存分配详解 ,需要的朋友可以参考下
    2016-07-07
  • SpringBoot+MyBatis Plus实现用户登录认证

    SpringBoot+MyBatis Plus实现用户登录认证

    本文介绍了使用Spring Boot和MyBatis Plus实现用户登录认证功能的完整流程,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2025-09-09
  • Java高效读取CSV文件的多种方法与分步实例

    Java高效读取CSV文件的多种方法与分步实例

    在当今数据驱动的世界中,CSV文件作为一种轻量级,通用的数据交换格式,本文将深入探讨多种Java读取CSV文件的方法,帮助开发者在不同场景下选择最合适的解决方案,希望对大家有所帮助
    2025-10-10
  • Java中Quartz高可用定时任务快速入门

    Java中Quartz高可用定时任务快速入门

    如果你想做定时任务,有高可用方面的需求,或者仅仅想入门快,上手简单,那么选用它准没错,感兴趣的小伙伴们可以参考一下
    2022-04-04
  • Java将数字金额转为大写中文金额

    Java将数字金额转为大写中文金额

    这篇文章主要为大家详细介绍了Java将数字金额转为大写中文金额,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2019-08-08
  • Java Elastic-Job分布式定时任务使用方法介绍

    Java Elastic-Job分布式定时任务使用方法介绍

    xxl-job 通过一个中心式的调度平台,调度多个执行器执行任务,调度中心通过 DB 锁保证集群分布式调度的一致性,这样扩展执行器会增大 DB 的压力,然而大部分公司的任务数,执行器并不多;xxl-job 提供了非常好用的监控页面甚至还有任务失败的邮件告警功能
    2023-01-01
  • Springboot如何使用YML文件配置多环境

    Springboot如何使用YML文件配置多环境

    这篇文章主要介绍了Springboot如何使用YML文件配置多环境问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2024-01-01
  • Java服务端服务监控:Prometheus与Spring Boot Actuator的集成方式

    Java服务端服务监控:Prometheus与Spring Boot Actuator的集成方式

    本文介绍了如何将Prometheus与SpringBootActuator集成,实现对Java服务端应用的监控,通过集成,可以利用Prometheus的强大监控能力,及时发现和解决性能问题
    2024-12-12

最新评论