前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Java基础—线程池原理与使用

Java基础—线程池原理与使用

作者头像
码哥字节
发布2020-05-06 16:42:09
7620
发布2020-05-06 16:42:09
举报
文章被收录于专栏:Java 技术栈Java 技术栈

愿你越努力越幸运

「歇一歇,继续奔跑」

今天给大家讲解Java中线程池的相关知识,分别从常见的线程池面试点什么是线程池线程池种类线程池生命周期以及线程池使用及实现线程池使用情景阐述,有误之处望多多海涵。

常见面试点

  • 为什么建议自定义线程池参数
  • 如何实现一个线程池
  • 线程池有几种任务拒绝策略

什么是线程池

线程池是预先生成N个线程,有任务提交时把任务放进任务队列中,并交付给空闲的线程处理,若当前没有空闲线程则根据设定的策略来处理已提交的任务,处理完任务后不会销毁线程,进而达到复用减少频繁创建、销毁线程的效果

代码语言:javascript
复制
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)
            throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
      throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

corePoolSize:核心线程数

maximumPoolSize:最大线程数

unit:空闲存活时间的单位

workQueue:任务队列类

  • 固定边限ArrayBlockingQueue
  • 延迟DelayedQueue
  • 无边限LinkedBlockingQueue
  • 同步阻塞SynchronousQueue

keepAliveTime:线程空闲存活时间,当空闲的非核心线程超过此时间则会被回收

threadFactory:线程工厂,一般使用默认,可自定义线程工厂设置线程属性

handler:任务拒绝策略,当最大线程数且任务队列已满时,对任务执行拒绝策略

  • AbortPolicy:拒绝任务并抛出异常
  • AbortPolicyWithReport:拒绝任务并忽略异常、记录日志
  • DiscardPolicy:直接丢弃任务,不作任何处理
  • DiscardOldestPolicy:丢弃队列中最久的任务并尝试执行任务
  • CallerRunPolicy:拒绝并由调用线程执行任务

什么是线程池

FixedThreadPool

这是一个线程数固定的线程池,即corePoolSize与maximumPoolSize为固定值,keepAliveTime为0,使用无边限的LinkedBlockingQueue,当线程数被创建时即已经创建好固定的线程数,即使新增或减少任务,线程数也是固定不变的,适合于整体上需要的线程数变化不大的情景。

SingleThreadExecutor

这是单个线程数的线程池,即corePoolSize及maximumPoolSize数都设置为1,keepAliveTime为0,使用无边限的LinkedBlockingQueue,对提交的任务顺序执行,更具准确性。

CachedThreadPool

这是一个线程数可伸缩、任务队列无边限的线程池,即corePoolSize设置为0,maximumPoolSize为Integer最大值,keepAliveTime设置60秒,使用SynchronousQueue作为任务队列,当新增任务时会先检查是否有空闲线程,若没有则会新建线程处理任务当空闲线程超过keepAliveTime后则会进行销毁回收

ScheduledThreadPoolExecutor

这是一个核心线程数固定、定时执行任务的线程池,即corePoolSize为固定值,maximumPoolSize为Integer的最大值keepAliveTime为0,使用延迟队列DelayedQueue进行调度任务的执行

ForkJoinPool

这是一个拆分、聚合任务的线程池,使用分而治之算法拆分为多个子任务,异步或同步执行任务,适合计算密集型情景使用

线程池生命周期

线程池状态

running: 接受新任务并处理队列的任务

shutdown: 不再接受新任务,但会处理队列中的任务

stop: 不再接受新任务,也不处理队列的任务同时中断正在执行中的任务

terminated: terminated() 方法已执行完

tidying: 全部任务都已终止且工作线程数为零,线程池即将向tidying状态过度,即将运行terminated()方法

状态转换

  • running->shutdown:shutdown()方法被调用
  • running或shutdown->stop:shutdownNow()方法被调用时
  • shutdown->tidying:当线程数为零且任务队列为空时
  • stop->tidying:当线程数为零的时
  • tidying->terminated:terminated()方法执行完

线程池使用与实现

代码语言:javascript
复制
 public static void main(String[] args) {
        Runnable task = new Runnable() {
                @Override
                public void run()
 {
                    System.out.println("run a task");
                }
        };
      //定义一个有10个线程的线程池
         ExecutorService fixedThreadPool = Executors.newFixedThreadPool(10); 
      fixedThreadPool.submit(task)
         //定义一个只有一个线程的线程池
        ExecutorService singleThreadPool = Executors.newSingleThreadExecutor();
       singleThreadPool.submit(task)
        //定义一个线程数可伸缩的线程池
        ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
        cachedThreadPool.submit(task)
        //定义一个可拆分任务的线程池
        ExecutorService workStealPool = Executors.newWorkStealingPool();
  }

