Java并发---ForkJoin框架

介绍

JDK 1.7 后,标准类库添加了 ForkJoinPool,作为对 Fork/Join 型线程池的实现。

Fork&Join的作用:Fork 将大型任务递归拆分成多个小型任务,然后让小任务执行,Join 则会等待获得小任务的结果,然后进行合并,将合并的结果作为大任务的结果。这使用的则是分治思想实现的,只是这些子任务都可以并行执行。

Fork&Join

ForkJoin的任务

ForkJoin中的任务主要分为两个:RecursiveAction以及RecursiveTask。而常用的也是继承这两个类,并且重写compute()函数,完成子任务的工作。

其中RecursiveAction则不会带有返回结果,不会涉及到父任务的结果合并中。而RecursiveTask则是带返回结果的Fork/Join任务,这类任务则是需要父任务等待子任务执行完后,使用子任务结果来合并任务结果。

通过这两个类的fork()函数,可以产生子任务,并且并行执行子任务。而通过join()函数则可以等待子任务的执行完成,并且获取结果。

执行任务

ForkJoin中可以使用三种方式开始执行任务:

  1. invoke 方法: 用来执行一个带返回值的任务(通常继承自RecursiveTask),并且该方法是阻塞的,直到任务执行完毕,该方法才会停止阻塞并返回任务的执行结果。
  2. Submit方法: 该 submit 方法用来执行带返回值的ForkJoinTask(通常继承自RecursiveTask)。该方法是非阻塞的,调用之后将任务提交给 ForkJoinPool 去执行便立即返回,返回的便是已经提交到 ForkJoinPool 去执行的 task, ForkJoinTask 实现了 Future 接口,所以可以直接通过 task 来和已经提交的任务进行交互
  3. Execute方法: 该 execute 方法用来执行不带返回值的ForkJoinTask(通常继承自RecursiveAction) ,该方法同样是非阻塞的。

举例

例如下面计算数组中数的总值:

  1. compute()函数中判断是否任务是否不需要拆分子任务,如果是的话,则直接执行即可
  2. 如果任务太大,则继续拆分成子任务,并且调用fork()开始并行执行子任务
  3. 子任务加入队列后,调用join()等待子任务执行完成后返回结果
  4. 计算子任务结果,将子任务结果返回给父任务,标识父任务完成
class SumTask extends RecursiveTask<Long> {

    static final int THRESHOLD = 100;
    long[] array;
    int start;
    int end;

    SumTask(long[] array, int start, int end) {
        this.array = array;
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        if (end - start <= THRESHOLD) {
            // 如果任务足够小,直接计算:
            long sum = 0;
            for (int i = start; i < end; i++) {
                sum += array[i];
            }
            return sum;
        }
        // 任务太大,一分为二:
        int middle = (end + start) / 2;
        SumTask subtask1 = new SumTask(this.array, start, middle);
        SumTask subtask2 = new SumTask(this.array, middle, end);
        // 执行fork的任务
        invokeAll(subtask1, subtask2);
        // 也可以调用fork();
        // subtask1.fork();
        // subtask2.fork();
        // 调用join,等待执行结果
        Long subresult1 = subtask1.join();
        Long subresult2 = subtask2.join();
        Long result = subresult1 + subresult2;
        return result;
    }
}

其中,会调用invokeAll的原因是因为:

invokeAll()的N个任务中,其中N-1个任务会使用fork()交给其它线程执行,但是,它还会留一个任务自己执行,这样,就充分利用了线程池,保证没有空闲的不干活的线程。

工作窃取(Work-Stealing)

在ForkJoin的框架中,很多时候子任务的执行时间是不均匀的,有些子任务的时间比较长,有些子任务执行的时间比较短,子任务时间比较短的在任务完成后,就会去窃取其他未完成的任务执行。

在ForkJoin的框架实现该机制的原理则是:

线程池中每个线程都有一个互不影响的任务队列(双端队列),线程每次都从自己的任务队列的队头中取出一个任务来运行;如果某个线程对应的队列已空并且处于空闲状态,而其他线程的队列中还有任务需要处理但是该线程处于工作状态,那么空闲的线程可以从其他线程的队列的队尾取一个任务来帮忙运行。感觉就像是空闲的线程去偷人家的任务来运行一样,所以叫 “工作窃取”

Work-Stealing

Work-Stealing 的适用场景是不同的任务的耗时相差比较大,即某些任务需要运行较长时间,而某些任务会很快的运行完成,这种情况下用 Work-Stealing 很合适;但是如果任务的耗时很平均,则此时 Work-Stealing 并不适合,因为窃取任务时不同线程需要抢占锁,这可能会造成额外的时间消耗,而且每个线程维护双端队列也会造成更大的内存消耗。所以ForkJoinPool并不是ThreadPoolExecutor的替代品,而是作为对 ThreadPoolExecutor的补充。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

扫码关注云+社区

领取腾讯云代金券