前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Fork/Join 框架

Fork/Join 框架

作者头像
MelonTeam
发布2018-01-04 15:43:29
5410
发布2018-01-04 15:43:29
举报
文章被收录于专栏:MelonTeam专栏MelonTeam专栏

1. 简介

1.1 什么是Fork/Join框架

Java 5 引入了 Executor 和 ExecutorService 接口,使得 Java在并发支持上得到了进一步的提升。 Java 7 更进了一步,Fork/Join框架是 ExecutorService接口的一个实现,用来解决可以通过分治法将问题拆分成小任务的问题。在一个任务中,先检查将要解决的问题大小,如果大于一个设定的大小,那就将问题拆分成可以通过框架来执行的小任务,否则直接在任务里解决这个问题,然后根据需要返回任务的结果。下面的图形总结了这个原理:

1.2 工作窃取算法

Fork/Join框架和 Executor Framework主要的区别在于工作窃取算法(Work-Stealing Algorithm)。假设我们有一个大任务,把任务分成互不依赖的子任务,为了减少线程间的竞争,就把这些子任务放到不同队列中,并为每个队列创建一个单独的线程来执行队列里的任务。但是有的线程干活干得快,与其等着不如去帮其他线程完成任务,通过这种方式,这些线程在运行时拥有所有的优点,进而提升应用程序的性能。 为了达到这个目标,通过Fork/Join框架执行的任务有以下限制

  • 任务只能使用fork()和join()操作当作同步机制。如果使用其他同步机制,负责收集子任务结果的工作者线程就不能执行其他任务。
  • 任务不能执行I/O操作
  • 任务不能抛出非运行时异常,必须在代码中处理掉这些异常。

1.3 Fork/Join框架的核心类

ForkJoinPool:这个类实现了ExecutorService接口和工作窃取算法,它管理工作者线程,并提供任务的状态信息,以及任务的执行信息。

ForkJoinTask:这个类是一个将在ForkJoinPool中执行的任务的基类。框架中提供了两个子类:

  • RecursiveAction:用于没有返回结果的任务
  • RecursiveTask:用于有返回结果的任务

2.实例

在文档中查找一个词,我们将实现以下两种任务:

  • 一个文档任务,它将遍历文档中的每一行来查找这个词
  • 一个行任务,它将在文档的一部分当中查找这个词

所有这些任务将返回文档或行中所出现这个词的次数。

代码语言:javascript
复制
public class DocumentMock {

    private String words[]={"the","hello","goodbye","packet","java","thread","pool","random","class","main"};

    public String[][] generateDocument(int numLines, int numWords, String word) {
        int counter = 0;
        String document[][] = new String[numLines][numWords];
        Random random = new Random();
        for (int i = 0; i < numLines; i++) {
            for (int j = 0; j < numWords; j++) {
                int index = random.nextInt(words.length);
                document[i][j] = words[index];
                if (document[i][j].equals(word)) {
                    counter++;
                }
            }
        }
        System.out.println("DocumentMock: the word appears " + counter + "times in the document");
        return document;
    }
}

DocumentTask类:这个类的任务需要处理由start和end属性决定的文档行,如果行数小于10,那么就每行创建一个LineTask对象,然后在任务执行后结束,合计返回的结果,并返回总数。如果任务要处理的行数大于10,那么将任务拆分成两组,并创建DocumentTask来处理这两组对象。当这些任务执行结束后,合计返回结果。

代码语言:javascript
复制
public class DocumentTask extends RecursiveTask<Integer> {

    private String document[][];
    private int start, end;
    private String word;

    public DocumentTask(String[][] document, int start, int end, String word) {
        this.document = document;
        this.start = start;
        this.end = end;
        this.word = word;
    }

    @Override
    protected Integer compute() {
        int result = -1;
        if (end - start < 10) {
            result = processLines(document, start, end, word);
        } else {
            int mid = (start + end) / 2;
            DocumentTask task1 = new DocumentTask(document, start, mid, word);
            DocumentTask task2 = new DocumentTask(document, mid, end, word);
            invokeAll(task1, task2);
            try {
                result = groupResults(task1.get(), task2.get());
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        }
        return result;
    }

    private Integer processLines(String[][] document, int start, int end, String word) {
        List tasks = new ArrayList();
        for (int i = start; i < end; i++) {
            LineTask task = new LineTask(document[i], 0, document[i].length, word);
            tasks.add(task);
        }
        invokeAll(tasks);
        int result = 0;
        for (int i = 0; i < tasks.size(); i++) {
            LineTask task = tasks.get(i);
            try {
                result = result + task.get();
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        }
        return new Integer(result);
    }
    private Integer groupResults(Integer number1, Integer number2) {
        Integer result;
        result = number1 + number2;
        return result;
    }
}

LineTask这个类需要处理文档中一行的某一组词。如果一组词的个数小于100,那么任务将直接在这一组词里搜索特定词,然后返回查找词在这一组词中出现的次数。否则,将任务拆分成两组,并创建两个LineTask对象来处理。当结果执行结束后,返回合并结果。

代码语言:javascript
复制
public class LineTask extends RecursiveTask<Integer> {
    private String line[];
    private int start, end;
    private String word;

    public LineTask(String[] line, int start, int end, String word) {
        this.line = line;
        this.start = start;
        this.end = end;
        this.word = word;
    }
    @Override
    protected Integer compute() {
        Integer result = null;
        if (end - start < 100) {
            result = count(line, start, end, word);
        } else {
            int mid = (start + end) / 2;
            LineTask task1 = new LineTask(line, start, mid, word);
            LineTask task2 = new LineTask(line, mid, end, word);
            invokeAll(task1, task2);
            try {
                result = groupResults(task1.get(), task2.get());
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        }
        return result;
    }
    private Integer count(String[] line, int start, int end, String word) {
        int counter = 0;
        for (int i = start; i < end; i++) {
            if (line[i].equals(word)) {
                counter++;
            }
        }
        //为了延缓执行,休眠10ms
        try {
            Thread.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return counter;
    }
    private Integer groupResults(Integer number1, Integer number2) {
        Integer result;
        result = number1 + number2;
        return result;
    }
}

Main函数,通过默认构造器创建了ForkJoinPool对象,然后执行DocumentTask类,来出来一共100行,每行1000字的文档,这个任务将问题拆分成DocumentTask对象和LineTask对象,然后当所有的任务执行完成后,使用原始的任务来获取整个文档中所要查找的词出现的次数,由于任务继承了RecursiveTask类,因此能够返回结果。

代码语言:javascript
复制
public class Main {
    public static void main(String[] args) {
        DocumentMock mock = new DocumentMock();
        String[][] document = mock.generateDocument(100, 1000, "the");
        DocumentTask task = new DocumentTask(document, 0, 100, "the");
        ForkJoinPool pool = new ForkJoinPool();
        pool.execute(task);
        do {
            System.out.println("***********************************");
            System.out.printf("Main Parallelism : %d\n", pool.getActiveThreadCount());
            System.out.printf("Main Task Count : %d\n", pool.getQueuedTaskCount());
            System.out.printf("Main Steal Count : %d\n", pool.getStealCount());
            System.out.println("***********************************");

            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        } while (!task.isDone());

        pool.shutdown();
        try {
            pool.awaitTermination(1, TimeUnit.DAYS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        try {
            if (task != null) {
                System.out.printf("Main: The word appears %d in the document", task.get());
            }
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
}

 执行结果:

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2017-06-30,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档