Java Stream API中实现数据的并行处理指南

 更新时间:2025年10月27日 08:41:38   作者:搜罗万相  
在 Java Stream API 中,实现数据的并行处理非常简单,核心是通过 ​​parallelStream() ​​​ 方法获取并行流,而非默认的串行流(​​stream()​​),本文给大家介绍了Java Stream API中实现数据的并行处理的操作方法,需要的朋友可以参考下

引言

在 Java Stream API 中,实现数据的并行处理非常简单,核心是通过 ​​parallelStream() ​​​ 方法获取并行流,而非默认的串行流(​​stream()​​)。并行流会自动利用多核 CPU 的优势,将数据分成多个子任务并行执行,从而提升大数据量处理的效率。

一、并行处理的核心原理

  • 并行流(Parallel Stream) :基于 ​​Fork/Join​​ 框架实现,自动将流中的元素分割成多个子流,由多个线程并行处理,最后合并结果。
  • 无需手动管理线程:开发者无需创建线程池或处理线程同步,Stream API 内部已封装了并行逻辑。

二、实现并行处理的步骤

  1. 获取并行流:通过集合的 ​​parallelStream()​​ 方法(或流的 ​​parallel()​​ 方法将串行流转为并行流)。
  2. 执行流操作:与串行流相同的链式操作(过滤、映射、聚合等),底层会自动并行执行。

三、示例代码

1. 基础并行处理(对比串行与并行)

import java.util.Arrays;
import java.util.List;

public class ParallelStreamDemo {
    public static void main(String[] args) {
        // 准备一个大数据量的集合(1000万个整数)
        List<Integer> numbers = Arrays.asList(new Integer[10_000_000]);
        for (int i = 0; i < numbers.size(); i++) {
            numbers.set(i, i);
        }

        // 串行流处理:计算偶数之和
        long start = System.currentTimeMillis();
        long serialSum = numbers.stream()
                .filter(n -> n % 2 == 0)
                .mapToLong(n -> n)
                .sum();
        long serialTime = System.currentTimeMillis() - start;
        System.out.println("串行处理结果:" + serialSum + ",耗时:" + serialTime + "ms");

        // 并行流处理:同样计算偶数之和
        start = System.currentTimeMillis();
        long parallelSum = numbers.parallelStream() // 关键:使用parallelStream()
                .filter(n -> n % 2 == 0)
                .mapToLong(n -> n)
                .sum();
        long parallelTime = System.currentTimeMillis() - start;
        System.out.println("并行处理结果:" + parallelSum + ",耗时:" + parallelTime + "ms");
    }
}

输出(示例)

串行处理结果:24999995000000,耗时:120ms  
并行处理结果:24999995000000,耗时:35ms  // 并行效率更高(依赖CPU核心数)

2. 将串行流转为并行流(​​parallel()​​ 方法)

除了直接使用 ​​parallelStream()​​,还可以通过 ​​parallel()​​ 方法将串行流转换为并行流:

List<String> words = Arrays.asList("apple", "banana", "cherry", "date");

// 串行流 → 转为并行流
long count = words.stream()
        .parallel() // 切换为并行处理
        .filter(word -> word.length() > 5)
        .count();
System.out.println("长度大于5的单词数:" + count); // 输出:2(banana、cherry)

四、注意事项

  1. 线程安全问题
    并行流会多线程执行操作,若流操作中涉及共享变量的修改(如使用 forEach 累加全局变量),可能导致线程安全问题。
    ❌ 错误示例(共享变量不安全):
int[] sum = {0}; // 共享数组
numbers.parallelStream()
       .forEach(n -> sum[0] += n); // 多线程修改sum[0],结果可能不正确

✅ 正确方式(使用线程安全的聚合操作):

long sum = numbers.parallelStream()
                 .mapToLong(n -> n)
                 .sum(); // sum() 内部线程安全
  1. 并非所有场景都适合并行
  • 数据量较小时:并行流的线程调度开销可能超过并行带来的收益,效率反而低于串行。
  • 操作复杂度低时:简单操作(如 ​​filter​​ 简单判断)的并行优势不明显,复杂操作(如大量计算)更适合并行。
  • 流元素有序性(​​Ordered​​):并行流为提升效率可能打破元素顺序(如 ​​forEach​​ 输出顺序不确定),若需保持顺序,可用 ​​forEachOrdered​​(但会损失部分并行性能)。
  1. 自定义并行线程池
    并行流默认使用 Fork/Join 框架的公共线程池(ForkJoinPool.commonPool()),若需自定义线程池,可通过 ForkJoinPool 包装:
