分治算法与Fork/Join框架

在计算机科学中,分治法是解决多项式分支递归的重要范式;也就是“分而治之”,将复杂问题分成两个或更多相似的子问题,然后将简单的子问题求解,再将子问题的解合并。有很多经典的算法就是采用了“分而治之”的思想,如:归并排序、快速排序、矩阵乘法等。

Fork/Join并行算法是分治算法的并行版本,是Java7提供的一个用于并行执行任务的框架,把一个大任务分割成若干个小任务,最终汇总每个小任务结果得到大任务结果的框架。

Fork/Join框架中,包含fork和join两个主要步骤:fork操作就是把一个大任务分为若干个子任务进行并行的执行;join操作就是合并这些子任务的执行结果。如下图:

Fork/Join框架的核心

1、ForkJoinPool:实现ExecutorService接口和work-stealing算法。

ForkJoinPool实现了线程执行器ExecutorService接口,用于执行子任务时分配线程;

ForkJoinPool内置了WorkQueue队列,是一个双端队列,实现了工作窃取算法,每个线程都有自己的WorkQueue,WorkQueue提供push、pop、poll方法,自己线程只允许push和pop,其他线程可以poll,所以当线程自己的WorkQueue执行完成时,将随机从其他线程的WorkQueue中,poll任务执行。

2、ForkJoinTask:是在ForkJoinPool中执行的任务的基类。提供任务中的fork()和join()操作,使用两个方法控制任务的状态。

通常使用ForkJoinTask的两个抽象子类RecursiveAction与RecursiveTask;

RecursiveAction是无返回值的任务;

RecursiveTask是有返回值的任务;

使用场景

某商城统计一天内所有订单的总金额,取出一天中所有订单,然后使用Fork/Join框架执行:

1、创建订单对象

public class Order {
    // 订单号
    private int orderId;
    // 订单金额
    private double amount;
    // 忽略 get set
}

2、构建订单执行任务

public class OrderTask extends RecursiveTask<Double> {
    // 子任务中的订单列表
    private List<Order> list;
    OrderTask(List<Order> list) {
        this.list = list;
    }
    @Override
    protected Double compute() {
        if (list.size() < 10) {
            return getAmout(list);
        } else {
            // 假装平均分配
            int mid = list.size() / 2;
            OrderTask leftTask = new OrderTask(list.subList(0, mid));
            OrderTask rightTask = new OrderTask(list.subList(mid, list.size()));
            invokeAll(leftTask, rightTask);
            double result = 0.0;
            try {
                result = leftTask.get() + rightTask.get();
            } catch (InterruptedException | ExecutionException e) {
                System.out.println("异常");
            }
            return result;
        }
    }
    // 计算最小子任务的订单列表的金额之和
    private Double getAmout(List<Order> list) {
        double amout = 0.0;
        for (Order order : list) {
            amout += order.getAmount();
        }
        return amout;
    }
}

3、模拟业务场景获取当日所有订单

public class OrderMock {
    public static List<Order> gen(int num) {
        List<Order> orders = new ArrayList<>();
        for (int i = 0; i < num; i++) {
            Order order = new Order();
            order.setOrderId(i);
            order.setAmount(10.0); // 固定每个订单都是10,方便验证结果
            orders.add(order);
        }
        return orders;
    }
}

4、创建ForkJoinPool执行,订单金额计算任务

public static void main(String[] args) throws InterruptedException, ExecutionException {
    List<Order> orders = OrderMock.gen(10000);
    OrderTask orderTask = new OrderTask(orders);
    ForkJoinPool pool = new ForkJoinPool(4);
    pool.execute(orderTask);
    // 确保任务执行结束
    do {
        TimeUnit.SECONDS.sleep(1);
    } while (!orderTask.isDone());
    // 关闭执行池
    pool.shutdown();
    System.out.println(orderTask.get());
}

创建了10000个订单,每个金额都是10,执行之后返回结果为100000.0!


1、《Java并发编程的艺术》

2、https://zh.wikipedia.org/wiki/%E5%88%86%E6%B2%BB%E6%B3%95

3、http://ifeve.com/doug-lea/

4、http://ifeve.com/java-fork-join-framework/

5、https://www.ibm.com/developerworks/cn/java/j-jtp11137.html

原文发布于微信公众号 - BanzClub(banz-club)

原文发表时间:2019-01-17

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

扫码关注云+社区

领取腾讯云代金券