前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >详解Java线程池监控

详解Java线程池监控

作者头像
857技术社区
发布2022-05-17 15:58:36
2.8K0
发布2022-05-17 15:58:36
举报
文章被收录于专栏:857-Bigdata

背景

业务使用线程池的时候,出现了问题,影响线上业务,由于没有线程池监控,导致问题难以发现和排查。于是需要这么一个线程池监控组件,用来监控线程池执行状态,任务执行状态等。

实现方式

ThreadPoolExecutor 提供了以下几个方法可以监控线程池的使用情况:

方法

含义

getActiveCount()

线程池中正在执行任务的线程数量

getCompletedTaskCount()

线程池已完成的任务数量,该值小于等于 taskCount

getCorePoolSize()

线程池的核心线程数量

getLargestPoolSize()

线程池曾经创建过的最大线程数量。通过这个数据可以知道线程池是否满过,也就是达到了 maximumPoolSize

getMaximumPoolSize()

线程池的最大线程数量

getPoolSize()

线程池当前的线程数量

getTaskCount()

线程池已经执行的和未执行的任务总数

通过这些方法,可以对线程池进行监控,在 ThreadPoolExecutor 类中提供了几个空方法,如 beforeExecute 方法, afterExecute 方法和 terminated 方法,可以扩展这些方法在执行前或执行后增加一些新的操作,例如统计线程池的执行任务的时间等,可以继承自 ThreadPoolExecutor 来进行扩展。极简示例如下

代码语言:javascript
复制
@Slf4j
public class ThreadPoolMonitor extends ThreadPoolExecutor {
    public ThreadPoolMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                             TimeUnit unit, BlockingQueue<Runnable> workQueue,
                             ThreadFactory threadFactory) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
        log.info("init");
    }
    @Override
    public void shutdown() {
        log.info("shutdown");
        super.shutdown();
    }
    @Override
    public List<Runnable> shutdownNow() {
        log.info("shutdownNow");
        return super.shutdownNow();
    }
    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        log.info("beforeExecute");
    }
    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        log.info("afterExecute");
    }
}

实战应用

上面是已经说明该组件的实现方式,但是在生产环境中,面对业务的复杂度高、变数大,我们应该如何实现一个高可拓展的线程池监控组件呢?这是这一小节的内容主题。

使用示例

先来个 hello world 演示效果

代码语言:javascript
复制
@Test
public void helloWorld() throws InterruptedException {
    //创建线程池对象
    MonitoredThreadPoolExecutor monitoredThreadPoolExecutor = new MonitoredThreadPoolExecutor("被监控的线程池1", 2, 2, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1), new MonitorConfig().setQueueSlowTime(100).setTaskSlowTimeThreshold(100));
    monitoredThreadPoolExecutor.execute(() -> {
        log.info("任务1开始……");
        try {
            Thread.sleep(RandomUtils.nextInt(1000, 2000));
        } catch (InterruptedException ignore) {
        }
        log.info("任务1完成……");
    });
    monitoredThreadPoolExecutor.execute(() -> {
        log.info("任务2开始……");
        try {
            Thread.sleep(RandomUtils.nextInt(0, 100));
        } catch (InterruptedException ignore) {
        }
        log.info("任务2完成……");
    });
    Thread.currentThread().join(5 * 1000);
}

hello world 程序运行日志如下:

