首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >如何中断CompletableFuture的底层执行

如何中断CompletableFuture的底层执行
EN

Stack Overflow用户
提问于 2015-03-12 23:27:32
回答 6查看 17K关注 0票数 27

我知道CompletableFuture设计不能通过中断来控制它的执行,但是我想你们中的一些人可能有这个问题。CompletableFuture是组成异步执行的非常好的方式,但是当你想要在取消未来时中断或停止底层执行时,我们该怎么做呢?或者我们必须接受任何取消的或手动完成的CompletableFuture不会影响正在完成它的线程?

在我看来,这显然是一项无用的工作,需要执行者工作者的时间。我想知道在这种情况下,什么方法或设计可能会有所帮助?

更新

下面是一个简单的测试

代码语言:javascript
运行
复制
public class SimpleTest {

  @Test
  public void testCompletableFuture() throws Exception {
    CompletableFuture<Void> cf = CompletableFuture.runAsync(()->longOperation());

    bearSleep(1);

    //cf.cancel(true);
    cf.complete(null);

    System.out.println("it should die now already");
    bearSleep(7);
  }

  public static void longOperation(){
    System.out.println("started");
    bearSleep(5);
    System.out.println("completed");
  }

  private static void bearSleep(long seconds){
    try {
      TimeUnit.SECONDS.sleep(seconds);
    } catch (InterruptedException e) {
      System.out.println("OMG!!! Interrupt!!!");
    }
  }
}
EN

回答 6

Stack Overflow用户

回答已采纳

发布于 2015-03-12 23:41:54

CompletableFuture与最终可能完成它的异步操作无关。

因为(与FutureTask不同)这个类不能直接控制导致它完成的计算,所以causes被视为异常完成的另一种形式。方法cancelcompleteExceptionally(new CancellationException())具有相同的效果。

甚至可能没有单独的线程在完成它(甚至可能有许多线程在它上面工作)。即使有,也没有从CompletableFuture到任何引用它的线程的链接。

因此,您无法通过CompletableFuture中断任何线程,这些线程可能正在运行一些任务来完成它。您必须编写自己的逻辑来跟踪任何Thread实例,这些实例获取对CompletableFuture的引用并打算完成它。

这是一个我认为你可以逃脱惩罚的执行类型的例子。

代码语言:javascript
运行
复制
public static void main(String[] args) throws Exception {
    ExecutorService service = Executors.newFixedThreadPool(1);
    CompletableFuture<String> completable = new CompletableFuture<>();
    Future<?> future = service.submit(new Runnable() {
        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                if (Thread.interrupted()) {
                    return; // remains uncompleted
                }
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    return; // remains uncompleted
                }
            }
            completable.complete("done");
        }
    });

    Thread.sleep(2000);

    // not atomic across the two
    boolean cancelled = future.cancel(true);
    if (cancelled)
        completable.cancel(true); // may not have been cancelled if execution has already completed
    if (completable.isCancelled()) {
        System.out.println("cancelled");
    } else if (completable.isCompletedExceptionally()) {
        System.out.println("exception");
    } else {
        System.out.println("success");
    }
    service.shutdown();
}

这假设正在执行的任务被设置为正确地处理中断。

票数 17
EN

Stack Overflow用户

发布于 2015-03-14 04:35:25

那这个呢?

代码语言:javascript
运行
复制
public static <T> CompletableFuture<T> supplyAsync(final Supplier<T> supplier) {

    final ExecutorService executorService = Executors.newFixedThreadPool(1);

    final CompletableFuture<T> cf = new CompletableFuture<T>() {
        @Override
        public boolean complete(T value) {
            if (isDone()) {
                return false;
            }
            executorService.shutdownNow();
            return super.complete(value);
        }

        @Override
        public boolean completeExceptionally(Throwable ex) {
            if (isDone()) {
                return false;
            }
            executorService.shutdownNow();
            return super.completeExceptionally(ex);
        }
    };

    // submit task
    executorService.submit(() -> {
        try {
            cf.complete(supplier.get());
        } catch (Throwable ex) {
            cf.completeExceptionally(ex);
        }
    });

    return cf;
}

简单测试:

代码语言:javascript
运行
复制
    CompletableFuture<String> cf = supplyAsync(() -> {
        try {
            Thread.sleep(1000L);
        } catch (Exception e) {
            System.out.println("got interrupted");
            return "got interrupted";
        }
        System.out.println("normal complete");
        return "normal complete";
    });

    cf.complete("manual complete");
    System.out.println(cf.get());

我不喜欢每次都要创建Executor服务的想法,但也许您可以找到一种重用ForkJoinPool的方法。

票数 3
EN

Stack Overflow用户

发布于 2019-03-19 21:00:43

如果您使用

代码语言:javascript
运行
复制
cf.get();

而不是

代码语言:javascript
运行
复制
cf.join();

等待完成的线程可以被中断。这让我感到头疼,所以我只是把它放在那里。然后,您需要进一步传播此中断/使用cf.cancel(...)才能真正完成执行。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/29013831

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档