线程池定制初探

背景

​ 我在的公司虽然是移动支付领域的公司。但是我做的业务类似于管理系统,所以一开始写代码的时候没有考虑到数据的量的问题。开始有一个统计页面,大概要统计的数据分为十多个维度,然后每个维度需要考虑十个左右的方面。也就是统计页面轻轻地点击一个查询按钮,要进行100次左右的数据库查询。开始数据量小的时候,查询还能够使用,页面不会超时。到后面数据量越来越大,最大的一张表数据量已经超过1亿。这时候悲催的事情发生了--- 页面点击查询直接未响应.....


方案考虑

​ 其实当时的方案我想了两种:

  1. 优化业务实现逻辑,其实在查询的时候一个表的多个维度的查询可以传一个维度列表进去。查出来结果之后,后台在进行分组计算。
  2. 采用多线程,对每一个维度采用一个线程去执行。这样其实每个线程的查询次数在10次左右,总的时间差不多可以缩短10倍。

​ 当然,当时我知道最好的方案是混合使用1和2。但当时1的实现存在以下问题,最终让我决定选择方案2:

  1. 因为每个维度涉及多张表, 不同的表归属于不同的模块。如果某张表的查询条件不支持维度列表,那么需要提需求给对应模块开发...... (何年何月能完)
  2. 方案1的改动涉及代码变动较多,且不好封装每个线程任务,写出来的代码逻辑有点绕,后台存在大量的重新分维度统计的代码。简而言之就是不优雅

实现考虑

​ 既然最终选定方案2的话,那么自然考虑到选择线程池。那么选啥线程池呢?Single?Schedule肯定不用想直接PASS。Cached?其实当前来说是可行的,因为当前线上的维度也就十多个。以Cached线程池的特性,只要同时并发的线程数量不至于太大,也不至于给系统太大压力导致系统瘫痪。但是因为维度会随着业务的增长而越来越多,如果后续维度增加到20甚至30,那么对系统的压力就无法预估了。

​ 思前想后,我最终决定选择Fix线程池,将线程池固化大小为10个。但这时候我又想,其实统计页面一天查询的次数并不多。可能就每天早上点击查询一次,后面可能就不再点查询。那么这时候又出现了两种蛋疼的选择:

  1. 直接在查询的方法内部初始化线程池
  2. 在类的属性中初始化线程池

​ 第1种方案的话,每次查询都要重新初始化线程池,造成很大的资源消耗。如果连续查询多次,并不会后面比前面快,反而可能由于不停的线程池销毁创建导致越来越慢。最终我选择了第2种方案,当然我并没有选择饿汉模式直接初始化,而是选择了懒汉模式在方法中进行线程池初始化,且通过锁保证只初始化一次。


想法尝试

​ 到这里你如果觉得我的定制初探就完了,那你就too young too naive。我不追求可行,只追求完美~

这时候我就在想,其实根据用户的操作习惯,统计页面的查询按钮,要么就隔着几个小时不按,要么就可能一时心血来潮,连续查询几天或者同一天分多个维度查询多次。而大家都知道Fix线程池固化了线程池的大小,即使后面连续几个小时没有任务来,仍然会一直保持着初始大小的线程数。那么能不能实现即能够控制线程数量Fix,又可以在空闲的时候销毁核心线程呢?答案当然是有的,关键点在于:ThreadPoolExecutor的allowCoreThreadTimeOut方法

/**
     * Sets the policy governing whether core threads may time out and
     * terminate if no tasks arrive within the keep-alive time, being
     * replaced if needed when new tasks arrive. When false, core
     * threads are never terminated due to lack of incoming
     * tasks. When true, the same keep-alive policy applying to
     * non-core threads applies also to core threads. To avoid
     * continual thread replacement, the keep-alive time must be
     * greater than zero when setting <tt>true</tt>. This method
     * should in general be called before the pool is actively used.
     * @param value <tt>true</tt> if should time out, else <tt>false</tt>
     * @throws IllegalArgumentException if value is <tt>true</tt>
     * and the current keep-alive time is not greater than zero.
     *
     * @since 1.6
     */
    public void allowCoreThreadTimeOut(boolean value) {
        if (value && keepAliveTime <= 0)
            throw new IllegalArgumentException("Core threads must have nonzero keep alive times");

        allowCoreThreadTimeOut = value;
    }

