前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Java并发-线程池

Java并发-线程池

作者头像
lpe234
发布2021-03-02 15:34:06
4270
发布2021-03-02 15:34:06
举报
文章被收录于专栏:若是烟花若是烟花

Java中线程池是运用场景最多的并发框架,几乎所有需要异步或者并发执行任务的程序都可以使用线程池。

合理使用线程池可以带来3个好处:

  • 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  • 提高响应速度:当任务到达时,任务可以不需要等到线程创建就能立即执行。
  • 提高线程的可管理性:使用线程池可以进行统一分配、调优和监控。

1 线程池的使用

1.1 线程池的创建

推荐使用ThreadPoolExecutor创建线程池。

代码语言:javascript
复制
/**
 * Creates a new {@code ThreadPoolExecutor} with the given initial
 * parameters.
 *
 * @param corePoolSize the number of threads to keep in the pool, even
 *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
 * @param maximumPoolSize the maximum number of threads to allow in the
 *        pool
 * @param keepAliveTime when the number of threads is greater than
 *        the core, this is the maximum time that excess idle threads
 *        will wait for new tasks before terminating.
 * @param unit the time unit for the {@code keepAliveTime} argument
 * @param workQueue the queue to use for holding tasks before they are
 *        executed.  This queue will hold only the {@code Runnable}
 *        tasks submitted by the {@code execute} method.
 * @param threadFactory the factory to use when the executor
 *        creates a new thread
 * @param handler the handler to use when execution is blocked
 *        because the thread bounds and queue capacities are reached
 * @throws IllegalArgumentException if one of the following holds:<br>
 *         {@code corePoolSize < 0}<br>
 *         {@code keepAliveTime < 0}<br>
 *         {@code maximumPoolSize <= 0}<br>
 *         {@code maximumPoolSize < corePoolSize}
 * @throws NullPointerException if {@code workQueue}
 *         or {@code threadFactory} or {@code handler} is null
 */
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
  if (corePoolSize < 0 ||
      maximumPoolSize <= 0 ||
      maximumPoolSize < corePoolSize ||
      keepAliveTime < 0)
    throw new IllegalArgumentException();
  if (workQueue == null || threadFactory == null || handler == null)
    throw new NullPointerException();
  this.corePoolSize = corePoolSize;
  this.maximumPoolSize = maximumPoolSize;
  this.workQueue = workQueue;
  this.keepAliveTime = unit.toNanos(keepAliveTime);
  this.threadFactory = threadFactory;
  this.handler = handler;
}

线程池创建参数解释:

  1. corePoolSize(核心线程数):当线程池线程数量小于核心线程数时,即使有空闲线程也会创建线程,只有达到核心线程数时才不会创建。可以调用prestartCoreThreadprestartAllCoreThreads来预先创建一个或全部核心线程。核心线程默认不会被销毁,除非主动调用allowCoreThreadTimeOut(true)
  2. maximumPoolSize(最大线程数):线程池允许的最大线程数。核心线程数+非核心线程数=最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则会创建新线程执行新任务。如果使用了无界队列,则该参数基本无效。
  3. keepAliveTime,unit(存活时间):线程池的工作线程空闲后,保持的存活时间。默认仅对非核心线程有效,除非主动调用allowCoreThreadTimeOut(true)
  4. workQueue(工作队列):线程阻塞队列,只会存放由execute提交的Runnable任务。①ArrayBlockingQueue:基于数组的有界阻塞队列。②LinkedBlockingQueue:基于链表的阻塞队列,吞吐量通常高于ArrayBlockingQueue。③SynchronousQueue:不存储元素的阻塞队列,每个插入必须等待另一个线程调用移除操作。④PriorityBlockingQueue:具有优先级的无限阻塞队列。
  5. threadFactory(线程工厂):设置创建线程的工厂,创建线程池时要指定有意义的线程名称,方便出错时回溯。
  6. handler(拒绝策略):当队列和线程池都满,或者线程池不处于RUNNING状态时,会使用该策略,默认有4种实现:①AbortPolicy(默认):直接抛出RejectedExecutionException异常。②DiscardPolicy:静默丢弃当前提交的任务。③DiscardOldestPolicy:线程池未关闭的情况下,丢弃队列中最久未被执行的任务,并执行当前任务。④CallerRunsPolicy:线程池未关闭的情况下,使用调用者的线程去执行当前任务。
