前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >从源码学习线程池的使用原理及核心思想解析

从源码学习线程池的使用原理及核心思想解析

作者头像
向着百万年薪努力的小赵
发布2022-12-02 10:43:30
2260
发布2022-12-02 10:43:30
举报
文章被收录于专栏:小赵的Java学习

文章内容引用自 咕泡科技 咕泡出品,必属精品

文章目录

1为什么要使用线程池

  我们都知道线程的作用,能够异步处理任务,并且能处理多个任务。   但是无限制的使用线程,线程之间的创建、销毁,切换,都会带来一定的消耗!   所以,为了控制线程的数量,复用已有线程,同时减少线程切换带来的开销,,线程池这种池化技术就出来了!!

给同学们总结了应付面试的要点:

  1. 降低资源消耗:重复利用已创建的线程,降低线程创建和销毁造成的损耗。
  2. 提高响应速度:任务到达时,无需等待线程创建即可立即执行。
  3. 提高线程的可管理性:使用线程池可以进行统一的分配、调优和监控。
  4. 提供更多更强大的功能:线程池具备可拓展性,允许开发人员向其中增加更多的功能。

线程池核心设计思想: 固定的线程数,来消费我们不定量的task

本文是对源码层面对线程池解析,有关线程池的使用,大家可以移步这篇文章: 链接: Java并发编程——四种线程池的使用及分析

2几种常用线程池介绍

大致给出几种常用线程池介绍:

  • newFixedThreadPool:该方法返回一个固定数量的线程池,线程数不变,当有一个任务提交时,若线程池中空闲,则立即执行,若没有,则会被暂缓在一个任务队列中,等待有空闲的线程去执行。
  • newSingleThreadExecutor: 创建一个线程的线程池,若空闲则执行,若没有空闲线程则暂缓在任务队列中。
  • newCachedThreadPool:返回一个可根据实际情况调整线程个数的线程池,不限制最大线程数量,若用空闲的线程则执行任务,若无任务则不创建线程。并且每一个空闲线程会在60秒后自动回收
  • newScheduledThreadPool: 创建一个可以指定线程的数量的线程池,但是这个线程池还带有延迟和周期性执行任务的功能,类似定时器。
  • newWorkStealingPool:适合使用在很耗时的操作,但是newWorkStealingPool不是ThreadPoolExecutor的扩展,它是新的线程池类ForkJoinPool的扩展,但是都是在统一的一个Executors类中实现,由于能够合理的使用CPU进行对任务操作(并行操作),所以适合使用在很耗时的任务中。

  其实,除了newWorkStealingPool,线程池都是对ThreadPoolExecutor的一层封装,并且,建议大家不要用这些封装的,用底层的ThreadPoolExecutor,这样你就逼着自己去把线程池的一些参数去搞明白!!并且能提供比封装的更多功能!比如监控!   这是阿里开发手册中的建议哦

  今天我们就去看下ThreadPoolExecutor中怎么去实现固定的线程数,来消费我们不定量的task。

闲话不多说,让我们从初始化进入看源码的正题:

3从初始化开始

我们先看下初始化(构造)5个参数:

代码语言:javascript
复制
public ThreadPoolExecutor(int corePoolSize,//主线程数
						int maximumPoolSize, //最大线程数
						long keepAliveTime, //线程存活时间(除主线程外,其他的线程在没有任务执行的时候需要回收,多久后回收)
						TimeUnit unit, //存活时间的时间单位
						BlockingQueue<Runnable>workQueue//阻塞队列,我们需要执行的task都在该队列) {
	this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
	}

唉?里面的this有7个参数,于是点进this构造方法

