java并行流处理具体方案讲解

 更新时间:2026年02月04日 10:33:41   作者:~ ?  
并行流是Java 8引入的一个强大工具,可以帮助我们充分利用多核处理器的性能,加速数据处理过程,这篇文章主要介绍了java并行流处理的相关资料,文中通过代码介绍的非常详细,需要的朋友可以参考下

当我们需要调用多次调用第三方接口,我们采用并行流处理的方式发送请求。

实际场景:第三方给了一个表,我需要对遍历整个表,调用表中的所有地址并解析json获取数据

常规思维:如果使用for循环遍历,一次发送http请求,所有请求都是串行,如果一个请求需要1秒,25个请求就需要25秒。

解法方案:思路 :异步请求,多线程发送http。

具体方案:

方案一:自定义线程池,定义请求方法,使用for循环讲请求添加到线程词。

方案二(推荐):异步请求注解,使用springboot的@Async注解,完美解决,编码量少。

方案三:并行流(Parallel Stream)。

以下是方案三详解:

并行流(Parallel Stream)深度解析

一、并行流是什么?

并行流是Java 8引入的一个特性,它允许你将流操作并行执行,利用多核处理器的优势来加速处理。

java

// 顺序流 vs 并行流
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

// 1. 顺序流(默认)
int sumSequential = numbers.stream()
    .reduce(0, Integer::sum);

// 2. 并行流
int sumParallel = numbers.parallelStream()
    .reduce(0, Integer::sum);

// 3. 将顺序流转为并行流
int sumParallel2 = numbers.stream()
    .parallel()
    .reduce(0, Integer::sum);

二、并行流的核心原理

1. 底层框架:Fork/Join框架

java

// 并行流背后是这个框架
ForkJoinPool commonPool = ForkJoinPool.commonPool();
System.out.println("并行度: " + commonPool.getParallelism()); // 通常是CPU核心数-1

2. 工作窃取算法(Work-Stealing)

text

线程A:│任务1│任务2│任务3│任务4│
线程B:│任务5│任务6│          │ ← 空闲
              ↓
线程B从线程A的队尾"偷取"任务3来执行

3. 数据拆分策略

java

// 并行流自动将数据拆分为多个子任务
Spliterator<T> spliterator = list.spliterator();

// 查看分割特性
int characteristics = spliterator.characteristics();
// SIZED: 知道确切大小
// SUBSIZED: 分割后的大小也确切
// ORDERED: 有顺序要求

三、基本使用模式

1. 创建并行流

java

// 方式1:从集合创建
List<String> list = Arrays.asList("a", "b", "c");
Stream<String> parallelStream = list.parallelStream();

// 方式2:将顺序流转为并行流
Stream<String> parallelStream2 = list.stream().parallel();

// 方式3:从数组创建
String[] array = {"a", "b", "c"};
Stream<String> parallelStream3 = Arrays.stream(array).parallel();

2. 常用操作示例

映射操作(map)

java

List<String> words = Arrays.asList("hello", "world", "java", "stream");

// 顺序处理
List<String> upperCaseSequential = words.stream()
    .map(String::toUpperCase)
    .collect(Collectors.toList());

// 并行处理
List<String> upperCaseParallel = words.parallelStream()
    .map(String::toUpperCase)
    .collect(Collectors.toList());

过滤操作(filter)

java

List<Integer> numbers = IntStream.range(1, 1000000).boxed().collect(Collectors.toList());

// 并行过滤偶数
List<Integer> evenNumbers = numbers.parallelStream()
    .filter(n -> n % 2 == 0)
    .collect(Collectors.toList());

归约操作(reduce)

java

// 计算1到1000000的和(并行版本更快)
long sum = LongStream.rangeClosed(1, 1000000)
    .parallel()
    .reduce(0, Long::sum);

四、你的代码中并行流的应用

java

private List<Map<String, String>> getInterlockResultWen13or24Level1PointValueParallel(
    Map<String, String> map, 
    List<IntermediateTable> list
) {
    // 1. 建立查找表(提升效率)
    Map<String, String> codeToTagNumber = list.stream()
        .collect(Collectors.toMap(
            IntermediateTable::getInterlockWen13or24Level1Point,
            IntermediateTable::getTagNumber,
            (v1, v2) -> v1  // 如果有重复key,取第一个
        ));
    
    // 2. 使用并行流处理
    return map.entrySet()
        .parallelStream()  // ← 关键!开启并行
        .map(entry -> {
            String code = entry.getKey();
            String name = entry.getValue();
            String tagNumber = codeToTagNumber.get(code);
            
            // 3. 执行IO操作(HTTP请求)
            String value = tagNumber != null ? getSupOSValue(tagNumber) : "0";
            
            // 4. 构建结果
            Map<String, String> resMap = new LinkedHashMap<>();
            resMap.put("name", name);
            resMap.put("value", value);
            return resMap;
        })
        .collect(Collectors.toList());  // 收集结果
}

