我使用包装在双线程FixedThreadPool ExecutorService上的CompletionService提交一些将来的任务,我设置然后设置一个等于提交的任务数量的循环,并使用completionservice.take()等待它们全部完成或失败。问题是有时它永远不会结束(但我不知道为什么),所以我将take()方法更改为poll(300,Timeout.SECONDS),其想法是如果一个任务完成的时间超过5分钟,该轮询将失败,然后最终将退出循环,我可以遍历所有的期货并调用future.cancel(true)来强制取消有问题的任务。
但是当我运行代码时,它挂起了,我看到轮询每5分钟持续失败一次,并且没有更多的任务运行,所以我假设这两个工作进程以某种方式死锁,永远不会完成,并且永远不允许启动额外的任务。由于超时时间为5分钟,还有1000个任务需要运行,中断循环所需的时间太长,因此取消了作业。
所以我想做的是中断/强制取消当前任务,如果在5分钟内没有完成,但我看不到任何方法。
此代码示例显示了我正在讨论的内容的简化版本
import com.jthink.jaikoz.exception.JaikozException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.*;
public class CompletionServiceTest
{
public static void main(final String[] args)
{
CompletionService<Boolean> cs = new ExecutorCompletionService<Boolean>(Executors.newFixedThreadPool(2));
Collection<Worker> tasks = new ArrayList<Worker>(10);
tasks.add(new Worker(1));
tasks.add(new Worker(2));
tasks.add(new Worker(3));
tasks.add(new Worker(4));
tasks.add(new Worker(5));
tasks.add(new Worker(6));
List<Future<Boolean>> futures = new ArrayList<Future<Boolean>>(tasks.size());
try
{
for (Callable task : tasks)
{
futures.add(cs.submit(task));
}
for (int t = 0; t < futures.size(); t++)
{
Future<Boolean> result = cs.poll(10, TimeUnit.SECONDS);
if(result==null)
{
System.out.println("Worker TimedOut:");
continue;
}
else
{
try
{
if(result.isDone() && result.get())
{
System.out.println("Worker Completed:");
}
else
{
System.out.println("Worker Failed");
}
}
catch (ExecutionException ee)
{
ee.printStackTrace();
}
}
}
}
catch (InterruptedException ie)
{
}
finally
{
//Cancel by interrupting any existing tasks currently running in Executor Service
for (Future<Boolean> f : futures)
{
f.cancel(true);
}
}
System.out.println("Done");
}
}
class Worker implements Callable<Boolean>
{
private int number;
public Worker(int number)
{
this.number=number;
}
public Boolean call()
{
if(number==3)
{
try
{
Thread.sleep(50000);
}
catch(InterruptedException tie)
{
}
}
return true;
}
}
输出
Worker Completed:
Worker Completed:
Worker Completed:
Worker Completed:
Worker Completed:
Worker TimedOut:
Done
发布于 2011-03-11 04:55:41
我想我已经解决了这个问题,基本上如果发生超时,我就会遍历我的未来对象列表,找到第一个没有完成的对象,并强制取消。看起来不是很优雅,但似乎很管用。
我更改了池的大小,只是为了显示更好地演示解决方案的输出,但也适用于双线程池。
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.concurrent.*;
public class CompletionServiceTest
{
public static void main(final String[] args)
{
CompletionService<Boolean> cs = new ExecutorCompletionService<Boolean>(Executors.newFixedThreadPool(1));
Collection<Worker> tasks = new ArrayList<Worker>(10);
tasks.add(new Worker(1));
tasks.add(new Worker(2));
tasks.add(new Worker(3));
tasks.add(new Worker(4));
tasks.add(new Worker(5));
tasks.add(new Worker(6));
List<Future<Boolean>> futures = new ArrayList<Future<Boolean>>(tasks.size());
try
{
for (Callable task : tasks)
{
futures.add(cs.submit(task));
}
for (int t = 0; t < futures.size(); t++)
{
System.out.println("Invocation:"+t);
Future<Boolean> result = cs.poll(10, TimeUnit.SECONDS);
if(result==null)
{
System.out.println(new Date()+":Worker Timedout:");
//So lets cancel the first futures we find that havent completed
for(Future future:futures)
{
System.out.println("Checking future");
if(future.isDone())
{
continue;
}
else
{
future.cancel(true);
System.out.println("Cancelled");
break;
}
}
continue;
}
else
{
try
{
if(result.isDone() && !result.isCancelled() && result.get())
{
System.out.println(new Date()+":Worker Completed:");
}
else if(result.isDone() && !result.isCancelled() && !result.get())
{
System.out.println(new Date()+":Worker Failed");
}
}
catch (ExecutionException ee)
{
ee.printStackTrace(System.out);
}
}
}
}
catch (InterruptedException ie)
{
}
finally
{
//Cancel by interrupting any existing tasks currently running in Executor Service
for (Future<Boolean> f : futures)
{
f.cancel(true);
}
}
System.out.println(new Date()+":Done");
}
}
class Worker implements Callable<Boolean>
{
private int number;
public Worker(int number)
{
this.number=number;
}
public Boolean call()
throws InterruptedException
{
try
{
if(number==3)
{
Thread.sleep(50000);
}
}
catch(InterruptedException ie)
{
System.out.println("Worker Interuppted");
throw ie;
}
return true;
}
}
输出为
Invocation:0
Thu Mar 10 20:51:39 GMT 2011:Worker Completed:
Invocation:1
Thu Mar 10 20:51:39 GMT 2011:Worker Completed:
Invocation:2
Thu Mar 10 20:51:49 GMT 2011:Worker Timedout:
Checking future
Checking future
Checking future
Cancelled
Invocation:3
Worker Interuppted
Invocation:4
Thu Mar 10 20:51:49 GMT 2011:Worker Completed:
Invocation:5
Thu Mar 10 20:51:49 GMT 2011:Worker Completed:
Thu Mar 10 20:51:49 GMT 2011:Done
发布于 2011-03-11 01:15:30
在worker示例中,Callable阻塞了一个支持中断的调用。如果您的实际代码在内部锁( synchronized
块)上发生死锁,您将无法通过中断来取消它。相反,您可以使用显式锁(java.util.concurrent.Lock
),它允许您指定希望等待获得锁的时间。如果线程等待锁超时,可能是因为它遇到了死锁情况,它可能会中止,并显示一条错误消息。
顺便说一下,在您的示例中,您的Callable
不应该吞噬InterruptedException
。您应该向上传递它(重新抛出,或者在方法声明的throws行中添加InterruptedException
),或者在catch块中(使用Thread.currentThread().interrupt()
)重置线程的中断状态。
发布于 2012-10-02 06:34:30
您可以随时调用future.get(timeout...)
如果它还没有完成,它将返回超时异常...然后就可以调用future.cancel()
了。
https://stackoverflow.com/questions/5260661
复制相似问题