代码语言:javascript
复制
    public ThreadPoolExecutor(int corePoolSize,//主线程数
                              int maximumPoolSize,//最大线程数
                              long keepAliveTime,//线程存活时间(除主线程外,其他的线程在没有任务执行的时候需要回收,多久后回收)
                              TimeUnit unit,//存活时间的时间单位
                              BlockingQueue<Runnable> workQueue,//阻塞队列,我们需要执行的task都在该队列
                              ThreadFactory threadFactory,//生成thread的工厂
                              RejectedExecutionHandler handler//拒绝饱和策略,当队列满了并且线程个数达到maximunPoolSize后采取的策略) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            //对数值传递不合理及最大线程数小于主线程数的情况做异常处理
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
        //这里能看出来这三个参数不能传null或不传
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ? null :AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

System.getSecurityManager:这是一个安全管理器,当运行未知的Java程序的时候,该程序可能有恶意代码(删除系统文件、重启系统等),为了防止运行恶意代码对系统产生影响,需要对运行的代码的权限进行控制,这时候就要启用Java安全管理器。这里不必深入,往下需要看很深。。。

总结:初始化(构造函数)就是赋了一些初始值

初始化完成之后,就该执行了

4执行任务execute

代码语言:javascript
复制
    public void execute(Runnable command) {
        if (command == null)//如果要执行的任务是空的,异常
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();//这里是啥见下文解释
        //高三位代表线程池的状态,低29位代表线程池中的线程数量 
        //如果当前线程数小于主线程数,添加线程
        if (workerCountOf(c) < corePoolSize) {//workerCountOf见下文
            if (addWorker(command, true))//addWorker才是添加线程的方法
                return;
            c = ctl.get();
        }
        //如果超过主线程数,将任务添加至workqueue 阻塞队列
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            //如果线程池关闭,移除并拒绝
            if (! isRunning(recheck) && remove(command))
                reject(command);
                //否则如果当前线程池线程空,则添加一个线程
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //阻塞队列已满,添加失败,采用拒绝策略
        else if (!addWorker(command, false))
            reject(command);
    }

上边代码里的 ctl是个啥?:

代码语言:javascript
复制
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

定义于threadPoolExecutor类中的一个原子性的32位二进制int数值 Java用这个二进制数的高三位代表线程池的状态,低29位代表线程池中的线程数量

workerCountOf是个啥?

代码语言:javascript
复制
private static int workerCountOf(int c)  { 
	return c & CAPACITY; //CAPACITY 与(&) 上面的ctl
}

作用是:获取ctl的低29位,获取当前线程池中的线程数

那CAPACITY又是个啥: 给出源码中的定义:

代码语言:javascript
复制
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
	
	//Integer类的一个常量
    @Native public static final int SIZE = 32;

总结:这个方法就是做了一些线程的相关判断

5添加线程addWorker

让我们看看添加线程的方法:

代码语言:javascript
复制
    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:  //这是goto语句  下面我会写一个demo讲解这玩意
        for (;;) {//大自旋检查线程池的状态。阻塞队列是否为空等判断
            int c = ctl.get();
            int rs = runStateOf(c);//线程池运行状态

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;//线程池被关或者任务为null(无任务可执行)
			
            for (;;) {//小自旋
                int wc = workerCountOf(c);
                //如果现有线程数大于最大值,或者大于等于最大线程数(主线程数)返回false
                //意味着线程都是满的
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                    //cas添加线程  线程+1
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                //如果失败了,继续自旋外层循环判断
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
        //开启一个线程,Worker实现了runnable接口
            w = new Worker(firstTask);//这里点进去看worker的run方法
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        //添加至wokers
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                //添加成功
                if (workerAdded) {
                    t.start();//启动线程,会调用我们线程的run接口,也就是我们worker的run
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

goto语句写法:

代码语言:javascript
复制
	retry:
	for (int i = 0; i < 3; i++) {
		for (int j = 3; j < 10; j++) {
			//if (j == 4) {
			// break retry; 
			//跳出外面循环 // 
		}
		if(j == 7){
		 	continue retry;
		  	//继续外面循环 
		}
		  System.out.println(i+":"+j); 
		} 
	}

总结: 方法开始就是一个两层的嵌套自旋,大自旋判断线程池状态,状态正常,不断的小自旋判断能不能加 能加,就开启一个新的线程,经过判断无异常加入到线程池,start开启

开启这个新的线程会执行这个线程的run方法,上面有写在哪里点进去,点进去后发现:

代码语言:javascript
复制
        public void run() {
            runWorker(this);
        }

继续点进去:

6运行新的线程runWorker

代码语言:javascript
复制
    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
        	//只要一直能获取到task,就一直会执行,不会关闭,所以线程也不会销毁,线程销毁只有当task为null
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                	//调用线程执行之前方法,这里可以重写,放一些我们自己的业务前置逻辑
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                    	//调用task的run方法,这里就是去执行我们的业务逻辑了
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                    	//调用线程执行之后方法,这里可以重写,放一些我们自己的业务后置逻辑
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

总结:通过自旋判断任务是否为空,不为空就去执行,为空就去取任务

7线程回收复用的关键:getTask():

看一下取任务的逻辑: getTask():

代码语言:javascript
复制
    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?
		//自旋获取
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);//获取线程池状态

            // Check if queue empty only if necessary.必要时检 查空,状态是否停止或者shutdown
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;//线程池状态异常或无任务返回null
            }
			//获取线程数量
            int wc = workerCountOf(c);

            // Are workers subject to culling?
            //线程数大于主线程数时,或者allowCoreThreadTimeOut参数为 true, allowCoreThreadTimeOut默认为false
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
			//超过最大线程,或者timed为true && (wc大于1个,并且任务队列为空)的时候
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                //线程数-1,并且返回null,该线程结束
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
            //线程复用的关键就在这里了↓↓↓↓↓↓↓↓↓↓↓↓
            	//如果timed是true,超过时间不阻塞,不然一直阻塞,不回收
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : //移除并返回队列头部的元素,如果为空,超过时间返回null
                    workQueue.take();//移除并返回队列头部的元素,如果为空,一直阻塞
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

8线程的回收

这里有一个面试问题: 线程池里的线程被销毁有什么条件?或者说线程池的线程什么时候被销毁? 上面的代码不是写了嘛,runWorker里的自旋判断,当gettask()为null也就是当无任务时,线程会减少(销毁) 那核心线程呢?核心线程什么时候销毁? 线程池被关闭,或者调用allowCoreThreadTimeOut方法,等待一个时间单位后会销毁(创建时传的变量)

代码语言:javascript
复制
        threadPoolExecutor.allowCoreThreadTimeOut(true);

9线程的复用

另外线程是怎么实现复用的? 一直不关闭,阻塞,等任务

好,至此为止,创建一个线程池然后添加线程执行任务的代码逻辑就结束了,不知道同学们学习到了多少呢? 如果你的公司要自己写一个线程池,你能不能参照这个写出来呢? 极其简化的图

![在这里插入图片描述](https://img-blog.csdnimg.cn/0af816025ba346dfa22611f642356a81.png
![在这里插入图片描述](https://img-blog.csdnimg.cn/0af816025ba346dfa22611f642356a81.png

10超过核心线程数小于最大线程数的那一撮所谓的临时线程

如果你只是背了八股文,各种所谓的面试秘籍会告诉你线程池有核心线程有临时线程,并发高时会创建临时线程帮忙,并发低时销毁这些临时线程,面试官问你?哪些是临时线程?你要是按八股文这样回答那就说明你没用过(起码没点进去看过代码),面试铁定是挂了。

其实没有什么临时线程,所谓的核心线程数是要保留几个线程

假如我们设置了核心数为3,最大数为10.

并发量刚经历一波高峰期,线程数量为我们的最大线程数10,然后这段时间并发很低,那超过核心线程数的线程会被回收,回收的是哪7个?

谁先执行完,谁先被回收被回收的就是所谓的临时的,最后剩下的3个就是核心的线程 核心线程只是几个一直被阻塞等待任务的线程而已 可能上一波并发高峰它还不是核心线程,但是它跑得慢,于是被留下来当核心了,下一波并发高峰,它先跑了,于是核心线程又换成了另外一批。

11拒绝策略

那线程已经达到最大线程数,如果满了,而且阻塞队列也满了,你的任务经过消峰限流等中间件的阻拦,还在源源不断的进来,已经处理不过来了,怎么办? 文章最开头,线程池创建时有个拒绝策略 阻塞队列已经满了,但是你还是源源不断地进来,会触发拒绝策略,报错 execute方法的末尾,调用了拒绝策略:

代码语言:javascript
复制
        //阻塞队列已满,添加失败,采用拒绝策略
        else if (!addWorker(command, false))
            reject(command);
代码语言:javascript
复制
    final void reject(Runnable command) {
        handler.rejectedExecution(command, this);
    }
代码语言:javascript
复制
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() + //就是这个错误
                                                 " rejected from " +
                                                 e.toString());
        }
    }

