我知道CompletableFuture
设计不能通过中断来控制它的执行,但是我想你们中的一些人可能有这个问题。CompletableFuture
是组成异步执行的非常好的方式,但是当你想要在取消未来时中断或停止底层执行时,我们该怎么做呢?或者我们必须接受任何取消的或手动完成的CompletableFuture
不会影响正在完成它的线程?
在我看来,这显然是一项无用的工作,需要执行者工作者的时间。我想知道在这种情况下,什么方法或设计可能会有所帮助?
更新
下面是一个简单的测试
public class SimpleTest {
@Test
public void testCompletableFuture() throws Exception {
CompletableFuture<Void> cf = CompletableFuture.runAsync(()->longOperation());
bearSleep(1);
//cf.cancel(true);
cf.complete(null);
System.out.println("it should die now already");
bearSleep(7);
}
public static void longOperation(){
System.out.println("started");
bearSleep(5);
System.out.println("completed");
}
private static void bearSleep(long seconds){
try {
TimeUnit.SECONDS.sleep(seconds);
} catch (InterruptedException e) {
System.out.println("OMG!!! Interrupt!!!");
}
}
}
发布于 2015-03-12 23:41:54
CompletableFuture
与最终可能完成它的异步操作无关。
因为(与
FutureTask
不同)这个类不能直接控制导致它完成的计算,所以causes被视为异常完成的另一种形式。方法cancel
与completeExceptionally(new CancellationException())
具有相同的效果。
甚至可能没有单独的线程在完成它(甚至可能有许多线程在它上面工作)。即使有,也没有从CompletableFuture
到任何引用它的线程的链接。
因此,您无法通过CompletableFuture
中断任何线程,这些线程可能正在运行一些任务来完成它。您必须编写自己的逻辑来跟踪任何Thread
实例,这些实例获取对CompletableFuture
的引用并打算完成它。
这是一个我认为你可以逃脱惩罚的执行类型的例子。
public static void main(String[] args) throws Exception {
ExecutorService service = Executors.newFixedThreadPool(1);
CompletableFuture<String> completable = new CompletableFuture<>();
Future<?> future = service.submit(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
if (Thread.interrupted()) {
return; // remains uncompleted
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
return; // remains uncompleted
}
}
completable.complete("done");
}
});
Thread.sleep(2000);
// not atomic across the two
boolean cancelled = future.cancel(true);
if (cancelled)
completable.cancel(true); // may not have been cancelled if execution has already completed
if (completable.isCancelled()) {
System.out.println("cancelled");
} else if (completable.isCompletedExceptionally()) {
System.out.println("exception");
} else {
System.out.println("success");
}
service.shutdown();
}
这假设正在执行的任务被设置为正确地处理中断。
发布于 2015-03-14 04:35:25
那这个呢?
public static <T> CompletableFuture<T> supplyAsync(final Supplier<T> supplier) {
final ExecutorService executorService = Executors.newFixedThreadPool(1);
final CompletableFuture<T> cf = new CompletableFuture<T>() {
@Override
public boolean complete(T value) {
if (isDone()) {
return false;
}
executorService.shutdownNow();
return super.complete(value);
}
@Override
public boolean completeExceptionally(Throwable ex) {
if (isDone()) {
return false;
}
executorService.shutdownNow();
return super.completeExceptionally(ex);
}
};
// submit task
executorService.submit(() -> {
try {
cf.complete(supplier.get());
} catch (Throwable ex) {
cf.completeExceptionally(ex);
}
});
return cf;
}
简单测试:
CompletableFuture<String> cf = supplyAsync(() -> {
try {
Thread.sleep(1000L);
} catch (Exception e) {
System.out.println("got interrupted");
return "got interrupted";
}
System.out.println("normal complete");
return "normal complete";
});
cf.complete("manual complete");
System.out.println(cf.get());
我不喜欢每次都要创建Executor服务的想法,但也许您可以找到一种重用ForkJoinPool的方法。
发布于 2019-03-19 21:00:43
如果您使用
cf.get();
而不是
cf.join();
等待完成的线程可以被中断。这让我感到头疼,所以我只是把它放在那里。然后,您需要进一步传播此中断/使用cf.cancel(...)才能真正完成执行。
https://stackoverflow.com/questions/29013831
复制相似问题