专栏首页JVMGCJava中J.U.C扩展组件之ForkJoinTask和ForkJoinPool
原创

Java中J.U.C扩展组件之ForkJoinTask和ForkJoinPool

Fork/Join框架中两个核心类ForkJoinTaskForkJoinPool,声明ForkJoinTask后,将其加入ForkJoinPool中,并返回一个Future对象。

  • ForkJoinPool:ForkJoinTask需要通过ForkJoinPool来执行,任务分割的子任务会添加到当前工作维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其它工作线程的队列尾部获取一个任务。
  • ForkJoinTask:我们需要使用ForkJoin框架,首先要创建一个ForkJoin任务。它提供在任务中执行Fork()Join()操作的机制,通常情况下不需要直接继承ForkJoinTask类,而只需要继承它的子类,Fork/Join框架提供以下两个子类。
  • RecursiveAction:用于没有返回值的任务。
  • RecursizeTask:用于有返回值的任务。
image-20210120194025938

Exception

ForkJoinTask在执行的时候可能会抛出异常,但是我们没有办法直接在主线程里捕获异常,所以ForkJoinTask提供了isCompletedAbnormally()方法来检查任务是否已经抛出异常或已经被取消了,并且可以通过ForkJoinTaskgetException方法捕获异常。

public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
    /** ForkJoinTask运行状态 */
    volatile int status; // 直接被ForkJoin池和工作线程访问
    static final int DONE_MASK   = 0xf0000000;  // mask out non-completion bits
    static final int NORMAL      = 0xf0000000;  // must be negative
    static final int CANCELLED   = 0xc0000000;  // must be < NORMAL
    static final int EXCEPTIONAL = 0x80000000;  // must be < CANCELLED
    static final int SIGNAL      = 0x00010000;  // must be >= 1 << 16
    static final int SMASK       = 0x0000ffff;  // short bits for tags
    
    /**
     * @Ruturn 任务是否扔出异常或被取消
     */
    public final boolean isCompletedAbnormally() {
        return status < NORMAL;
    }
    
    /**
     * 如果计算扔出异常,则返回异常
     * 如果任务被取消了则返回CancellationException。如果任务没有完成或者没有抛出异常则返回null
     */
    public final Throwable getException() {
        int s = status & DONE_MASK;
        return ((s >= NORMAL)    ? null :
                (s == CANCELLED) ? new CancellationException() :
                getThrowableException());
    }
}

ForkJoinPool源码

public class ForkJoinPool extends AbstractExecutorService {
    /**
     * ForkJoinPool,它同ThreadPoolExecutor一样,也实现了Executor和ExecutorService接口。它使用了
     * 一个无限队列来保存需要执行的任务,而线程的数量则是通过构造函数传入,如果没有向构造函数中传入希
     * 望的线程数量,那么当前计算机可用的CPU数量会被设置为线程数量作为默认值。
     */
    public ForkJoinPool() {
        this(Math.min(MAX_CAP,Runtime.getRuntime().availableProcessors()),
             defaultForkJoinWorkerThreadFactory, null, false);
    }
    public ForkJoinPool(int parallelism) {
        this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);
    }
    //有多个构造器,这里省略
    
    volatile WorkQueue[] workQueues;     // main registry
    static final class WorkQueue {
        final ForkJoinWorkerThread owner; // 工作线程
        ForkJoinTask<?>[] array;   // 任务
        
        //传入的是ForkJoinPool与指定的一个工作线程
        WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner) {
            this.pool = pool;
            this.owner = owner;
            // Place indices in the center of array (that is not yet allocated)
            base = top = INITIAL_QUEUE_CAPACITY >>> 1;
        }

    }
}

FrokJoinPool work stealing算法

38f0c9a9a9b072b589aba560a849c18bad5.jpg

ForkJoinPool维护了一组WorkQueue,也就是工作队列,工作队列中又维护了一个工作线程ForkJoinWorkerThread与一组工作任务ForkJoinTask

WorkQueue是一个双端队列Deque(Double Ended Queue),Deque是一种具有队列和栈性质的数据结构,双端队列中的元素可以从两端弹出,其限定插入和删除操作在表的两端进行。

