线程池ThreadPoolExecutor整理

项目用到线程池,但是其实很多人对原理并不熟悉 ,这里只是整理一下

ThreadPoolExecutor

  java.uitl.concurrent.ThreadPoolExecutor类是线程池中最核心的一个类

  • 构造方法
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
    }
  • 参数

    corePoolSize               核心线程数     maximumPoolSize            最大线程数 阻塞队列装不下的后 总得线程数  包含corePoolSize      keepAliveTime             超时时间 线程池中当前的空闲线程服务完某任务后的存活时间。如果时间足够长,那么可能会服务其它任务     unit               时间单位     workQueue             阻塞队列 线程数大于核心线程后放到队列中     threadFactory         线程池工厂     handler              拒绝策略 阻塞队列满了,也达到了最大线程数 执行拒绝策略

    • corePoolSize:核心池的大小,在创建了线程池后,即在没有任务到来之前就创建  默认情况下,在创建了线程池后,线程池中的线程数为0,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法来预创建线程.当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;
    • keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;
  • 流程

    1)当池子大小 小于corePoolSize就新建线程,并处理请求     2)当池子大小 等于corePoolSize,把请求放入workQueue中,池子里的空闲线程就去从workQueue中取任务并处理     3)当workQueue放不下新入的任务时,新建线程入池,并处理请求(不用等待队列),如果池子大小撑到了maximumPoolSize就用RejectedExecutionHandler来做拒绝处理     4)另外,当池子的线程数大于corePoolSize的时候,多余的线程会等待keepAliveTime长的时间,如果无请求可处理就自行销毁

    处理步骤:  核心线程 << 阻塞队列 <<最大线程数

  • 通俗流程解释

    假如有一个工厂,工厂里面有10个工人,每个工人同时只能做一件任务。因此只要当10个工人中有工人是空闲的,来了任务就分配给空闲的工人做;     当10个工人都有任务在做时,如果还来了任务,就把任务进行排队等待;     如果说新任务数目增长的速度远远大于工人做任务的速度,那么此时工厂主管可能会想补救措施,比如重新招4个临时工人进来;     然后就将任务也分配给这4个临时工人做;     如果说着14个工人做任务的速度还是不够,此时工厂主管可能就要考虑不再接收新的任务或者抛弃前面的一些任务了。     当这14个工人当中有人空闲时,而新任务增长的速度又比较缓慢,工厂主管可能就考虑辞掉4个临时工了,只保持原来的10个工人,毕竟请额外的工人是要花钱的.     这个例子中的corePoolSize就是10,而maximumPoolSize就是14(10+4)。

    也就是说corePoolSize就是线程池大小,maximumPoolSize在我看来是线程池的一种补救措施,即任务量突然过大时的一种补救措施。 

  • 阻塞队列

    1)ArrayBlockingQueue:基于数组的先进先出队列,此队列创建时必须指定大小;

    2)LinkedBlockingQueue:基于链表的先进先出队列,如果创建时没有指定此队列大小,则默认为Integer.MAX_VALUE;

    3)synchronousQueue:这个队列比较特殊,它不会保存提交的任务,而是将直接新建一个线程来执行新来的任务

  • 拒绝策略 四种

    AbortPolicy 默认 直接抛弃 并抛异常     DiscardPolicy 直接抛弃 不抛异常     CallerRunsPolicy 在主线程中执行     DiscardOldestPolicy 把注册队列中最老的抛弃掉 执行当前的     自定义的策略 实现RejectedExecutionHandler即可

  • 测试案例
public class Test {
     public static void main(String[] args) {   
         ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS,
                 new ArrayBlockingQueue<Runnable>(5));
 
         for(int i=0;i<15;i++){
             MyTask myTask = new MyTask(i);
             executor.execute(myTask);
             System.out.println("线程池中线程数目:"+executor.getPoolSize()+",队列中等待执行的任务数目:"+
             executor.getQueue().size()+",已执行完别的任务数目:"+executor.getCompletedTaskCount());
         }
         executor.shutdown();
     }
}
 
class MyTask implements Runnable {
    private int taskNum;
 
    public MyTask(int num) {
        this.taskNum = num;
    }
 