1.2 提交任务

线程池提交任务两个方法。

  • execute方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功。
  • submit方法用于提交需要有返回值的任务。线程池会返回一个Future类型对象,该对象可以判断任务是否执行成功,并且可以通过Future.get()来获取返回值,get()会阻塞当前线程直到任务完成,get(long timeout, TimeUnit unit)会阻塞当前线程一段时间后立即返回。
1.3 线程池的关闭

可调用shutdown或者shutdownNow方法来关闭线程池。**原理:**遍历线程池中的线程,逐个调用线程的interrupt方法来中断线程,所以无法响应中断的任务可能永远无法终止。shutdownNow首先向线程池置为STOP状态,然后停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表;shutdown只是将线程池的状态设置成SHUTDOWN状态,然后中断所有没有正在执行任务的线程,之前提交的任务(正在执行的和队列中的)会被执行。

只要调用了shutdown或者shutdownNow任意方法,isShutdown都会返回true。当所有任务都已关闭后,才表示线程池关闭,isTerminated方法返回true。通常调用shutdown来关闭线程池,如果不一定要任务执行完,则可以调用shutdownNow方法。

1.4 线程池的配置

要想合理的配置线程池,先要对任务特性进行分析。

  • 任务性质:CPU密集、IO密集、混合型
  • 任务执行时间:长、中、短
  • 任务依赖性:是否依赖其他系统资源,如数据库连接、外部系统API调用
  • 任务优先级:高、中、低

性质不同的任务可以使用不同规模的线程池分开处理。CPU密集型任务应配置尽可能小的线程,如N或N+1。IO密集型的任务并不是一直在执行任务,则应配置尽可能多的线程。可以大概预估请求等待时间(WT)和服务时间(ST)之间的比例。线程池大小设置为N*(1+WT/ST)。

优先级不同的任务可以使用优先级队列PriorityBlockingQueue来处理,可以让高优先级的任务先执行。为了防止优先级低的任务可能永远都不能执行。可以将等待时间加入权重计算优先级。

建议使用有界队列,能增加系统的稳定性和预警能力。

1.5 线程池的监控

如果系统中存在大量使用线程池,则由必要对线程池进行监控,方便在出现问题时,可根据线程池使用情况进行快速定位。可根据线程池提供的参数进行监控,常用属性如下:

  • getPoolSize:线程池当前线程数
  • getCorePoolSize:线程池核心线程数
  • getActiveCount:正在执行任务的线程数量
  • getCompletedTaskCount:已完成任务的大致总数(任务和线程的状态可能在计算过程中有所变化)
  • getTaskCount:全部任务的大致总数
  • getQueue:当前线程池的任务队列
  • getLargestPoolSize:线程池曾经最大线程数量
  • getMaximumPoolSize:线程池允许最大线程数
  • getKeepAliveTime:线程池线程存活时间
  • isShutdown:线程池是否为关闭(SHUTDOWN状态)
  • isTerminated:线程池是否为TERMINATED状态

通过扩展线程池进行监控。可以通过继承的方式来自定义线程池,重写beforeExecute(Thread t, Runnable r)afterExecute(Runnable r, Throwable t)terminated(),可以在任务执行前、执行后和线程池关闭前执行一些代码进行监控。

代码语言:javascript
复制
class TPE extends ThreadPoolExecutor {
	// 记录Runnable任务起始执行时间
  private ConcurrentHashMap<Integer, Long> beginTimeMaps = new ConcurrentHashMap<>();
  
  @Override
  protected void beforeExecute(Thread t, Runnable r) {
    // 设置起始时间
    beginTimeMaps.put(r.hashCode(), new Date().getTime());
  }
  
  @Override
  protected void afterExecute(Runnable r, Throwable t) {
    // 获取起始时间  
    Long begin = beginTimeMaps.remove(r.hashCode());
    Long end = new Date().getTime();
    System.out.println(end-begin);
  }
  
  @Override
  protected void terminated() {
    // code
  }

  @Override
  public void shutdown() {
    // code
    super.shutdown();
  }

