Fork/Join 框架

1. 简介

1.1 什么是Fork/Join框架

Java 5 引入了 Executor 和 ExecutorService 接口,使得 Java在并发支持上得到了进一步的提升。 Java 7 更进了一步,Fork/Join框架是 ExecutorService接口的一个实现,用来解决可以通过分治法将问题拆分成小任务的问题。在一个任务中,先检查将要解决的问题大小,如果大于一个设定的大小,那就将问题拆分成可以通过框架来执行的小任务,否则直接在任务里解决这个问题,然后根据需要返回任务的结果。下面的图形总结了这个原理:

1.2 工作窃取算法

Fork/Join框架和 Executor Framework主要的区别在于工作窃取算法(Work-Stealing Algorithm)。假设我们有一个大任务,把任务分成互不依赖的子任务,为了减少线程间的竞争,就把这些子任务放到不同队列中,并为每个队列创建一个单独的线程来执行队列里的任务。但是有的线程干活干得快,与其等着不如去帮其他线程完成任务,通过这种方式,这些线程在运行时拥有所有的优点,进而提升应用程序的性能。 为了达到这个目标,通过Fork/Join框架执行的任务有以下限制

  • 任务只能使用fork()和join()操作当作同步机制。如果使用其他同步机制,负责收集子任务结果的工作者线程就不能执行其他任务。
  • 任务不能执行I/O操作
  • 任务不能抛出非运行时异常,必须在代码中处理掉这些异常。

1.3 Fork/Join框架的核心类

ForkJoinPool:这个类实现了ExecutorService接口和工作窃取算法,它管理工作者线程,并提供任务的状态信息,以及任务的执行信息。

ForkJoinTask:这个类是一个将在ForkJoinPool中执行的任务的基类。框架中提供了两个子类:

  • RecursiveAction:用于没有返回结果的任务
  • RecursiveTask:用于有返回结果的任务

2.实例

在文档中查找一个词,我们将实现以下两种任务:

  • 一个文档任务,它将遍历文档中的每一行来查找这个词
  • 一个行任务,它将在文档的一部分当中查找这个词

所有这些任务将返回文档或行中所出现这个词的次数。

public class DocumentMock {

    private String words[]={"the","hello","goodbye","packet","java","thread","pool","random","class","main"};

    public String[][] generateDocument(int numLines, int numWords, String word) {
        int counter = 0;
        String document[][] = new String[numLines][numWords];
        Random random = new Random();
        for (int i = 0; i < numLines; i++) {
            for (int j = 0; j < numWords; j++) {
                int index = random.nextInt(words.length);
                document[i][j] = words[index];
                if (document[i][j].equals(word)) {
                    counter++;
                }
            }
        }
        System.out.println("DocumentMock: the word appears " + counter + "times in the document");
        return document;
    }
}

DocumentTask类:这个类的任务需要处理由start和end属性决定的文档行,如果行数小于10,那么就每行创建一个LineTask对象,然后在任务执行后结束,合计返回的结果,并返回总数。如果任务要处理的行数大于10,那么将任务拆分成两组,并创建DocumentTask来处理这两组对象。当这些任务执行结束后,合计返回结果。

public class DocumentTask extends RecursiveTask<Integer> {

    private String document[][];
    private int start, end;
    private String word;

    public DocumentTask(String[][] document, int start, int end, String word) {
        this.document = document;
        this.start = start;
        this.end = end;
        this.word = word;
    }

