Java并行处理的实现

 更新时间:2021年07月14日 11:54:26   作者:张云飞Vir  
并行计算一般是指许多指令得以同时进行的计算模式。本文主要介绍了Java并行处理的实现,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

1. 背景

本文是一个短文章,介绍Java 中的并行处理。
说明:10多分钟读完的文章我称之为短文章,适合快速阅读。

2.知识

并行计算(parallel computing)一般是指许多指令得以同时进行的计算模式。在同时进行的前提下,可以将计算的过程分解成小部分,之后以并发方式来加以解决。

也就是分解为几个过程:

1、将一个大任务拆分成多个子任务,子任务还可以继续拆分。
2、各个子任务同时进行运算执行。
3、在执行完毕后,可能会有个 " 归纳 " 的任务,比如 求和,求平均等。

再简化一点的理解就是: 先拆分  -->  在同时进行计算  --> 最后“归纳”
为什么要“并行”,优点呢?

1、为了获得 “节省时间”,“快”。适合用于大规模运算的场景。从理论上讲,在 n 个并行处理的执行速度可能会是在单一处理机上执行的速度的 n 倍。
2、以前的计算机是单核的,现代的计算机Cpu都是多核的,服务器甚至都是多Cpu的,并行计算可以充分利用硬件的性能。

3. Java 中的并行处理

JDK 8 新增的Stream API(java.util.stream)将生成环境的函数式编程引入了Java库中,可以方便开发者能够写出更加有效、更加简洁的代码。

steam 的另一个价值是创造性地支持并行处理(parallel processing)。示例:

final Collection< Task > tasks = Arrays.asList(
    new Task( Status.OPEN, 5 ),
    new Task( Status.OPEN, 13 ),
    new Task( Status.CLOSED, 8 ) 
);

// 并行执行多个任务,并 求和
final double totalPoints = tasks
   .stream()
   .parallel()
   .map( task -> task.getPoints() ) // or map( Task::getPoints ) 
   .reduce( 0, Integer::sum );
 
System.out.println( "Total points (all tasks): " + totalPoints );

对于上面的tasks集合,上面的代码计算所有任务的点数之和。
它使用 parallel 方法并行处理所有的task,并使用 reduce 方法计算最终的结果。

4. 扩展

线程池方式实现并行处理

jdk1.5引入了并发包,其中包括了ThreadPoolExecutor,相关代码如下:

public class ExecutorServiceTest {
 
    public static final int THRESHOLD = 10_000;
    public static long[] numbers;
 
    public static void main(String[] args) throws Exception {
        numbers = LongStream.rangeClosed(1, 10_000_000).toArray();
        ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() + 1);
        CompletionService<Long> completionService = new ExecutorCompletionService<Long>(executor);
        int taskSize = (int) (numbers.length / THRESHOLD);
        for (int i = 1; i <= taskSize; i++) {
            final int key = i;
            completionService.submit(new Callable<Long>() {
 
                @Override
                public Long call() throws Exception {
                    return sum((key - 1) * THRESHOLD, key * THRESHOLD);
                }
            });
        }
        long sumValue = 0;
        for (int i = 0; i < taskSize; i++) {
            sumValue += completionService.take().get();
        }
        // 所有任务已经完成,关闭线程池
        System.out.println("sumValue = " + sumValue);
        executor.shutdown();
    }
 
    private static long sum(int start, int end) {
        long sum = 0;
        for (int i = start; i < end; i++) {
            sum += numbers[i];
        }
        return sum;
    }
}

使用 fork/join框架

分支/合并框架的目的是以递归的方式将可以并行的认为拆分成更小的任务,然后将每个子任务的结果合并起来生成整体结果;相关代码如下:

public class ForkJoinTest extends java.util.concurrent.RecursiveTask<Long> {
    
    private static final long serialVersionUID = 1L;
    private final long[] numbers;
    private final int start;
    private final int end;
    public static final long THRESHOLD = 10_000;
 
    public ForkJoinTest(long[] numbers) {
        this(numbers, 0, numbers.length);
    }
 
    private ForkJoinTest(long[] numbers, int start, int end) {
        this.numbers = numbers;
        this.start = start;
        this.end = end;
    }
 