  @Override
  public List<Runnable> shutdownNow() {
    // code
    return super.shutdownNow();
  }
}

2 线程池原理

2.1 线程池状态及线程数
代码语言:javascript
复制
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

// Packing and unpacking ctl
// 运行状态
private static int runStateOf(int c)     { return c & ~CAPACITY; }
// 工作线程数
private static int workerCountOf(int c)  { return c & CAPACITY; }
// 运行状态和工作线程数
private static int ctlOf(int rs, int wc) { return rs | wc; }

线程池控制状态,ctl是一个原子integer包含两个字段:workerCount线程池内有效线程数量,runState线程池状态。高3位存储runState,低29位存储workerCount

线程池的状态如下:

  • RUNNING:接受新任务,并且能够处理队列任务
  • SHUTDOWN:不接收新任务,但能处理队列任务
  • STOP:不接收新任务,不处理队列任务
  • TIDYING:所有任务都已终止,workCount为0,线程进入该状态后会调用terminated()钩子函数
  • TERMINATED:terminated()函数已经调用完毕
2.2 关键方法
2.2.1 execute方法代码

在将来的某个时刻执行给定的任务,该任务可能被新线程执行也可能被线程池中已存在的线程执行。如果无法提交任务执行,因为执行器已经关闭或者达到最大容量,则该任务由当前的RejectedExecutionHandler处理。

代码语言:javascript
复制
public void execute(Runnable command) {
  if (command == null)
    throw new NullPointerException();
  /*
   * Proceed in 3 steps:
   *
   * 1. If fewer than corePoolSize threads are running, try to
   * start a new thread with the given command as its first
   * task.  The call to addWorker atomically checks runState and
   * workerCount, and so prevents false alarms that would add
   * threads when it shouldn't, by returning false.
   *
   * 2. If a task can be successfully queued, then we still need
   * to double-check whether we should have added a thread
   * (because existing ones died since last checking) or that
   * the pool shut down since entry into this method. So we
   * recheck state and if necessary roll back the enqueuing if
   * stopped, or start a new thread if there are none.
   *
   * 3. If we cannot queue task, then we try to add a new
   * thread.  If it fails, we know we are shut down or saturated
   * and so reject the task.
   */
  // 获取线程池的runState和workerCount
  int c = ctl.get();
  // 若当前线程数小于核心线程数;则新建一个Worker并执行当前任务
  if (workerCountOf(c) < corePoolSize) {
    // 新增线程(command, core) core表示新增的是否为核心线程
    if (addWorker(command, true))
      return;
    // 添加核心线程失败,重新获取线程池状态
    c = ctl.get();
  }
  // 若当前线程数不小于核心线程数并且处于运行状态,则将任务放到阻塞队列中
  if (isRunning(c) && workQueue.offer(command)) {
    // 添加队列成功后,再次获取线程池状态
    int recheck = ctl.get();
    // 线程池不处于运行状态,并且成功移除任务。则执行拒绝策略
    if (! isRunning(recheck) && remove(command))
      // 执行拒绝策略
      reject(command);
    else if (workerCountOf(recheck) == 0)
      // 如果工作线程为0(线程池无线程),则新建一个无任务的线程
      addWorker(null, false);
  }
  // 非运行状态或放队列失败时,直接拒绝策略
  else if (!addWorker(command, false))
    // 执行拒绝策略
    reject(command);
}
2.2.2 addWorker方法

检查是否可根据当前线程池状态以及给定的边界(核心线程或最大线程)创建新的worker。firstTask为新线程需先执行的任务,如果为null的话则不执行。core如果为true则使用corePoolSize作为边界界定条件,为false则使用maximumPoolSize作为边界界定条件。