五、性能对比:串行 vs 并行

java

public class ParallelStreamBenchmark {
    
    public static void main(String[] args) {
        // 模拟24次HTTP请求(用睡眠模拟)
        List<Integer> requestIds = IntStream.range(1, 25).boxed().collect(Collectors.toList());
        
        // 串行执行
        long start = System.currentTimeMillis();
        List<String> sequentialResults = requestIds.stream()
            .map(id -> mockHttpRequest(id))
            .collect(Collectors.toList());
        long sequentialTime = System.currentTimeMillis() - start;
        
        // 并行执行
        start = System.currentTimeMillis();
        List<String> parallelResults = requestIds.parallelStream()
            .map(id -> mockHttpRequest(id))
            .collect(Collectors.toList());
        long parallelTime = System.currentTimeMillis() - start;
        
        System.out.println("串行耗时: " + sequentialTime + "ms");
        System.out.println("并行耗时: " + parallelTime + "ms");
        System.out.println("加速比: " + (double)sequentialTime/parallelTime);
    }
    
    private static String mockHttpRequest(Integer id) {
        try {
            // 模拟HTTP请求延迟(100-500ms随机)
            Thread.sleep(100 + (long)(Math.random() * 400));
            return "Response_" + id;
        } catch (InterruptedException e) {
            return "Error";
        }
    }
}

六、注意事项和最佳实践

1. 什么时候使用并行流?

java

// ✅ 适合的场景
- 数据量大(>1000个元素)
- 每个元素的处理是独立的
- 处理操作是CPU密集型或IO密集型
- 没有顺序要求(或者可以使用unordered()放弃顺序)

// ❌ 不适合的场景
- 数据量小(<100个元素)
- 处理操作简单(如简单的数学运算)
- 需要严格保持顺序
- 操作有共享状态(线程不安全)

2. 线程安全问题

java

// ❌ 错误示例:共享可变状态
List&lt;Integer&gt; sharedList = new ArrayList&lt;&gt;();
numbers.parallelStream()
    .forEach(sharedList::add);  // 并发修改异常!

// ✅ 正确做法:使用线程安全收集器
List&lt;Integer&gt; safeList = numbers.parallelStream()
    .collect(Collectors.toList());
    
// ✅ 或者使用同步集合
List&lt;Integer&gt; synchronizedList = Collections.synchronizedList(new ArrayList&lt;&gt;());
numbers.parallelStream()
    .forEach(synchronizedList::add);

3. 性能优化技巧

java

// 1. 使用基础类型流避免装箱拆箱
IntStream.range(1, 1000000)
    .parallel()
    .sum();

// 2. 使用unordered()放弃顺序约束(可能提升性能)
list.parallelStream()
    .unordered()
    .filter(...)
    .collect(Collectors.toList());

// 3. 合并操作(combiner)必须是可结合的
// reduce操作需要:a ⊕ b ⊕ c = a ⊕ (b ⊕ c)

4. 控制并行度

java

// 方法1:修改系统属性
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "8");

// 方法2:使用自定义ForkJoinPool
ForkJoinPool customPool = new ForkJoinPool(4);
List<String> results = customPool.submit(() -> 
    list.parallelStream()
        .map(item -> process(item))
        .collect(Collectors.toList())
).get();

七、在你的具体场景中的应用建议

java

private List<Map<String, String>> getInterlockResultWen13or24Level1PointValueOptimized(
    Map<String, String> map, 
    List<IntermediateTable> list
) {
    // 1. 建立索引
    Map<String, String> codeToTag = list.stream()
        .collect(Collectors.toMap(
            IntermediateTable::getInterlockWen13or24Level1Point,
            IntermediateTable::getTagNumber,
            (v1, v2) -> v1
        ));
    
    // 2. 预计算需要请求的tagNumbers
    List<String> tagNumbers = map.keySet().stream()
        .map(codeToTag::get)
        .filter(Objects::nonNull)
        .distinct()
        .collect(Collectors.toList());
    
    // 3. 并行获取所有值
    Map<String, String> valueCache = tagNumbers.parallelStream()
        .collect(Collectors.toMap(
            tag -> tag,
            this::getSupOSValue  // 这是你的HTTP请求方法
        ));
    
    // 4. 构建结果
    return map.entrySet().stream()
        .map(entry -> {
            String code = entry.getKey();
            String tag = codeToTag.get(code);
            String value = tag != null ? valueCache.getOrDefault(tag, "0") : "0";
            
            Map<String, String> result = new LinkedHashMap<>();
            result.put("name", entry.getValue());
            result.put("value", value);
            return result;
        })
        .collect(Collectors.toList());
}

八、调试和监控

java

