前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >面试官:你在项目中用过 多线程 吗?

面试官:你在项目中用过 多线程 吗?

作者头像
田维常
发布2022-03-22 16:20:32
4650
发布2022-03-22 16:20:32
举报

hi 大家好,我是田哥

最近,从去年到现在,我给小伙伴们做模拟面试已有100多场。有时候我也在想,现在真的很卷吗?大部分人第一次模拟面试结束,给我的感觉不像大家说的那么卷。

奇怪的现象

我在做模拟面试的过程中,无意中发现一个现象,就是如果问八股文,问知识点,在校生或刚刚毕业不久的,回答的斗殴挺好的,往往是工作三五年的在这一块非常欠缺。

我也私下问过很多人,为什么这种现象,主要原因差不多就是:

  • 每天太忙,没时间学习
  • 年纪大了,记不住
  • 就是不要想学

工作三五年的也不是就真的没有优点,他们的优点就是有大量的项目经验。换着问项目业务和设计之类的,他们明显占优势,但,问他们稍微往深的问,就会懵逼。比如:你们项目中使用到了Redis,用来干嘛,他们能立马回答上来。

如果继续追问:如何保证Redis和数据库中的数据一致性?然后就会稀里糊涂的回答。还有就是问他们Redis的持久化方式使用的是哪种?“这个没注意,不是我安装的”,继续问:那你觉得哪种方式更好,答案各种各样的都有。

总结起来就亮点:

  • 学生或新人,八股文占优势(也有一部分啥都不知道,啥也没去背的)。
  • 三五年有项目经验,但大部分都停留在用上面,稍微问题问题就容易暴露自己的家点(也有一小分部知道的比较多)

在模拟面试的时候,我问过很多人是否在项目中用过并发编程的相关技术,用了什么?

基本上都回答:用过线程池

好吧,接下来,那我们就以一个线程池的面试题来对比以上两类人的回答。

聊聊线程池

线程池核心参数

学生或新人:基本上都是一口气就能吧这些参数回答上来,另外有部分优秀的会对这些参数做一个解释。

三五年的:部分人能全部回答出来,一部分人能说出核心线程数、最大线程数,其他参数就吱吱呜呜的回答,还有一部分就是完全一脸懵逼。

我们来看看到底有哪些参数:

代码语言:javascript
复制
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
  • corePoolSize:核心线程数
  • maximumPoolSize:最大线程数
  • keepAliveTime:空闲时间
  • unit:空闲时间单位
  • workQueue:阻塞队列
  • threadFactory:线程工厂
  • handler:拒绝策略

问核心参数时,至少要回答corePoolSizemaximumPoolSizeworkQueuehandler。还是建议全部回答吧,反正也没几个参数。

线程池原理

学生或新人:按照八股文来回答一番,甚至有的在回答核心参数的时候,顺带着就会把线程池的原理给说了(刚刚遇到能说的,就顺带着多说点)。

三五年的:有部分人也会按照八股文上的来回答,有部分人是吱吱呜呜的,不知道在说啥,还有一分部人就是瞎说咯。

关于线程池原理,我这里借用网上一张图:

如果看图记不住,我也有办法,我们可以使用生活案例来理解。

  • 公司A:线程池
  • 公司A自己的员工:核心线程数
  • 公司A接到的订单:我们的业务线程
  • 公司A的仓库:阻塞队列
  • 公司B派的人:最大线程数

开始表演:

公司A接到订单,先给自己员工处理,如果自己员工处理不来了,就丢到公司A仓库里,如果仓库堆满了,这时候就去找公司B,公司B就派人(最大线程数)到公司A,订单持续爆棚,公司B派来的员工和公司A的员工都搞不来了,那就只能把后面来的订单拒绝掉(拒绝策略)。如果公司B的员工在公司A里吧任务做完了,闲着没事了,公司A也不会立马就让人家回公司B,毕竟人员来回还是有成本的,所以,可以适当的给点时间(keepAliveTime),是在没有什么任务了,那你们还是回公司B吧。