代码语言:javascript
复制
private boolean addWorker(Runnable firstTask, boolean core) {
  retry:
  for (;;) {
    // 线程池状态及线程池运行状态
    int c = ctl.get();
    int rs = runStateOf(c);

    // Check if queue empty only if necessary.
    // 当rs>=SHUTDOWN时 即SHUTDOWN(0)、STOP(1)、TIDYING(2)、TERMINATED(3),此状态不再接受新任务
    // 当rs=SHUTDOWN时,此时可创建线程条件如下:
    // 1. rs==SHUTDOWN,此时不再接受新任务,但可
    // 2. firstTask为空,不可再继续提交任务
    // 3. !workQueue.isEmpty(),为空的话则不再需要新线程
    if (rs >= SHUTDOWN &&
        ! (rs == SHUTDOWN &&
           firstTask == null &&
           ! workQueue.isEmpty()))
      return false;

    for (;;) {
      // 当前工作线程数
      int wc = workerCountOf(c);
      // 当线程数达到CAPACITY,
      // 或者core为true时达到核心线程数,
      // 或者core为false时达到最大线程数,
      // 不再创建新线程
      if (wc >= CAPACITY ||
          wc >= (core ? corePoolSize : maximumPoolSize))
        return false;
      // 使用CAS增加workerCount
      if (compareAndIncrementWorkerCount(c))
        // 成功修改workerCount,跳出最外层for循环
        break retry;
      
      // 重新获取线程池状态
      c = ctl.get();  // Re-read ctl
      // 线程池状态已被修改,继续外传for循环
      if (runStateOf(c) != rs)
        continue retry;
      // else CAS failed due to workerCount change; retry inner loop
    }
  }

  boolean workerStarted = false;
  boolean workerAdded = false;
  Worker w = null;
  try {
    // 根据firstTask创建新worker
    w = new Worker(firstTask);
    // 拿到当前worker的线程
    final Thread t = w.thread;
    if (t != null) {
      final ReentrantLock mainLock = this.mainLock;
      mainLock.lock();
      try {
        // Recheck while holding lock.
        // Back out on ThreadFactory failure or if
        // shut down before lock acquired.
        int rs = runStateOf(ctl.get());

        // rs<SHUTDOWN即为RUNNING,线程池处于运行状态
        // rs==SHUTDOWN时,因为可以继续执行队列中的任务,故允许添加无任务的worker
        if (rs < SHUTDOWN ||
            (rs == SHUTDOWN && firstTask == null)) {
          if (t.isAlive()) // precheck that t is startable
            throw new IllegalThreadStateException();
          workers.add(w);
          int s = workers.size();
          // 更新getLargestPoolSize,线程池中出现过最大线程数
          if (s > largestPoolSize)
            largestPoolSize = s;
          workerAdded = true;
        }
      } finally {
        mainLock.unlock();
      }
      if (workerAdded) {
        // worker添加成功,启动线程
        t.start();
        workerStarted = true;
      }
    }
  } finally {
    // worker启动失败,则roll back cleanly.
    if (! workerStarted)
      addWorkerFailed(w);
  }
  return workerStarted;
}

t.start()即为worker.thread.start()Worker类本身实现了Runnable接口,在Worker初始化时,会执行this.thread = getThreadFactory().newThread(this);。也就是t.start()-->worker.run()

2.2.3 Worker类

线程池中的线程都会被封装成Worker实例,Worker继承于AbstractQueuedSynchronizer(AQS),实现了Runnalbe接口。

代码语言:javascript
复制
private final class Worker
  extends AbstractQueuedSynchronizer
  implements Runnable
{
  /**
   * This class will never be serialized, but we provide a
   * serialVersionUID to suppress a javac warning.
   */
  private static final long serialVersionUID = 6138294804551838833L;

  /** Thread this worker is running in.  Null if factory fails. */
  // ThreadFactory.newThread创建的线程实例
  final Thread thread;
  /** Initial task to run.  Possibly null. */
  // 要运行的初始任务,可能为空
  Runnable firstTask;
  /** Per-thread task counter */
  // 每个线程都会记录的完成任务数
  volatile long completedTasks;

  /**
   * Creates with given first task and thread from ThreadFactory.
   * @param firstTask the first task (null if none)
   */
  Worker(Runnable firstTask) {
    // 禁止中断,直到runWorker方法执行
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    // 通过ThreadFactory创建线程实例,Worker自身作为Runnable来创建线程实例
    this.thread = getThreadFactory().newThread(this);
  }

  /** Delegates main run loop to outer runWorker  */
  // t.start() 将会调用该方法
  public void run() {
    // 执行任务核心方法
    runWorker(this);
  }

  // Lock methods
  //
  // The value 0 represents the unlocked state.
  // The value 1 represents the locked state.

  protected boolean isHeldExclusively() {
    return getState() != 0;
  }

  protected boolean tryAcquire(int unused) {
    if (compareAndSetState(0, 1)) {
      setExclusiveOwnerThread(Thread.currentThread());
      return true;
    }
    return false;
  }

  protected boolean tryRelease(int unused) {
    setExclusiveOwnerThread(null);
    setState(0);
    return true;
  }

  public void lock()        { acquire(1); }
  public boolean tryLock()  { return tryAcquire(1); }
  public void unlock()      { release(1); }
  public boolean isLocked() { return isHeldExclusively(); }

  void interruptIfStarted() {
    Thread t;
    if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
      try {
        t.interrupt();
      } catch (SecurityException ignore) {
      }
    }
  }
}
2.2.4 runWorker方法