// 1. 查看并行流使用的线程
List<String> results = list.parallelStream()
    .peek(item -> System.out.println(Thread.currentThread().getName()))
    .map(...)
    .collect(Collectors.toList());

// 2. 监控执行时间
long start = System.nanoTime();
List<String> results = list.parallelStream()
    .map(...)
    .collect(Collectors.toList());
long duration = System.nanoTime() - start;

// 3. 使用JMH进行基准测试(推荐)
// 添加依赖:org.openjdk.jmh

九、常见陷阱

java

// 陷阱1:副作用操作
List<String> list = Arrays.asList("a", "b", "c");
StringBuilder builder = new StringBuilder();
list.parallelStream().forEach(builder::append); // ❌ 线程不安全

// 陷阱2:有状态lambda
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
List<Integer> results = new ArrayList<>();
numbers.parallelStream()
    .filter(n -> {
        results.add(n); // ❌ 在lambda中修改外部状态
        return n % 2 == 0;
    })
    .collect(Collectors.toList());

// 陷阱3:顺序依赖
Optional<Integer> first = numbers.parallelStream()
    .filter(n -> n > 3)
    .findFirst(); // ⚠️ 并行下findFirst可能变慢

PyTorch 2.9

总结

到此这篇关于java并行流处理的文章就介绍到这了,更多相关java并行流处理内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • 基于Java的电梯系统实现过程

    基于Java的电梯系统实现过程

    这篇文章主要介绍了基于Java的电梯系统实现过程,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2019-10-10
  • Java链表中添加元素的原理与实现方法详解

    Java链表中添加元素的原理与实现方法详解

    这篇文章主要介绍了Java链表中添加元素的原理与实现方法,结合实例形式详细分析了Java实现链表中添加元素的相关原理、操作技巧与注意事项,需要的朋友可以参考下
    2020-03-03
  • 使用java实现http多线程断点下载文件(二)

    使用java实现http多线程断点下载文件(二)

    下载工具我想没有几个人不会用的吧,前段时间比较无聊,花了点时间用java写了个简单的http多线程下载程序,我实现的这个http下载工具功能很简单,就是一个多线程以及一个断点恢复,当然下载是必不可少的,需要的朋友可以参考下
    2012-12-12
  • 关于Spring @Bean 相同加载顺序不同结果不同的问题记录

    关于Spring @Bean 相同加载顺序不同结果不同的问题记录

    本文主要探讨了在Spring 5.1.3.RELEASE版本下,当有两个全注解类定义相同类型的Bean时,由于加载顺序不同,最终生成的Bean实例也会不同,文章通过分析ConfigurationClassPostProcessor的执行过程,解释了BeanDefinition的加载和覆盖机制,感兴趣的朋友一起看看吧
    2025-02-02
  • java通过Idea远程一键部署springboot到Docker详解

    java通过Idea远程一键部署springboot到Docker详解

    这篇文章主要介绍了java通过Idea远程一键部署springboot到Docker详解,Idea是Java开发利器,springboot是Java生态中最流行的微服务框架,docker是时下最火的容器技术,那么它们结合在一起会产生什么化学反应呢?的相关资料
    2019-06-06
  • Java中常见的数据验证注解总结大全

    Java中常见的数据验证注解总结大全

    在Java开发中数据校验是确保应用程序的数据完整性和一致性的重要步骤,这篇文章主要介绍了Java中常见的数据验证注解的相关资料,文中通过代码介绍的非常详细,需要的朋友可以参考下
    2025-07-07
  • Java8优雅的字符串拼接工具类StringJoiner实例代码

    Java8优雅的字符串拼接工具类StringJoiner实例代码

    这篇文章主要给大家介绍了关于Java8优雅的字符串拼接工具类StringJoiner的相关资料,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2021-02-02
  • SpringCloud之Zuul服务网关详解

    SpringCloud之Zuul服务网关详解

    这篇文章主要介绍了SpringCloud之Zuul服务网关详解,服务网关是微服务架构中一个不可或缺的部分,通过服务网关统一向外系统提供REST API的过程中,除了具备服务路由、均衡负载功能之外,它还具备了权限控制(鉴权)等功能,需要的朋友可以参考下
    2023-08-08
  • 使用nacos增加修改配置实时生效方式

    使用nacos增加修改配置实时生效方式

    文章介绍了如何在Nacos配置中心实现动态配置,包括添加或修改配置、使用`@RefreshScope`注解刷新Bean属性、扩展动态配置参数等
    2026-01-01
  • Java中的任务调度框架quartz详细解析

    Java中的任务调度框架quartz详细解析

    这篇文章主要介绍了Java中的任务调度框架quartz详细解析,Quartz 是一个完全由 Java 编写的开源作业调度框架,为在 Java 应用程序中进行作业调度提供了简单却强大的机制,需要的朋友可以参考下
    2023-11-11

最新评论