Java中流式并行操作parallelStream的原理和使用方法

 更新时间:2025年11月11日 10:35:29   作者:NazonaX  
本文详细介绍了Java中的并行流(parallelStream)的原理、正确使用方法以及在实际业务中的应用案例,并指出在使用并行流时需要注意的线程安全问题和性能问题,并提供了最佳实践建议,感兴趣的朋友跟随小编一起看看吧

Java中流式并行操作parallelStream

0. 问题的产生

某天上线后,发现线上存在一些报错,遂即自己尝试线上操作,但是发现功能正常。追踪相关的报错代码行如下:

PointMissionPO missionPO = missionBO.getDbData();

该行报错为空指针异常,可以从代码中判断唯一能报出空指针异常的位置为missionBO为空,向上追踪该引用:

for (PointMissionBO missionBO : result) {
	// 循环体内容
}

为一个列表List的循环体。于是接着向前追踪引用,发现所有处理该列表引用的地方均使用了stream流式操作。经常使用函数式编程的同学都知道,这很少会出现null对象在列表中。我通体检查了一遍都未发现任何可能产生null对象的插入位置。很奇怪那么这个null对象是如何被加入到列表中的呢?

后来有个小伙伴经过AI上下文分析,给出了可能的位置:

missionByType.entrySet().parallelStream()
                .filter(entry -> !CollectionUtils.isEmpty(entry.getValue()))
                .map(entry -> processMissions(ctmId, hruId, entry.getValue()))
                .forEach(result::addAll);

仔细一看居然使用了parallelStream并行处理,那么什么是parallelStream呢?为什么它会出现错误呢?

1. 什么是parallelStream?

Java 8引入的Stream API提供了两种处理方式:

  • stream():串行处理,按顺序处理元素
  • parallelStream():并行处理,利用多核CPU将数据分割成多个部分并行处理

2. parallelStream的工作原理

parallelStream基于Fork/Join框架实现:

  1. 将数据源分割成多个子任务(fork)
  2. 在不同线程上并行处理这些子任务
  3. 合并结果(join)

3. parallelStream的正确与错误使用示例

错误使用示例(来自我们的测试代码):

List<String> result = new ArrayList<>();
missionByType.entrySet().parallelStream()
        .map(entry -> processMissions(entry.getValue()))
        .forEach(result::addAll);  // 危险操作!

问题分析:

  • ArrayList不是线程安全的集合
  • 多个线程同时调用result::addAll会产生竞态条件
  • 可能导致数据丢失、重复、甚至程序崩溃

正确使用方式一:使用同步块

List<String> result = new ArrayList<>();
missionByType.entrySet().parallelStream()
        .map(entry -> processMissions(entry.getValue()))
        .filter(processedMissions -> !processedMissions.isEmpty())
        .forEach(processedMissions -> {
            synchronized (result) {
                result.addAll(processedMissions);
            }
        });

正确使用方式二:使用收集器(推荐)

List<String> safeResult = missionByType.entrySet().parallelStream()
        .flatMap(entry -> processMissions(entry.getValue()).stream())
        .collect(Collectors.toList());

4. parallelStream在实际业务中的应用

查看我们项目中的实际应用案例:

// PointBusinessServiceImpl.java 中的实际使用
List<PointMissionBO> result = missionByType.entrySet().parallelStream()
        .filter(entry -> !CollectionUtils.isEmpty(entry.getValue()))
        .flatMap(entry -> processMissions(ctmId, hruId, entry.getValue()).stream())
        .collect(Collectors.toList());

这种方式的优点:

  • 使用flatMap展平数据结构
  • 使用collect收集结果,避免线程安全问题
  • 提高了任务处理效率

5. parallelStream适用场景与注意事项

适用场景:

  1. 数据量较大(通常万级以上)
  2. 计算密集型操作
  3. 无状态操作(函数式编程)
  4. 不依赖处理顺序的操作

不适用场景:

  1. 数据量小(并行开销可能超过收益)
  2. IO密集型操作
  3. 有状态共享操作
  4. 需要保证处理顺序的场景

注意事项:

  1. 线程安全:避免在并行流中使用非线程安全的对象
  2. 副作用:避免在流操作中修改外部状态
  3. 性能考量:并行不一定比串行快,需根据实际情况评估
  4. 资源竞争:注意共享资源的访问控制

6. 最佳实践建议

  1. 优先考虑collect:使用收集器而不是直接修改共享集合
  2. 避免副作用:确保流操作是无状态的纯函数
  3. 合理选择数据结构:使用适合并行处理的数据结构
  4. 测试性能:在实际环境中测试并行处理效果
  5. 监控资源使用:关注CPU和内存使用情况