注意事项:不建议直接使用上述方式构造线程池,原因是容易出现内存溢出、无法创建新线程等问题,其中固定线程池及单线程池使用的是无边限任务队列,可能会出现任务一直增多导致队列占用更多的内存,最终引起频繁GC或者内存不足抛出异常等问题,另外无边限线程池则可能会一直无限制的创建线程,我们都知道线程是珍贵资源,在JVM默认参数-Xss的配置中,新建一个线程大概占用1M内存左右,一直创建线程会占用更多的内存导致内存溢出、程序卡顿等问题,故建议根据业务情景自定义线程池。

代码语言:javascript
复制
//ExecutorService核心方法
public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
}

//ThreadPoolExecutor核心方法
public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get(); //获取线程池的状态
        //1、判断当前空闲线程数是否小于核心线程数,若是则新建核心线程并把任务设为首个任务
        if (workerCountOf(c) < corePoolSize) {
            //若addWorker返回false直接结束,addWork方法将会原子性地检查runState、workCount
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        //2、判断线程池是否处于运行状态及是否成功添加任务到队列中
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            /*
            2.1 重新检查状态,有可能最后一次检查时存在的线程已被销毁或进入此方法时线程池被关闭
            成功添加任务后双重检查线程池的状态,若线程池状态为STOP,则移除队列中的任务并执行拒绝策略
            */
            if (! isRunning(recheck) && remove(command))
                reject(command);
            //2.2 线程池处于运行状态且当前空闲线程数为零则创建非核心线程
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //3、如果添加任务失败则尝试创建非核心线程,若创建失败,有可能线程池已被关闭或队列已饱和,拒绝任务
        else if (!addWorker(command, false))
            reject(command);
}

//添加线程
private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        //CAS方式添加线程
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
      //线程池处于非运行状态 and !(线程池处于关闭状态 and 任务为空 and 任务队列非空)
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
       //通过添加
            for (;;) {
                int wc = workerCountOf(c);
                //判断当前线程数是否大于最大线程数限制,若超过则返回false
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                //通过cas更新当前线程数+1
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                //若cas操作失败则重新读取当前线程数,并判断当前线程池的状态是否被更新
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                //获取锁并添加新的线程到workers集合中
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());
          //由于ThreadFactory创建线程时失败或获到锁前线程池已被SHUTDOWN,故重新检查线程池状态
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable(判断创建的线程是否存活)
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        //添加成功后更新添加标志位
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                //线程创建成功后,启动线程并更新启动标志位
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            //若线程启动或创建失败,执行失败逻辑
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
}

private void addWorkerFailed(Worker w) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (w != null)
                workers.remove(w);  //从集合中移除线程
            decrementWorkerCount(); //原子性更新线程数
            tryTerminate();  // 尝试终止线程池
        } finally {
            mainLock.unlock();
        }
}

线程池使用情景

场景使用

  • 并发高、任务执行时间短的业务,线程池的线程数可设置为CPU核心数+1,减少线程上下文频繁切换
  • 并发不高、任务执行时间很长的业务,根据以下情景进行设置
    • 若是IO密集型任务,主要消耗在IO操作上而不会占用过多的CPU资源,故可调大线程数增加CPU的利用率
    • 若是CPU密集型任务,则耗时在CPU计算里,需要避免线程上下文的频繁切换,可参考第一种情景配置线程数
  • 并发高、任务执行时间也长的业务,这已不是通过调整线程池数能解决的,需要考虑系统的架构设计、缓存使用、中间件性能调优等方面

参数配置

根据每秒任务数tasksInSecond(100-500)、每个任务大概执行时间taskCost(0.5s)、系统允许最大执行时间maxCost等标准计算

  • 核心线程数: 每秒任务数/每个线程每秒处理能力,即tasksInSecond/(1/taskCost)
  • 任务队列大小:每秒线程处理任务数系统允许最大执行时间,即(corePool/taskCost) * maxCost
  • 最大线程数:(最大任务数-队列容量)/每个线程每秒处理能力 ,即(maxTasksInSecond-queue)/(1/taskCost)
  • keepAliveTime:一般使用默认提供的空闲时间即可
  • 线程工厂:一般使用默认提供即可,若需要更改线程优先级、线程前缀名称可自定义
  • 任务策略的配置需根据业务情景来决定,比如任务是否可丢失,若不可丢失可增加持久性的手段进行保存

好啦,谢谢看我逼叨了,今天就到这里~

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-04-17,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码哥字节 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
消息队列 TDMQ
消息队列 TDMQ (Tencent Distributed Message Queue)是腾讯基于 Apache Pulsar 自研的一个云原生消息中间件系列,其中包含兼容Pulsar、RabbitMQ、RocketMQ 等协议的消息队列子产品,得益于其底层计算与存储分离的架构,TDMQ 具备良好的弹性伸缩以及故障恢复能力。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档