主工作线程循环,不断的从队列获取任务并且执行他们。

代码语言:javascript
复制
final void runWorker(Worker w) {
  Thread wt = Thread.currentThread();
  Runnable task = w.firstTask;
  w.firstTask = null;
  // unlock 将state由-1置为0。
  w.unlock(); // allow interrupts
  boolean completedAbruptly = true;
  try {
    // 初始任务不为null,或者从队列中获取到了任务
    while (task != null || (task = getTask()) != null) {
      // worker加锁
      w.lock();
      // If pool is stopping, ensure thread is interrupted;
      // if not, ensure thread is not interrupted.  This
      // requires a recheck in second case to deal with
      // shutdownNow race while clearing interrupt
      // 若线程池正在停止,要保证当前线程是中断状态
      // 若不是的话,则要保证当前线程不是中断状态
      if ((runStateAtLeast(ctl.get(), STOP) ||
           (Thread.interrupted() &&
            runStateAtLeast(ctl.get(), STOP))) &&
          !wt.isInterrupted())
        wt.interrupt();
      try {
        // 钩子函数,任务执行前执行
        beforeExecute(wt, task);
        Throwable thrown = null;
        try {
          // 执行任务 =======>
          task.run();
        } catch (RuntimeException x) {
          thrown = x; throw x;
        } catch (Error x) {
          thrown = x; throw x;
        } catch (Throwable x) {
          thrown = x; throw new Error(x);
        } finally {
          // 钩子函数,任务执行后执行
          afterExecute(task, thrown);
        }
      } finally {
        // 任务执行完,清空task任务
        task = null;
        // 已完成任务+1
        w.completedTasks++;
        // worker解锁
        w.unlock();
      }
    }
    // 线程正常退出
    completedAbruptly = false;
  } finally {
    // 线程退出 completedAbruptly标识正常退出/非正常退出
    processWorkerExit(w, completedAbruptly);
  }
}

Thread.interrupted()判断线程是否中断,同时复位中断状态。

runWorker()方法的执行流程:

  1. while循环调用getTask()方法,从任务队列中获取任务(Worker新建时可能有firstTask)
  2. 若线程池正在停止,要保证当前线程是中断状态,否则要保证当前线程不是中断状态
  3. 调用task.run()执行任务
  4. 若task为null则退出循环,执行processWorkerExit
  5. runWorker执行完,即代表Worker中run方法执行完毕,销毁线程
2.2.5 getTask方法

根据当前配置,阻塞或者超时等待任务。发生以下情况时,会返回null。task为null时,则销毁线程。

  1. 当前线程池线程数超过最大线程数(maximumPoolSize)。调用setMaximumPoolSize方法。
  2. 线程池处于STOP状态。调用了shutdownNow方法。
  3. 线程池处于SHUTDOWN状态,并且workQueue队列为空。
  4. 当前worker获取任务等待超时(有可能是allowCoreThreadTimeOutworkerCount > corePoolSize)。