好了,按照这个故事去编就行了,也可以模仿着编其他故事,至少让面试官觉得你不是在背八股文。

核心线程数量设置

学生或新人:就算知道也是被八股文的,但很遗憾,问过十多个人,回答上来的应该占30%左右。

三五年的:问过几十个人,回答上来的寥寥无几,甚是遗憾。在项目中敢用线程池,却不知道如何设置核心线程数,这不是瞎搞吗?有的人能回答出CPU密集型和IO密集型,但问他哪些类型是CPU密集型、哪些是IO密集型?分表举两个例子,此时很多人都会慌的。

下面,我们来说说CPU密集型和IO密集型:

CPU密集型:这种任务消耗的主要是 CPU 资源,可以将线程数设置为 N(CPU 核心数)+1,比 CPU 核心数多出来的一个线程是为了防止线程偶发的缺页中断,或者其它原因导致的任务暂停而带来的影响。一旦任务暂停,CPU 就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间。

比如:像加解密,压缩、计算等一系列需要大量耗费 CPU 资源的任务,大部分场景下都是纯 CPU 计算。

IO密集型:这种任务应用起来,系统会用大部分的时间来处理 I/O 交互,而线程在处理 I/O 的时间段内不会占 用 CPU 来处理,这时就可以将 CPU 交出给其它线程使用。因此在 I/O 密集型任务的应用中,我们可以多配置一些线程,具体的计算方法是 :

核心线程数=CPU核心数量*2

比如:像 MySQL 数据库、文件的读写、网络通信等任务,这类任务不会特别消耗 CPU 资源,但是 IO 操作比较耗时,会占用比较多时间。

另外,线程的平均工作时间所占比例越高,就需要越少的线程;线程的平均等待时间所占比例越高,就需要越多的线程;

以上只是理论值,实际项目中建议在本地或者测试环境进行多次调优,找到相对理想的值大小。

线程是如何复用的

关于这个问题,目前我见过的,只有两个人能说个大概。

我们都知道,继承Thread类或者实现Runnable接口,然后调用其start()方法就可以启动线程了,但是如果调用run()方法就和调用普通方法一样。

我们来看看,线程池中,我们提交的线程实例(任务),在线程池中到底是怎么被执行的。

线程池中有个Worker的角色,我们调用execute(Runnable tak)时候,会创建一个Worker:

我们先来看看这个Worker是怎么定义的:

代码语言:javascript
复制
private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable{
    
    private static final long serialVersionUID = 6138294804551838833L;
    
    final Thread thread;
    Runnable firstTask;
    volatile long completedTasks;

    // Worker 只有这一个构造方法,传入 firstTask
    Worker(Runnable firstTask) {
        setState(-1); 
        this.firstTask = firstTask;
        // 调用 ThreadFactory 来创建一个新的线程,这里创建的线程到时候用来执行任务
        // 我们发现创建线程的时候传入的值是this,我们知道创建线程可以通过继承Runnable的方法,
        // Worker继承了Runnable,并且下面重写了run()方法
        this.thread = getThreadFactory().newThread(this);
    }

    // 由上面创建线程时传入的this,上面的thread启动后,会执行这里的run()方法,并且此时runWorker传入的也是this
    public void run() {
        runWorker(this);
    }
}

Worker继承了Runnable,并且下面重写了run()方法,这时候我们调用new Worker().start()方法后,就会调用Worker类中的run()方法。

此时的Worker和我们平时的Thread类就类似了

好了,我们再回到前面的说的execute()方法中来。

