前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Java并发-29.线程池

Java并发-29.线程池

作者头像
悠扬前奏
发布2019-06-14 10:18:04
2900
发布2019-06-14 10:18:04
举报

1. 线程池的原理

  • 线程池优点:
    • 降低资源消耗
    • 提高响应速度
    • 提高线程可管理性
  • 线程池流程
    1. 判断核心线程池中线程是否都在执行任务,否则创建新工作线程,是则进入下一步
    2. 判断工作队列是否已满,否则提交新的任务存储在这个工作队列里,是则进入下一步
    3. 判断线程池中的线程是否都处于工作状态,否则创建一个新的工作线程来执行任务,是则交给饱和策略来处理这个任务
  • ThreadPoolExecutor执行execute方法有4中情况:
    1. 当前运行线程少于corePoolSize,创建新线程执行任务(需要全局锁)
    2. 运行线程多余corePoolSize,将任务加入BlockingQueue
    3. BlockingQueue已满,创建新线程来处理任务(需要全局锁)
    4. 创建新线程超过maximumPoolSize,拒绝任务,调用RejectedExecutionHandler.rejectedExecution()方法
  • executor()源码
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }
  • 工作线程:线程池创建线程时会将线程封装成工作线程Worker,Worker在完成任务后会循环获取工作队列里的任务来执行:
  • 线程池中的线程执行任务有两种情况:
    • execute()方法创建一个线程时,会让线程执行当前任务
    • 线程完成当前任务后,会反复从BlockingQueue中获取任务来执行

2. 线程池的使用

2.1 线程池的创建

大致有以下几个参数:

  • corePoolSize:线程池基本大小;提交任务时,创建线程,直到达到这个大小就不新建了。调用prestartAllCoreThreads方法会提前创建好。
  • maximumPoolSize:线程池最大数量,队列满了,而且创建的线程数小于最大线程数,会创建新线程执行任务,注意使用了无界队列作为任务队列这个参数就没效果了。
  • keepAliveTime:线程活动保持时间,线程池工作线程空闲后保持存活的时间
  • TimeUnit:keepAliveTime 的单位
  • workQueue:任务队列,用于保存等待执行的任务的队列,可以是阻塞队列(BlockingQueue)的各种实现类。
  • threadFactory:创建线程的工厂,可以给线程起标志性的名称等。
  • RejectedExecutionHandler:饱和策略:
    • AbortPolicy:直接抛出异常
    • CallerRunsPolicy:调用者所在线程来运行任务
    • DiscardOldestPolicy:丢弃队列中最近的一个任务,并执行当前任务
    • DiscardPolicy:不处理,丢弃
  • 构造器源码:
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
    }
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

2.2 线程池提交任务

  • execute():用于提交不需要返回值的任务
  • submit():用于提交需要返回值的任务:线程池会返回一个future类型对象,通过该对象可以判断任务是否执行成功,通过future.get()方法获取返回值,该方法会阻塞当前线程知道任务完成,也有get(long timeout, TimeUnit unit)作为超时等待。

2.3 关闭线程池

  • shutdown()和shutdownNow()方法可以关闭线程池:遍历线程池中的工作线程,逐个调用interrupt方法来中止。
    • shutdownNow首先将线程池置为STOP,然后尝试停止所有正在执行或者暂停任务的线程,并返回等待执行任务的列表
    • shutdown只是将线程置为SHUTDOWN状态,然后中断所有没有正在执行的任务的线程

2.4 合理分配线程池

  • 任务性质:CPU密集,IO密集,混合
  • 任务优先级:高,中,低
  • 任务执行时长:长,中,短
  • 任务依赖性:是否依赖其他系统资源

2.5 线程监控

常用属性:

  • taskCount:需要执行的任务数量
  • completedTaskCount:线程池在运行过程中已经完成的数量
  • largestPoolSize:线程池曾经创建过的最大线程数量
  • getPoolSize:线程池线程数量
  • getActiveCount:活动线程数
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019.06.07 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. 线程池的原理
  • 2. 线程池的使用
    • 2.1 线程池的创建
      • 2.2 线程池提交任务
        • 2.3 关闭线程池
          • 2.4 合理分配线程池
            • 2.5 线程监控
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档