代码语言:javascript
复制
private Runnable getTask() {
  boolean timedOut = false; // Did the last poll() time out?

  for (;;) {
    // 获取当前线程池状态、运行状态
    int c = ctl.get();
    int rs = runStateOf(c);

    // Check if queue empty only if necessary.
    // rs>=STOP,线程池调用`shutdownNow`后,不再处理新任务
    // rs>=SHUTDOWN,workQueue消费完后,不再接受新任务
    if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
      decrementWorkerCount();
      return null;
    }

    // 当前工作线程数
    int wc = workerCountOf(c);

    // Are workers subject to culling?
    // 是否存在超时校验标识
    // `allowCoreThreadTimeOut`允许核心线程超时 或 工作线程数大于核心线程数
    boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

    // wc>maximumPoolSize,可能调用了`setMaximumPoolSize`,修改了最大线程数
    // timed&&timedOut,当前线程需要进行超时控制,并且上次发生了超时
    // 线程数大于1,或者阻塞队列为空
    if ((wc > maximumPoolSize || (timed && timedOut))
        && (wc > 1 || workQueue.isEmpty())) {
      // 尝试扣减工作线程
      if (compareAndDecrementWorkerCount(c))
        return null;
      continue;
    }

    try {
      // 是否需要进行超时判断 timed
      // 若为true,则需要进行超时判断,通过阻塞队列的poll方法来进行超时控制,超时则返回null
      // 若为false,则通过take获取,阻塞队列直到workQueue不为空
      Runnable r = timed ?
        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
      workQueue.take();
      if (r != null)
        return r;
      // r为null,已经超时,设置标识位
      timedOut = true;
    } catch (InterruptedException retry) {
      // 发生了中断,未发生超时,设置标识位
      timedOut = false;
    }
  }
}

getTask方法返回null时,跳出循环,然后执行processWorkerExit方法进行退出。

2.2.6 processWorkerExit方法
代码语言:javascript
复制
private void processWorkerExit(Worker w, boolean completedAbruptly) {
  // completedAbruptly是否出现异常,将workerCount减1
  if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
    decrementWorkerCount();

  final ReentrantLock mainLock = this.mainLock;
  mainLock.lock();
  try {
    // 已完成任务数
    completedTaskCount += w.completedTasks;
    workers.remove(w);
  } finally {
    mainLock.unlock();
  }

  tryTerminate();

  int c = ctl.get();
  // 若线程处于RUNNING或SHUTDOWN状态时,若worker异常结束,则直接添加空任务worker
  if (runStateLessThan(c, STOP)) {
    if (!completedAbruptly) {
      // 允许核心线程超时时,如果阻塞队列不为空,则至少保留一个worker
      int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
      if (min == 0 && ! workQueue.isEmpty())
        min = 1;
      // 若不允许核心线程池超时,则workerCount不少于corePoolSize
      if (workerCountOf(c) >= min)
        return; // replacement not needed
    }
    addWorker(null, false);
  }
}
2.2.7 shutdown方法
代码语言:javascript
复制
public void shutdown() {
  final ReentrantLock mainLock = this.mainLock;
  mainLock.lock();
  try {
    // 安全策略校验
    checkShutdownAccess();
    // 切换状态为SHUTDOWN
    advanceRunState(SHUTDOWN);
    // 中断空闲线程
    interruptIdleWorkers();
    // 钩子
    onShutdown(); // hook for ScheduledThreadPoolExecutor
  } finally {
    mainLock.unlock();
  }
  // 尝试结束线程池
  tryTerminate();
}
2.2.8 shutdownNow方法
代码语言:javascript
复制
public List<Runnable> shutdownNow() {
  List<Runnable> tasks;
  final ReentrantLock mainLock = this.mainLock;
  mainLock.lock();
  try {
    checkShutdownAccess();
    advanceRunState(STOP);
    // 中断所有线程(空闲、非空闲)
    interruptWorkers();
    // 取出队列中所有未被执行的任务
    tasks = drainQueue();
  } finally {
    mainLock.unlock();
  }
  tryTerminate();
  return tasks;
}

3 几种常见线程池

3.1 Exectors线程池

Executors提供了几个静态方法来创建线程池。

3.1.1 newFixedThreadPool
代码语言:javascript
复制
public static ExecutorService newFixedThreadPool(int nThreads) {
  return new ThreadPoolExecutor(nThreads, nThreads,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>());
}

核心线程数量和总线程数相同,都是传入的参数nThreads,所以只能创建核心线程。因为LinkedBlockingQueue的默认大小是Integer.MAX_VALUE,故核心线程空闲则由其处理,否则入队等待直到核心线程空闲。keepAliveTime设置为0L,多余的线程将会被立即停止。

