前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >线程池

线程池

原创
作者头像
AlbertZhang
修改2020-11-19 18:13:26
6480
修改2020-11-19 18:13:26
举报
文章被收录于专栏:技术文献技术文献

为什么要用线程池(好处)


  • 降低资源消耗(线程可重用)。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  • 提高响应速度(因为线程池随程序的启动而创建,普通的线程创建是一个耗时操作)。当任务到达时,任务可以不需要的等到线程创建就能立即执行。
  • 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
  • **提供更加强大的功能。**线程池具有扩展性,允许程序员自行添加功能,比如ScheduleThreadPoolExectuor就提供了强大的延期执行功能等

池化思想带来的好处是什么


不光是线程池,池化思想在诸多地方有着很好的应用,比如对象池、连接池等等。。一般运用池化思想的都是一些比较消耗系统资源的操作,通过池化,可以降低内存消耗,并且可以进行复用操作,提高效率。同时池化还可以统一的对资源进行管理,控制他们的创建与销毁。

创建线程池


使用Executors创建

首先最简单的方式,是使用juc包提供的Executors进行创建,这个类为我们提供了几种比较简单的线程池对象

  • FixedThreadPool : 该方法返回一个固定线程数量的线程池。该线程池中的线程数量始终不变。当有一个新的任务提交时,线程池中若有空闲线程,则立即执行。若没有,则新的任务会被暂存在一个任务队列中,待有线程空闲时,便处理在任务队列中的任务。
image-20200825154540651
image-20200825154540651
  • SingleThreadExecutor: 方法返回一个只有一个线程的线程池。若多余一个任务被提交到该线程池,任务会被保存在一个任务队列中,待线程空闲,按先入先出的顺序执行队列中的任务。
image-20200825154755108
image-20200825154755108
  • CachedThreadPool: 该方法返回一个可根据实际情况调整线程数量的线程池。线程池的线程数量不确定,但若有空闲线程可以复用,则会优先使用可复用的线程。若所有线程均在工作,又有新的任务提交,则会创建新的线程处理任务。所有线程在当前任务执行完毕后,将返回线程池进行复用。
image-20200825154813417
image-20200825154813417
  • ScheduleThreadPool: 返回一个带时间调度功能(比如定时执行等功能)的线程池。
image-20200825160315014
image-20200825160315014

但是但是但是

image-20200825160644060
image-20200825160644060

《阿里巴巴Java开发手册》中强制线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险

为什么阿里禁用Executors创建线程池

答案很简单,通过上面的源码就可以看到为什么:

  1. **FixedThreadPool 和 SingleThreadExecutor **都默认new了一个LinkedBlockingQueue,看下他的实现,可以看到他创建了一个链式的阻塞队列,特么容量是 MAX_VALUE !!!这样当大量的任务涌入时,队列里面就会积攒很多很多的任务,造成OOM
  1. CachedThreadPool 和 ScheduledThreadPool 都传入了Integer.MAX_VALUE作为线程池最大容量(maximumPoolSize,下面会详细介绍,先不急),那就有可能出创建出大量的线程!!!造成OOM

综上所述,不推荐使用Executors创建线程,当然上面说的其实只是一部分,阿里的规范其实更多的是希望程序员自己直接使用ThreadPoolExecutor创建线程池,是为了让程序员可以自己完全控制掌握自己的线程池。

创建线程池的正确姿势

仔细看看上面Executors提供的几个常用的线程池,看到他们的实现,其实都是调用了new ThreadPoolExecutor(xxxxx),只是参数上做了些小小的区别。所以最正确的创建方式就是直接使用ThreadPoolExecutor类进行线程池创建。

查看ThreadPoolExecutor源码可以看到,ThreadPoolExecutor提供了四个构造方法

image-20200825162405893
image-20200825162405893

但是前三个都是弟弟,因为他们都套娃,一个一个调用,最后实际都是调用的第四个构造方法

image-20200825150939105
image-20200825150939105

下面着重开始介绍ThreadPoolExecutor类

