浅析Java的Frok/Join框架 一丶Fork/Join框架产生背景:二丶工作窃取算法三丶Fork/Join框架的设计思想四丶JDK实现Fork/Join框架五丶ForkJoin框架的实现原理

一丶Fork/Join框架产生背景:

随着并发需求的不断提高和硬件的不断发展,程序并行执行仿佛就提上日程上来了,伟大的毛主席就说过:“人多力量大”,所以如果一件事可以分配给多个人同时去做,到最后再把完成的事情组合到一起去,那么做事情的效率就会大大提升。用下面的这张图啦感受一下:

一件大事被分为五个处理器来处理(当然分开的这些任务没有依赖性),最终完成合并做成一件大事! 到这里,大家应该对这个框架有一个简单的认识了吧! Fork/Join框架是Java1.7开始提供的一个并行执行框架,结合上面的讲述,再结合Fork/Join的字面意思我们可以知道,先Fork(分叉),再结合!


二丶工作窃取算法

想必只要提到Fork/Join框架,都要提到工作窃取算法,工作窃取是指在分别完成分配的事情时,如果工作结束早的线程可以窃取(帮助)其他线程来完成工作,最终达到总的任务快速完成的目的,就像老板布置一样任务,项目经理将这个任务分给甲乙丙三个程序员来完成这样事情,甲比较厉害,或者分配的事情比较少,他完成以后他主动帮助乙来完成工作。 如果这样子来完成任务,那么怎么分配任务就是一个问题了: 首先将任务分为互不依赖的子任务,将这些子任务放到不同的队列里,并安排一个线程来处理这些任务,线程和队列(一般使用双端队列)一一对应,当有线程完成时,就会去其他队列里窃取任务,但是不同的是,窃取任务是从队列的尾端窃取,防止窃取过程中发生线程竞争。 优点:充分利用并行计算,提高运算效率。 缺点:在某些情况下还是会发生竞争,例如当队列中只剩下一个任务,两个线程进行竞争。


三丶Fork/Join框架的设计思想

理解了Fork/Join框架的设计思想后,我们理解这个框架就会变的非常容易,我们用几行伪代码来直观的告诉你什么是Fork/Join思想:

if(任务足够小){
    进行计算;
}else{
    将任务分为两个部分;
    结合两个子任务;
}

四丶JDK实现Fork/Join框架

(1)JDk为Fork/Join框架提供了很好的支持,我们想要用这个算法首先得创建一个Fork/Join任务,在JDK中这个任务就叫做:ForJoinTask,只要继承这个类就可以创建一个任务类,但是实际使用中并不是直接继承ForkJoinTask类,而是继承它的子类,它有两个子类,分别是RecursiveAction和RecursiveTask,它们之间的区别是是否返回任务结果,前者用于没有返回结果的任务,后者用于有返回结果的任务。

(2)有了Fork/Join任务后还需要执行任务,JDK提供了ForkJoinPool来执行。

(3)下面我们来看一个用ForkJoin框架来进行累加计算例子:

//继承RecursiveTask,用于与结果返回的任务
public class CountTask  extends RecursiveTask<Integer>{
    //阈值为2,当小于等于这个值的时候进行计算
    private static final int THRESHOLD = 2;
    
    //累加的起始值
    private int start;
    private int end;
    
    public CountTask(int start,int end){
        this.start = start;
        this.end = end;
    }
    
    protected Integer compute() {
        int sum = 0;
        boolean canCompute = (end-start) <= THRESHOLD;
        //如果false,表示end-start大于2,还要进行分割
        if(canCompute) {
            for(int i = start;i<=end; i++ ) {
                sum += i;
            }
        }
        else {
            int mid = (end + start) / 2;
            //分为两个任务
            CountTask leftTask = new CountTask(start,mid);
            CountTask rightTask = new CountTask(mid + 1,end);
            //执行子任务
            leftTask.fork();
            rightTask.fork();
            //等待子任务完成
            int leftResult = leftTask.join();
            int rightResult = rightTask.join();
            //合并子任务
            sum = leftResult+rightResult;
            
        }
        return sum;
    }
    