​ 从源码的注释来看,该方法可以支持线程池的keep-alive time的设置同时对核心线程和非核心线程生效。具体为啥,后面我分析线程池源码的时候会讲到,现在我们只需要看看用到该处的源码(在ThreadPoolExecutor的getTask方法中):

/**
     * Gets the next task for a worker thread to run.  The general
     * approach is similar to execute() in that worker threads trying
     * to get a task to run do so on the basis of prevailing state
     * accessed outside of locks.  This may cause them to choose the
     * "wrong" action, such as trying to exit because no tasks
     * appear to be available, or entering a take when the pool is in
     * the process of being shut down.  These potential problems are
     * countered by (1) rechecking pool state (in workerCanExit)
     * before giving up, and (2) interrupting other workers upon
     * shutdown, so they can recheck state. All other user-based state
     * changes (to allowCoreThreadTimeOut etc) are OK even when
     * performed asynchronously wrt getTask.
     *
     * @return the task
     */
    Runnable getTask() {
        for (;;) {
            try {
                int state = runState;
                if (state > SHUTDOWN)
                    return null;
                Runnable r;
                if (state == SHUTDOWN)  // Help drain queue
                    r = workQueue.poll();
                else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
                    r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
                else
                    r = workQueue.take();
                if (r != null)
                    return r;
                if (workerCanExit()) {
                    if (runState >= SHUTDOWN) // Wake up others
                        interruptIdleWorkers();
                    return null;
                }
                // Else retry
            } catch (InterruptedException ie) {
                // On interruption, re-check runState
            }
        }
    }

​ 关键在于workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS),该方法表示尝试从等待队列中获取任务,如果超过keepAlive time,则直接返回null。如果返回null的话,work线程就会被终止。

​ 好了这些都是后话,在我看了线程池的源码之后才能够清楚地知道为啥这个参数有这个作用。那么在之前,我是怎么测试验证我的想法的呢?其实很简单:

  1. 我先参照线程池的默认的DefaultThreadFactory定义自己的线程工厂,目的是为了获取线程工厂内的ThreadGroup属性,因为ThreadGroup类有一个activeCount方法,该方法可以获取线程组内活跃的线程个数。
class MyThreadFactory implements ThreadFactory {
        static final AtomicInteger poolNumber = new AtomicInteger(1);
        final ThreadGroup group;
        final AtomicInteger threadNumber = new AtomicInteger(1);
        final String namePrefix;

        MyThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-";
        }

        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
        
        // 我所增加的方法,为了获取线程组
        public ThreadGroup getThreadGroup() {
            return this.group;
        }
    }
  1. 万事俱备,只欠东风了,我只需要构造两种不同的情况验证我的猜想即可!
MyThreadFactory myThreadFactory = new MyThreadFactory();
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5, 5, TimeUnit.SECONDS,
                                new LinkedBlockingQueue<Runnable>(), myThreadFactory); 
// executor.allowCoreThreadTimeOut(true);
for (int i = 0; i <= 20; i++) {
  executor.submit(new MyRunnable());
}
System.out.println(myThreadFactory.getThreadGroup().activeCount());     // 6 
Thread.sleep(20000);
System.out.println("After destroy, active thread count:" + myThreadFactory.getThreadGroup().activeCount());             // 6/1
executor.shutdown();

​ 运行的结果:

  1. 如果不执行executor.allowCoreThreadTimeOut(true);两个activeCount的结果都是6
  2. 如果执行executor.allowCoreThreadTimeOut(true);第一个activeCount的结果为6,第二个activeCount的结果为1

最终实现