适用于为满足资源管理需求,需要限制当前线程数量的应用场景,适用于负载比较高的服务器。

3.1.2 newSingleThreadExecutor
代码语言:javascript
复制
public static ExecutorService newSingleThreadExecutor() {
  return new FinalizableDelegatedExecutorService
    (new ThreadPoolExecutor(1, 1,
                            0L, TimeUnit.MILLISECONDS,
                            new LinkedBlockingQueue<Runnable>()));
}

核心线程数和最大线程数都为1,使用无界队列。任意时刻最多只有一个线程执行任务,多余任务会被缓冲至队列中。

适用于保证顺序的执行各任务;并且在任意时间点,不会有多线程是活动的应用场景。

3.1.3 newCachedThreadPool
代码语言:javascript
复制
public static ExecutorService newCachedThreadPool() {
  return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                60L, TimeUnit.SECONDS,
                                new SynchronousQueue<Runnable>());
}

核心线程数为0,最大线程数为Integer.MAX_VALUEkeepAliveTime为60秒,空闲线程超时将会被终止。阻塞队列SynchronousQueue是一个没有容量的阻塞队列,插入数据时必须等待一个线程来获取数据、否则就会阻塞。没有了队列的缓冲,提交的任务会被尽快的分配线程执行。

适用于很多短期的异步任务的小程序,或者是负载较轻的服务器。

3.1.4 newScheduledThreadPool
代码语言:javascript
复制
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
  return new ScheduledThreadPoolExecutor(corePoolSize);
}

public ScheduledThreadPoolExecutor(int corePoolSize) {
  super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
        new DelayedWorkQueue());
}

支持定时或周期性的任务执行。阻塞队列使用DelayedWorkQueue

适用于需要多个后台线程执行周期任务,同时为了满足资源管理的需求而需要限制后台线程数量的应用场景。

3.2 Exectors弊端
  • newFixedThreadPoolnewSingleThreadExecutor:阻塞队列使用LinkedBlockingQueue,其默认容量为Integer.MAX_VALUE,若任务处理较慢,则会引起消息堆积问题,消耗大量内存甚至触发OOM。
  • newCachedThreadPoolnewScheduledThreadPool:最大线程数为Integer.MAX_VALUE,可能会创建很多的线程,甚至导致OOM。

4 线程池注意事项

  • 自定义线程工厂ThreadFactory,指定有意义的线程名称,方便出错时回溯。
  • 使用Exectors时,避免出现任务堆积线程堆积情况。最好使用ThreadPoolExecutor显示的创建线程池。
  • 若线程池中使用到ThreadLocal,必须主动回收。
  • 最好有有效的监控、日志等记录信息。方便异常处理。
  • 线程池大小设置要根据任务类型进行设置,根据任务运行情况、系统负载、资源利用率进行调整。
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1 线程池的使用
    • 1.1 线程池的创建
      • 1.2 提交任务
        • 1.3 线程池的关闭
          • 1.4 线程池的配置
            • 1.5 线程池的监控
            • 2 线程池原理
              • 2.1 线程池状态及线程数
                • 2.2 关键方法
                  • 2.2.1 execute方法代码
                  • 2.2.2 addWorker方法
                  • 2.2.3 Worker类
                  • 2.2.4 runWorker方法
                  • 2.2.5 getTask方法
                  • 2.2.6 processWorkerExit方法
                  • 2.2.7 shutdown方法
                  • 2.2.8 shutdownNow方法
              • 3 几种常见线程池
                • 3.1 Exectors线程池
                  • 3.1.1 newFixedThreadPool
                  • 3.1.2 newSingleThreadExecutor
                  • 3.1.3 newCachedThreadPool
                  • 3.1.4 newScheduledThreadPool
                • 3.2 Exectors弊端
                • 4 线程池注意事项
                相关产品与服务
                云开发 CloudBase
                云开发(Tencent CloudBase,TCB)是腾讯云提供的云原生一体化开发环境和工具平台,为200万+企业和开发者提供高可用、自动弹性扩缩的后端云服务,可用于云端一体化开发多种端应用(小程序、公众号、Web 应用等),避免了应用开发过程中繁琐的服务器搭建及运维,开发者可以专注于业务逻辑的实现,开发门槛更低,效率更高。
                领券
                问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档