ThreadPoolExecutor七大参数

image-20200825162919489
image-20200825162919489

说句实话,作为一个基础非常不扎实的小白,,明明是中国人,明明是中国字,但我咋看不懂呢。。。

然后看到了大神举了一个非常生动的例子,瞬间跪了

有趣的例子

首先有一个银行(线程池),你是幕后boss(程序员),可以办理业务,平时没啥人,就是三个窗口办业务(这里每个业务窗口就是线程,3个就是corePoolSize)。有天办业务的人有点多,三个柜台都满了,大堂经理就安排新来的客户在候客区等待(把任务塞进阻塞队列),鬼知道什么情况,把业务的人越来越多,候客区也满了,经理就只好把在家休息的小王小李。。。都叫来加班,又开了2个窗口(现在一共5个窗口,是银行柜台的上限,就是maximumPoolSize),现在有五个窗口同事进行业务办理工作,再后来五个窗口满了,候客区也满了,新来的客户坐不下了,咋办?为了维持银行秩序,经理必须想几个办法改善现状(这里想的几个办法就是饱和策略,具体的饱和策略下面有介绍)

其实这个例子里面也包含了ThreadPoolExecutor的执行原理,先有个印象

这上面这个例子里面可以很清晰的知道几个核心参数的含义(除了keepAliveTime、unit、threadFactory),所以单独说一下

  • keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,**如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。**但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;
  • unit:参数keepAliveTime的时间单位,有7种取值,在TimeUnit类中有7种静态属性: TimeUnit.DAYS; //天 TimeUnit.HOURS; //小时 TimeUnit.MINUTES; //分钟 TimeUnit.SECONDS; //秒 TimeUnit.MILLISECONDS; //毫秒 TimeUnit.MICROSECONDS; //微妙 TimeUnit.NANOSECONDS; //纳秒 1234567
  • threadFactory:这是线程池中用来创建线程的线程工厂,一般的我们使用默认自带的线程工厂即可,其实很简单就是有些简单的验证,然后对创建的线程有一个统一的名称标识
image-20200825172553194
image-20200825172553194

ThreadPoolExecutor执行原理

  1. 首先判断核心线程数是否已满,如果没满,则调用一个线程处理Task任务,如果已满,则执行步骤(2)
  2. 这时会判断阻塞队列是否已满,如果阻塞队列没满,就将Task任务加入到阻塞队列中等待执行,如果阻塞队列已满,则执行步骤(3)
  3. 判断是否大于最大线程数,如果小于最大线程数,则创建线程执行Task任务,如果大于最大线程数,则执行步骤(4)
  4. 这时会使用淘汰策略来处理无法执行的Task任务
image-20200826092649107
image-20200826092649107

通过查看execute方法的源代码,可以很清晰的看到这些执行流程(jdk也很清楚地在注释中写了这个执行过程)

代码语言:javascript
复制
public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        // 比较工作线程和核心线程池大小
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // 尝试将任务放入缓存队列
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        // 尝试创建新的线程
        else if (!addWorker(command, false))
            // 上述都失败了,就执行拒绝策略
            reject(command);
    }
12345678910111213141516171819202122232425262728293031323334353637383940414243

submit和execute

ThreadPoolExecutor中没有重写父类AbstractExecutorService的submit方法

JavaGuide里面有一个很好地描述:

image-20200826110235686
image-20200826110235686
线程池生命周期

线程池有五大生命周期

image-20200826095656495
image-20200826095656495

线程池生命周期的转换图如下:

image-20200826095727554
image-20200826095727554

代码内部是如何维护的呢?

ThreadPoolExecutor内部维护了有一个成员变量ctl,他是一个AtomicInteger类型的变量。它是对线程池运行状态和线程池中有效线程数量进行控制的字段,Integer值一共有32位,其中高3位表示”线程池状态”,低29位表示”线程池中的任务数量”

代码语言:javascript
复制
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