每个工作线程在运行中产生新的任务(通常因为调用了fork())时,会放在工作队列的对尾,并且工作线程在处理自己的工作队列时,使用的是LIFO,也就是说每次从队列尾部取任务来执行。

每个工作线程在处理自己的工作队列同时,会尝试窃取一个任务(或是来自于刚刚提交到 pool 的任务,或是来自于其它工作线程的工作队列),窃取的任务位于其他线程的工作队列的队首,也就是说工作线程在窃取其他工作线程的任务时,使用的是 FIFO 方式。

在遇到Join()时,如果需要Join的任务尚未完成,则会优先处理其它任务,并等待其完成。

在没有自己的任务时,也没有任何可以窃取时,则进入休眠。

public class ForkJoinPool extends AbstractExecutorService {
    public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {}
    public <T> ForkJoinTask<T> submit(Callable<T> task) {}
    public <T> ForkJoinTask<T> submit(Runnable task, T result) {}
    public ForkJoinTask<?> submit(Runnable task) {}
}

ForkJoinPool自身也拥有工作队列,这些工作队列的作用是用来接收由外部线程(非 ForkJoinThread线程)提交过来的任务,而这些工作队列被称为 submitting queue

ForkJoinTask

任务的操作,重要的是fork()join()

public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
    /**
     * 在当前任务正在运行的池中异步执行此任务(如果适用)
     * 或使用ForkJoinPool.commonPool()(如果不是ForkJoinWorkerThread实例)进行异步执行 
     */
    public final ForkJoinTask<V> fork() {
        Thread t;
        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
            ((ForkJoinWorkerThread)t).workQueue.push(this);
        else
            ForkJoinPool.common.externalPush(this);
        return this;
    }
    
    public final V join() {
        int s;
        if ((s = doJoin() & DONE_MASK) != NORMAL)
            reportException(s);
        return getRawResult();
    }
    
    private int doJoin() {
        int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
        return (s = status) < 0 ? s :
            ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
            (w = (wt = (ForkJoinWorkerThread)t).workQueue).
            tryUnpush(this) && (s = doExec()) < 0 ? s :
            wt.pool.awaitJoin(w, this, 0L) :
            externalAwaitDone();
    }
}

fork()做的工作只有一件事,就是把当前任务推入当前线程的工作队列里。

join()的工作就比较复杂,也是join()可以使线程免于被阻塞的原因。

  • 检查调用join()的线程是否是ForkJoinThread线程。如果不是(例如main线程),则阻塞当前线程,等待任务完成。如果是,则不阻塞。
  • 检查任务的完成状态,如果已经完成,则直接返回结果。
  • 如果任务尚未完成,但是处理自己的工作队列,则完成它。
  • 如果任务已经被其它线程偷走,则这个小偷工作队列的任务以先进先出的方式执行,帮助小偷线程尽快完成join
  • 如果偷走任务的小偷也已经把自己的任务全部做完,正在等待需要join的任务时,则找到小偷的小偷(递归执行),帮助它完成它的任务。
bc8ab44bbad490edddfe5b6e8febaab2742.jpg

ForkJoinPool.submit方法

public static void main(String[] args) throws ExecutionException, InterruptedException {
        //生成一个池
        ForkJoinPool forkJoinPool=new ForkJoinPool();
        ForkJoinTask task=new ForkJoinExample(1, 100000);
        ForkJoinTask<Integer> submit = forkJoinPool.submit(task);
        Integer sum = submit.get();
        System.out.println("最后的结果是:"+sum);

}

每个工作线程自己拥有的工作队列以外,ForkJoinPool自身也拥有工作队列,这些工作队列的作用是用来接收有外部线程(非ForkJoinPool)提交过来的任务,而这些工作队列被称为submitting queue

submit()fork()没有本质区别,只是提交对象变成了submitting queue(还有一些初始化,同步操作)。submitting queue和其它work queue一样,是工作线程窃取的对象,因此当其中的任务被一个工作线程成功窃取时,也就意味着提交的任务真正开始进入执行阶段。