    @Override
    public void run() {
        System.out.println("正在执行task "+taskNum);
        try {
            Thread.currentThread().sleep(4000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("task "+taskNum+"执行完毕");
    }
}
  • 执行结果
正在执行task 0
线程池中线程数目:1,队列中等待执行的任务数目:0,已执行完别的任务数目:0
线程池中线程数目:2,队列中等待执行的任务数目:0,已执行完别的任务数目:0
正在执行task 1
线程池中线程数目:3,队列中等待执行的任务数目:0,已执行完别的任务数目:0
正在执行task 2
线程池中线程数目:4,队列中等待执行的任务数目:0,已执行完别的任务数目:0
正在执行task 3
线程池中线程数目:5,队列中等待执行的任务数目:0,已执行完别的任务数目:0
正在执行task 4
线程池中线程数目:5,队列中等待执行的任务数目:1,已执行完别的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:2,已执行完别的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:3,已执行完别的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:4,已执行完别的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:5,已执行完别的任务数目:0
线程池中线程数目:6,队列中等待执行的任务数目:5,已执行完别的任务数目:0
正在执行task 10
线程池中线程数目:7,队列中等待执行的任务数目:5,已执行完别的任务数目:0
正在执行task 11
线程池中线程数目:8,队列中等待执行的任务数目:5,已执行完别的任务数目:0
正在执行task 12
线程池中线程数目:9,队列中等待执行的任务数目:5,已执行完别的任务数目:0
正在执行task 13
线程池中线程数目:10,队列中等待执行的任务数目:5,已执行完别的任务数目:0
正在执行task 14
task 3执行完毕
task 0执行完毕
task 2执行完毕
task 1执行完毕
正在执行task 8
正在执行task 7
正在执行task 6
正在执行task 5
task 4执行完毕
task 10执行完毕
task 11执行完毕
task 13执行完毕
task 12执行完毕
正在执行task 9
task 14执行完毕
task 8执行完毕
task 5执行完毕
task 7执行完毕
task 6执行完毕
task 9执行完毕
  • 如何合理的配置线程池
    • 要想合理的配置线程池,就必须首先分析任务特性,可以从以下几个角度来进行分析:
      1. 任务的性质:CPU密集型任务,IO密集型任务和混合型任务。
      2. 任务的优先级:高,中和低。
      3. 任务的执行时间:长,中和短。
      4. 任务的依赖性:是否依赖其他系统资源,如数据库连接。

      任务性质不同的任务可以用不同规模的线程池分开处理。CPU密集型任务配置尽可能少的线程数量,如配置Ncpu+1个线程的线程池。IO密集型任务则由于需要等待IO操作,线程并不是一直在执行任务,则配置尽可能多的线程,如2*Ncpu。混合型的任务,如果可以拆分,则将其拆分成一个CPU密集型任务和一个IO密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐率要高于串行执行的吞吐率,如果这两个任务执行时间相差太大,则没必要进行分解。我们可以通过Runtime.getRuntime().availableProcessors()方法获得当前设备的CPU个数。 优先级不同的任务可以使用优先级队列PriorityBlockingQueue来处理。它可以让优先级高的任务先得到执行,需要注意的是如果一直有优先级高的任务提交到队列里,那么优先级低的任务可能永远不能执行。 执行时间不同的任务可以交给不同规模的线程池来处理,或者也可以使用优先级队列,让执行时间短的任务先执行。 依赖数据库连接池的任务,因为线程提交SQL后需要等待数据库返回结果,如果等待的时间越长CPU空闲时间就越长,那么线程数应该设置越大,这样才能更好的利用CPU。 建议使用有界队列,有界队列能增加系统的稳定性和预警能力,可以根据需要设大一点,比如几千。有一次我们组使用的后台任务线程池的队列和线程池全满了,不断的抛出抛弃任务的异常,通过排查发现是数据库出现了问题,导致执行SQL变得非常缓慢,因为后台任务线程池里的任务全是需要向数据库查询和插入数据的,所以导致线程池里的工作线程全部阻塞住,任务积压在线程池里。如果当时我们设置成无界队列,线程池的队列就会越来越多,有可能会撑满内存,导致整个系统不可用,而不只是后台任务出现问题。当然我们的系统所有的任务是用的单独的服务器部署的,而我们使用不同规模的线程池跑不同类型的任务,但是出现这样问题时也会影响到其他任务。

  • 最佳实践
    //创建线程池指定有意义的线程名字, 方便出错是回溯   
    ThreadFactory namedThreadFactory = new ThreadFactoryBuilder() .setNameFormat("demo-pool-%d").build();
    ExecutorService singleThreadPool = new ThreadPoolExecutor(1, 1,
        0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());

    singleThreadPool.execute(()-> System.out.println(Thread.currentThread().getName())
    );
   
    public class TimerTaskThread extends Thread {
        public TimerTaskThread(){
            super.setName("TimerTaskThread"); 
           …
        }    
        …
   }    

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Java帮帮-微信公众号-技术文章全总结

Web-第二十一天 Web商城实战一【悟空教程】

public class BaseServlet extends HttpServlet {

1864
来自专栏猿天地

spring data mongodb 代码连接数据库方式

平时我们用spring data mongodb都是采用xml配置的方式来链接数据库 但是往往有的时候需要用代码的方式来实现。 比如说我们有可能要同时操作多个d...

37014
来自专栏Android随笔

利用无障碍服务(AccessibilityService)批量清理后台进程

Demo地址:https://github.com/qyxxjd/ClearProcesses

1031
来自专栏finleyMa

Laravel 前后台共享数据

5.5以后可以这么写, 用 @json Blade 指令替代手动 json_encode

1904
来自专栏Android开发指南

4.线程池

36712
来自专栏Albert陈凯

2018-06-13 RestTemplate处理Gzip压缩

1913
来自专栏Golang语言社区

游戏服务器之多线程发送(中)

4、拷贝数据到会话的发送缓冲区 交换发送队列和添加队列,拷贝会话的发送队列的数据到会话的发送缓冲区 BOOL ExecSockDataMgr::CopyWait...

3403
来自专栏大闲人柴毛毛

Java并发编程的艺术(十一)——线程池(2)

Executor两级调度模型 ? 在HotSpot虚拟机中,Java中的线程将会被一一映射为操作系统的线程。 在Java虚拟机层面,用户将多个任务提...

3708
来自专栏Android 研究

APK安装流程详解13——PMS中的新安装流程下(装载)

而在handleReturnCode()方法里面也是调用processPendingInstall(args, ret)方法,如下:

2132
来自专栏禹都一只猫博客

Linux 性能检测常用的 10 个基本命令

1482

扫码关注云+社区

领取腾讯云代金券