在计算机科学中,分治法是解决多项式分支递归的重要范式;也就是“分而治之”,将复杂问题分成两个或更多相似的子问题,然后将简单的子问题求解,再将子问题的解合并。有很多经典的算法就是采用了“分而治之”的思想,如:归并排序、快速排序、矩阵乘法等。
Fork/Join并行算法是分治算法的并行版本,是Java7提供的一个用于并行执行任务的框架,把一个大任务分割成若干个小任务,最终汇总每个小任务结果得到大任务结果的框架。
Fork/Join框架中,包含fork和join两个主要步骤:fork操作就是把一个大任务分为若干个子任务进行并行的执行;join操作就是合并这些子任务的执行结果。如下图:
Fork/Join框架的核心
1、ForkJoinPool:实现ExecutorService接口和work-stealing算法。
ForkJoinPool实现了线程执行器ExecutorService接口,用于执行子任务时分配线程;
ForkJoinPool内置了WorkQueue队列,是一个双端队列,实现了工作窃取算法,每个线程都有自己的WorkQueue,WorkQueue提供push、pop、poll方法,自己线程只允许push和pop,其他线程可以poll,所以当线程自己的WorkQueue执行完成时,将随机从其他线程的WorkQueue中,poll任务执行。
2、ForkJoinTask:是在ForkJoinPool中执行的任务的基类。提供任务中的fork()和join()操作,使用两个方法控制任务的状态。
通常使用ForkJoinTask的两个抽象子类RecursiveAction与RecursiveTask;
RecursiveAction是无返回值的任务;
RecursiveTask是有返回值的任务;
使用场景
某商城统计一天内所有订单的总金额,取出一天中所有订单,然后使用Fork/Join框架执行:
1、创建订单对象
public class Order {
// 订单号
private int orderId;
// 订单金额
private double amount;
// 忽略 get set
}
2、构建订单执行任务
public class OrderTask extends RecursiveTask<Double> {
// 子任务中的订单列表
private List<Order> list;
OrderTask(List<Order> list) {
this.list = list;
}
@Override
protected Double compute() {
if (list.size() < 10) {
return getAmout(list);
} else {
// 假装平均分配
int mid = list.size() / 2;
OrderTask leftTask = new OrderTask(list.subList(0, mid));
OrderTask rightTask = new OrderTask(list.subList(mid, list.size()));
invokeAll(leftTask, rightTask);
double result = 0.0;
try {
result = leftTask.get() + rightTask.get();
} catch (InterruptedException | ExecutionException e) {
System.out.println("异常");
}
return result;
}
}
// 计算最小子任务的订单列表的金额之和
private Double getAmout(List<Order> list) {
double amout = 0.0;
for (Order order : list) {
amout += order.getAmount();
}
return amout;
}
}
3、模拟业务场景获取当日所有订单
public class OrderMock {
public static List<Order> gen(int num) {
List<Order> orders = new ArrayList<>();
for (int i = 0; i < num; i++) {
Order order = new Order();
order.setOrderId(i);
order.setAmount(10.0); // 固定每个订单都是10,方便验证结果
orders.add(order);
}
return orders;
}
}
4、创建ForkJoinPool执行,订单金额计算任务
public static void main(String[] args) throws InterruptedException, ExecutionException {
List<Order> orders = OrderMock.gen(10000);
OrderTask orderTask = new OrderTask(orders);
ForkJoinPool pool = new ForkJoinPool(4);
pool.execute(orderTask);
// 确保任务执行结束
do {
TimeUnit.SECONDS.sleep(1);
} while (!orderTask.isDone());
// 关闭执行池
pool.shutdown();
System.out.println(orderTask.get());
}
创建了10000个订单,每个金额都是10,执行之后返回结果为100000.0!
1、《Java并发编程的艺术》
2、https://zh.wikipedia.org/wiki/%E5%88%86%E6%B2%BB%E6%B3%95
3、http://ifeve.com/doug-lea/
4、http://ifeve.com/java-fork-join-framework/
5、https://www.ibm.com/developerworks/cn/java/j-jtp11137.html