    public static void main(String[] args) {
        //创建ForkJoinPool 对象执行任务
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        //计算1-5累加
        CountTask task = new CountTask(1,5);
        //执行
        Future<Integer> result = forkJoinPool.submit(task);
        try {
            System.out.println(result.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
}

五丶ForkJoin框架的实现原理

在看到例子后我的第一个想法是,这个框架用到了递归,这和我之前看到的归并排序有点类似,那我们来看看框架到底是怎么实现的: ForkJoinPool由ForkJoinTask数组和ForkJoinWorkThread数组组成。ForkJoinTask负责将存储的任务提交给ForkJoinPool,ForkJoinWorkThread负责处理这些任务。 但是我简单的翻了下源码,发现ForkJoinPool里面已经没有ForkJoinWorkThread数组了,取而代之的是ForkJoinWorkThread工厂(博主Java8)。

既然是实现原理,就从它的核心方法开始看起: 5.1submit方法: 一个任务执行的开端是submit方法,即将任务提交给ForkJoinPool,让它唤醒一个ForkJoinWorkThread线程来执行这个任务:

public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
        //任务为空抛异常
        if (task == null)
            throw new NullPointerException();
        //调用externalPush方法,这个我们往下看,看看它玩啥花样
        externalPush(task);
        return task;
    }
  final void externalPush(ForkJoinTask<?> task) {
        //工作队列,也就是之前介绍框架时所提到的偷窃队列
        //创建队列数组的个数和线程数一致
        WorkQueue[] ws; WorkQueue q; int m;
        //getProbe用来为当前线程生成一个不强制初始化的取样值
        int r = ThreadLocalRandom.getProbe();
        //runState为ForkJoinPool 的volatile字段,表示锁的状态
        int rs = runState;
        //workQueues为ForkJoinPool 的数组volatile类型字段
        //当这个队列不为空(如果为空肯定是要初始化一个队列的)
        //并且队列长度大于等于0
        //额...后面原谅我看不懂了
        //但是大概的意思应该是要调用我们自定义的task对象的方法,对这个任务进行分割
        if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
            (q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 &&
            U.compareAndSwapInt(q, QLOCK, 0, 1)) {
            ForkJoinTask<?>[] a; int am, n, s;
            if ((a = q.array) != null &&
                (am = a.length - 1) > (n = (s = q.top) - q.base)) {
                int j = ((am & s) << ASHIFT) + ABASE;
                //将task放入ForkJoinTask数组中
                U.putOrderedObject(a, j, task);
                U.putOrderedInt(q, QTOP, s + 1);
                U.putIntVolatile(q, QLOCK, 0);
                //signalWork是用来创建线程来执行任务
                if (n <= 1)
                    signalWork(ws, q);
                return;
            }
            U.compareAndSwapInt(q, QLOCK, 1, 0);
        }
        externalSubmit(task);
    }

5.2fork方法: 我参考过并发编程的艺术,Java7中的fork方法和Java8的fork方法有点小不同! 以下代码是Java8的:

 public final ForkJoinTask<V> fork() {
        Thread t;
        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
            ((ForkJoinWorkerThread)t).workQueue.push(this);
        else
            ForkJoinPool.common.externalPush(this);
        return this;
    }

我们也说了这个fork方法是由submit方法触发的,且submit方法里面唤醒过ForkJoinWorkerThread来处理队列中的任务,所以通过push方法将当前分割后任务放到队列中,同时调用signalWork唤醒线程对分割后的线程进行处理。

  final void push(ForkJoinTask<?> task) {
            ForkJoinTask<?>[] a; ForkJoinPool p;
            int b = base, s = top, n;
            if ((a = array) != null) {    // ignore if queue removed
                int m = a.length - 1;     // fenced write for task visibility
                U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
                U.putOrderedInt(this, QTOP, s + 1);
                if ((n = s - b) <= 1) {
                    if ((p = pool) != null)
                        p.signalWork(p.workQueues, this);
                }
                else if (n >= m)
                    growArray();
            }
        }

我对这个框架的理解是这样的(因为网上关于1.8的fork/join框架介绍比较少所以博主水平有限,理解可能有偏差):

1)submit提交task 2)将task暂存到ForkJoinTask数组中 3)调用signalWork方法创建或唤醒线程来执行ForkJoinTask数组中的任务 4)线程拿到任务后调用fork方法 5)fork方法将任务再分割,放到ForkJoinTask数组中(分割前的总任务清 除),然后再唤醒线程来处理ForkJoinTask数组中的任务(唤醒线程的个数取决任务个数)。 6)fork后通过调用join方法将结果合并。 7)任务继续分割,直到不能再分割,然后各个线程进行计算。 8)像递归一样,将数据合并并返回,最终返回到submit方法中。

