我使用的是executorservice,每次webservice调用都会产生大约9-10个可调用的任务,并提交给executorService线程池。该应用程序有一个线程池大小为100的单个executorService。当我提交callables时,我有一个2 For循环。外部循环要么运行到指定的超时到期,要么运行到已完成的哈希集大小和提交的任务大小;内循环遍历可调用对象,如果isDone()==为true,则这些可调用对象将被收集到“==”哈希集中。当外部循环条件失败时,我循环遍历“完成”哈希集中的可调用对象,并聚合结果。我确信有一个比使用两个循环更优雅的解决方案。
如果所有任务都已完成或超时到期,通知我的最佳方式是什么?是否有框架、库等或设计模式?
发布于 2016-07-01 12:39:09
基本上有两个选项,pull或push。
Pull就是您已经尝试过的--发送所有异步任务,保留对它们的引用,并调用isDone(),直到它们全部完成。
另一方面,Push分离了调用和通知任务。您将调用异步任务,然后该方法将立即返回。通知将由任务本身处理,他们需要在工作完成时通知。
如果您使用的是Java,则可以通过Observer Pattern或CDI Events轻松地实现此通知。
我个人更喜欢push方法,因为它清理了代码,并分离了调用任务和处理结果的关注点。不过,这两种方式都很好。
发布于 2016-07-01 14:05:12
您可以使用CompletableFuture来实现这一点。
在HashSet.
thenCombine()
CompletableFuture.supplyAsync() timeout将所有这些收集在一个CompletableFuture中,如果出现超时,请使用默认值完成所有计算。然后,聚合将返回部分结果immediately.这也是一种推式方法,但所有通知逻辑都是由CompletableFuture类为您完成的。
示例(使用整型求和作为聚合)
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(10);
Set<CompletableFuture<Integer>> calculations = new HashSet<>();
CompletableFuture<Integer> sum = CompletableFuture.completedFuture(0);
for (int i = 0; i < 100; i++) {
// submit to thread pool
CompletableFuture<Integer> calculation =
CompletableFuture.supplyAsync(CompleteableFutureGather::longCalculation, executorService);
calculations.add(calculation);
sum = sum.thenCombine(calculation, Integer::sum); // set up future aggregation
}
int total = 0;
try {
total = sum.get(5, TimeUnit.SECONDS); // set timeout
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
throw (RuntimeException) e.getCause();
} catch (TimeoutException e) {
// preemptively complete calculations with default value, those already completed will be unaffected
calculations.forEach(integerCompletableFuture -> integerCompletableFuture.complete(0));
total = sum.getNow(0); // everything is complete so partial aggregation will be returned immediately
}
System.out.println(total);
}
private static Integer longCalculation() {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return 1;
}https://stackoverflow.com/questions/38136957
复制相似问题