java并行流处理具体方案讲解
当我们需要调用多次调用第三方接口,我们采用并行流处理的方式发送请求。
实际场景:第三方给了一个表,我需要对遍历整个表,调用表中的所有地址并解析json获取数据
常规思维:如果使用for循环遍历,一次发送http请求,所有请求都是串行,如果一个请求需要1秒,25个请求就需要25秒。
解法方案:思路 :异步请求,多线程发送http。
具体方案:
方案一:自定义线程池,定义请求方法,使用for循环讲请求添加到线程词。
方案二(推荐):异步请求注解,使用springboot的@Async注解,完美解决,编码量少。
方案三:并行流(Parallel Stream)。
以下是方案三详解:
并行流(Parallel Stream)深度解析
一、并行流是什么?
并行流是Java 8引入的一个特性,它允许你将流操作并行执行,利用多核处理器的优势来加速处理。
java
// 顺序流 vs 并行流
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
// 1. 顺序流(默认)
int sumSequential = numbers.stream()
.reduce(0, Integer::sum);
// 2. 并行流
int sumParallel = numbers.parallelStream()
.reduce(0, Integer::sum);
// 3. 将顺序流转为并行流
int sumParallel2 = numbers.stream()
.parallel()
.reduce(0, Integer::sum);二、并行流的核心原理
1. 底层框架:Fork/Join框架
java
// 并行流背后是这个框架
ForkJoinPool commonPool = ForkJoinPool.commonPool();
System.out.println("并行度: " + commonPool.getParallelism()); // 通常是CPU核心数-12. 工作窃取算法(Work-Stealing)
text
线程A:│任务1│任务2│任务3│任务4│
线程B:│任务5│任务6│ │ ← 空闲
↓
线程B从线程A的队尾"偷取"任务3来执行3. 数据拆分策略
java
// 并行流自动将数据拆分为多个子任务 Spliterator<T> spliterator = list.spliterator(); // 查看分割特性 int characteristics = spliterator.characteristics(); // SIZED: 知道确切大小 // SUBSIZED: 分割后的大小也确切 // ORDERED: 有顺序要求
三、基本使用模式
1. 创建并行流
java
// 方式1:从集合创建
List<String> list = Arrays.asList("a", "b", "c");
Stream<String> parallelStream = list.parallelStream();
// 方式2:将顺序流转为并行流
Stream<String> parallelStream2 = list.stream().parallel();
// 方式3:从数组创建
String[] array = {"a", "b", "c"};
Stream<String> parallelStream3 = Arrays.stream(array).parallel();2. 常用操作示例
映射操作(map)
java
List<String> words = Arrays.asList("hello", "world", "java", "stream");
// 顺序处理
List<String> upperCaseSequential = words.stream()
.map(String::toUpperCase)
.collect(Collectors.toList());
// 并行处理
List<String> upperCaseParallel = words.parallelStream()
.map(String::toUpperCase)
.collect(Collectors.toList());过滤操作(filter)
java
List<Integer> numbers = IntStream.range(1, 1000000).boxed().collect(Collectors.toList());
// 并行过滤偶数
List<Integer> evenNumbers = numbers.parallelStream()
.filter(n -> n % 2 == 0)
.collect(Collectors.toList());归约操作(reduce)
java
// 计算1到1000000的和(并行版本更快)
long sum = LongStream.rangeClosed(1, 1000000)
.parallel()
.reduce(0, Long::sum);四、你的代码中并行流的应用
java
private List<Map<String, String>> getInterlockResultWen13or24Level1PointValueParallel(
Map<String, String> map,
List<IntermediateTable> list
) {
// 1. 建立查找表(提升效率)
Map<String, String> codeToTagNumber = list.stream()
.collect(Collectors.toMap(
IntermediateTable::getInterlockWen13or24Level1Point,
IntermediateTable::getTagNumber,
(v1, v2) -> v1 // 如果有重复key,取第一个
));
// 2. 使用并行流处理
return map.entrySet()
.parallelStream() // ← 关键!开启并行
.map(entry -> {
String code = entry.getKey();
String name = entry.getValue();
String tagNumber = codeToTagNumber.get(code);
// 3. 执行IO操作(HTTP请求)
String value = tagNumber != null ? getSupOSValue(tagNumber) : "0";
// 4. 构建结果
Map<String, String> resMap = new LinkedHashMap<>();
resMap.put("name", name);
resMap.put("value", value);
return resMap;
})
.collect(Collectors.toList()); // 收集结果
}五、性能对比:串行 vs 并行
java
public class ParallelStreamBenchmark {
public static void main(String[] args) {
// 模拟24次HTTP请求(用睡眠模拟)
List<Integer> requestIds = IntStream.range(1, 25).boxed().collect(Collectors.toList());
// 串行执行
long start = System.currentTimeMillis();
List<String> sequentialResults = requestIds.stream()
.map(id -> mockHttpRequest(id))
.collect(Collectors.toList());
long sequentialTime = System.currentTimeMillis() - start;
// 并行执行
start = System.currentTimeMillis();
List<String> parallelResults = requestIds.parallelStream()
.map(id -> mockHttpRequest(id))
.collect(Collectors.toList());
long parallelTime = System.currentTimeMillis() - start;
System.out.println("串行耗时: " + sequentialTime + "ms");
System.out.println("并行耗时: " + parallelTime + "ms");
System.out.println("加速比: " + (double)sequentialTime/parallelTime);
}
private static String mockHttpRequest(Integer id) {
try {
// 模拟HTTP请求延迟(100-500ms随机)
Thread.sleep(100 + (long)(Math.random() * 400));
return "Response_" + id;
} catch (InterruptedException e) {
return "Error";
}
}
}六、注意事项和最佳实践
1. 什么时候使用并行流?
java
// ✅ 适合的场景 - 数据量大(>1000个元素) - 每个元素的处理是独立的 - 处理操作是CPU密集型或IO密集型 - 没有顺序要求(或者可以使用unordered()放弃顺序) // ❌ 不适合的场景 - 数据量小(<100个元素) - 处理操作简单(如简单的数学运算) - 需要严格保持顺序 - 操作有共享状态(线程不安全)
2. 线程安全问题
java
// ❌ 错误示例:共享可变状态
List<Integer> sharedList = new ArrayList<>();
numbers.parallelStream()
.forEach(sharedList::add); // 并发修改异常!
// ✅ 正确做法:使用线程安全收集器
List<Integer> safeList = numbers.parallelStream()
.collect(Collectors.toList());
// ✅ 或者使用同步集合
List<Integer> synchronizedList = Collections.synchronizedList(new ArrayList<>());
numbers.parallelStream()
.forEach(synchronizedList::add);3. 性能优化技巧
java
// 1. 使用基础类型流避免装箱拆箱
IntStream.range(1, 1000000)
.parallel()
.sum();
// 2. 使用unordered()放弃顺序约束(可能提升性能)
list.parallelStream()
.unordered()
.filter(...)
.collect(Collectors.toList());
// 3. 合并操作(combiner)必须是可结合的
// reduce操作需要:a ⊕ b ⊕ c = a ⊕ (b ⊕ c)4. 控制并行度
java
// 方法1:修改系统属性
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "8");
// 方法2:使用自定义ForkJoinPool
ForkJoinPool customPool = new ForkJoinPool(4);
List<String> results = customPool.submit(() ->
list.parallelStream()
.map(item -> process(item))
.collect(Collectors.toList())
).get();七、在你的具体场景中的应用建议
java
private List<Map<String, String>> getInterlockResultWen13or24Level1PointValueOptimized(
Map<String, String> map,
List<IntermediateTable> list
) {
// 1. 建立索引
Map<String, String> codeToTag = list.stream()
.collect(Collectors.toMap(
IntermediateTable::getInterlockWen13or24Level1Point,
IntermediateTable::getTagNumber,
(v1, v2) -> v1
));
// 2. 预计算需要请求的tagNumbers
List<String> tagNumbers = map.keySet().stream()
.map(codeToTag::get)
.filter(Objects::nonNull)
.distinct()
.collect(Collectors.toList());
// 3. 并行获取所有值
Map<String, String> valueCache = tagNumbers.parallelStream()
.collect(Collectors.toMap(
tag -> tag,
this::getSupOSValue // 这是你的HTTP请求方法
));
// 4. 构建结果
return map.entrySet().stream()
.map(entry -> {
String code = entry.getKey();
String tag = codeToTag.get(code);
String value = tag != null ? valueCache.getOrDefault(tag, "0") : "0";
Map<String, String> result = new LinkedHashMap<>();
result.put("name", entry.getValue());
result.put("value", value);
return result;
})
.collect(Collectors.toList());
}八、调试和监控
java
// 1. 查看并行流使用的线程
List<String> results = list.parallelStream()
.peek(item -> System.out.println(Thread.currentThread().getName()))
.map(...)
.collect(Collectors.toList());
// 2. 监控执行时间
long start = System.nanoTime();
List<String> results = list.parallelStream()
.map(...)
.collect(Collectors.toList());
long duration = System.nanoTime() - start;
// 3. 使用JMH进行基准测试(推荐)
// 添加依赖:org.openjdk.jmh九、常见陷阱
java
// 陷阱1:副作用操作
List<String> list = Arrays.asList("a", "b", "c");
StringBuilder builder = new StringBuilder();
list.parallelStream().forEach(builder::append); // ❌ 线程不安全
// 陷阱2:有状态lambda
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
List<Integer> results = new ArrayList<>();
numbers.parallelStream()
.filter(n -> {
results.add(n); // ❌ 在lambda中修改外部状态
return n % 2 == 0;
})
.collect(Collectors.toList());
// 陷阱3:顺序依赖
Optional<Integer> first = numbers.parallelStream()
.filter(n -> n > 3)
.findFirst(); // ⚠️ 并行下findFirst可能变慢
总结
到此这篇关于java并行流处理的文章就介绍到这了,更多相关java并行流处理内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
相关文章
关于Spring @Bean 相同加载顺序不同结果不同的问题记录
本文主要探讨了在Spring 5.1.3.RELEASE版本下,当有两个全注解类定义相同类型的Bean时,由于加载顺序不同,最终生成的Bean实例也会不同,文章通过分析ConfigurationClassPostProcessor的执行过程,解释了BeanDefinition的加载和覆盖机制,感兴趣的朋友一起看看吧2025-02-02
java通过Idea远程一键部署springboot到Docker详解
这篇文章主要介绍了java通过Idea远程一键部署springboot到Docker详解,Idea是Java开发利器,springboot是Java生态中最流行的微服务框架,docker是时下最火的容器技术,那么它们结合在一起会产生什么化学反应呢?的相关资料2019-06-06
Java8优雅的字符串拼接工具类StringJoiner实例代码
这篇文章主要给大家介绍了关于Java8优雅的字符串拼接工具类StringJoiner的相关资料,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧2021-02-02


最新评论