    @Override
    protected Integer compute() {
        int result = -1;
        if (end - start < 10) {
            result = processLines(document, start, end, word);
        } else {
            int mid = (start + end) / 2;
            DocumentTask task1 = new DocumentTask(document, start, mid, word);
            DocumentTask task2 = new DocumentTask(document, mid, end, word);
            invokeAll(task1, task2);
            try {
                result = groupResults(task1.get(), task2.get());
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        }
        return result;
    }

    private Integer processLines(String[][] document, int start, int end, String word) {
        List tasks = new ArrayList();
        for (int i = start; i < end; i++) {
            LineTask task = new LineTask(document[i], 0, document[i].length, word);
            tasks.add(task);
        }
        invokeAll(tasks);
        int result = 0;
        for (int i = 0; i < tasks.size(); i++) {
            LineTask task = tasks.get(i);
            try {
                result = result + task.get();
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        }
        return new Integer(result);
    }
    private Integer groupResults(Integer number1, Integer number2) {
        Integer result;
        result = number1 + number2;
        return result;
    }
}

LineTask这个类需要处理文档中一行的某一组词。如果一组词的个数小于100,那么任务将直接在这一组词里搜索特定词,然后返回查找词在这一组词中出现的次数。否则,将任务拆分成两组,并创建两个LineTask对象来处理。当结果执行结束后,返回合并结果。

public class LineTask extends RecursiveTask<Integer> {
    private String line[];
    private int start, end;
    private String word;

    public LineTask(String[] line, int start, int end, String word) {
        this.line = line;
        this.start = start;
        this.end = end;
        this.word = word;
    }
    @Override
    protected Integer compute() {
        Integer result = null;
        if (end - start < 100) {
            result = count(line, start, end, word);
        } else {
            int mid = (start + end) / 2;
            LineTask task1 = new LineTask(line, start, mid, word);
            LineTask task2 = new LineTask(line, mid, end, word);
            invokeAll(task1, task2);
            try {
                result = groupResults(task1.get(), task2.get());
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        }
        return result;
    }
    private Integer count(String[] line, int start, int end, String word) {
        int counter = 0;
        for (int i = start; i < end; i++) {
            if (line[i].equals(word)) {
                counter++;
            }
        }
        //为了延缓执行,休眠10ms
        try {
            Thread.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return counter;
    }
    private Integer groupResults(Integer number1, Integer number2) {
        Integer result;
        result = number1 + number2;
        return result;
    }
}

Main函数,通过默认构造器创建了ForkJoinPool对象,然后执行DocumentTask类,来出来一共100行,每行1000字的文档,这个任务将问题拆分成DocumentTask对象和LineTask对象,然后当所有的任务执行完成后,使用原始的任务来获取整个文档中所要查找的词出现的次数,由于任务继承了RecursiveTask类,因此能够返回结果。

public class Main {
    public static void main(String[] args) {
        DocumentMock mock = new DocumentMock();
        String[][] document = mock.generateDocument(100, 1000, "the");
        DocumentTask task = new DocumentTask(document, 0, 100, "the");
        ForkJoinPool pool = new ForkJoinPool();
        pool.execute(task);
        do {
            System.out.println("***********************************");
            System.out.printf("Main Parallelism : %d\n", pool.getActiveThreadCount());
            System.out.printf("Main Task Count : %d\n", pool.getQueuedTaskCount());
            System.out.printf("Main Steal Count : %d\n", pool.getStealCount());
            System.out.println("***********************************");

            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        } while (!task.isDone());

        pool.shutdown();
        try {
            pool.awaitTermination(1, TimeUnit.DAYS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        try {
            if (task != null) {
                System.out.printf("Main: The word appears %d in the document", task.get());
            }
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
}

 执行结果:

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Java技术交流群809340374

史上最全Java面试266题:算法+缓存+TCP+JVM+搜索+分布式+数据库

230
来自专栏Jackson0714

C#多线程之旅(3)——线程池

3216
来自专栏草根专栏

使用C# (.NET Core) 实现状态设计模式 (State Pattern)

3055
来自专栏斑斓

编程修炼 | Scala中Stream的应用场景及其实现原理

假设一个场景需要在50个随机数中找到前两个可以被3整除的数字。听起来很简单,我们可以这样来写: def randomList = (1 to 50).map(_...

2775
来自专栏与神兽党一起成长

解析XML和JSON内容的一点技巧

在没有统一标准的情况下,一个系统对接多个外部系统往往会遇到请求接口响应数据异构的情况,有可能返回的是XML,也有可能返回 JSON。除了返回类型不同,内容结构也...

202
来自专栏张善友的专栏

内容协商 (Content Negotiation)

大多数响应包含一个实体,此实体包含人类用户能理解的信息。通常,希望提供给用户相应于请求最容易得到的实体。对服务器和缓存来说,不幸的是,并不是所有的用户都对这个最...

1769
来自专栏逸鹏说道

Python3 与 C# 扩展之~基础衍生

在线编程: https://mybinder.org/v2/gh/lotapp/BaseCode/master

853
来自专栏iKcamp

iKcamp|基于Koa2搭建Node.js实战(含视频)☞ 解析JSON

视频地址:https://www.cctalk.com/v/15114923886141 JSON 数据 我颠倒了整个世界,只为摆正你的倒影。 前面的文章中,...

3629
来自专栏java达人

最有价值的50道java面试题(一)

来自骆昊的技术专栏 1、面向对象的特征有哪些方面? 答:面向对象的特征主要有以下几个方面: 1)抽象:抽象是将一类对象的共同特征总结出来构造类的过程,...

22310
来自专栏企鹅号快讯

浅谈 Java 并发编程中的若干核心技术

作者:一字马胡 原文:http://www.jianshu.com/p/5f499f8212e7 索引 Java线程 线程模型 Java线程池 Future(各...

1786

扫码关注云+社区