// Packing and unpacking ctl
// 通过位运算获取线程池运行状态
private static int runStateOf(int c)     { return c & ~CAPACITY; }
// 通过位运算获取线程池中有效的工作线程数
private static int workerCountOf(int c)  { return c & CAPACITY; }
// 初始化ctl变量值
private static int ctlOf(int rs, int wc) { return rs | wc; }
123456789101112131415161718

为什么用一个Integer来维护两个值

image-20200826165607081
image-20200826165607081

线程工作原理解析

这里的解析来自一个大神的博文,我做了一个整理,添加了我自己的一些见解:http://objcoding.com/2019/04/25/threadpool-running/

通过查看execute源码可以看到当线程池有余时会调用addWorker方法,创建线程执行方法,那么是如何进行的呢?继续向下探索

addWorker()

可以简单讲addWorker方法分为两个部分,一个是上面的一堆判断,然后就是主菜创建Worker

先看前部分,可以看出前部分是一个for循环,这个for循环的主要作用就是判断当前线程池的状态可不可以添加任务,特别说明了如果线程池处于SHUTDOWN状态时,可以继续执行阻塞队列中的任务,但不能继续往线程池中添加任务了;同时增加工作线程数量使用了AQS作同步,如果同步失败,则继续循环执行。

代码语言:javascript
复制
retry:
for (;;) {
  int c = ctl.get();
  // 获取线程池当前运行状态
  int rs = runStateOf(c);

  // 如果rs大于SHUTDOWN,则说明此时线程池不在接受新任务了
  // 如果rs等于SHUTDOWN,同时满足firstTask为空,且阻塞队列如果有任务,则继续执行任务
  // 也就说明了如果线程池处于SHUTDOWN状态时,可以继续执行阻塞队列中的任务,但不能继续往线程池中添加任务了
  if (rs >= SHUTDOWN &&
      ! (rs == SHUTDOWN &&
         firstTask == null &&
         ! workQueue.isEmpty()))
    return false;

  for (;;) {
    // 获取有效线程数量
    int wc = workerCountOf(c);
    // 如果有效线程数大于等于线程池所容纳的最大线程数(基本不可能发生),不能添加任务
    // 或者有效线程数大于等于当前限制的线程数,也不能添加任务
    // 限制线程数量有任务是否要核心线程执行决定,core=true使用核心线程执行任务
    if (wc >= CAPACITY ||
        // core是addWorker的第二个参数
        // 通过此处可以看出,core为true时,会判断当前有效线程的数量与核心线程数进行比较
        wc >= (core ? corePoolSize : maximumPoolSize))
      return false;
    // 使用AQS增加有效线程数量
    if (compareAndIncrementWorkerCount(c))
      break retry;
    // 如果再次获取ctl变量值
    c = ctl.get();  // Re-read ctl
    // 再次对比运行状态,如果不一致,再次循环执行
    if (runStateOf(c) != rs)
      continue retry;
    // else CAS failed due to workerCount change; retry inner loop
  }
}
12345678910111213141516171819202122232425262728293031323334353637

下半部分

下半部分的源码主要的作用是创建一个Worker对象,并将新的任务装进Worker中,开启同步将Worker添加进workers中,这里需要注意workers的数据结构为HashSet,非线程安全,所以操作workers需要加同步锁。添加步骤做完后就启动线程来执行任务了,继续往下看。

