Fork/Join
模型是ExecutorService
的接口实现,可以帮助你利用多个处理器。它被设计用可以递归地分解成更小的任务,目的是所有可用的处理能力来提高应用程序性能,与分而治之思路类似。
与任何一个ExecutorService
实现一样,Fork/Join
模型将任务分配到线程池中的工作线程中。但Fork/Join
框架与其他的区别是采用了工作窃取算法,工作线程任务完成后可能会从仍然忙碌的其他线程窃取任务。
Fork/Join
模型的核心是ForkJoinPool
,该类的扩展AbstractExecutorService
。ForkJoinPool
实现核心工作窃取算法,可以执行ForkJoinTask
任务。
使用Fork/Join模型第一步应该编写核心任务代码。大题逻辑如下:
if(我的任务足够小){
直接工作
}else{
任务划分成两份,
执行并等待结果。
}
把这段代码封装到一个ForkJoinTask
的子类中。通常做法是继承 RecursiveTask
或 RecursiveAction
。
package com.itunic.concurrent;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
public class CountTask extends RecursiveTask<Long> {
private static final long serialVersionUID = 1L;
private long start;
private long end;
// 区分任务颗粒度
private static final int THRESHOLD = 2;
public CountTask(long start, long end) {
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
long sum = 0;
boolean canCompute = (end - start) <= THRESHOLD;
// 判断任务的颗粒度是否足够小
if (canCompute) {
for (long i = start; i < end; i++) {
sum += i;
}
} else {
// 将数据切分
long middle = (start + end) / 2;
CountTask task1 = new CountTask(start, middle);
CountTask task2 = new CountTask(middle, end);
// 发起两个线程任务
invokeAll(task1, task2);
// 等待线程返回结果
long result1 = task1.join();
long result2 = task2.join();
sum = result1 + result2;
}
return sum;
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
ForkJoinPool pool = new ForkJoinPool();
ForkJoinTask<Long> future = pool.submit(new CountTask(1, 10));
System.out.printf("统计结果为:%s",future.get());
}
}
我的博客即将搬运同步至腾讯云+社区,邀请大家一同入驻:https://cloud.tencent.com/developer/support-plan?invite_code=2head3ycz2qss