以上就是关于parallelStream并行处理的AI输出内容。下面结合实际问题发生的场景进行推演和论述。

7. 其他思考

上面说了这么多,核心重点在于:parallelStream中使用线程不安全的对象操作时会出现异常。那么具体是什么样的异常呢?
其实我们仔细想想非内存安全List的内存管理,熟悉八股文的同学应该一下子就懂了,没错就是内存扩展机制。
当一个非内存安全的List在触发到内存扩展阈值的时候,就会触发一次内存扩展。具体原理就是开辟一个更长的(通常是2倍)列表,然后将原数据赋值到新列表中。这个过程中,如果出现了并行开辟的情况,那赋值的内容以及目标列表就会变得混乱,出现null对象也并非不可能。因此当尝试指定初始列表大小为256的情况下,第0章的错误就自然消失了。

那么也就出现一些关于Collection的使用建议:

  1. 初始化List时,最好能够有一个预估的列表大小并指定,其实所有Collection对象都可以这么做。
  2. 但凡出现非线程安全的Collection对象需要参与并行计算时,都需要注意它的数据正确性应当如何保证,如果无法自行控制,或者控制有难度的,可以考虑使用Concurrent包中的内容。

到此这篇关于Java中流式并行操作parallelStream的原理和使用方法的文章就介绍到这了,更多相关java并行parallelStream内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Java中List转Map的几种常见方式与对比

    Java中List转Map的几种常见方式与对比

    JavaList转Map是一个非常常用的技术,对于Java开发人员来讲,掌握该技术可以帮助我们更加高效地操作List集合中的对象,这篇文章主要给大家介绍了关于Java中List转Map的几种常见方式与对比的相关资料,需要的朋友可以参考下
    2024-02-02
  • Java FutureTask类使用案例解析

    Java FutureTask类使用案例解析

    这篇文章主要介绍了Java FutureTask类使用案例解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值析,需要的朋友可以参考下
    2020-04-04
  • 线程池之newCachedThreadPool可缓存线程池的实例

    线程池之newCachedThreadPool可缓存线程池的实例

    这篇文章主要介绍了线程池之newCachedThreadPool可缓存线程池的实例,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-06-06
  • 深入了解Spring Boot2.3.0及以上版本的Liveness和Readiness功能

    深入了解Spring Boot2.3.0及以上版本的Liveness和Readiness功能

    这篇文章主要介绍了Spring Boot2.3.0及以上版本的Liveness和Readiness功能示例深入解析,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-10-10
  • IDEA无法打开Marketplace的三种解决方案(推荐)

    IDEA无法打开Marketplace的三种解决方案(推荐)

    这篇文章主要介绍了IDEA无法打开Marketplace的三种解决方案(推荐),本文通过图文并茂的形式给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-11-11
  • spring boot 防止重复提交实现方法详解

    spring boot 防止重复提交实现方法详解

    这篇文章主要介绍了spring boot 防止重复提交实现方法,结合实例形式详细分析了spring boot 防止重复提交具体配置、实现方法及操作注意事项,需要的朋友可以参考下
    2019-11-11
  • Java项目中如何引入Hutool工具类并正确使用它

    Java项目中如何引入Hutool工具类并正确使用它

    Hutool是一个小而全的Java工具类库,通过静态方法封装,降低相关API的学习成本,提高工作效率,使Java拥有函数式语言般的优雅,这篇文章主要给大家介绍了关于Java项目中如何引入Hutool工具类并正确使用它的相关资料,需要的朋友可以参考下
    2024-01-01
  • java编程 中流对象选取规律详解

    java编程 中流对象选取规律详解

    下面小编就为大家带来一篇java编程 中流对象选取规律详解。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2017-01-01
  • SpringBoot使用thymeleaf实现前端表格

    SpringBoot使用thymeleaf实现前端表格

    虽然现在流行前后端分离,但是后端模版在一些关键地方还是非常有用的,例如邮件模版、代码模版等。当然也不排除一些古老的项目后端依然使用动态模版。Thymeleaf 简洁漂亮、容易理解,并且完美支持 HTML5,可以直接打开静态页面,同时不新增标签,只需增强属性
    2022-10-10
  • 深入解析Java的Struts框架中的控制器DispatchAction

    深入解析Java的Struts框架中的控制器DispatchAction

    这篇文章主要介绍了深入解析Java的Struts框架中的控制器DispatchAction,Struts是Java的SSH三大web开发框架之一,需要的朋友可以参考下
    2015-12-12

最新评论