专栏首页Vegout浅谈java线程池(基于jdk1.8)

浅谈java线程池(基于jdk1.8)

多线程让程序世界丰富多彩,也让其错综复杂。对于线程的创建和销毁成了一笔不小的开销,为了减少这些开销,出现了线程池。线程池对线程进行管理,对于需要使用多线程的我们来说,只需要把任务丢给线程池就可以了。但当我们把任务丢给线程池的时候,它是如何处理的呢?我们去源码中寻找踪迹。

ThreadPoolExecutor

线程池在JDK中的主要实现类就是这个ThreadPoolExecutor。我们首先看一下他的构造函数

    public ThreadPoolExecutor(int corePoolSize,//核心线程数
                              int maximumPoolSize,//最大线程数
                              long keepAliveTime,//存活时间
                              TimeUnit unit,//存活时间的单位(秒、毫秒等)
                              BlockingQueue<Runnable> workQueue,//阻塞队列
                              RejectedExecutionHandler handler) {//拒绝策略
        ...
    }

构造函数中出现的这几个参数都是线程池的重要指标,我们用几句话把它们串起来,顺便说明他们是如何发挥作用的: 线程池中有两种重要的元素,一是线程,二是阻塞队列。 1、当线程池刚初始化时,线程为0,阻塞队列为空。 2、第一个任务来临时,线程池为它新建一个线程来执行,第二个任务来临时,线程池再为它新建一个线程来执行,直到新建的线程数达到了核心线程数,线程池暂时就不会再新建线程了。 3、新来的任务将会被放到阻塞队列中,随着新任务的不断到来,如果阻塞队列已,那么线程池将会继续为新来的任务新建线程,直到线程数达到了最大线程数。 4、这时,对与新来的任务,线程池将不会直接接受,而是执行拒绝策略。 5、拒绝策略有很多种,默认的是直接抛出异常。还有其他三种①丢弃当前被拒绝的任务。②丢弃最老的任务,重新尝试接受新任务。③在调用者线程中执行这个任务

下面我们去源码中寻找以上叙述的踪迹

Execute线程池的执行入口

execute方法是执行的入口

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            //如果此时线程数小于核心线程数,则增加线程处理任务
            if (addWorker(command, true))//A
                return;//增加成功,结束
            c = ctl.get();
        }//线程数已经不小于核心线程数,进行入队
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))//如果线程池已不再运行,取出任务,执行拒绝策略
                reject(command);
            else if (workerCountOf(recheck) == 0)//此刻没有线程,就增加一个线程
                addWorker(null, false);//C
        }//如果我们没能成功入队,那么久增加一个线程,如果失败;说明线程池已关闭或已饱和,执行拒绝策略
        else if (!addWorker(command, false))//B
            reject(command);
    }

以上代码清晰可见,基本还原了上述文字叙述的流程。但我们发现当线程数不小于核心线程数时,入队之后,还进行了一些检测操作,就是看当前线程池是否还在运行,如果已经停止运行,那么取出入队的任务,执行拒绝策略。所以拒绝策略不只是线程池饱和之后执行,停止运行也会执行,当然这也是情理之中的事情。addWorker就是增加线程来处理任务,但我们发现这个方法的参数除了Runnable还有一个,是一个boobean值,并且在上面代码中的A处和B处分别调用了true和false,这里有何奥妙?我们来剖开addWorker

增加工作者

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&//监测线程池状态
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                //如果此时有效线程数已经超过bound(核心线程数或最大线程数),返回false
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    //CAS增加worker数量记录,成功则跳出循环
                    break retry;
                c = ctl.get();  // Re-read ctl准备重试
                if (runStateOf(c) != rs)//如果线程池状态发送变化,从外循环重新开始(进行状态监测)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

前期判断

其实我们在上一段代码中就看到了c = ctl.get()这一句代码。这里取出的c是一个int值,高3位表示的是此刻线程池的状态,低29位表示的是此刻线程数。因为这个数会有多个线程对它进行操作,所以将它用AtomicInteger进行了包装,并且提供从这个Int值中取出线程数和线程池状态的方法(一些位操作),源码如下:

 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

    // Packing and unpacking ctl
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }

从上也可以看到线程池有五种状态。当调用shutdown()、shutdownnow()等方法时,线程池的状态会发生变化,从而影响线程池对与新来任务的策略,这个也在addWorker中有所体现。进入addWorker这个方法首先就是进行线程池状态的检测,如果处于非运行状态,就会返回false。但也有个特殊情况如果 (rs == SHUTDOWN &&firstTask == null && ! workQueue.isEmpty())这三个条件同时为真的话,将不会返回false。这个条件满足时是什么状态呢?线程池处于SHUTDOWN状态,队列不为空,且调用的是addWorker(null,true/false)。此时基本就是这么个状态:线程池准备关闭了,需要新建一些线程来把队列中的任务处理掉。 看完了这个检测过程,就进入了内for循环,这个for循环中首先判断线程数是否超过了某个值,如果超过,返回false,不再新建线程。可以发现这个值是由addWorker的第二个参数控制的,如果为true,这个值就是corePollSize,如果为false,这个值就是maxinumPoolSize。分别对应了核心线程的新建,和超过核心线程数其他线程的新建。然后CAS改变线程数量记录,如果成功,跳出循环,进行线程的新建。如果不成功,则重试,并且如果线程池状态发生了变化,还需要继续外层循环,重新进行状态检测。两个for循环之后的代码就是进行线程的新建,并且启动这个线程。新建线程的工作是在Worker的构造函数中进行的