​ 好了,终于到最终定制实现了。我的代码实现如下(类为Spring管理的类,最终线程池shutdown在PreDestroy的时候):

    private volatile ThreadPoolExecutor searchExecutors; 
    
    private final Object lock = new Object();
    
    /**
     * 初始化线程池,开始不进行初始化,免得浪费系统资源
     */
    private void initExecutor() {
        if (searchExecutors != null) {
            return;
        }
        
        synchronized (lock) {
            if (searchExecutors == null) {
                 // 设置一个固定大小为10,核心线程如果超过10分钟空闲也可销毁的线程池
                ThreadPoolExecutor tempExecutor = new ThreadPoolExecutor(10, 10, 10, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(), Executors.defaultThreadFactory());
                tempExecutor.allowCoreThreadTimeOut(true);
                this.searchExecutors = tempExecutor;
            }
        }
    }

    @PreDestroy
    public void destroy() {
        if (searchExecutors != null) {
            searchExecutors.shutdown();
        }
    }

这里再说两点

  1. 这个初始化方法采用了double-check-lock的方式,来保证多线程并发获取到的是同一个线程池实例
  2. 注意到在设置属性searchExecutors之前借助了一个tempExecutor。这样也是为了防止ThreadPoolExecutor对象已经被初始化,但是allowCoreThreadTimeOut还未被执行的问题。(对象过早逃逸导致属性与预期不符)。

总结

​ 通过这次线程池定制初探,发现其实看起来再没有技术含量的工作,如果细细想下去还是会有很多可以深入研究的东西。而做软件其实也要像做艺术品一样,多考虑不同的实现可能,尽量选择最完美的解决方案。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏程序猿DD

Netflix Zuul与Nginx的性能对比

这是一篇翻译,关于大家经常质疑的一个问题:API网关Zuul的性能。 作者:STANISLAV MIKLIK 原文:NETFLIX ZUUL VS NGINX ...

3235
来自专栏用户画像

2.3.2 集线器

集线器实质上是一个多端口的中继器,也可以工作在物理层。在Hub工作时,当一个端口接受到数据后,由于信号在从端口到Hub的传输过程中已有了衰减,所以Hub便将该信...

791
来自专栏Rainbond开源「容器云平台」

Kubernetes容器网络接口(CNI) midonet网络插件的设计与实现

1277
来自专栏CDN及云技术分享

微型分布式架构设计范例

设计该系统初衷是基于描绘业务(或机器集群)存储模型,分析代理缓存服务器磁盘存储与回源率的关系。

92028
来自专栏云计算

Kubernetes的服务网格(第6部分):简单轻松的分期微服务

在将代码暴露给生产流量之前,分期新代码是构建可靠的,低故障停机时间软件的关键部分。然而不幸的是,对于微服务来说,每个新服务的添加都提升了分期过程的复杂性,因为服...

2018
来自专栏腾讯Bugly的专栏

【MIG专项测试组】腾讯手机管家实战分析:内存突增是为神马?

应用版本升级后使用内存突增?如何跟踪?这次MIG专项测试组为大家分享内存问题跟踪实战过程! MIG专...

3174
来自专栏进击的程序猿

Wormhole:可靠的发布-订阅系统

Wormhole是Facebook内部使用的一个Pub-Sub系统,目前还没有开源。 在网上搜索论文相关内容的时候,发现几个网站,首先是该篇论文有个视频: ...

703
来自专栏云计算

异步数据存储声明

在过去的几年里,NoSQL数据存储的工作让我对应用程序的方向有了一些见解,因为NoSQL成为了主流的数据存储和检索方法,至少对网络和基于云的程序来说是这样的(企...

1979
来自专栏杨建荣的学习笔记

数据建模和数据映射的初步思考

今天和大家聊下关于数据建模和数据映射的事情,其实开始一个简单的项目的时候,我们的目标是很明确,而且所做的事情相对来说是比较简单的流程。

810
来自专栏数据和云

静默错误:Oracle 数据库是如何应对和处理的 ?

说明:关于本文提到的所有参考文档,一律上传分享,关注本公众号回复 122arch 获得。

652

扫描关注云+社区