代码语言:javascript
复制
Connected to the target VM, address: '127.0.0.1:62724', transport: 'socket'
[main] INFO MonitoredThreadPoolExecutor - 线程池名称 = 被监控的线程池1, 提交任务数+1
[ThreadPoolMonitor_0] INFO PoolMonitorTask - 线程池名称 = 被监控的线程池1, 活跃线程数峰值 = 0, 队列任务数峰值 = 0, 核心线程数 = 2, 最大线程数 = 2, 执行的任务总数 = 0
[main] INFO MonitoredThreadPoolExecutor - 线程池名称 = 被监控的线程池1, 提交任务数+1
[被监控的线程池1_1] INFO MonitoredThreadPoolExecutorTest - 任务2开始……
[被监控的线程池1_0] INFO MonitoredThreadPoolExecutorTest - 任务1开始……
[被监控的线程池1_1] INFO MonitoredThreadPoolExecutorTest - 任务2完成……
[被监控的线程池1_1] INFO MonitoredThreadPoolExecutor - 线程池名称 = 被监控的线程池1, 任务排队时间 = 1, 任务执行时间 = 86
[ThreadPoolMonitor_0] INFO PoolMonitorTask - 线程池名称 = 被监控的线程池1, 活跃线程数峰值 = 1, 队列任务数峰值 = 0, 核心线程数 = 2, 最大线程数 = 2, 执行的任务总数 = 2
[被监控的线程池1_0] INFO MonitoredThreadPoolExecutorTest - 任务1完成……
[被监控的线程池1_0] INFO MonitoredThreadPoolExecutor - 线程池名称 = 被监控的线程池1, 任务排队时间 = 2, 任务执行时间 = 1452
[被监控的线程池1_0] INFO MonitoredThreadPoolExecutor - 线程池名称 = 被监控的线程池1, 执行慢任务数+1
[ThreadPoolMonitor_1] INFO PoolMonitorTask - 线程池名称 = 被监控的线程池1, 活跃线程数峰值 = 0, 队列任务数峰值 = 0, 核心线程数 = 2, 最大线程数 = 2, 执行的任务总数 = 0
[ThreadPoolMonitor_0] INFO PoolMonitorTask - 线程池名称 = 被监控的线程池1, 活跃线程数峰值 = 0, 队列任务数峰值 = 0, 核心线程数 = 2, 最大线程数 = 2, 执行的任务总数 = 0
[ThreadPoolMonitor_2] INFO PoolMonitorTask - 线程池名称 = 被监控的线程池1, 活跃线程数峰值 = 0, 队列任务数峰值 = 0, 核心线程数 = 2, 最大线程数 = 2, 执行的任务总数 = 0
from the target VM, address: '127.0.0.1:62724', transport: 'socket'

详细使用示例

「监控方式」

线程池的监控分为 2 种类型,一种是在执行任务前后全量统计任务排队时间和执行时间,另外一种是通过定时任务,定时获取活跃线程数,队列中的任务数,核心线程数,最大线程数等数据。

MonitoredThreadPoolExecutor 会同时统计这两种类型的数据。如果您不想统计全量任务执行和排队的监控数据,可以使用 ThreadPoolMonitor.monitor(String name, ThreadPoolExecutor threadPoolExecutor) 方法,该方法只使用定时任务来监控线程数据。其中,name 需要唯一,threadPoolExecutor 不能是 MonitoredThreadPoolExecutor 类型,否则会抛出异常。

「监控参数」

  • poolName :线程池名称。必须为每个线程池创建不同的名称,否则会抛出异常。可以将其作为监控平台的 id,通过名称找到对应的监控数据。
  • monitorConfig :监控配置参数。其中可以设置两个参数,taskSlowTimeThreshold 和 queueSlowTimeThreshold。如果 taskSlowTime 指定为 100,则表示任务执行时间大于 100ms 的任务会统计为慢任务,在监控中可以看到慢任务的数量。同样的,queueSlowTime 指定为 100,表示排队时间大于 100ms 的任务统计为排队慢任务,可以在监控中看到排队慢任务的数量。

其他参数和 JDK 中线程池参数意义相同。

示例代码

代码语言:javascript
复制
@Slf4j
public class MonitoredThreadPoolExecutorTest {

    @Test
    public void helloWorld() throws InterruptedException {
        //创建线程池对象
        MonitoredThreadPoolExecutor monitoredThreadPoolExecutor = new MonitoredThreadPoolExecutor("被监控的线程池1", 2, 2, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1), new MonitorConfig().setQueueSlowTimeThreshold(100).setTaskSlowTimeThreshold(100));
        monitoredThreadPoolExecutor.execute(() -> {
            log.info("任务1开始……");
            try {
                Thread.sleep(RandomUtils.nextInt(1000, 2000));
            } catch (InterruptedException ignore) {
            }
            log.info("任务1完成……");
        });
        monitoredThreadPoolExecutor.execute(() -> {
            log.info("任务2开始……");
            try {
                Thread.sleep(RandomUtils.nextInt(0, 100));
            } catch (InterruptedException ignore) {
            }
            log.info("任务2完成……");
        });
        Thread.currentThread().join(5 * 1000);
    }

    @Test
    public void ThreadPoolExecutorTest() throws InterruptedException {
        //线程池需要指定唯一的线程池名称,否则会抛出异常
        String uniqPoolName = "被监控的线程池2";
        MonitoredThreadPoolExecutor monitoredThreadPoolExecutor = new MonitoredThreadPoolExecutor(uniqPoolName, 1, 4, 60,
                TimeUnit.SECONDS, new ArrayBlockingQueue<>(256), new MonitorConfig().setQueueSlowTimeThreshold(100).setTaskSlowTimeThreshold(100)) {
            //如果想在任务执行开始或者执行结束时,执行一些操作,覆盖afterExecute0(Runnable r, Throwable t)和beforeExecute0(Thread t, Runnable r),注意方法名称后面有0
            @Override
            public void afterExecute0(Runnable r, Throwable t) {
                log.info("增强afterExecute0");
            }

            @Override
            public void beforeExecute0(Thread t, Runnable r) {
                log.info("增强beforeExecute0");
            }
        };

        //使用方式和ThreadPoolExecutor完全相同
        for (int i = 0; i < 3; i++) {
            int taskId = i;
            monitoredThreadPoolExecutor.submit(() -> log.info("任务{}", taskId));
        }

        for (int i = 0; i < 3; i++) {
            monitoredThreadPoolExecutor.submit(() -> {
                int id = RandomUtils.nextInt();
                log.info("生成id = {}", id);
                return id;
            });
        }
        //使用结束后,如果需要再创建相同名称的线程池,则需要调用remove方法移除定时任务。
        ThreadPoolMonitor.remove(uniqPoolName);
        //关闭线程池,在一段时间内会关闭所有监控的定时任务
        monitoredThreadPoolExecutor.shutdown();
        Thread.currentThread().join(2 * 1000);
    }

}

