专栏首页一个会写诗的程序员的博客并行执行任务的ForkJoin框架简介

并行执行任务的ForkJoin框架简介

Fork/Join框架简介

从JDK1.7开始,Java提供Fork/Join框架用于并行执行任务,它的思想就是讲一个大任务分割成若干小任务,最终汇总每个小任务的结果得到这个大任务的结果。

这种思想和MapReduce很像(input --> split --> map --> reduce --> output).

主要有两步:

  1. 任务切分
  2. 结果合并

它的模型大致是这样的:线程池中的每个线程都有自己的工作队列(PS:这一点和ThreadPoolExecutor不同,ThreadPoolExecutor是所有线程公用一个工作队列,所有线程都从这个工作队列中取任务),当自己队列中的任务都完成以后,会从其它线程的工作队列中偷一个任务执行,这样可以充分利用资源。

A ForkJoinPool differs from other kinds of ExecutorService mainly by virtue of employing work-stealing: all threads in the pool attempt to find and execute tasks submitted to the pool and/or created by other active tasks (eventually blocking waiting for work if none exist). This enables efficient processing when most tasks spawn other subtasks (as do most ForkJoinTasks), as well as when many small tasks are submitted to the pool from external clients. Especially when setting asyncMode to true in constructors, ForkJoinPools may also be appropriate for use with event-style tasks that are never joined.[API注释]

ForkJoinPool与其它的ExecutorService区别主要在于它使用“工作窃取”:线程池中的所有线程都企图找到并执行提交给线程池的任务。当大量的任务产生子任务的时候,或者同时当有许多小任务被提交到线程池中的时候,这种处理是非常高效的。特别的,当在构造方法中设置asyncMode为true的时候这种处理更加高效。

工作窃取(work-stealing)算法是指某个线程从其他队列里窃取任务来执行。工作窃取的运行流程图如下:

image

ForkJoinTask

ForkJoinTask代表运行在ForkJoinPool中的任务。

主要方法:

  • fork() 在当前线程运行的线程池中安排一个异步执行。简单的理解就是再创建一个子任务。
  • join() 当任务完成的时候返回计算结果。
  • invoke() 开始执行任务,如果必要,等待计算完成。

子类:

  • RecursiveAction 一个递归无结果的ForkJoinTask(没有返回值)
  • RecursiveTask 一个递归有结果的ForkJoinTask(有返回值)

ForkJoinWorkerThread

A thread managed by a ForkJoinPool, which executes ForkJoinTasks.

ForkJoinWorkerThread代表ForkJoinPool线程池中的一个执行任务的线程。

[Ref: https://www.cnblogs.com/cjsblog/p/9078341.html]

代码实例

分治思想用递归实现的经典案例就是斐波那契数列了。

斐波那契数列:1、1、2、3、5、8、13、21、34、……

公式 :

F(1)=1,

F(2)=1,

F(n)=F(n-1)+F(n-2)(n>=3,n∈N*)

我们使用单线程的递归计算 Fibonacci 数列的代码是:

class Fibonacci(val n: Long) {
    fun compute(): Long {
        if (n <= 1) {
            return n
        }
        return Fibonacci(n - 1).compute() + Fibonacci(n - 2).compute()
    }
}

这个代码是单线程的,运行效果如下:

val fib = Fibonacci(40)

val v = fib.compute()
println("v=$v")

运行时间是:4099ms

我们采用ForkJoin框架并发计算的代码是:

class FibonacciRecursiveTask(val n: Long) : RecursiveTask<Long>() {

    override fun compute(): Long {
        if (n <= 1) {
            return n
        }
        val f1 = FibonacciRecursiveTask(n - 1)
        val f2 = FibonacciRecursiveTask(n - 2)
        ForkJoinTask.invokeAll(f1, f2)
        return f2.join() + f1.join()
    }
}

调用运行代码:

val fib = FibonacciRecursiveTask(30)
val forkJoinPool = ForkJoinPool(4) // 最大并发数
val f = forkJoinPool.invoke(fib)
println("f=$f") // f=102334155

运行时间是: 343ms


本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 死锁、活锁、饥饿锁、无锁

    死锁、活锁、饥饿是关于多线程是否活跃出现的运行阻塞障碍问题,如果线程出现了这三种情况,即线程不再活跃,不能再正常地执行下去了。

    一个会写诗的程序员
  • 给 Java 开发者的 Kotlin 快速上手教程(Kotlin for Java Developers)v0.1

    1995年,当年如日中天的Sun公司发布了Java语言,引起了巨大的轰动,与当时主流的C语言和Basic语言比起来,Java语言简单、面向对象、稳定、与平台无关...

    一个会写诗的程序员
  • Java并发多线程

    并发知识不管在学习、面试还是工作过程中都非常非常重要,看完本文,相信绝对能助你一臂之力。

    一个会写诗的程序员
  • Java Fork/Join 框架

    响应式编程(Reactive Programming / RP)作为一种范式在整个业界正在逐步受到认可和落地,是对过往系统的业务需求理解梳理之后对系统技术设计/...

    哲洛不闹
  • Java并发编程学习前期知识上篇

    Java并发,这个无论是面试还是在工作中,并发都是会遇到的。Java并发包JUC(java.util.concurrent)有了解过哪些?并发包实现最重要的是什...

    凯哥Java
  • 【转】 Java 多线程之一

    进程:一个计算机程序的运行实例,包含了需要执行的指令;有自己的独立地址空间,包含程序内容和数据;不同进程的地址空间是互相隔离的;进程拥有各种资源和状态信息,包括...

    shirayner
  • 现象之下,问题环生,国内云CRM厂商何时自医?

    近日,T研究发布了《2018年中国云CRM市场及用户实践研究报告》,持续关注该市场发展动态。

    人称T客
  • spark求最受欢迎的老师的问题

    曼路
  • Kotlin实战【六】Kotlin中集合的创建

    public interface Collection<out E> : Iterable<E>{...}

    先知先觉
  • JDK中Concurrent包工具类指南

    这篇翻译指南很能解决问题,对于初步建立并发包的认识很有帮助,感谢原作者和翻译者 Java 并发工具包 java.util.concurrent 用户指南 ...

    于霆霖

扫码关注云+社区

领取腾讯云代金券