栗子:核心3最大5队列5,我用100个任务同时执行:

在这里插入图片描述
在这里插入图片描述

但是一般我们会重写拒绝策略,创建线程池时当成变量传入,毕竟并发一高就报错这谁顶得住啊!!!

只要实现一下RejectedExecutionHandler这个接口就可以了

代码语言:javascript
复制
public class ExecJavaTemplate implements RejectedExecutionHandler { 
	@Override 
	public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
		//在这里根据自己的业务写一个暂存的逻辑,然后塞回去即可
		//比较懒的话暂存策略也不用写,不断地塞回去就行,慢就慢点,别让它报错即可
		System.out.println("进入拒绝策略"); 
		executor.execute(r); //再次调用execute
	} 
}
在这里插入图片描述
在这里插入图片描述

那有人说了阻塞队列我设置成无限大,不就不会有上面的问题了。 如果阻塞队列是无限的,会发生什么? 首先我想到的就是OOM了,你的队列无限大,内存够不够? 不够你的内存不久溢出了,项目挂掉然后紧急排查错误,发现你这个开发将队列设置为无限大 好嘛,公司严格的话给你一个线上一级错误警告处理,这找谁说理去

12线程设置多少合理

线程数 = CPU可用核心数/(1 - 阻塞系数),其中阻塞系数的取值在0和1之间。阻塞系数=阻塞时间/(阻塞时间+计算时间)。

  1. 线程的 CPU 耗时所占比例越高,就需要越少的线程
  2. 线程的 IO 耗时所占比例越高,就需要越多的线程
  3. 针对不同的程序,进行对应的实际测试就可以得到最合适的选择
  4. 线程数 >= CPU 核心数
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022-07-24,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 文章目录
  • 1为什么要使用线程池
  • 2几种常用线程池介绍
  • 3从初始化开始
  • 4执行任务execute
  • 5添加线程addWorker
  • 6运行新的线程runWorker
  • 7线程回收复用的关键:getTask():
  • 8线程的回收
  • 9线程的复用
  • 10超过核心线程数小于最大线程数的那一撮所谓的临时线程
  • 11拒绝策略
  • 12线程设置多少合理
相关产品与服务
消息队列 TDMQ
消息队列 TDMQ (Tencent Distributed Message Queue)是腾讯基于 Apache Pulsar 自研的一个云原生消息中间件系列,其中包含兼容Pulsar、RabbitMQ、RocketMQ 等协议的消息队列子产品,得益于其底层计算与存储分离的架构,TDMQ 具备良好的弹性伸缩以及故障恢复能力。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档