运行日志

代码语言:javascript
复制
[main] INFO MonitoredThreadPoolExecutor - 线程池名称 = 被监控的线程池2, 提交任务数+1
[ThreadPoolMonitor_0] INFO PoolMonitorTask - 线程池名称 = 被监控的线程池2, 活跃线程数峰值 = 0, 队列任务数峰值 = 0, 核心线程数 = 1, 最大线程数 = 4, 执行的任务总数 = 0
[main] INFO MonitoredThreadPoolExecutor - 线程池名称 = 被监控的线程池2, 提交任务数+1
[main] INFO MonitoredThreadPoolExecutor - 线程池名称 = 被监控的线程池2, 提交任务数+1
[被监控的线程池2_0] INFO MonitoredThreadPoolExecutorTest - 增强beforeExecute0
[被监控的线程池2_0] INFO MonitoredThreadPoolExecutorTest - 任务0
[被监控的线程池2_0] INFO MonitoredThreadPoolExecutor - 线程池名称 = 被监控的线程池2, 任务排队时间 = 0, 任务执行时间 = 0
[被监控的线程池2_0] INFO MonitoredThreadPoolExecutorTest - 增强afterExecute0
[main] INFO MonitoredThreadPoolExecutor - 线程池名称 = 被监控的线程池2, 提交任务数+1
[被监控的线程池2_0] INFO MonitoredThreadPoolExecutorTest - 增强beforeExecute0
[main] INFO MonitoredThreadPoolExecutor - 线程池名称 = 被监控的线程池2, 提交任务数+1
[被监控的线程池2_0] INFO MonitoredThreadPoolExecutorTest - 任务1
[main] INFO MonitoredThreadPoolExecutor - 线程池名称 = 被监控的线程池2, 提交任务数+1
[被监控的线程池2_0] INFO MonitoredThreadPoolExecutor - 线程池名称 = 被监控的线程池2, 任务排队时间 = 0, 任务执行时间 = 0
[被监控的线程池2_0] INFO MonitoredThreadPoolExecutorTest - 增强afterExecute0
[被监控的线程池2_0] INFO MonitoredThreadPoolExecutorTest - 增强beforeExecute0
[被监控的线程池2_0] INFO MonitoredThreadPoolExecutorTest - 任务2
[被监控的线程池2_0] INFO MonitoredThreadPoolExecutor - 线程池名称 = 被监控的线程池2, 任务排队时间 = 0, 任务执行时间 = 0
[被监控的线程池2_0] INFO MonitoredThreadPoolExecutorTest - 增强afterExecute0
[被监控的线程池2_0] INFO MonitoredThreadPoolExecutorTest - 增强beforeExecute0
[被监控的线程池2_0] INFO MonitoredThreadPoolExecutorTest - 生成id = 488282372
[被监控的线程池2_0] INFO MonitoredThreadPoolExecutor - 线程池名称 = 被监控的线程池2, 任务排队时间 = 0, 任务执行时间 = 3
[被监控的线程池2_0] INFO MonitoredThreadPoolExecutorTest - 增强afterExecute0
[被监控的线程池2_0] INFO MonitoredThreadPoolExecutorTest - 增强beforeExecute0
[被监控的线程池2_0] INFO MonitoredThreadPoolExecutorTest - 生成id = 1176017668
[被监控的线程池2_0] INFO MonitoredThreadPoolExecutor - 线程池名称 = 被监控的线程池2, 任务排队时间 = 4, 任务执行时间 = 0
[被监控的线程池2_0] INFO MonitoredThreadPoolExecutorTest - 增强afterExecute0
[被监控的线程池2_0] INFO MonitoredThreadPoolExecutorTest - 增强beforeExecute0
[被监控的线程池2_0] INFO MonitoredThreadPoolExecutorTest - 生成id = 463520743
[被监控的线程池2_0] INFO MonitoredThreadPoolExecutor - 线程池名称 = 被监控的线程池2, 任务排队时间 = 4, 任务执行时间 = 0
[被监控的线程池2_0] INFO MonitoredThreadPoolExecutorTest - 增强afterExecute0

