前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >如何解决Java线程池队列过饱问题

如何解决Java线程池队列过饱问题

作者头像
老钱
发布2018-08-15 16:25:03
1.4K0
发布2018-08-15 16:25:03
举报
文章被收录于专栏:码洞码洞
代码语言:javascript
复制
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}

Java的Executors框架提供的定长线程池内部默认使用LinkedBlockingQueue作为任务的容器,这个队列是没有限定大小的,可以无限向里面submit任务。当线程池处理的太慢的时候,队列里的内容会积累,积累到一定程度就会内存溢出。即使没有内存溢出,队列的延迟势必会变大,而且如果进程突然遇到退出信号,队列里的消息还没有被处理就被丢弃了,那必然会对系统的消息可靠性造成重大影响。

那如何解决线程池的过饱问题呢?从队列入手,无外乎两种方法

  1. 增加消费者,增加消费者处理效率
  2. 限制生产者生产速度

增加消费者就是增加线程池大小,增加消费者处理效率就是优化逻辑处理。但是如果遇到了IO瓶颈,消费者处理的效率完全取决于IO效率,在消费能力上已经优化到了极限还是处理不过来怎么办?或者系统突然遇到用户高峰,我们所配置的线程池大小不够用怎么办?

这时候我们就只能从生产者入手,限制生产者的生产速度。那如何限制呢?

代码语言:javascript
复制
public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
 }

LinkedBlockingQueue提供了capacity参数可以限制队列的大小,当队列元素达到上线的时候,生产者线程会阻塞住,直到队列被消费者消费到有空槽的时候才会继续下去。这里似乎只要给队列设置一个大小就ok了。

但是实际情况并不是我们所想的那样。

代码语言:javascript
复制
public void execute(Runnable command) {
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {  # here
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command); # here
    }

翻开源码可以发现生产者向队列里塞任务用的方法是workQueue.offer(),这个方法在遇到队列满时是不会阻塞的,而是直接返回一个false,表示抛弃了这个任务。然后生产者调用reject方法,进入拒绝处理逻辑。

接下来我们看看这个reject方法到底干了什么

代码语言:javascript
复制
final void reject(Runnable command) {
        handler.rejectedExecution(command, this);
}

我们看到JDK默认提供了4中拒绝策略的实现。

  1. AbortPolicy 默认策略,抛出RejectedExecutionException异常
  2. CallerRunsPolicy 让任务在生产者线程里执行,这样可以降低生产者的生产速度也不会将生产者的线程堵住
  3. DiscardPolicy 直接抛弃任务,不抛异常
  4. DiscardOldestPolicy 直接抛弃旧任务,不抛异常

一般比较常用的是CallerRunPolicy,比较优雅的解决了过饱问题。如果你觉得这种方式不那么优雅的话,还可以使用下面的几种方式。这几种方式都是通过处理RejectExecution来实现生产者的阻塞的目的。

代码语言:javascript
复制
public class BlockWhenQueueFullHandler implements RejectedExecutionHandler {

    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        pool.getQueue().put(new FutureTask(r));
    }

}

这种方案是使用put方法会阻塞生产者线程的原理达到了目的。

代码语言:javascript
复制
public class BlockWhenQueueFull implements RejectedExecutionHandler {

    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        try {
            long waitMs = 10;
            Thread.sleep(waitMs);
        } catch (InterruptedException e) {}
        executor.execute(r);
    }

}

这种方案显而易见,用sleep达到了阻塞的目的。

代码语言:javascript
复制
public class BoundedExecutor {
    private final Executor exec;
    private final Semaphore semaphore;

    public BoundedExecutor(Executor exec, int bound) {
        this.exec = exec;
        this.semaphore = new Semaphore(bound);
    }

    public void submitTask(final Runnable command)
            throws InterruptedException, RejectedExecutionException {
        semaphore.acquire();
        try {
            exec.execute(new Runnable() {
                public void run() {
                    try {
                        command.run();
                    } finally {
                        semaphore.release();
                    }
                }
            });
        } catch (RejectedExecutionException e) {
            semaphore.release();
            throw e;
        }
    }}

这种方案是通过信号量的大小都限制队列的大小,也不需要特别限定executor队列大小了

同样的原理还可以使用wait/notifyAll机制来达到一样的目的。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2017-12-25,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码洞 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
容器服务
腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档