前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >面试,关于线程池的那些事

面试,关于线程池的那些事

原创
作者头像
阿建dong一点
修改2022-07-02 15:20:05
3192
修改2022-07-02 15:20:05
举报
文章被收录于专栏:写点吧写点吧

背景

最近在面试找工作吧,关于线程池的问题被面试官问得还是蛮多。但是发现大多面试官也问不出啥来,大都会问有哪几个核心参数,自己拉吧拉吧的讲12345个参数,然后就没然后了(就下一个问题了)。但不排除有些面试官确实可以,会针对你的理解,问你一些稍微源码级别或者给你设计一些场景给你回答(个人还是比较喜欢这种)。

核心参数

  1. corePoolSize 核心线程数
  2. maximumPoolSize 最大线程数
  3. keepAliveTime 线程空闲时间
  4. workQueue 任务缓冲队列
  5. RejectedExecutionHandler 拒绝策略,默认抛出异常(其他包括丢弃队列中旧的任务、丢弃当前任务、使用当前线程执行任务)

值得思考的问题

  1. keepAliveTime参数有什么用,体现在哪里?
  2. corePoolSize核心线程数内创建的线程会被回收?
  3. 我们知道当工作线程(worker)数大于等于 corePoolSize且缓存队列满了之后,会根据maximumPoolSize进行判断,如果小于,则开启一个线程去执行我们的任务。那有没有办法不用丢弃任何任务,也不用当前线程去执行呢?
  4. 为什么Worker要继承AbstractQueuedSynchronizer,其作用在哪里?
  5. shutdown()和shutdownNow()区别在哪里?

想到生活中的一些事

某外包公司老板最近接到一个小项目,项目经理(称A)把需求整理好,拆解任务,跟老板汇报保守要10个开发人员。嗯,二话不说立刻招人,干活。某一天,甲方提出新需求,A掐指一算,任务有点多,目前的开发都还在忙着,安排不上了,哎,先拆解任务存入需求库把。果然是甲方爸爸,没过几天,又提出新的需求,A也忍不住骂mmp了,因为需求库满了,只能跟老板汇报,老板摸了下头,当前开发人员还在我最大能接受(上限20个开发)范围内,二话不说立刻招人,干活。此时,老板跟A墨迹了下,如果继续提新需求,招的开发人员比我最大能接受数量还要多,直接跟他们摊牌把。

其实老板也有自己得小心思,招20个开发,也不是长久之计(程序员动不动就2w起步),等项目需求都做完了,把几个核心得留下就行了,裁掉那些几天都没活干的。

以上几个加粗文字,勉强能跟线程池的核心参数对得上把(大家自行对号入座哈),然后呢,针对此故事回答下前面得几个问题

回到前面的问题

  1. keepAliveTime:主要是用来回收那些空闲的线程,当线程创建之后,执行完任务,就立刻去队列中取任务,如果在指定时间内没有任务可取(也就是就是每个开发人员做完手上的任务,就要去需求库中领取任务,一直重复此操作),该线程就会被回收掉。当然,这个有一定的条件,比如:允许核心线程数超时或者工作线程数是大于配置的核心线程数。
  2. corePoolSize:核心线程数内创建的线程和后面创建的线程是一样的,在线程池中并没有所谓固定的核心线程。每个线程执行完任务,接着从队列中拉一个任务出来继续执行,当任务没有的时候,线程领取不到就会被回收掉(也就是你那10个员工先入职,不代表你可以永远在公司就职,如果后来的员工比你更能干活积极,照样把你开掉的)。
  3. 这里主要考察拒绝策略。我们可以自定义一个拒绝策略继续将我们的任务放到线程池,比如:新建一个队列用来存储那些触发拒绝策略的任务,再开一个线程从队列中取任务出来重新丢到线程池中。
  4. 有两个地方,需要使用worker进行加锁。分别是runWorker()和shutdown()函数。个人理解是执行runWorker函数时,如果取到任务时候加锁,作者不希望因为执行shutdown(主要是给工作线程打中断标记)而对线程正在执行的任务有任何影响(虽然给线程打中断标记,不会对线程有绝对的影响,主要还是看开发者怎么处理,这一点可具体看 interrupt 相关知识)。
  5. 执行shutdown(),会将线程池状态修改为SHUTDOWN,并且给工作线程打中断标记,此时不会接收新的任务,但,如果队列已有任务,则,会继续执行的。 执行shutdown(),线程池对应的状态是STOP,并强制给所有工作线程打中断标记,此时,同样不会接收新任务,如果队列中有任务,会把任务移除掉,不会执行。 总体来说,这两个函数对正在被执行的任务是没有影响的(这里排除你对中断异常做了其他处理)。

针对以上问题,我们分别看下源码:

线程执行逻辑:

代码语言:javascript
复制
final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            //领取任务,领得到进入while逻辑,否则,处理后面回收的逻辑
            //从这里可看出,没有所谓的固定核心线程,全靠抢,抢到就继续执行,抢不到就销毁
            while (task != null || (task = getTask()) != null) {
                //取到任务,加锁
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        //这里是我们具体的业务逻辑了
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            //处理回收线程逻辑
            processWorkerExit(w, completedAbruptly);
        }
    }

获取队列中的任务逻辑:

代码语言:javascript
复制
private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            //(1) allowCoreThreadTimeOut 默认是false 
            //主要看  wc > corePoolSize;
            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

           //(3)按理,如果此时没有任务加进来,这个条件是满足的,最终会走到 return null 返回,结束无限遍历
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                //(2)当timed为true, 并且在指定时间keepAliveTime找不到任务会返回一 个 null
                //由于在for(;;)内,会继续遍历回到(3)处
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

shutdown逻辑,处理线程池状态为SHUTDOWN,遍历工作线程,然后获取锁,成功则打中断标记

代码语言:txt
复制
private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                //这里同样是尝试获取worker的锁,意味着如果线程正在执行任务,
                //那会阻塞在这里的,也就是无法打标记
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }

shutdownNow逻辑,处理线程池状态为STOP。遍历工作线程,给线程打中断标记。移除队列中的任务,并返回。

代码语言:txt
复制
void interruptIfStarted() {
            Thread t;
            //这里跟shutdown不一样,,有点强制的意思
            //因为getState() >= 0 基本能满足,尽管线程已经在执行任务了
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        } 

private List<Runnable> drainQueue() {
        BlockingQueue<Runnable> q = workQueue;
        ArrayList<Runnable> taskList = new ArrayList<Runnable>();
        //移除队列中的元素
        q.drainTo(taskList);
        if (!q.isEmpty()) {
            //这里,主要是再次确认队列中是否还有元素,确保做到不留一个活口哈。
            //因为有可能在q.drainTo(taskList)期间,用户线程继续往线程池丢任务。
            for (Runnable r : q.toArray(new Runnable[0])) {
                if (q.remove(r))
                    taskList.add(r);
            }
        }
        return taskList;
    }

写到最后

线程池不管平时工作还是面试,出现的频率还是比较多的。如果平时注意积累,使用时即可信手拈来。另外,以上仅是本人对线程池的初步了解,若有不对的地方,欢迎指出哈。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 背景
  • 核心参数
  • 值得思考的问题
  • 想到生活中的一些事
  • 回到前面的问题
  • 写到最后
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档