Java Stream 并行流简介、使用与注意事项小结

 更新时间:2025年08月20日 14:35:14   作者:向南怀瑾  
Java8并行流基于Stream API,利用多核CPU提升计算密集型任务效率,但需注意线程安全、顺序不确定及线程池管理,可通过自定义线程池与CompletableFuture结合实现更灵活的异步处理,本文给大家介绍Java Stream并行流简介、使用与注意事项小结,感兴趣的朋友一起看看吧

1. 并行流简介

Java 8 引入了 Stream API,提供了一种高效的数据处理方式。而 ​并行流(Parallel Stream)​ 则是 Stream 的并行版本,能够将流操作分配到多个线程中执行,充分利用多核 CPU 的性能。

​特点:

  • 默认使用 ForkJoinPool.commonPool() 执行任务。
  • 适合处理 ​计算密集型 任务。
  • 任务执行顺序不确定。

​2. 并行流的简单使用

将普通流转换为并行流非常简单,只需调用 parallel() 方法即可。

​示例:并行流的基本使用

import java.util.Arrays;
import java.util.List;
public class ParallelStreamExample {
    public static void main(String[] args) {
        List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
        // 将流转换为并行流
        numbers.parallelStream()
               .forEach(num -> System.out.println("线程: " + Thread.currentThread().getName() + ", 处理: " + num));
    }
}

输出示例

线程: main, 处理: 6
线程: ForkJoinPool.commonPool-worker-1, 处理: 3
线程: ForkJoinPool.commonPool-worker-2, 处理: 8
...

​3. 配合自定义线程池

默认情况下,并行流使用 ForkJoinPool.commonPool() 执行任务。你可以通过自定义线程池来控制并行流的执行环境。

​示例:自定义线程池

import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class ParallelStreamCustomPool {
    public static void main(String[] args) {
        // 创建自定义线程池
        ForkJoinPool customPool = new ForkJoinPool(4);
        // 在自定义线程池中执行并行流任务
        customPool.submit(() -> {
            List<Integer> result = IntStream.rangeClosed(1, 10)
                                            .parallel()
                                            .map(i -> {
                                                System.out.println("线程: " + Thread.currentThread().getName() + ", 处理: " + i);
                                                return i * 2;
                                            })
                                            .boxed()
                                            .collect(Collectors.toList());
            System.out.println("结果: " + result);
        }).join(); // 等待任务完成
        customPool.shutdown(); // 关闭线程池
    }
}

示例:配合CompletableFuture实现异步

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class ParallelStreamWithCompletableFuture {
    public static void main(String[] args) {
        // 创建一个并行流
        List<CompletableFuture<Integer>> futures = IntStream.rangeClosed(1, 10)
                .parallel()
                .mapToObj(i -> CompletableFuture.supplyAsync(() -> {
                    System.out.println("线程: " + Thread.currentThread().getName() + ", 处理: " + i);
                    return i * 2; // 模拟计算任务
                }))
                .collect(Collectors.toList());
        // 等待所有任务完成并获取结果
        List<Integer> results = futures.stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList());
        System.out.println("结果: " + results);
    }
}

好处:

  • 并行流:适合处理数据流中的计算密集型任务,能够自动将任务分配到多个线程中执行。
  • CompletableFuture:提供强大的异步编程能力,可以处理任务的依赖关系、异常处理、结果合并等。

结合两者的优势,可以实现:

  1. 异步并行处理:将并行流的任务异步化,进一步提升性能。
  2. 任务依赖管理:通过 CompletableFuture 管理任务之间的依赖关系。
  3. 结果合并:将多个任务的结果合并处理。

​4. 控制有序性

并行流的任务执行顺序是不确定的。如果需要保持顺序,可以使用 forEachOrdered() 方法。

​示例:保持顺序

import java.util.Arrays;
import java.util.List;
public class ParallelStreamOrder {
    public static void main(String[] args) {
        List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
        numbers.parallelStream()
               .forEachOrdered(System.out::println); // 输出顺序与流中元素顺序一致
    }
}

​5. 共享资源的安全性

并行流在多个线程中执行操作,如果操作共享可变状态,可能会导致线程安全问题。

​示例:线程安全问题

import java.util.ArrayList;
import java.util.List;
public class ParallelStreamThreadSafety {
    public static void main(String[] args) {
        List<Integer> result = new ArrayList<>();
        IntStream.rangeClosed(1, 1000)
                 .parallel()
                 .forEach(result::add); // 这里会出现线程安全问题
        System.out.println("结果大小: " + result.size()); // 结果可能小于 1000
    }
}

解决方法

  • 使用线程安全的集合,如 Collections.synchronizedList()
  • 使用 collect() 方法将结果收集到线程安全的容器中。

​示例:线程安全的解决方案

import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class ParallelStreamThreadSafety {
    public static void main(String[] args) {
        List<Integer> result = IntStream.rangeClosed(1, 1000)
                                        .parallel()
                                        .boxed()
                                        .collect(Collectors.toList()); // 使用 collect() 方法
        System.out.println("结果大小: " + result.size()); // 输出: 1000
    }
}

