前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >并行执行任务的ForkJoin框架简介

并行执行任务的ForkJoin框架简介

作者头像
一个会写诗的程序员
发布2019-07-15 17:21:07
9630
发布2019-07-15 17:21:07
举报

Fork/Join框架简介

从JDK1.7开始,Java提供Fork/Join框架用于并行执行任务,它的思想就是讲一个大任务分割成若干小任务,最终汇总每个小任务的结果得到这个大任务的结果。

这种思想和MapReduce很像(input --> split --> map --> reduce --> output).

主要有两步:

  1. 任务切分
  2. 结果合并

它的模型大致是这样的:线程池中的每个线程都有自己的工作队列(PS:这一点和ThreadPoolExecutor不同,ThreadPoolExecutor是所有线程公用一个工作队列,所有线程都从这个工作队列中取任务),当自己队列中的任务都完成以后,会从其它线程的工作队列中偷一个任务执行,这样可以充分利用资源。

A ForkJoinPool differs from other kinds of ExecutorService mainly by virtue of employing work-stealing: all threads in the pool attempt to find and execute tasks submitted to the pool and/or created by other active tasks (eventually blocking waiting for work if none exist). This enables efficient processing when most tasks spawn other subtasks (as do most ForkJoinTasks), as well as when many small tasks are submitted to the pool from external clients. Especially when setting asyncMode to true in constructors, ForkJoinPools may also be appropriate for use with event-style tasks that are never joined.[API注释]

ForkJoinPool与其它的ExecutorService区别主要在于它使用“工作窃取”:线程池中的所有线程都企图找到并执行提交给线程池的任务。当大量的任务产生子任务的时候,或者同时当有许多小任务被提交到线程池中的时候,这种处理是非常高效的。特别的,当在构造方法中设置asyncMode为true的时候这种处理更加高效。

工作窃取(work-stealing)算法是指某个线程从其他队列里窃取任务来执行。工作窃取的运行流程图如下:

image

ForkJoinTask

ForkJoinTask代表运行在ForkJoinPool中的任务。

主要方法:

  • fork() 在当前线程运行的线程池中安排一个异步执行。简单的理解就是再创建一个子任务。
  • join() 当任务完成的时候返回计算结果。
  • invoke() 开始执行任务,如果必要,等待计算完成。

子类:

  • RecursiveAction 一个递归无结果的ForkJoinTask(没有返回值)
  • RecursiveTask 一个递归有结果的ForkJoinTask(有返回值)

ForkJoinWorkerThread

A thread managed by a ForkJoinPool, which executes ForkJoinTasks.

ForkJoinWorkerThread代表ForkJoinPool线程池中的一个执行任务的线程。

[Ref: https://www.cnblogs.com/cjsblog/p/9078341.html]

代码实例

分治思想用递归实现的经典案例就是斐波那契数列了。

斐波那契数列:1、1、2、3、5、8、13、21、34、……

公式 :

代码语言:javascript
复制
F(1)=1,

F(2)=1,

F(n)=F(n-1)+F(n-2)(n>=3,n∈N*)

我们使用单线程的递归计算 Fibonacci 数列的代码是:

代码语言:javascript
复制
class Fibonacci(val n: Long) {
    fun compute(): Long {
        if (n <= 1) {
            return n
        }
        return Fibonacci(n - 1).compute() + Fibonacci(n - 2).compute()
    }
}

这个代码是单线程的,运行效果如下:

代码语言:javascript
复制
val fib = Fibonacci(40)

val v = fib.compute()
println("v=$v")

运行时间是:4099ms

我们采用ForkJoin框架并发计算的代码是:

代码语言:javascript
复制
class FibonacciRecursiveTask(val n: Long) : RecursiveTask<Long>() {

    override fun compute(): Long {
        if (n <= 1) {
            return n
        }
        val f1 = FibonacciRecursiveTask(n - 1)
        val f2 = FibonacciRecursiveTask(n - 2)
        ForkJoinTask.invokeAll(f1, f2)
        return f2.join() + f1.join()
    }
}

调用运行代码:

代码语言:javascript
复制
val fib = FibonacciRecursiveTask(30)
val forkJoinPool = ForkJoinPool(4) // 最大并发数
val f = forkJoinPool.invoke(fib)
println("f=$f") // f=102334155

运行时间是: 343ms


本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019.07.11 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • ForkJoinTask
  • ForkJoinWorkerThread
  • 代码实例
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档