主要实现

  • ThreadPoolMonitor 负责线程池与监控方法的管理;
代码语言:javascript
复制
public class ThreadPoolMonitor {

    private static final Map<String, FutureWrapper> POOL_TASK_FUTURE_MAP = new ConcurrentHashMap<>();
    private static final ScheduledThreadPoolExecutor SCHEDULE_THREAD_POOL = new ScheduledThreadPoolExecutor(8, new NamedThreadFactory("ThreadPoolMonitor"));

    private static final Long DEFAULT_MONITOR_PERIOD_TIME_MILLS = 1000L;

    public ThreadPoolMonitor() {
    }

    public static void monitor(String name, ThreadPoolExecutor threadPoolExecutor) {
        if (threadPoolExecutor instanceof MonitoredThreadPoolExecutor) {
            throw new IllegalArgumentException("MonitoredThreadPoolExecutor is already monitored.");
        } else {
            monitor0(name, threadPoolExecutor, DEFAULT_MONITOR_PERIOD_TIME_MILLS);
        }
    }

    public static void remove(String name) {
        ThreadPoolMonitor.FutureWrapper futureWrapper = POOL_TASK_FUTURE_MAP.remove(name);
        if (futureWrapper != null) {
            futureWrapper.future.cancel(false);
        }

    }

    public static void remove(String name, ThreadPoolExecutor threadPoolExecutor) {
        ThreadPoolMonitor.FutureWrapper futureWrapper = POOL_TASK_FUTURE_MAP.get(name);
        if (futureWrapper != null && futureWrapper.threadPoolExecutor == threadPoolExecutor) {
            POOL_TASK_FUTURE_MAP.remove(name, futureWrapper);
            futureWrapper.future.cancel(false);
        }

    }

    static void monitor(MonitoredThreadPoolExecutor threadPoolExecutor) {
        monitor0(threadPoolExecutor.poolName(), threadPoolExecutor, DEFAULT_MONITOR_PERIOD_TIME_MILLS);
    }

    private static void monitor0(String name, ThreadPoolExecutor threadPoolExecutor, long monitorPeriodTimeMills) {
        PoolMonitorTask poolMonitorTask = new PoolMonitorTask(threadPoolExecutor, name);
        POOL_TASK_FUTURE_MAP.compute(name, (k, v) -> {
            if (v == null) {
                return new ThreadPoolMonitor.FutureWrapper(SCHEDULE_THREAD_POOL.scheduleWithFixedDelay(poolMonitorTask, 0L, monitorPeriodTimeMills, TimeUnit.MILLISECONDS), threadPoolExecutor);
            } else {
                throw new IllegalStateException("duplicate pool name: " + name);
            }
        });
    }

    static {
        Runtime.getRuntime().addShutdownHook(new Thread(ThreadPoolMonitor.SCHEDULE_THREAD_POOL::shutdown));
    }

    static class FutureWrapper {
        private final Future<?> future;
        private final ThreadPoolExecutor threadPoolExecutor;

        public FutureWrapper(Future<?> future, ThreadPoolExecutor threadPoolExecutor) {
            this.future = future;
            this.threadPoolExecutor = threadPoolExecutor;
        }
    }
}
  • 给异步任务 Runnable 套一个壳,让他该任务可监控;
代码语言:javascript
复制
public class MonitoredRunnable implements Runnable, Monitored {

    private final Runnable runnable;
    private final long inQueueNanoTime;

    public MonitoredRunnable(Runnable runnable) {
        this.runnable = runnable;
        this.inQueueNanoTime = System.nanoTime();
    }

