首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >如何取消使用CompletionService耗时过长任务

如何取消使用CompletionService耗时过长任务
EN

Stack Overflow用户
提问于 2011-03-10 22:01:41
回答 3查看 6.7K关注 0票数 5

我使用包装在双线程FixedThreadPool ExecutorService上的CompletionService提交一些将来的任务,我设置然后设置一个等于提交的任务数量的循环,并使用completionservice.take()等待它们全部完成或失败。问题是有时它永远不会结束(但我不知道为什么),所以我将take()方法更改为poll(300,Timeout.SECONDS),其想法是如果一个任务完成的时间超过5分钟,该轮询将失败,然后最终将退出循环,我可以遍历所有的期货并调用future.cancel(true)来强制取消有问题的任务。

但是当我运行代码时,它挂起了,我看到轮询每5分钟持续失败一次,并且没有更多的任务运行,所以我假设这两个工作进程以某种方式死锁,永远不会完成,并且永远不允许启动额外的任务。由于超时时间为5分钟,还有1000个任务需要运行,中断循环所需的时间太长,因此取消了作业。

所以我想做的是中断/强制取消当前任务,如果在5分钟内没有完成,但我看不到任何方法。

此代码示例显示了我正在讨论的内容的简化版本

代码语言:javascript
运行
复制
import com.jthink.jaikoz.exception.JaikozException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.*;

public class CompletionServiceTest
{
    public static void main(final String[] args)
    {
        CompletionService<Boolean>  cs = new ExecutorCompletionService<Boolean>(Executors.newFixedThreadPool(2));
        Collection<Worker> tasks = new ArrayList<Worker>(10);
        tasks.add(new Worker(1));
        tasks.add(new Worker(2));
        tasks.add(new Worker(3));
        tasks.add(new Worker(4));
        tasks.add(new Worker(5));
        tasks.add(new Worker(6));

        List<Future<Boolean>> futures = new ArrayList<Future<Boolean>>(tasks.size());
        try
        {
            for (Callable task : tasks)
            {
                futures.add(cs.submit(task));
            }
            for (int t = 0; t < futures.size(); t++)
            {
                Future<Boolean> result = cs.poll(10, TimeUnit.SECONDS);
                if(result==null)
                {
                    System.out.println("Worker TimedOut:");
                    continue;
                }
                else
                {
                    try
                    {
                        if(result.isDone() && result.get())
                        {
                            System.out.println("Worker Completed:");
                        }
                        else
                        {
                            System.out.println("Worker Failed");
                        }
                    }
                    catch (ExecutionException ee)
                    {
                        ee.printStackTrace();
                    }
                }
            }
       }
        catch (InterruptedException ie)
        {
        }
        finally
        {
            //Cancel by interrupting any existing tasks currently running in Executor Service
            for (Future<Boolean> f : futures)
            {
                f.cancel(true);
            }
        }
        System.out.println("Done");
    }
}

class Worker implements Callable<Boolean>
{
    private int number;
    public Worker(int number)
    {
        this.number=number;
    }

    public Boolean call()
    {
        if(number==3)
        {
            try
            {
                Thread.sleep(50000);
            }
            catch(InterruptedException tie)
            {

            }
        }
        return true;
    }
}

输出

代码语言:javascript
运行
复制
Worker Completed:
Worker Completed:
Worker Completed:
Worker Completed:
Worker Completed:
Worker TimedOut:
Done
EN

回答 3

Stack Overflow用户

回答已采纳

发布于 2011-03-11 04:55:41

我想我已经解决了这个问题,基本上如果发生超时,我就会遍历我的未来对象列表,找到第一个没有完成的对象,并强制取消。看起来不是很优雅,但似乎很管用。

我更改了池的大小,只是为了显示更好地演示解决方案的输出,但也适用于双线程池。

代码语言:javascript
运行
复制
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.concurrent.*;