代码语言:javascript
复制
// 任务是否已执行
boolean workerStarted = false;
// 任务是否已添加
boolean workerAdded = false;
// 任务包装类,我们的任务都需要添加到Worker中
Worker w = null;
try {
  // 创建一个Worker
  w = new Worker(firstTask);
  // 获取Worker中的Thread值
  final Thread t = w.thread;
  if (t != null) {
    // 操作workers HashSet 数据结构需要同步加锁
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
      // Recheck while holding lock.
      // Back out on ThreadFactory failure or if
      // shut down before lock acquired.
      // 获取当前线程池的运行状态
      int rs = runStateOf(ctl.get());
      // rs < SHUTDOWN表示是RUNNING状态;
      // 如果rs是RUNNING状态或者rs是SHUTDOWN状态并且firstTask为null,向线程池中添加线程。
      // 因为在SHUTDOWN时不会在添加新的任务,但还是会执行workQueue中的任务
      // rs是RUNNING状态时,直接创建线程执行任务
      // 当rs等于SHUTDOWN时,并且firstTask为空,也可以创建线程执行任务,也说说明了SHUTDOWN状态时不再接受新任务
      if (rs < SHUTDOWN ||
          (rs == SHUTDOWN && firstTask == null)) {
        if (t.isAlive()) // precheck that t is startable
          throw new IllegalThreadStateException();
        workers.add(w);
        int s = workers.size();
        if (s > largestPoolSize)
          largestPoolSize = s;
        workerAdded = true;
      }
    } finally {
      mainLock.unlock();
    }
    // 启动线程执行任务
    if (workerAdded) {
      t.start();
      workerStarted = true;
    }
  }
} finally {
  if (! workerStarted)
    addWorkerFailed(w);
}
return workerStarted;
}
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051

可以很清晰的看到,当worker被创建好后直接丢到了workers中,然后进行了t.start操作,直接开始执行线程。然而t是worker的成员变量,所以具体如何执行的呢?继续向下看

Worker对象的构造

上述的addWorker中大量的出现了worker对象,那么到底什么是worker对象呢,一起来看看他的源码。

可以看出worker是threadpoolexecutor的内部类,有两个成员变量,firstTask和thread,通过他的构造方法可以看出,fristTask是在创建对象时被初始化的。

这里特别强调,firstTask是开启线程执行的首个任务,之后常驻在线程池中的线程执行的任务都是从阻塞队列中取出的,需要注意。

Worker继承自AQS(这个后面开坑再讲)实现了Runnable接口,核心就是这个run方法,所以最最根本的线程任务执行就是run方法,可以看到Worker重写的run方法里面调用了runWorker这个方法传入了当前对象。继续往下看

image-20200826151151712
image-20200826151151712
runWorker()

runWorker就更加的简单了,首先是取得worker对象的firstTask,这是初次情况,firstTask不为空,进入while循环会进行判断,初次直接使用firstTask,随后都是调用的getTask进行取任务。

下面的代码中注释的地方写的非常详细,有两个地方需要注意下 1. beforeExecute 2. afterExecute

这两个方法是ThreadPoolExecutor特意保留的两个方法,默认是没有实现的,他运行程序员对他进行实现,帮助我们完成一些线程执行时的一些小操作~

代码语言:javascript
复制
final void runWorker(Worker w) {
  Thread wt = Thread.currentThread();
  Runnable task = w.firstTask;
  w.firstTask = null;
  w.unlock(); // allow interrupts
  boolean completedAbruptly = true;
  try {
    // 循环从workQueue阻塞队列中获取任务并执行
    while (task != null || (task = getTask()) != null) {
      // 加同步锁的目的是为了防止同一个任务出现多个线程执行的问题
      w.lock();
      // 如果线程池正在关闭,须确保中断当前线程(这里就是调用shutdownnow会使正在执行中的线程结束的代码)
      if ((runStateAtLeast(ctl.get(), STOP) ||
           (Thread.interrupted() &&
            runStateAtLeast(ctl.get(), STOP))) &&
          !wt.isInterrupted())
        wt.interrupt();
      try {
        // 执行任务前可以做一些操作
        beforeExecute(wt, task);  // 注意这个方法
        Throwable thrown = null;
        try {
          // 执行任务
          task.run();
        } catch (RuntimeException x) {
          thrown = x; throw x;
        } catch (Error x) {
          thrown = x; throw x;
        } catch (Throwable x) {
          thrown = x; throw new Error(x);
        } finally {
          // 执行任务后可以做一些操作
          afterExecute(task, thrown); // 注意这个方法
        }
      } finally {
        // 将task置为空,让线程自行调用getTask()方法从workQueue阻塞队列中获取任务
        task = null;
        // 记录Worker执行了多少次任务
        w.completedTasks++;
        w.unlock();
      }
    }
    completedAbruptly = false;
  } finally {
    // 线程回收过程
    processWorkerExit(w, completedAbruptly); 
  }
}
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748
保证核心线程不被销毁的关键

