前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Fork/Join 框架及其使用

Fork/Join 框架及其使用

作者头像
CodingDiray
发布2019-09-25 16:18:55
6710
发布2019-09-25 16:18:55
举报
文章被收录于专栏:Coding Diary

fork/join框架是ExecutorService接口的一种具体实现,会将任务分发给线程池中的工作线程,更好地利用多处理器带来的好处,提供程序性能。它是为那些能够被递归地拆解成子任务的工作类型量身设计的。

fork/join框架的独特之处在与它使用工作窃取(work-stealing)算法。完成自己的工作而处于空闲的工作线程能够从其他仍然处于忙碌(busy)状态的工作线程处窃取等待执行的任务。

fork/join框架的核心是ForkJoinPool类,它是对AbstractExecutorService类的扩展。ForkJoinPool实现了工作偷取算法,并可以执行ForkJoinTask任务。

使用fork/join框架的第一步是编写执行一部分工作的代码,类似的伪代码如下:

代码语言:javascript
复制
if (当前这个任务工作量足够小)直接完成这个任务else把当前任务分解成两个部分
调用这两个部分并等待结果

此代包装在ForkJoinTask的子类中。不过,通常是RecursiveTask(会返回一个结果),或RecursiveAction。

fork分解出新任务,join汇集任务结果,其大致过程如下:

Fork/Join实际用例

fork/join的核心思想就是分而治之,将一个大任务拆分成一个一个的小任务。fork/join的使用需要定义一个任务类去实现RecursiveTask或RecursiveAction,重写compute()方法,在compute()方法中定义任务拆分的逻辑,然后借助ForkJoinPool提交任务去执行,fork/join框架会根据compute()方法中定义的拆分逻辑对任务进行具体的拆分,如果有返回值,可以借助ForkJoinTask获取返回值。假设现在有很多网络请求需要并发的去执行,然后汇总结果,使用fork/join的代码实现如下:

代码语言:javascript
复制
public class ForkJoinTest {// 测试数据
static ArrayList<String> urls =
new ArrayList<String>() {
{
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
}
};// 核心ForkJoinPool
static ForkJoinPool firkJoinPool =
new ForkJoinPool(3, ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true);
public static void main(String args[]) throws ExecutionException, InterruptedException {
Job job = new Job(urls, 0, urls.size());
ForkJoinTask<String> forkJoinTask = firkJoinPool.submit(job);
String result = forkJoinTask.get();
System.out.println(result);
}
public static String doRequest(String url) {
// 模拟网络请求
return "Kody ... test ... " + url + "\n";
}// 自定义任务
static class Job extends RecursiveTask<String> {List<String> urls;
int start;int end;public Job(List<String> urls, int start, int end) {
this.urls = urls;
this.start = start;
this.end = end;}/** 核心任务的计算,任务拆分逻辑实现 */
@Override
protected String compute() {// 计算任务的大小
int count = end - start;// 如果任务拆分的足够小,则执行任务
if (count <= 5) {
// 直接执行
String result = "";
for (int i = start; i < end; i++) {
String response = doRequest(urls.get(i));
result += response;
}
return result;} else { // 如果任务较大,继续拆分任务int x = (start + end) / 2;
Job job1 = new Job(urls, start, x);
job1.fork();
Job job2 = new Job(urls, x, end);
job2.fork();// 汇总fork出的子任务结果
return String.join("", job1.join(), job2.join());}
}
}
}
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2019-08-07,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Coding Diary 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Fork/Join实际用例
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档