线程的新建

        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

可以看到构造函数将自己传给了newThread来新建线程,也就是说Worker类有一个thread成员变量,这个thread又是通过Worker来构造的。而我们启动这个线程的时候调用的就是这个线程的start方法,下面看一下这个线程的执行逻辑,也就是worker的run方法

任务的执行

public void run() {
            runWorker(this);
        }

run方法就是调用了runWorker方法,我们再进入runWorker

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

这个方法主体是一个while循环,首先处理firstTask,处理完之后就去队列里getTask()。处理的过程很简单,就是调用task的run方法(此刻的run方法调用才是对任务的处理)。getTask我们可以稍微看一下

任务的获取

private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?//判断线程是否需要回收
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

如果线程不需要回收的话,他会去take任务,take方法如果取不到任务就会一直阻塞,取到就执行,因此这个线程就不会终结。但如果线程需要回收,那么线程会去poll任务,阻塞时间一旦超过了keepAliveTime,poll就会返回null,从而线程也就不会继续这个“取任务并执行任务”的循环,实现线程的回收。

各种线程池

JDK提供了一个工具类Executors来让我们方便的创建各种线程池。 1、newFixedThreadPool 这个线程池中线程的数量是一定的,队列无限长,不能及时处理的任务在队列中等待。

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

2、newWorkStealingPool 这个线程池是一个支持并行任务处理的线程池,传入的参数就是我们目标的并行度,为了减少争用,内部可能出现多个队列,实际的线程数也会动态的增加和减少,任务的先后执行顺序并不是一定的。

public static ExecutorService newWorkStealingPool(int parallelism) {
        return new ForkJoinPool
            (parallelism,
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }

3、newSingleThreadExecutor 这个线程池中只会有一个线程和一个无界队列。可以保证任务的执行顺序,并且任何一个时刻只有一个任务在执行。

 public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

4、newCachedThreadPool 这个线程池只会在需要的时候创建线程,每个线程如果空闲时间超过60秒就会被回收。对线程的数量没有限制,有内存溢出的风险。但长时间不适用的话它将是耗费资源最小的线程池,因为所有的线程都会被回收。

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

5、newScheduledThreadPool 这个线程池可以用来周期性的执行一些任务

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

本文分享自微信公众号 - Vegout(t10244201)

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2018-10-09

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 43.QT-访问远程SQLite数据库

    将要共享的share文件夹设置为共享(远程访问默认只能读),如果要想远程访问能够写的话,则点击权限进行修改,然后应用.

    张诺谦
  • Mybatis源码分析之Mapper注册与绑定

    Mybatis 是一个「面向 sql」的持久层框架,它可实现动态拼装 sql,极其灵活,同时避免了几乎所有的 JDBC 代码和手动设置参数以及获取结果集,其插件...

    张乘辉
  • 64位内核第七讲.内核中字符串编程注意事项

    在内核中.我们的字符有 char类型的.也有wchar_t类型的.分别是宽字符 跟窄字符.但是这种都不建议使用了.而内核提供了两个新的结构体让我们使用 分别...

    IBinary
  • Python 编码的这些坑,你还在踩吗!?

    utf-8: 可变长编码,常用的英文字母被编码成1个字节,汉字通常是3个字节,只有很生僻的字符才会被编码成4-6个字节。如果你要传输的文本包含大量英文字符,用U...

    昱良
  • 你必须收藏的Github技巧

    GitHub Pages大家可能都知道,常用的做法,是建立一个gh-pages的分支,通过setting里的设置的GitHub Pages模块可以自动创建该项目...

    用户5224393
  • 你都理解创建线程池的参数吗?

    多线程可以说是面试官最喜欢拿来问的题目之一了,可谓是老生之常谈,不管你是新手还是老司机,我相信你一定会在面试过程中遇到过有关多线程的一些问题。那我现在就充当一次...

    张乘辉
  • 64位内核开发第九讲,注册表编程.

    如下: 内核: \Registry\Machine\SYSTEM\CurrentControlSet\Control\Session Manager\pen...

    IBinary
  • 64位内核开发第十二讲,进程监视,ring3跟ring0事件同步.

    事件状态: 有信号 Signaled 无信号 Non-signaled 事件类别 自动恢复 Synchronization 自动设置 不自动恢复. N...

    IBinary
  • 自学 Python 只需要这3步

    大家好,我是大鹏,城市数据团联合发起人,致力于Python数据分析、数据可视化的应用与教学。

    昱良
  • 链表反转,此生相伴

    你是否有过这种体会:看别人的代码,当时看得很明白了,但是,过段时间,自己却怎么都写不出来?这是怎么回事,可能我们也清楚。别人的思维你是无法拷贝的,形成之前不具备...

    double

扫码关注云+社区

领取腾讯云代金券