以上就是博主读fork/join框架的理解,可能偏差很大,大的离谱,如果有大神看见还请赐教!

分析完上面的,博主还是不放心,那就断点走起吧:

上图是我们要执行的主程序(完整程序上面有),第一行和第二行没什么新建一些对象,当执行到Future<Integer> result = forkJoinPool.submit(task);时我们看看都有哪些变量:

这个时候还没啥迹象,且result都没有值,只有这段代码执行结束才能看出点啥Future<Integer> result = forkJoinPool.submit(task);: 看看最终形态:

额,告诉大家一个不幸的消息,我没看出啥~(滑稽脸.jpg) 此次分析到此结束,有空再来拜读源码!

恳请有大牛看见了此篇文章指点一下,万谢!!! 2018.4.13 17:19

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏高性能服务器开发

写给新手们看的编程修养

什么是好的程序员?是不是懂得很多技术细节?还是懂底层编程?还是编程速度比较快?我觉得都不是。对于一些技术细节来说和底层的技术,只要看帮助,查资料就能找到,对于速...

1393
来自专栏程序员互动联盟

【编程基础】写代码,你应该知道九类规则

网上有太多讲编码规范、编码习惯的文章,但我总是念的多,实际去认真阅读理解的少。或多或少的按照自己的思维去编写代码。这种习惯让我吃大亏,比如一个指针未赋值导致偶尔...

4215
来自专栏数据科学学习手札

(数据科学学习手札32)Python中re模块的详细介绍

  关于正则表达式,我在前一篇(数据科学学习手札31)中已经做了详细介绍,本篇将对Python中自带模块re的常用功能进行总结;

3324
来自专栏木可大大

编写优雅代码的最佳实践

Robert Martin曾说过"在代码阅读中说脏话的频率是衡量代码质量额唯一标准"。同时,代码的写法应当使别人理解它所需的时间最小化,也就是说我们写的代码是给...

45320
来自专栏Pythonista

封装-python

    从封装本身的意思去理解,封装就好像是拿来一个麻袋,把小猫,小狗,小王八,还有alex一起装进麻袋,然后把麻袋封上口子。但其实这种理解相当片面

2882
来自专栏较真的前端

新手们容易在Promise上挖的坑~

2395
来自专栏每周一脱topic

Effictive python学习总结连载(1)

python从读研开始就在用了,拿来做过web后台、安全分析、爬虫、测试框架等等,挺强大的。最近借放假和看书和整理的机会,系统的总结下。主要是2方面:一个是书或...

2172
来自专栏Crossin的编程教室

【Python 第23课】 if, elif, else

今天补充之前讲过的一个语句:if。为什么我跳要着讲,因为我的想法是先讲下最最基本的概念,让你能用起来,之后你熟悉了,再说些细节。 关于if,可以发送数字『7』回...

2856
来自专栏代码世界

计算机基础,Python基础--变量以及简单的循环

一、计算机基础 1.CPU   相当于人体的大脑,用于计算处理数据。 2.内存    用于存储数据,CPU从内存调用数据处理计算,运算速度很快。 PS:问:...

2807
来自专栏PHP在线

设计模式分类

Introduction 根据目的和范围,设计模式可以分为五类。按照目的分为:创建设计模式,结构设计模式,以及行为设计模式。按照范围分为:类的设计模式,以及对象...

3356

扫码关注云+社区

领取腾讯云代金券