wx.jpg

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Java中J.U.C扩展组件之Fork,join

    入门小站
  • java线程池(五):ForkJoinPool源码分析之一(外部提交及worker执行过程)

    在前文中介绍了如何使用ForkJoinPool和ForkJoin的一些基本原理。现在继续来分析ForkJoin,原本计划从源码开始分析。但是ForkJoinPo...

    冬天里的懒猫
  • 【高并发】如何使用Java7中提供的Fork/Join框架实现高并发程序?

    作者个人研发的在高并发场景下,提供的简单、稳定、可扩展的延迟消息队列框架,具有精准的定时任务和延迟队列处理功能。自开源半年多以来,已成功为十几家中小型企业提供了...

    冰河
  • 多线程编程学习七( Fork/Join 框架).

    使用 java8 lambda 表达式大半年了,一直都知道底层使用的是 Fork/Join 框架,今天终于有机会来学学 Fork/Join 框架了。

    JMCui
  • Fork/Join 框架及其使用

    fork/join框架是ExecutorService接口的一种具体实现,会将任务分发给线程池中的工作线程,更好地利用多处理器带来的好处,提供程序性能。它是为那...

    CodingDiray
  • 还有年味的文章,ForkJoinPool 大型图文现场

    并发工具类我们已经讲了很多,这些工具类的「目标」是让我们只关注任务本身,并且忽视线程间合作细节,简化了并发编程难度的同时,也增加了很多安全性。工具类的对使用者的...

    用户4172423
  • 最强Java并发编程详解:知识点梳理,BAT面试题等

    在JUC锁: AbstractQueuedSynchonizer详解中类的内部类-conditionobject类有具体分析。

    Java团长
  • 【高并发】什么是ForkJoin?看这一篇就够了!

    作者个人研发的在高并发场景下,提供的简单、稳定、可扩展的延迟消息队列框架,具有精准的定时任务和延迟队列处理功能。自开源半年多以来,已成功为十几家中小型企业提供了...

    冰河
  • java中的fork join框架

    fork join框架是java 7中引入框架,这个框架的引入主要是为了提升并行计算的能力。

    程序那些事
  • Java多线程面试准备:聊聊Executor框架

    在HotSpot VM的线程模型中,Java线程被一对一映射为本地操作系统线程。Java线程启动时会创建一个本地操作系统线程;当Java线程终止时,这个操作系统...

    好好学java
  • java线程池(六):ForkJoinPool源码分析之二(WorkQueue源码)

    在前面介绍了ForkJoinPool的骨架源码之后,我们来看看ForkJoinPool的核心组成。WorkQueue的源码。

    冬天里的懒猫
  • Java8--新特性--串并行流与ForkJoin框架

    PS:工作窃取带来的性能提升偏理论,API的复杂性较高,实际研发中可控性来说不如其他API。一般使用最多的就是做数据处理。接口和数据库尽量不要使用,线程如何堵塞...

    IT架构圈
  • RecursiveTask和RecursiveAction的使用 以及java 8 并行流和顺序流

    转载自 https://blog.csdn.net/weixin_41404773/article/details/80733324

    allsmallpig
  • java线程池(七):ForkJoinPool源码分析之三(ForkJoinTask源码)

    类前面的注释部分如下: ForkJoinTask是在ForkJoinPool中运行task的基础抽象类,ForkJoinTask是类似于线程的实体,其权重比普...

    冬天里的懒猫
  • 我眼中的并发编程——Fork/Join模型

    天策
  • Java多线程详解

    转载自    http://www.cnblogs.com/snow-flower/p/6114765.html

    allsmallpig
  • Java线程(十一):Fork/Join-Java并行计算框架

    并行计算在处处都有大数据的今天已经不是一个新鲜的词汇了,现在已经有单机多核甚至多机集群并行计算,注意,这里说的是并行,而不是并发。严格的将,并行是指系统内...

    高爽
  • 分治算法与Fork/Join框架

    在计算机科学中,分治法是解决多项式分支递归的重要范式;也就是“分而治之”,将复杂问题分成两个或更多相似的子问题,然后将简单的子问题求解,再将子问题的解合并。有很...

    搬砖俱乐部
  • 基于ForkJoin构建一个简单易用的并发组件

    基于ForkJoin构建一个简单易用的并发组件 在实际的业务开发中,需要用到并发编程的知识,实际使用线程池来异步执行任务的场景并不是特别多,而且一般真的遇到了需...

    一灰灰blog

扫码关注云+社区

领取腾讯云代金券