    @Override
    public long inQueueNanoTime() {
        return this.inQueueNanoTime;
    }

    @Override
    public void run() {
        this.runnable.run();
    }
}
  • MonitoredThreadPoolExecutor 继承 ThreadPoolExecutor 覆盖其方法做监控统计增强;
代码语言:javascript
复制
@Slf4j
public class MonitoredThreadPoolExecutor extends ThreadPoolExecutor {

    private final ThreadLocal<Long> executeStartTimeThreadLocal;
    protected String poolName;
    private final int slowTaskThreshold;
    private final int queueTimeThreshold;
    private static final int DEFAULT_SLOW_TASK_TIME = 5000;
    private static final int DEFAULT_QUEUE_TIME = 100;


    @Override
    public void execute(Runnable command) {
        log.info("线程池名称 = {}, 提交任务数+1", this.poolName());
        super.execute(new MonitoredRunnable(command));
    }

    public MonitoredThreadPoolExecutor(String poolName, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        this(poolName, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, new NamedThreadFactory(poolName), new AbortPolicy(), new MonitorConfig());
    }

    public MonitoredThreadPoolExecutor(String poolName, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
        this(poolName, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler, new MonitorConfig());
    }

    public MonitoredThreadPoolExecutor(String poolName, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, MonitorConfig monitorConfig) {
        this(poolName, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, new NamedThreadFactory(poolName), new AbortPolicy(), monitorConfig);
    }

    public MonitoredThreadPoolExecutor(String poolName, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler, MonitorConfig monitorConfig) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, new MonitorRejectedExecutionHandler(handler, poolName));
        this.executeStartTimeThreadLocal = new ThreadLocal<>();
        this.poolName = poolName;
        this.slowTaskThreshold = monitorConfig.getTaskSlowTimeThreshold() > 0 ? monitorConfig.getTaskSlowTimeThreshold() : DEFAULT_SLOW_TASK_TIME;
        this.queueTimeThreshold = monitorConfig.getQueueSlowTimeThreshold() > 0 ? monitorConfig.getQueueSlowTimeThreshold() : DEFAULT_QUEUE_TIME;
        ThreadPoolMonitor.monitor(this);
    }

    @Override
    protected final void beforeExecute(Thread t, Runnable r) {
        try {
            this.beforeExecute0(t, r);
        } finally {
            this.executeStartTimeThreadLocal.set(System.nanoTime());
        }
    }

    @Override
    protected final void afterExecute(Runnable r, Throwable t) {
        this.afterExecuteMonitor(r, t);
        this.afterExecute0(r, t);
    }

    private void afterExecuteMonitor(Runnable r, Throwable t) {
        try {
            long executeEndNano = System.nanoTime();
            Long executeStartTime = this.executeStartTimeThreadLocal.get();
            Monitored monitored = (Monitored) r;
            long queueNanoTime = monitored.inQueueNanoTime();
            int queueTime = (int) ((executeStartTime - queueNanoTime) / 1000000L);
            int executeTime = (int) ((executeEndNano - executeStartTime) / 1000000L);
            log.info("线程池名称 = {}, 任务排队时间 = {}, 任务执行时间 = {}",
                    this.poolName(), queueTime, executeTime);
            if (executeTime > this.slowTaskThreshold) {
                log.info("线程池名称 = {}, 执行慢任务数+1", this.poolName());
            }
            if (queueTime > this.queueTimeThreshold) {
                log.info("线程池名称 = {}, 排队慢任务数+1", this.poolName());
            }

            if (t != null) {
                log.info("线程池名称 = {}, 执行异常的任务数+1", this.poolName());
            }
        } catch (Exception ignore) {
        } finally {
            executeStartTimeThreadLocal.remove();
        }
    }

    protected void beforeExecute0(Thread t, Runnable r) {
    }

    protected void afterExecute0(Runnable r, Throwable t) {
    }

    @Override
    protected final void terminated() {
        ThreadPoolMonitor.remove(this.poolName(), this);
    }

    public String poolName() {
        return this.poolName;
    }
}
  • PoolMonitorTask 定时收集线程池监控项的任务实现;
代码语言:javascript
复制
@Slf4j
@Getter
public class PoolMonitorTask implements Runnable {

    private final ThreadPoolExecutor monitoredThreadPool;
    private final String poolName;
    private volatile long lastTaskCount = 0L;

