专栏首页阿杜的世界Java线程池监控小结

Java线程池监控小结

最近我们组杨青同学遇到一个使用线程池不当的问题:异步处理的线程池线程将主线程hang住了,分析代码发现是线程池的拒绝策略设置得不合理,设置为CallerRunsPolicy。当异步线程的执行效率降低时,阻塞队列满了,触发了拒绝策略,进而导致主线程hang死。

从这个问题中,我们学到了两点:

  • 线程池的使用,需要充分分析业务场景后作出选择,必要的情况下需要自定义线程池;
  • 线程池的运行状况,也需要监控

关于线程池的监控,我参考了《Java编程的艺术》中提供的思路实现的,分享下我的代码片段,如下:

public class AsyncThreadExecutor implements AutoCloseable {

    private static final int DEFAULT_QUEUE_SIZE = 1000;

    private static final int DEFAULT_POOL_SIZE = 10;

    @Setter
    private int queueSize = DEFAULT_QUEUE_SIZE;

    @Setter
    private int poolSize = DEFAULT_POOL_SIZE;

    /**
     * 用于周期性监控线程池的运行状态
     */
    private final ScheduledExecutorService scheduledExecutorService =
        Executors.newSingleThreadScheduledExecutor(new BasicThreadFactory.Builder().namingPattern("async thread executor monitor").build());

    /**
     * 自定义异步线程池
     * (1)任务队列使用有界队列
     * (2)自定义拒绝策略
     */
    private final ThreadPoolExecutor threadPoolExecutor =
        new ThreadPoolExecutor(poolSize, poolSize, 0, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(queueSize),
                               new BasicThreadFactory.Builder().namingPattern("async-thread-%d").build(),
                               (r, executor) -> log.error("the async executor pool is full!!"));
    private final ExecutorService executorService = threadPoolExecutor;

    @PostConstruct
    public void init() {
        scheduledExecutorService.scheduleAtFixedRate(() -> {
            /**
             * 线程池需要执行的任务数
             */
            long taskCount = threadPoolExecutor.getTaskCount();
            /**
             * 线程池在运行过程中已完成的任务数
             */
            long completedTaskCount = threadPoolExecutor.getCompletedTaskCount();
            /**
             * 曾经创建过的最大线程数
             */
            long largestPoolSize = threadPoolExecutor.getLargestPoolSize();
            /**
             * 线程池里的线程数量
             */
            long poolSize = threadPoolExecutor.getPoolSize();
            /**
             * 线程池里活跃的线程数量
             */
            long activeCount = threadPoolExecutor.getActiveCount();

            log.info("async-executor monitor. taskCount:{}, completedTaskCount:{}, largestPoolSize:{}, poolSize:{}, activeCount:{}",
                     taskCount, completedTaskCount, largestPoolSize, poolSize, activeCount);
        }, 0, 10, TimeUnit.MINUTES);
    }

    public void execute(Runnable task) {
        executorService.execute(task);
    }

    @Override
    public void close() throws Exception {
        executorService.shutdown();
    }
}

这里的主要思路是:(1)使用有界队列的固定数量线程池;(2)拒绝策略是将任务丢弃,但是需要记录错误日志;(3)使用一个调度线程池对业务线程池进行监控。

在查看监控日志的时候,看到下图所示的监控日志:

屏幕快照 2018-03-28 21.55.19.png

这里我对largestPooSize的含义比较困惑,按字面理解是“最大的线程池数量”,但是按照线程池的定义,maximumPoolSize和coreSize相同的时候(在这里,都是10),一个线程池里的最大线程数是10,那么为什么largestPooSize可以是39呢?我去翻这块的源码:

    /**
     * Returns the largest number of threads that have ever
     * simultaneously been in the pool.
     *
     * @return the number of threads
     */
    public int getLargestPoolSize() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            return largestPoolSize;
        } finally {
            mainLock.unlock();
        }
    }

注释的翻译是:返回在这个线程池里曾经同时存在过的线程数。再看这个变量largestPoolSize在ThreadExecutor中的赋值的地方,代码如下:

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;//这里这里!
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

发现两点:

  • largestPoolSize是worker集合的历史最大值,只增不减。largestPoolSize的大小是线程池曾创建的线程个数,跟线程池的容量无关;
  • largestPoolSize<=maximumPoolSize。

PS:杨青同学是这篇文章的灵感来源,他做了很多压测。给了我很多思路,并跟我一起分析了一些代码。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Java线程池容量设置

    Java中可以通过Executors和ThreadPoolExecutor的方式创建线程池,通过Executors可以快速创建四种常见的线程池,但这种方式在实际...

    阿杜
  • 《七周七并发模型》阅读笔记(一)一、线程与锁——第一天二、线程与锁——第二天三、线程与锁——第三天

    线程与锁模型其实是对底层硬件运行过程的形式化,这种形式化既是该模型最大的优点,也是它最大的缺点。我们借助Java语言来学习线程与锁模型,不过内容也适用于其他语言...

    阿杜
  • ThreadLocal的使用场景

    最近项目中遇到如下的场景:在执行数据迁移时,需要按照用户粒度加锁,因此考虑使用排他锁,迁移工具和业务服务属于两个服务,因此需要使用分布式锁。

    阿杜
  • 图解 .NET async/await

    我非常认同一图胜千言的说法,因此我也喜欢用图来解释各种技术概念。本文通过图解来说明 .NET 中 async/await 如何工作。

    zls365
  • java多线程关键字volatile、lock、synchronized

    总结:volatile关键字的作用是:使变量在多个线程间可见(具有可见性),但是仅靠volatile是不能保证线程的安全性,volatile关键字不具备sync...

    刘文正
  • [051]Binder线程优先级继承

    Binder通信需要两个线程,这两个线程的优先级是不同,也就意味着,他们能获取到的cpu的优先级不同。

    王小二
  • JAVA多线程之线程间的通信方式

    本总结我对于JAVA多线程中线程之间的通信方式的理解,主要以代码结合文字的方式来讨论线程间的通信,故摘抄了书中的一些示例代码。

    哲洛不闹
  • 【纯干货】Java 并发基础常见面试题总结

    进程是程序的一次执行过程,是系统运行程序的基本单位,因此进程是动态的。系统运行一个程序即是一个进程从创建,运行到消亡的过程。

    乔戈里
  • java高并发系列-第2天:并发级别

    这是java高并发系列第2篇文章,一个月,咱们一起啃下java高并发,欢迎留言打卡,一起坚持一个月,拿下java高并发。

    路人甲Java
  • 从构建分布式秒杀系统聊聊Threadpool线程池

    从0到1构建分布式秒杀系统案例的代码已经全部上传至码云,文章也被分发到各个平台。其中也收到了不少小伙伴喜欢和反馈,有网友如是说:

    小柒2012

扫码关注云+社区

领取腾讯云代金券