再看这里的解析的时候,我同样搜集了很多网上的解释,稍微有点点晦涩,我按照我的理解从新写了一段

首先问题是如何保证核心线程不被销毁,那么先看看什么情况下会被销毁?

没错就是上面这段runWorker()中的最后,当while循环退出时,会执行processWorkerExit操作这个操作就是销毁的核心

代码语言:javascript
复制
 private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            // 可以看到这里将worker对象从set中移除,销毁了当前的工作线程对象
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }

        tryTerminate();

        int c = ctl.get();
        if (runStateLessThan(c, STOP)) {
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            addWorker(null, false);
        }
    }
12345678910111213141516171819202122232425262728

按照这个逻辑,必须要退出while循环的时候,他才会进行销毁,那么怎样才能退出while循环呢,再看看while循环的判断

代码语言:javascript
复制
task != null || (task = getTask()) != null
1

||左边不多说,主要看右边,当getTask获取不到对象的时候才会为false,所以控制它的关键在于getTask方法

重点研究一下(这里我看了另一个不错的博客:https://www.jianshu.com/p/8848860b9ad4),原作者对于getTask的代码做了一个简化分析

代码语言:javascript
复制
// 为分析而简化后的代码
private Runnable getTask() {
    boolean timedOut = false;
    for (;;) {
        int c = ctl.get();
        int wc = workerCountOf(c);

        // timed变量用于判断是否需要进行超时控制。
        // allowCoreThreadTimeOut默认是false,也就是核心线程不允许进行超时;
        // wc > corePoolSize,表示当前线程池中的线程数量大于核心线程数量;
        // 对于超过核心线程数量的这些线程,需要进行超时控制
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if (timed && timedOut) {
            // 如果需要进行超时控制,且上次从缓存队列中获取任务时发生了超时,那么尝试将workerCount减1,即当前活动线程数减1,
            // 如果减1成功,则返回null,这就意味着runWorker()方法中的while循环会被退出,其对应的线程就要销毁了,也就是线程池中少了一个线程了
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();

            // 注意workQueue中的poll()方法与take()方法的区别
            //poll方式取任务的特点是从缓存队列中取任务,最长等待keepAliveTime的时长,取不到返回null
            //take方式取任务的特点是从缓存队列中取任务,若队列为空,则进入阻塞状态,直到能取出对象为止

            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}
1234567891011121314151617181920212223242526272829303132333435363738

从以上代码可以看出,getTask()的作用是:

  • 如果当前活动线程数大于核心线程数,当去缓存队列中取任务的时候,如果缓存队列中没任务了,则等待keepAliveTime的时长,此时还没任务就返回null,这就意味着runWorker()方法中的while循环会被退出,其对应的线程就要销毁了,也就是线程池中少了一个线程了。因此只要线程池中的线程数大于核心线程数就会这样一个一个地销毁这些多余的线程。
  • 如果当前活动线程数小于等于核心线程数,同样也是去缓存队列中取任务,但当缓存队列中没任务了,就会进入阻塞状态,直到能取出任务为止,因此这个线程是处于阻塞状态的,并不会因为缓存队列中没有任务了而被销毁。这样就保证了线程池有N个线程是活的,可以随时处理任务,从而达到重复利用的目的。

所以allowCoreThreadTimeOut可以控制核心线程是否进行销毁

线程池关闭

ThreadPoolExecutor提供了两个方法进行线程池关闭操作:shutdown和shutdownNow

在上面的线程池生命周期中可以很好地看出这两个操作的区别

shutdown只是将线程池的状态设置为SHUTWDOWN状态,正在执行的任务(包含阻塞队列中等待的任务)会继续执行下去,没有被执行的则中断。

而shutdownNow则是将线程池的状态设置为STOP,正在执行的任务则被停止,没被执行任务的则返回。

image-20200826171835220
image-20200826171835220

缓存队列

阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。

使用不同的队列可以实现不一样的任务存取策略。在这里,我们可以再介绍下阻塞队列的成员:

image-20200826100448246
image-20200826100448246

拒绝策略

任务拒绝模块是线程池的保护部分,线程池有一个最大的容量,当线程池的任务缓存队列已满,并且线程池中的线程数目达到maximumPoolSize时,就需要拒绝掉该任务,采取任务拒绝策略,保护线程池。

ThreadPoolExecutor自带了4种饱和策略:

  1. AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。 (默认策略)
  2. DiscardPolicy:也是丢弃任务,但是不抛出异常。
  3. DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
  4. CallerRunsPolicy:由调用线程处理该任务

前面三个好理解一点,给第四个举个例子,上demo:

代码语言:javascript
复制
public class TestThreadPoolExecutor {

    public static void main(String[] args) {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                5, // corePoolSize
                5, // maximumPoolSize
                1, // 过期时间
                TimeUnit.SECONDS, 
                new LinkedBlockingDeque<>(1), // 阻塞队列
                Executors.defaultThreadFactory(), 
                new ThreadPoolExecutor.CallerRunsPolicy());
        for (int i = 0; i < 20; i++) {
            threadPoolExecutor.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(500); // 让线程处理的久一点
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(Thread.currentThread());
                }
            });
        }
        threadPoolExecutor.shutdown();
    }

}
12345678910111213141516171819202122232425262728