    public PoolMonitorTask(ThreadPoolExecutor monitoredThreadPool, String poolName) {
        this.monitoredThreadPool = monitoredThreadPool;
        this.poolName = poolName;
    }

    @Override
    public void run() {
        int activeCount = this.monitoredThreadPool.getActiveCount();
        int corePoolSize = this.monitoredThreadPool.getCorePoolSize();
        int maximumPoolSize = this.monitoredThreadPool.getMaximumPoolSize();
        int queueTaskSize = this.monitoredThreadPool.getQueue().size();
        long taskCount = this.monitoredThreadPool.getTaskCount();
        int executedTask = (int) (taskCount - this.lastTaskCount);
        log.info("线程池名称 = {}, 活跃线程数峰值 = {}, 队列任务数峰值 = {}, 核心线程数 = {}, 最大线程数 = {}, 执行的任务总数 = {}",
                this.poolName, activeCount, queueTaskSize, corePoolSize, maximumPoolSize, executedTask);
        this.lastTaskCount = taskCount;
        if (this.monitoredThreadPool.isTerminated()) {
            ThreadPoolMonitor.remove(this.poolName, this.monitoredThreadPool);
        }
    }

}
  • ThreadPoolMonitor 线程池监控者,负责线程池与监控方法的管理,定时采集任务的执行者;
代码语言:javascript
复制
public class ThreadPoolMonitor {

    private static final Map<String, FutureWrapper> POOL_TASK_FUTURE_MAP = new ConcurrentHashMap<>();
    private static final ScheduledThreadPoolExecutor SCHEDULE_THREAD_POOL = new ScheduledThreadPoolExecutor(8, new NamedThreadFactory("ThreadPoolMonitor"));

    private static final Long DEFAULT_MONITOR_PERIOD_TIME_MILLS = 1000L;

    public ThreadPoolMonitor() {
    }

    public static void monitor(String name, ThreadPoolExecutor threadPoolExecutor) {
        if (threadPoolExecutor instanceof MonitoredThreadPoolExecutor) {
            throw new IllegalArgumentException("MonitoredThreadPoolExecutor is already monitored.");
        } else {
            monitor0(name, threadPoolExecutor, DEFAULT_MONITOR_PERIOD_TIME_MILLS);
        }
    }

    public static void remove(String name) {
        ThreadPoolMonitor.FutureWrapper futureWrapper = POOL_TASK_FUTURE_MAP.remove(name);
        if (futureWrapper != null) {
            futureWrapper.future.cancel(false);
        }

    }

    public static void remove(String name, ThreadPoolExecutor threadPoolExecutor) {
        ThreadPoolMonitor.FutureWrapper futureWrapper = POOL_TASK_FUTURE_MAP.get(name);
        if (futureWrapper != null && futureWrapper.threadPoolExecutor == threadPoolExecutor) {
            POOL_TASK_FUTURE_MAP.remove(name, futureWrapper);
            futureWrapper.future.cancel(false);
        }

    }

    static void monitor(MonitoredThreadPoolExecutor threadPoolExecutor) {
        monitor0(threadPoolExecutor.poolName(), threadPoolExecutor, DEFAULT_MONITOR_PERIOD_TIME_MILLS);
    }

    private static void monitor0(String name, ThreadPoolExecutor threadPoolExecutor, long monitorPeriodTimeMills) {
        PoolMonitorTask poolMonitorTask = new PoolMonitorTask(threadPoolExecutor, name);
        POOL_TASK_FUTURE_MAP.compute(name, (k, v) -> {
            if (v == null) {
                return new ThreadPoolMonitor.FutureWrapper(SCHEDULE_THREAD_POOL.scheduleWithFixedDelay(poolMonitorTask, 0L, monitorPeriodTimeMills, TimeUnit.MILLISECONDS), threadPoolExecutor);
            } else {
                throw new IllegalStateException("duplicate pool name: " + name);
            }
        });
    }

    static {
        Runtime.getRuntime().addShutdownHook(new Thread(ThreadPoolMonitor.SCHEDULE_THREAD_POOL::shutdown));
    }

    static class FutureWrapper {
        private final Future<?> future;
        private final ThreadPoolExecutor threadPoolExecutor;

        public FutureWrapper(Future<?> future, ThreadPoolExecutor threadPoolExecutor) {
            this.future = future;
            this.threadPoolExecutor = threadPoolExecutor;
        }
    }
}
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-12-06,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 857Hub 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 背景
  • 实现方式
  • 实战应用
    • 使用示例
      • 详细使用示例
        • 主要实现
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档