首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >Java Executor部分关闭

Java Executor部分关闭
EN

Stack Overflow用户
提问于 2011-11-03 18:35:14
回答 3查看 661关注 0票数 1

让我们在应用程序中使用一个经典的Executor。应用程序的许多部分使用这个Executor进行一些计算,每次计算都可以取消,为此我可以在executor上调用shutdown()shutdownNow()

但我只想关闭Executor中的部分任务。遗憾的是,我无法访问Future对象,它们是计算实现的私有部分(实际上计算是由actor框架jetlang支持的)

我想要像Executor wrapper这样的东西,我可以把它传递给计算,它应该由真正的Executor支持。如下所示:

代码语言:javascript
运行
复制
// main application executor
Executor applicationExecutor = Executors.newCachedThreadPool();

// starting computation
Executor computationExecutor = new ExecutorWrapper(applicationExecutor);
Computation computation = new Computation(computationExecutor);
computation.start();

// cancelling computation
computation.cancel();
// shutting down only computation tasks
computationExecutor.shutdown();

// applicationExecutor remains running and happy

或者其他的想法?

EN

回答 3

Stack Overflow用户

回答已采纳

发布于 2011-11-05 01:07:57

对于那些想要好结果的人来说:有一个最终的解决方案,部分基于伊万·索波夫的答案。幸运的是,jetlang只使用Executor接口(而不是ExecutorService)来运行它的任务,所以我制作了包装器类,它支持停止仅由该包装器创建的任务。

代码语言:javascript
运行
复制
static class StoppableExecutor implements Executor {
    final ExecutorService executor;
    final List<Future<?>> futures = Lists.newArrayList();
    boolean stopped;

    public StoppableExecutor(ExecutorService executor) {
        this.executor = executor;
    }

    void stop() {
        this.stopped = true;
        synchronized (futures) {
            for (Iterator<Future<?>> iterator = futures.iterator(); iterator.hasNext();) {
                Future<?> future = iterator.next();
                if (!future.isDone() && !future.isCancelled()) {
                    System.out.println(future.cancel(true));
                }
            }
            futures.clear();
        }
    }

    @Override
    public void execute(Runnable command) {
        if (!stopped) {
            synchronized (futures) {
                Future<?> newFuture = executor.submit(command);
                for (Iterator<Future<?>> iterator = futures.iterator(); iterator.hasNext();) {
                    Future<?> future = iterator.next();
                    if (future.isDone() || future.isCancelled())
                        iterator.remove();
                }
                futures.add(newFuture);
            }
        }
    }
}

使用它非常简单:

代码语言:javascript
运行
复制
ExecutorService service = Executors.newFixedThreadPool(5);
StoppableExecutor executor = new StoppableExecutor(service);

// doing some actor stuff with executor instance
PoolFiberFactory factory = new PoolFiberFactory(executor);

// stopping tasks only created on executor instance
// executor service is happily running other tasks
executor.stop();

就这样。效果很好。

票数 2
EN

Stack Overflow用户

发布于 2011-11-03 18:45:36

如何让您的Computation成为Runnable (并使用提供的Executor运行),直到设置了布尔标志?大致是这样的:

代码语言:javascript
运行
复制
public class Computation
{
  boolean volatile stopped;

  public void run(){
    while(!stopped){
    //do magic
  }

  public void cancel)(){stopped=true;}
}

您所做的实际上是停止线程。但是,它不会被垃圾回收,而是会被重用,因为它是由Executor管理的。查找“停止线程的正确方式是什么?”。

编辑:请注意,上面的代码非常原始,因为它假定while循环的主体只需要很短的时间。如果不是这样,检查将不会频繁执行,并且您将注意到在取消任务和实际停止任务之间存在延迟。

票数 0
EN

Stack Overflow用户

发布于 2011-11-03 19:27:10

像这样的东西?您可以执行部分关闭:

代码语言:javascript
运行
复制
for (Future<?> future : %ExecutorServiceWrapperInstance%.getFutures()) {
    if (%CONDITION%) {
        future.cancel(true);
    }
}

代码如下:

代码语言:javascript
运行
复制
package com.sopovs.moradanen;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class ExecutorServiceWrapper implements ExecutorService {

private final ExecutorService realService;
private List<Future<?>> futures = new ArrayList<Future<?>>();

public ExecutorServiceWrapper(ExecutorService realService) {
    this.realService = realService;
}

@Override
public void execute(Runnable command) {
    realService.execute(command);
}

@Override
public void shutdown() {
    realService.shutdown();

}

@Override
public List<Runnable> shutdownNow() {
    return realService.shutdownNow();
}

@Override
public boolean isShutdown() {
    return realService.isShutdown();
}

@Override
public boolean isTerminated() {
    return realService.isTerminated();
}

@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
    return realService.awaitTermination(timeout, unit);
}

@Override
public <T> Future<T> submit(Callable<T> task) {
    Future<T> future = realService.submit(task);
    synchronized (this) {
        futures.add(future);
    }
    return future;
}

public synchronized List<Future<?>> getFutures() {
    return Collections.unmodifiableList(futures);
}

@Override
public <T> Future<T> submit(Runnable task, T result) {
    Future<T> future = realService.submit(task, result);
    synchronized (this) {
        futures.add(future);
    }
    return future;
}

@Override
public Future<?> submit(Runnable task) {
    Future<?> future = realService.submit(task);
    synchronized (this) {
        futures.add(future);
    }
    return future;
}

@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
    List<Future<T>> future = realService.invokeAll(tasks);
    synchronized (this) {
        futures.addAll(future);
    }
    return future;
}

@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
        throws InterruptedException {
    List<Future<T>> future = realService.invokeAll(tasks, timeout, unit);
    synchronized (this) {
        futures.addAll(future);
    }
    return future;
}

@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
    //don't know what to do here. Maybe this method is not needed by the framework
    //than just throw new NotImplementedException();
    return realService.invokeAny(tasks);
}

@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
    //don't know what to do here. Maybe this method is not needed by the framework
    //than just throw new NotImplementedException();
    return realService.invokeAny(tasks, timeout, unit);
}
}
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/7993422

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档