​6. 注意事项

  1. 任务类型
    • 适合 ​计算密集型 任务,不适合 ​I/O 密集型 任务。
  2. 线程安全
    • 避免在并行流中操作共享可变状态。
  3. 任务顺序
    • 并行流的任务执行顺序不确定,使用 forEachOrdered() 保持顺序。
  4. 线程池管理
    • 使用自定义线程池时,记得关闭线程池,避免资源泄漏。

​7. 总结

并行流是 Java 8 提供的一个强大工具,能够显著提升数据处理性能。但在使用时需要注意线程安全、任务顺序和线程池管理等问题。通过合理使用并行流,可以编写高效、灵活的代码。

​附录:完整代码

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class ParallelStreamDemo {
    public static void main(String[] args) {
        // 基本使用
        List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
        numbers.parallelStream()
               .forEach(num -> System.out.println("线程: " + Thread.currentThread().getName() + ", 处理: " + num));
        // 自定义线程池
        ForkJoinPool customPool = new ForkJoinPool(4);
        customPool.submit(() -> {
            List<Integer> result = IntStream.rangeClosed(1, 10)
                                            .parallel()
                                            .map(i -> {
                                                System.out.println("线程: " + Thread.currentThread().getName() + ", 处理: " + i);
                                                return i * 2;
                                            })
                                            .boxed()
                                            .collect(Collectors.toList());
            System.out.println("结果: " + result);
        }).join();
        customPool.shutdown();
        // 保持顺序
        numbers.parallelStream()
               .forEachOrdered(System.out::println);
        // 线程安全
        List<Integer> safeResult = IntStream.rangeClosed(1, 1000)
                                            .parallel()
                                            .boxed()
                                            .collect(Collectors.toList());
        System.out.println("结果大小: " + safeResult.size());
    }
}

希望这篇文章能帮助你更好地理解和使用 Java 的并行流!如果有任何问题,欢迎在评论区讨论!

到此这篇关于Java Stream 并行流简介、使用与注意事项小结的文章就介绍到这了,更多相关Java Stream 并行流内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • SpringBoot 获取请求参数的常用注解及用法

    SpringBoot 获取请求参数的常用注解及用法

    Spring Boot通过@RequestParam、@PathVariable等注解支持从HTTP请求中获取参数,涵盖查询、路径、请求体、头、Cookie等类型,还可绑定到对象或使用Servlet API,需注意参数匹配规则及配置支持隐藏方法过滤器,本文介绍SpringBoot获取请求参数的常用注解,感兴趣朋友一起看看吧
    2025-07-07
  • SpringBoot种如何使用 EasyExcel 实现自定义表头导出并实现数据格式化转换

    SpringBoot种如何使用 EasyExcel 实现自定义表头导出并实现数据格式化转换

    本文详细介绍了如何使用EasyExcel工具类实现自定义表头导出,并实现数据格式化转换与添加下拉框操作,通过示例和代码,展示了如何处理不同数据结构和注解,确保数据在导出时能够正确显示和格式化,此外,还介绍了如何解决特定数据类型的转换问题,并提供了解决方案
    2024-11-11
  • Java FtpClient 实现文件上传服务

    Java FtpClient 实现文件上传服务

    本文主要对Java FtpClient实现简单的图片上传到服务器的方法进行介绍,并且展示的小demo中,对配置过程中主要碰到的问题:关于文件权限的问题也进行了说明,下面跟着小编一起来看下吧
    2016-12-12
  • Java实现斗地主的发牌功能

    Java实现斗地主的发牌功能

    这篇文章主要为大家详细介绍了Java实现斗地主的发牌功能,含按顺序发牌和玩家牌排序显示等功能,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2021-06-06
  • Dubbo配置http协议全过程

    Dubbo配置http协议全过程

    文章介绍了在服务提供者和服务消费者中,分别通过添加依赖包和配置文件,实现基于HTTP协议的服务接口注册及调用,无需修改消费者启动类,服务调用方式与Dubbo协议类似
    2025-10-10
  • SpringBoot实现application.yml文件敏感信息加密

    SpringBoot实现application.yml文件敏感信息加密

    本文主要介绍了SpringBoot实现application.yml文件敏感信息加密,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2023-07-07
  • Spring Boot 异步框架的使用详解

    Spring Boot 异步框架的使用详解

    这篇文章主要介绍了Spring Boot 异步框架的使用详解,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2019-01-01
  • mybatis plus 开启sql日志打印的方法小结

    mybatis plus 开启sql日志打印的方法小结

    Mybatis-Plus(简称MP)是一个 Mybatis 的增强工具,在 Mybatis 的基础上只做增强不做改变,为简化开发、提高效率而生。本文重点给大家介绍mybatis plus 开启sql日志打印的方法小结,感兴趣的朋友一起看看吧
    2021-09-09
  • Spring Cache + Caffeine的整合与使用示例详解

    Spring Cache + Caffeine的整合与使用示例详解

    对于一些项目里需要对数据库里的某些数据一直重复请求的,且这些数据基本是固定的,在这种情况下,可以借助简单使用本地缓存来缓存这些数据,本文介绍一下Spring Cache和Caffeine的使用,感兴趣的朋友一起看看吧
    2023-12-12
  • IDEA mybatis Mapper.xml报红的最新解决办法

    IDEA mybatis Mapper.xml报红的最新解决办法

    这篇文章主要介绍了IDEA mybatis Mapper.xml报红的解决办法,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2023-04-04

最新评论