代码语言:javascript
复制
public void execute(Runnable command) { 
    int c = ctl.get();

    // 如果当前线程数少于核心线程数,那么直接添加一个 worker 来执行任务,
    // 创建一个新的线程,并把当前任务 command 作为这个线程的第一个任务(firstTask)
    if (workerCountOf(c) < corePoolSize) {
        // 添加任务成功,那么就结束了。提交任务嘛,线程池已经接受了这个任务,这个方法也就可以返回了
        // 至于执行的结果,到时候会包装到 FutureTask 中。
        // 这里的true代表当前线程数小于corePoolSize,表示以corePoolSize为线程数界限
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 到这里说明,要么当前线程数大于等于核心线程数,要么刚刚 addWorker 失败了
    // 如果线程池处于 RUNNING 状态,把这个任务添加到任务队列 workQueue 中
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 如果线程池已不处于 RUNNING 状态,那么移除已经入队的这个任务,并且执行拒绝策略
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 如果 workQueue 队列满了,那么进入到这个分支
    // 这里的false代表当前线程数大于corePoolSize,表示以 maximumPoolSize 为界创建新的 worker
    // 如果失败,说明当前线程数已经达到 maximumPoolSize,执行拒绝策略
    else if (!addWorker(command, false))
        reject(command);
}

这段代码里我们看到了很多地方都在调用接着addWorker()方法,我们来看看这个方法(方法内容有点多):

代码语言:javascript
复制
private boolean addWorker(Runnable firstTask, boolean core) {
    //相当于goto,虽然不建议滥用,看看大神们是如何使用吧
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // 如果线程池已关闭,并满足以下条件之一,那么不创建新的 worker:
        // 1. 线程池状态大于 SHUTDOWN,其实也就是 STOP, TIDYING, 或 TERMINATED
        // 2. firstTask != null
        // 3. workQueue.isEmpty()
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            //这里就是通过core参数对当前线程数的判断
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    /*
     * 到这里,我们认为在当前这个时刻,可以开始创建线程来执行任务了,
     */

    // worker 是否已经启动
    boolean workerStarted = false;
    // 是否已将这个 worker 添加到 workers 这个 HashSet 中
    boolean workerAdded = false;
    Worker w = null;
    try {
        final ReentrantLock mainLock = this.mainLock;
        // 把 firstTask 传给 worker 的构造方法
        w = new Worker(firstTask);
        // 取 worker 中的线程对象,之前说了,Worker的构造方法会调用 ThreadFactory 来创建一个新的线程
        final Thread t = w.thread;
        if (t != null) {
            // 这个是整个类的全局锁,因为关闭一个线程池需要这个锁,至少我持有锁的期间,线程池不会被关闭
            mainLock.lock();
            try {

                int c = ctl.get();
                int rs = runStateOf(c);

                // 小于 SHUTTDOWN 那就是 RUNNING
                // 如果等于 SHUTDOWN,前面说了,不接受新的任务,但是会继续执行等待队列中的任务
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    // worker 里面的 thread 可不能是已经启动的
                    if (t.isAlive())
                        throw new IllegalThreadStateException();
                    // 加到 workers 这个 HashSet 中
                    workers.add(w);
                    int s = workers.size();
                    // largestPoolSize 用于记录 workers 中的个数的最大值
                    // 因为 workers 是不断增加减少的,通过这个值可以知道线程池的大小曾经达到的最大值
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            // 添加成功的话,启动这个线程
            if (workerAdded) {
                // 启动线程,最重要的就是这里,下面我们会讲解如何执行任务
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        // 如果线程没有启动,需要做一些清理工作,如前面 workCount 加了 1,将其减掉
        if (! workerStarted)
            addWorkerFailed(w);
    }
    // 返回线程是否启动成功
    return workerStarted;
}

请注意:t.start();这里的t其实就是我们创建的Worker对象,就回到我们前面说的,调用start()方法后,会执行到他的run()方法中来,我们继续看Worker中的run()方法。

代码语言:javascript
复制
//Worker的run方法
public void run() {
      runWorker(this);
}

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        //取出需要执行的任务,
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            //如果task不是null,或者去队列中取任务,注意这里会阻塞,后面会分析getTask方法
            while (task != null || (task = getTask()) != null) {
               //这个lock在这里是为了如果线程被中断,那么会抛出InterruptedException,而退出循环,结束线程
                w.lock();
                //判断线程是否需要中断
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                   //任务开始执行前的hook方法
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        
                        task.run();//这里就是直接调用run 方法
                        
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } finally {
                       ////任务开始执行后的hook方法
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;//清空task
                    w.completedTasks++;//完成数添加
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
           //Worker退出
           processWorkerExit(w, completedAbruptly);
    }
}

这里注意这行代码:

while (task != null || (task = getTask()) != null)

注释中已经说清楚了:如果task不是null,或者去队列中取任务,注意这里会阻塞。

另外,一个注意点:

task.run();

这个其实就是调用我们我们传入到execute(Runnable task)中的参数,也就是说,我们创建的Runnable对象,根本就不会去调用其start()方法,而是直接调用其run()方法。

我们在看看getTask()方法到底是做什么?

代码语言:javascript
复制
// 此方法有三种可能:
// 1. 阻塞直到获取到任务返回。我们知道,默认 corePoolSize 之内的线程是不会被回收的,
//      它们会一直等待任务
// 2. 超时退出。keepAliveTime 起作用的时候,也就是如果这么多时间内都没有任务,那么应该执行关闭
// 3. 如果发生了以下条件,此方法必须返回 null:
//    - 池中有大于 maximumPoolSize 个 workers 存在(通过调用 setMaximumPoolSize 进行设置)
//    - 线程池处于 SHUTDOWN,而且 workQueue 是空的,前面说了,这种不再接受新的任务
//    - 线程池处于 STOP,不仅不接受新的线程,连 workQueue 中的线程也不再执行
private Runnable getTask() {
    boolean timedOut = false; 

    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        // 两种可能
        // 1. rs == SHUTDOWN && workQueue.isEmpty()
        // 2. rs >= STOP
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            // CAS 操作,减少工作线程数
            decrementWorkerCount();
            return null;
        }

        boolean timed;      // Are workers subject to culling?
        for (;;) {
            int wc = workerCountOf(c);
            // 允许核心线程数内的线程回收,或当前线程数超过了核心线程数,那么有可能发生超时关闭
            timed = allowCoreThreadTimeOut || wc > corePoolSize;
            if (wc <= maximumPoolSize && ! (timedOut && timed))
                break;
            if (compareAndDecrementWorkerCount(c))
                return null;
            c = ctl.get();  // Re-read ctl
            // compareAndDecrementWorkerCount(c) 失败,线程池中的线程数发生了改变
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
        // wc <= maximumPoolSize 同时没有超时
        try {
            // 到 workQueue 中获取任务
            // 如果timed=wc > corePoolSize=false,我们知道核心线程数之内的线程永远不会销毁,则执行workQueue.take();我前面文章中讲过,take()方法是阻塞方法,如果队里中有任务则取到任务,如果没有任务,则一直阻塞在这里知道有任务被唤醒。
            //如果timed=wc > corePoolSize=true,这里将执行超时策略,poll(keepAliveTime, TimeUnit.NANOSECONDS)会阻塞keepAliveTime这么长时间,没超时就返回任务,超时则返回null.
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            // 如果此 worker 发生了中断,采取的方案是重试
            // 解释下为什么会发生中断,这个读者要去看 setMaximumPoolSize 方法,
            // 如果开发者将 maximumPoolSize 调小了,导致其小于当前的 workers 数量,
            // 那么意味着超出的部分线程要被关闭。重新进入 for 循环,自然会有部分线程会返回 null
            timedOut = false;
        }
    }
}

到这里,大家应该都知道了,线程池中到底是如何复用线程的吧。

我来总结一下:

线程池中,维护了一个Worker的内部类,其中Worker也实现了Runnable接口,重写了run()方法,在调用这个run()时候,会采用类似于死循环的while方式重复使用这个worker去获取任务并执行我们传入execute(Runnable task)方法的参数task(task存放在阻塞队列里)。

关于线程池的完整版源码分析,已更新到我的博客里。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2022-03-12,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Java后端技术全栈 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 奇怪的现象
  • 聊聊线程池
    • 线程池核心参数
      • 线程池原理
        • 核心线程数量设置
          • 线程是如何复用的
          相关产品与服务
          云数据库 Redis
          腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档