ForkJoinPool简介ForkJoinPool

ForkJoinPool

背景描述

过去我们在线程池解决问题时,通常维护了一个阻塞的任务队列。每个工作线程在任务完成后,就会去任务队列里面寻找任务。这种方式在我们执行数量较多且不互相依赖的任务时非常方便且高效。但是当我们需要执行一个很大的任务时,普通的线程池似乎就很难有什么帮助了。

在JDK7中新增了ForkJoinPool。ForkJoinPool采用分治+work-stealing的思想。可以让我们很方便地将一个大任务拆散成小任务,并行地执行,提高CPU的使用率。关于ForkJoinPool的精妙之处,我们将在后面的使用中慢慢说明。

如何使用

构造方法

Android官方文档中给出了三个构造方法。我们注意到在构造方法中,我们可以设置ForkJoinPool的最大工作线程数、工作线程工厂、拒绝任务的Handler和同步模式。

执行任务

ForkJoinPool提供了两套执行任务的API,它们的区别主要是返回的结果类型不同。invoke方法返回执行的结果,而submit方法返回执行的任务。

使用示例

需求

遍历系统所有文件,得到系统中文件的总数。

思路

通过递归的方法。任务在遍历中如果发现文件夹就创建新的任务让线程池执行,将返回的文件数加起来,如果发现文件则将计数加一,最终将该文件夹下的文件数返回。

代码实现

    CountingTask countingTask = new CountingTask(Environment.getExternalStorageDirectory());
    forkJoinPool.invoke(countingTask);

    class CountingTask extends RecursiveTask<Integer> {
        private File dir;

        public CountingTask(File dir) {
            this.dir = dir;
        }

        @Override
        protected Integer compute() {
            int count = 0;

            File files[] = dir.listFiles();
            if(files != null){
                for (File f : files){
                    if(f.isDirectory()){
                        // 对每个子目录都新建一个子任务。
                        CountingTask countingTask = new CountingTask(f);
                        countingTask.fork();
                        count += countingTask.join();

                    }else {
                        Log.d("tag" , "current path = "+f.getAbsolutePath());
                        count++;
                    }
                }
            }


            return count;
        }
    }         

原理说明

所谓work-stealing模式,即每个工作线程都会有自己的任务队列。当工作线程完成了自己所有的工作后,就会去“偷”别的工作线程的任务。

那么这样的工作模式,有什么好处呢?

假如我们需要做一个比较大的任务,我们可以把这个任务分割为若干互不依赖的子任务,为了减少线程间的竞争,于是把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应,比如A线程负责处理A队列里的任务。但是有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有任务等待处理。干完活的线程与其等着,不如去帮其他线程干活,于是它就去其他线程的队列里窃取一个任务来执行。而在这时它们会访问同一个队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。

上面的需求,如果我们用普通的线程池该如何完成?

如果我们使用newFixedThreadPool,当核心线程的路径下都有子文件夹时,它们会将路径下的子文件夹抛给任务队列,最终变成所有的核心线程都在等待子文件夹的返回结果,从而造成死锁。最终任务无法完成。

如果我们使用newCachedThreadPool,依然用上面的思路可以完成任务。但是每次子文件夹就会创建一个新的工作线程,这样消耗过大。

因此,在这样的情况下,ForkJoinPool的work-stealing的方式就体现出了优势。每个任务分配的子任务也由自己执行,只有自己的任务执行完成时,才会去执行别的工作线程的任务。

再来个例子

N项的Fibonacci数列求和,我们不再只能仰仗单个线程为我们执行任务。

package com.example;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

public class MyClass {

    static int computeCount = 0;

    static class Fibonacci extends RecursiveTask<Integer> {
        int n;

        Fibonacci(int n) {
            this.n = n;
        }

        @Override
        protected Integer compute() {
            computeCount ++;
            System.out.printf("Current thread is " + Thread.currentThread()
                    + "\n n = " + n + "\n");

            if (n <= 2)
                return 1;
            Fibonacci f1 = new Fibonacci(n - 1);
            f1.fork();

            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            Fibonacci f2 = new Fibonacci(n - 2);
            f2.fork();
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.printf("wati temp answer is :" + n + "\n");
            int answer = f1.join() + f2.join();
            System.out.printf("temp answer is :" + answer  + ",  n is :" +n +"\n");
            return answer;
        }
    }


    public static void main(String[] args)  {
        ForkJoinPool pool = new ForkJoinPool(2);
        Fibonacci task = new Fibonacci(5);
        int answer = 0;
        answer = pool.invoke(task);
        System.out.printf("Hello answer is :" + answer +  " , compute count is :" + computeCount);
    }
}

结语

实测下来,当情况足够复杂时,ForkJoinPool的优势会愈加明显。但是,就像快排一样,最优策略并不是一个思路走到死,当分治的区域较小时,可以将小区域改用插入排序进行排序。同理,当我们递归到情况不再复杂时,就可以转而用别的线程池进行处理。

以上

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Ryan Miao

sessionid如何产生?由谁产生?保存在哪里?

面试问道这个我居然不知道怎么回答,当然也是因为我确实没有研究过。下面就是百度了一篇文章后简单回答这个问题。 参考:http://www.cnblogs.com/...

3567
来自专栏iOS技术

YYWebImage 源码剖析:线程调度与缓存策略

在 iOS 开发中,异步网络图片下载框架可以说是很大的解放了生产力,通常情况下开发者只需要简单的代码就能将网络图片异步下载并显示到手机屏幕上,并且还带有缓存优化...

1044
来自专栏武军超python专栏

2018年8月15日UDP编程和面向对象的TCP编程

TCP协议:(Transmission Control Protocol 传输控制协议)是一种面向连接的、可靠的、基于字节流的传输层通信协议 UDP协议:(U...

795
来自专栏三丰SanFeng

redis配置详解(中英文)

V2.8.21: (中英字幕同步) # Redis configuration file example #* Redis 配置文件例子 # Note on...

2208
来自专栏木木玲

Netty in action ——— Bootstrapping

992
来自专栏邹立巍的专栏

Linux 的进程间通信:信号量

Linux环境下主要实现的信号量有两种。根据标准的不同,它们跟共享内存类似,一套XSI的信号量,一套POSIX的信号量。下面我们分别使用它们实现一套类似文件锁的...

3020
来自专栏Java帮帮-微信公众号-技术文章全总结

Java设计模式-命令模式

在对象的结构和创建问题都解决了之后,就剩下对象的行为问题了: 如果对象的行为设计的好,那么对象的行为就会更清晰,它们之间的协作效率就会提高. 行为型模式共有1...

3396
来自专栏菩提树下的杨过

mybatis 3.x 缓存Cache的使用

mybatis 3.x 已经支持cache功能了,使用很简单,在mappper的xml文件里添加以下节点: 1 <mapper namespace="com....

19710
来自专栏解Bug之路

MySQL多版本并发控制机制(MVCC)-源码浅析 顶

作为一个数据库爱好者,自己动手写过简单的SQL解析器以及存储引擎,但感觉还是不够过瘾。<<事务处理-概念与技术>>诚然讲的非常透彻,但只能提纲挈领,不能让你玩转...

942
来自专栏Golang语言社区

服务器端Go程序对长短链接的处理及运行参数的保存

对长、短连接的处理策略(模拟心跳) 作为一个可能会和很多Client进行通讯交互的Server,首先要保证的就是整个Server运行状态的稳定性,因此在和Cli...

4107

扫码关注云+社区