public class CompletionServiceTest
{
    public static void main(final String[] args)
    {
        CompletionService<Boolean>  cs = new ExecutorCompletionService<Boolean>(Executors.newFixedThreadPool(1));
        Collection<Worker> tasks = new ArrayList<Worker>(10);
        tasks.add(new Worker(1));
        tasks.add(new Worker(2));
        tasks.add(new Worker(3));
        tasks.add(new Worker(4));
        tasks.add(new Worker(5));
        tasks.add(new Worker(6));

        List<Future<Boolean>> futures = new ArrayList<Future<Boolean>>(tasks.size());
        try
        {
            for (Callable task : tasks)
            {
                futures.add(cs.submit(task));
            }
            for (int t = 0; t < futures.size(); t++)
            {
                System.out.println("Invocation:"+t);
                Future<Boolean> result = cs.poll(10, TimeUnit.SECONDS);
                if(result==null)
                {
                    System.out.println(new Date()+":Worker Timedout:");
                    //So lets cancel the first futures we find that havent completed
                    for(Future future:futures)
                    {
                        System.out.println("Checking future");
                        if(future.isDone())
                        {
                            continue;
                        }
                        else
                        {
                            future.cancel(true);
                            System.out.println("Cancelled");
                            break;
                        }
                    }
                    continue;
                }
                else
                {
                    try
                    {
                        if(result.isDone() && !result.isCancelled() && result.get())
                        {
                            System.out.println(new Date()+":Worker Completed:");
                        }
                        else if(result.isDone() && !result.isCancelled() && !result.get())
                        {
                            System.out.println(new Date()+":Worker Failed");
                        }
                    }
                    catch (ExecutionException ee)
                    {
                        ee.printStackTrace(System.out);
                    }
                }
            }
       }
        catch (InterruptedException ie)
        {
        }
        finally
        {
            //Cancel by interrupting any existing tasks currently running in Executor Service
            for (Future<Boolean> f : futures)
            {
                f.cancel(true);
            }
        }
        System.out.println(new Date()+":Done");
    }
}

class Worker implements Callable<Boolean>
{
    private int number;
    public Worker(int number)
    {
        this.number=number;
    }

    public Boolean call()
        throws InterruptedException
    {
        try
        {
            if(number==3)
            {
                Thread.sleep(50000);
            }
        }
        catch(InterruptedException ie)
        {
            System.out.println("Worker Interuppted");
            throw ie;
        }
        return true;
    }
}

输出为

代码语言:javascript
运行
复制
Invocation:0
Thu Mar 10 20:51:39 GMT 2011:Worker Completed:
Invocation:1
Thu Mar 10 20:51:39 GMT 2011:Worker Completed:
Invocation:2
Thu Mar 10 20:51:49 GMT 2011:Worker Timedout:
Checking future
Checking future
Checking future
Cancelled
Invocation:3
Worker Interuppted
Invocation:4
Thu Mar 10 20:51:49 GMT 2011:Worker Completed:
Invocation:5
Thu Mar 10 20:51:49 GMT 2011:Worker Completed:
Thu Mar 10 20:51:49 GMT 2011:Done
票数 5
EN

Stack Overflow用户

发布于 2011-03-11 01:15:30

在worker示例中,Callable阻塞了一个支持中断的调用。如果您的实际代码在内部锁( synchronized块)上发生死锁,您将无法通过中断来取消它。相反,您可以使用显式锁(java.util.concurrent.Lock),它允许您指定希望等待获得锁的时间。如果线程等待锁超时,可能是因为它遇到了死锁情况,它可能会中止,并显示一条错误消息。

顺便说一下,在您的示例中,您的Callable不应该吞噬InterruptedException。您应该向上传递它(重新抛出,或者在方法声明的throws行中添加InterruptedException ),或者在catch块中(使用Thread.currentThread().interrupt())重置线程的中断状态。

票数 2
EN

Stack Overflow用户

发布于 2012-10-02 06:34:30

您可以随时调用future.get(timeout...)

如果它还没有完成,它将返回超时异常...然后就可以调用future.cancel()了。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/5260661

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档