import java.util.concurrent.ForkJoinPool;

ForkJoinPool pool = new ForkJoinPool(4); // 自定义4个核心线程的线程池
long sum = pool.submit(() -> 
    numbers.parallelStream()
           .filter(n -> n % 2 == 0)
           .mapToLong(n -> n)
           .sum()
).get(); // 阻塞获取结果
pool.shutdown(); // 关闭线程池

五、总结

  • 实现方式:通过 ​​parallelStream()​​​ 或 ​​stream().parallel()​​ 获取并行流,后续操作与串行流一致。
  • 优势:自动利用多核CPU,提升大数据量、复杂操作的处理效率,无需手动管理线程。
  • 注意:避免共享变量修改,数据量小或操作简单时慎用,有序性需求需权衡性能。

合理使用并行流能显著优化数据处理性能,但需根据具体场景评估是否适用。

到此这篇关于Java Stream API中实现数据的并行处理指南的文章就介绍到这了,更多相关Java Stream API数据并行处理内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • java返回前端实体类json数据时忽略某个属性方法

    java返回前端实体类json数据时忽略某个属性方法

    这篇文章主要给大家介绍了关于java返回前端实体类json数据时忽略某个属性的相关资料,文中通过示例代码介绍的非常详细,需要的朋友可以参考下
    2023-08-08
  • Java中的包(package)是什么和使用方法

    Java中的包(package)是什么和使用方法

    包是Java中一种强大的组织代码的工具,它们帮助开发者将代码分组,防止命名冲突,并通过控制访问级别来增强代码的安全性,这篇文章主要介绍了Java中的包(package)是什么和如何使用它们,需要的朋友可以参考下
    2024-07-07
  • elasticsearch通过guice注入Node组装启动过程

    elasticsearch通过guice注入Node组装启动过程

    这篇文章主要为大家介绍了 elasticsearch通过guice注入Node组装启动过程,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2022-04-04
  • 基于Java回顾之网络通信的应用分析

    基于Java回顾之网络通信的应用分析

    在这篇文章里,我们主要讨论如何使用Java实现网络通信,包括TCP通信、UDP通信、多播以及NIO
    2013-05-05
  • Java利用Selenium操作浏览器的示例详解

    Java利用Selenium操作浏览器的示例详解

    本文主要介绍如何使用java代码利用Selenium操作浏览器,某些网页元素加载慢,如何操作元素就会把找不到元素的异常,此时需要设置元素等待,等待元素加载完,再操作,感兴趣的可以了解一下
    2023-01-01
  • ThreadLocal数据存储结构原理解析

    ThreadLocal数据存储结构原理解析

    这篇文章主要为大家介绍了ThreadLocal数据存储结构原理解析,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2022-10-10
  • Java批量操作如何提升ORM框架的批处理性能

    Java批量操作如何提升ORM框架的批处理性能

    本文介绍的批量插入、更新、删除和读取优化技术,以及性能监控与调优方法,为开发者提供了全面的批处理性能优化思路,感兴趣的朋友一起看看吧
    2025-05-05
  • 一篇文章带你入门Java修饰符

    一篇文章带你入门Java修饰符

    Java语言提供了很多修饰符,主要分为以下两类:访问修饰符;非访问修饰符。修饰符用来定义类、方法或者变量,通常放在语句的最前端。我们通过下面的例子来说明,下面就跟小编一起来看下吧
    2021-08-08
  • 在拦截器中读取request参数,解决在controller中无法二次读取的问题

    在拦截器中读取request参数,解决在controller中无法二次读取的问题

    这篇文章主要介绍了在拦截器中读取request参数,解决在controller中无法二次读取的问题,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-10-10
  • SpringCloud环境搭建过程之Rest使用小结

    SpringCloud环境搭建过程之Rest使用小结

    这篇文章主要介绍了SpringCloud环境搭建之Rest使用,本文通过实例代码图文相结合给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2022-08-08

最新评论