    @Override
    protected Long compute() {
        int length = end - start;
        if (length <= THRESHOLD) {
            return computeSequentially();
        }
        ForkJoinTest leftTask = new ForkJoinTest(numbers, start, start + length / 2);
        leftTask.fork();
        ForkJoinTest rightTask = new ForkJoinTest(numbers, start + length / 2, end);
        Long rightResult = rightTask.compute();
        // 注:join方法会阻塞,因此有必要在两个子任务的计算都开始之后才执行join方法
        Long leftResult = leftTask.join();
        return leftResult + rightResult;
    }
 
    private long computeSequentially() {
        long sum = 0;
        for (int i = start; i < end; i++) {
            sum += numbers[i];
        }
        return sum;
    }
 
    public static void main(String[] args) {
        System.out.println(forkJoinSum(10_000_000));
    }
 
    public static long forkJoinSum(long n) {
        long[] numbers = LongStream.rangeClosed(1, n).toArray();
        ForkJoinTask<Long> task = new ForkJoinTest(numbers);
        return new ForkJoinPool().invoke(task);
    }
}

上面的代码实现了 递归方式拆分子任务,并放入到线程池中执行。

5.参考:

https://zh.wikipedia.org/wiki/%E5%B9%B6%E8%A1%8C%E8%AE%A1%E7%AE%97

到此这篇关于Java并行处理的实现的文章就介绍到这了,更多相关Java并行处理内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • MyBatis3源码解析之如何获取数据源详解

    MyBatis3源码解析之如何获取数据源详解

    用myBatis3与spring整合的时候,我们可以通过多种方式获取数据源,下面这篇文章主要给大家介绍了关于MyBatis3源码解析之如何获取数据源的相关资料,文中通过示例代码介绍的非常详细,需要的朋友可以参考下
    2022-06-06
  • Java 中引入内部类的意义?

    Java 中引入内部类的意义?

    这篇文章主要介绍了Java 中引入内部类的意义?文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,,需要的朋友可以参考下
    2019-06-06
  • Java插入修改删除数据库数据的基本方法

    Java插入修改删除数据库数据的基本方法

    这篇文章主要介绍了Java插入修改删除数据库数据的基本方法,是Java入门学习中的基础知识,需要的朋友可以参考下
    2015-10-10
  • java开发之基于Validator接口的SpringMVC数据校验方式

    java开发之基于Validator接口的SpringMVC数据校验方式

    这篇文章主要介绍了java开发之基于Validator接口的SpringMVC数据校验方式,文中附含详细示例代码,有需要的朋友可以借鉴参考下
    2021-09-09
  • Spring Boot 2结合Spring security + JWT实现微信小程序登录

    Spring Boot 2结合Spring security + JWT实现微信小程序登录

    这篇文章主要介绍了Spring Boot 2结合Spring security + JWT实现微信小程序登录,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2021-01-01
  • SpringBoot整合kaptcha实现图片验证码功能

    SpringBoot整合kaptcha实现图片验证码功能

    这篇文章主要介绍了SpringBoot整合kaptcha实现图片验证码功能,文章围绕主题展开详细的内容介绍,具有一定的参考价值,需要的小伙伴可以参考一下
    2022-07-07
  • Java如何实现上传文件到服务器指定目录

    Java如何实现上传文件到服务器指定目录

    这篇文章主要介绍了Java如何实现上传文件到服务器指定目录,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-04-04
  • 一文详解Java如何实现自定义注解

    一文详解Java如何实现自定义注解

    Java实现自定义注解其实很简单,跟类定义差不多,只是属性的定义可能跟我们平时定义的属性略有不同,这篇文章主要给大家介绍了关于Java如何实现自定义注解的相关资料,需要的朋友可以参考下
    2024-07-07
  • Java groovy内存回收测试步骤解析

    Java groovy内存回收测试步骤解析

    这篇文章主要介绍了Java groovy内存回收测试步骤解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-05-05
  • JAVA面向对象之继承 super入门解析

    JAVA面向对象之继承 super入门解析

    在JAVA类中使用super来引用父类的成分,用this来引用当前对象,如果一个类从另外一个类继承,我们new这个子类的实例对象的时候,这个子类对象里面会有一个父类对象。怎么引用里面的父类对象呢?用super来引用,this指当前对象的引用,super是当前对象里面的父对象的引用
    2022-01-01

最新评论