输出:

image-20200825153443613
image-20200825153443613

通过上面的例子可以看出,线程池任务满载的时候,一部分任务转由调用者执行(主线程)

如何选择合适的线程池大小

CPU 密集型任务

计算密集型,顾名思义就是应用需要非常多的CPU计算资源,在多核CPU时代,我们要让每一个CPU核心都参与计算,将CPU的性能充分利用起来,这样才算是没有浪费服务器配置,如果在非常好的服务器配置上还运行着单线程程序那将是多么重大的浪费。

这种任务消耗的主要是 CPU 资源,可以将线程数设置为 N(CPU 核心数)+1,比 CPU 核心数多出来的一个线程是为了防止线程偶发的缺页中断,或者其它原因导致的任务暂停而带来的影响。一旦任务暂停,CPU 就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间。

I/O 密集型任务

对于IO密集型的应用,就很好理解了,**我们现在做的开发大部分都是WEB应用,涉及到大量的网络传输,不仅如此,与数据库,与缓存间的交互也涉及到IO,一旦发生IO,线程就会处于等待状态,当IO结束,数据准备好后,线程才会继续执行。**因此从这里可以发现,对于IO密集型的应用,我们可以多设置一些线程池中线程的数量,这样就能让在等待IO的这段时间内,线程可以去做其它事,提高并发处理效率。

这种任务应用起来,系统会用大部分的时间来处理 I/O 交互,而线程在处理 I/O 的时间段内不会占用 CPU 来处理,这时就可以将 CPU 交出给其它线程使用。因此在 I/O 密集型任务的应用中,我们可以多配置一些线程,具体的计算方法是 2N。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 为什么要用线程池(好处)
    • 池化思想带来的好处是什么
    • 创建线程池
      • 使用Executors创建
        • 为什么阿里禁用Executors创建线程池
          • 创建线程池的正确姿势
          • ThreadPoolExecutor七大参数
            • 有趣的例子
            • ThreadPoolExecutor执行原理
              • submit和execute
                • 线程池生命周期
              • 线程工作原理解析
                • addWorker()
                • Worker对象的构造
                • runWorker()
                • 保证核心线程不被销毁的关键
              • 线程池关闭
              • 缓存队列
              • 拒绝策略
              • 如何选择合适的线程池大小
              相关产品与服务
              容器服务
              腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档