前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Java线程池了解一下?

Java线程池了解一下?

作者头像
技术从心
发布2019-08-07 14:01:57
3810
发布2019-08-07 14:01:57
举报
文章被收录于专栏:技术从心技术从心

为什么需要使用线程池

1、减少线程创建与切换的开销

  • 在没有使用线程池的时候,来了一个任务,就创建一个线程,我们知道系统创建和销毁工作线程的开销很大,而且频繁的创建线程也就意味着需要进行频繁的线程切换,这都是一笔很大的开销。

2、控制线程的数量

  • 使用线程池我们可以有效地控制线程的数量,当系统中存在大量并发线程时,会导致系统性能剧烈下降。

线程池做了什么

重复利用有限的线程

  • 线程池中会预先创建一些空闲的线程,他们不断的从工作队列中取出任务,然后执行,执行完之后,会继续执行工作队列中的下一个任务,减少了创建和销毁线程的次数,每个线程都可以一直被重用,变了创建和销毁的开销。

线程池的使用

其实常用Java线程池本质上都是由ThreadPoolExecutor或者ForkJoinPool生成的,只是其根据构造函数传入不同的实参来实例化相应线程池而已。

Executors

Executors是一个线程池工厂类,该工厂类包含如下集合静态工厂方法来创建线程池:

  • newFixedThreadPool():创建一个可重用的、具有固定线程数的线程池
  • newSingleThreadExecutor():创建只有一个线程的线程池
  • newCachedThreadPool():创建一个具有缓存功能的线程池
  • newWorkStealingPool():创建持有足够线程的线程池来支持给定的并行级别的线程池
  • newScheduledThreadPool():创建具有指定线程数的线程池,它可以在指定延迟后执行任务线程
ExecutorService接口

对设计模式有了解过的同学都会知道,我们尽量面向接口编程,这样对程序的灵活性是非常友好的。Java线程池也采用了面向接口编程的思想,可以看到ThreadPoolExecutorForkJoinPool所有都是ExecutorService接口的实现类。在ExecutorService接口中定义了一些常用的方法,然后再各种线程池中都可以使用ExecutorService接口中定义的方法,常用的方法有如下几个:

  • 向线程池提交线程
    • Future<?> submit():将一个Runnable对象交给指定的线程池,线程池将在有空闲线程时执行Runnable对象代表的任务,该方法既能接收Runnable对象也能接收Callable对象,这就意味着sumbit()方法可以有返回值。
    • void execute(Runnable command):只能接收Runnable对象,意味着该方法没有返回值。
  • 关闭线程池
    • void shutdown():阻止新来的任务提交,对已经提交了的任务不会产生任何影响。(等待所有的线程执行完毕才关闭)
    • List<Runnable> shutdownNow(): 阻止新来的任务提交,同时会中断当前正在运行的线程,另外它还将workQueue中的任务给移除,并将这些任务添加到列表中进行返回。(立马关闭)
  • 检查线程池的状态
    • boolean isShutdown():调用shutdown()或shutdownNow()方法后返回为true。
    • boolean isTerminated():当调用shutdown()方法后,并且所有提交的任务完成后返回为true;当调用shutdownNow()方法后,成功停止后返回为true。
常见线程池使用示例
一、newFixedThreadPool

线程池中的线程数目是固定的,不管你来了多少的任务。

示例代码

代码语言:javascript
复制
public class MyFixThreadPool {    public static void main(String[] args) throws InterruptedException {        // 创建一个线程数固定为5的线程池
       ExecutorService service = Executors.newFixedThreadPool(5);       System.out.println("初始线程池状态:" + service);        for (int i = 0; i < 6; i++) {
           service.execute(() -> {                try {
                   TimeUnit.MILLISECONDS.sleep(1000);
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }
               System.out.println(Thread.currentThread().getName());
           });
       }
       System.out.println("线程提交完毕之后线程池状态:" + service);       service.shutdown();//会等待所有的线程执行完毕才关闭,shutdownNow:立马关闭
       System.out.println("是否全部线程已经执行完毕:" + service.isTerminated());//所有的任务执行完了,就会返回true
       System.out.println("是否已经执行shutdown()" + service.isShutdown());
       System.out.println("执行完shutdown()之后线程池的状态:" + service);       TimeUnit.SECONDS.sleep(5);
       System.out.println("5秒钟过后,是否全部线程已经执行完毕:" + service.isTerminated());
       System.out.println("5秒钟过后,是否已经执行shutdown()" + service.isShutdown());
       System.out.println("5秒钟过后,线程池状态:" + service);
   }}复制代码

运行结果:

初始线程池状态:[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0] 线程提交完毕之后线程池状态:[Running, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0] 是否全部线程已经执行完毕:false 是否已经执行shutdown():true 执行完shutdown()之后线程池的状态:[Shutting down, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0] pool-1-thread-2 pool-1-thread-1 pool-1-thread-4 pool-1-thread-5 pool-1-thread-3 pool-1-thread-2 5秒钟过后,是否全部线程已经执行完毕:true 5秒钟过后,是否已经执行shutdown():true 5秒钟过后,线程池状态:[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 6]

程序分析

  • 当我们创建好一个FixedThreadPool之后,该线程池就处于Running状态了,但是pool size(线程池线程的数量)、active threads(当前活跃线程) queued tasks(当前排队线程)、completed tasks(已完成的任务数)都是0
  • 当我们把6个任务都提交给线程池之后,
    • pool size = 5:因为我们创建的是一个固定线程数为5的线程池(注意:如果这个时候我们只提交了3个任务,那么pool size = 3,说明线程池也是通过懒加载的方式去创建线程)。
    • active threads = 5:虽然我们向线程池提交了6个任务,但是线程池的固定大小为5,所以活跃线程只有5个
    • queued tasks = 1:虽然我们向线程池提交了6个任务,但是线程池的固定大小为5,只能有5个活跃线程同时工作,所以有一个任务在等待
  • 我们第一次执行shutdown()的时候,由于任务还没有全部执行完毕,所以isTerminated()返回falseshutdown()返回true,而线程池的状态会由Running变为Shutting down
  • 从任务的运行结果我们可以看出,名为pool-1-thread-2执行了两次任务,证明线程池中的线程确实是重复利用的。
  • 5秒钟后,isTerminated()返回trueshutdown()返回true,证明所有的任务都执行完了,线程池也关闭了,我们再次检查线程池的状态[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 6],状态已经处于Terminated了,然后已完成的任务显示为6
二、newSingleThreadExecutor

从头到尾整个线程池都只有一个线程在工作。

实例代码

代码语言:javascript
复制
public class SingleThreadPool {    public static void main(String[] args) {
       ExecutorService service = Executors.newSingleThreadExecutor();        for (int i = 0; i < 5; i++) {            final int j = i;
           service.execute(() -> {
               System.out.println(j + " " + Thread.currentThread().getName());
           });
       }
   }}复制代码

运行结果

0 pool-1-thread-1 1 pool-1-thread-1 2 pool-1-thread-1 3 pool-1-thread-1 4 pool-1-thread-1

程序分析可以看到只有pool-1-thread-1一个线程在工作。

三、newCachedThreadPool

来多少任务,就创建多少线程(前提是没有空闲的线程在等待执行任务,否则还是会复用之前旧(缓存)的线程),直接你电脑能支撑的线程数的极限为止。

实例代码

代码语言:javascript
复制
public class CachePool {    public static void main(String[] args) throws InterruptedException {
       ExecutorService service = Executors.newCachedThreadPool();
       System.out.println("初始线程池状态:" + service);        for (int i = 0; i < 12; i++) {
           service.execute(() -> {                try {
                   TimeUnit.MILLISECONDS.sleep(500);
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }
               System.out.println(Thread.currentThread().getName());
           });
       }
       System.out.println("线程提交完毕之后线程池状态:" + service);       TimeUnit.SECONDS.sleep(50);
       System.out.println("50秒后线程池状态:" + service);       TimeUnit.SECONDS.sleep(30);
       System.out.println("80秒后线程池状态:" + service);
   }}复制代码

运行结果

初始线程池状态:[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0] 线程提交完毕之后线程池状态:[Running, pool size = 12, active threads = 12, queued tasks = 0, completed tasks = 0] pool-1-thread-3 pool-1-thread-4 pool-1-thread-1 pool-1-thread-2 pool-1-thread-5 pool-1-thread-8 pool-1-thread-9 pool-1-thread-12 pool-1-thread-7 pool-1-thread-6 pool-1-thread-11 pool-1-thread-10 50秒后线程池状态:[Running, pool size = 12, active threads = 0, queued tasks = 0, completed tasks = 12] 80秒后线程池状态:[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 12]

程序分析

  • 因为我们每个线程任务至少需要500毫秒的执行时间,所以当我们往线程池中提交12个任务的过程中,基本上没有空闲的线程供我们重复使用,所以线程池会创建12个线程。
  • 缓存中的线程默认是60秒没有活跃就会被销毁掉,可以看到在50秒钟的时候回,所有的任务已经完成了,但是线程池线程的数量还是12。
  • 80秒过后,可以看到线程池中的线程已经全部被销毁了。
四、newScheduledThreadPool

可以在指定延迟后或周期性地执行线程任务的线程池。

ScheduledThreadPoolExecutor

  • newScheduledThreadPool()方法返回的其实是一个ScheduledThreadPoolExecutor对象,ScheduledThreadPoolExecutor定义如下:
代码语言:javascript
复制
public class ScheduledThreadPoolExecutor
       extends ThreadPoolExecutor
       implements ScheduledExecutorService {复制代码
  • 可以看到,它还是继承了ThreadPoolExecutor并实现了ScheduledExecutorService接口,而ScheduledExecutorService也是继承了ExecutorService接口,所以我们也可以像使用之前的线程池对象一样使用,只不过是该对象会额外多了一些方法用于控制延迟与周期:
    • public <V> ScheduledFuture<V> schedule(Callable<V> callable,long delay, TimeUnit unit):指定callable任务将在delay延迟后执行
    • public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit):指定的command任务将在delay延迟后执行,而且已设定频率重复执行。(一开始并不会执行)
    • public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,ong initialDelay,long delay,TimeUnit unit):创建并执行一个在给定初始延迟后首期启用的定期操作,随后在每一个执行终止和下一次执行开始之间都存在给定的延迟。

示例代码

下面代码每500毫秒打印一次当前线程名称以及一个随机数字。

代码语言:javascript
复制
public class MyScheduledPool {    public static void main(String[] args) {
       ScheduledExecutorService service = Executors.newScheduledThreadPool(4);
       service.scheduleAtFixedRate(() -> {
           System.out.println(Thread.currentThread().getName() + new Random().nextInt(1000));
       }, 0, 500, TimeUnit.MILLISECONDS);
   }
}复制代码
五、newWorkStealingPool

每个线程维护着自己的队列,执行完自己的任务之后,会去主动执行其他线程队列中的任务。

示例代码

代码语言:javascript
复制
public class MyWorkStealingPool {    public static void main(String[] args) throws IOException {
       ExecutorService service = Executors.newWorkStealingPool(4);
       System.out.println("cpu核心:" + Runtime.getRuntime().availableProcessors());       service.execute(new R(1000));
       service.execute(new R(2000));
       service.execute(new R(2000));
       service.execute(new R(2000));
       service.execute(new R(2000));        //由于产生的是精灵线程(守护线程、后台线程),主线程不阻塞的话,看不到输出
       System.in.read();
   }    static class R implements Runnable {        int time;       R(int time) {            this.time = time;
       }        @Override
       public void run() {            try {
               TimeUnit.MILLISECONDS.sleep(time);
           } catch (InterruptedException e) {
               e.printStackTrace();
           }
           System.out.println(time + " " + Thread.currentThread().getName());
       }
   }
}复制代码

运行结果

cpu核心:4 1000 ForkJoinPool-1-worker-1 2000 ForkJoinPool-1-worker-0 2000 ForkJoinPool-1-worker-3 2000 ForkJoinPool-1-worker-2 2000 ForkJoinPool-1-worker-1

程序分析ForkJoinPool-1-worker-1任务的执行时间是1秒,它会最先执行完毕,然后它会去主动执行其他线程队列中的任务。

六、ForkJoinPool
  • ForkJoinPool可以将一个任务拆分成多个“小任务”并行计算,再把多个“小任务”的结果合并成总的计算结果。ForkJoinPool提供了如下几个方法用于创建ForkJoinPool实例对象:
    • ForkJoinPool(int parallelism):创建一个包含parallelism个并行线程的ForkJoinPool,parallelism的默认值为Runtime.getRuntime().availableProcessors()方法的返回值
    • ForkJoinPool commonPool():该方法返回一个通用池,通用池的运行状态不会受shutdown()shutdownNow()方法的影响。
  • 创建了ForkJoinPool示例之后,就可以调用ForkJoinPoolsubmit(ForkJoinTask task)invoke(ForkJoinTask task)方法来执行指定任务了。其中ForkJoinTask(实现了Future接口)代表一个可以并行、合并的任务。ForkJoinTask是一个抽象类,他还有两个抽象子类:RecursiveActionRecursiveTask。其中RecursiveTask代表有返回值的任务,而RecursiveAction代表没有返回值的任务。

示例代码

下面代码演示了使用ForkJoinPool对1000000个随机整数进行求和。

代码语言:javascript
复制
public class MyForkJoinPool {    static int[] nums = new int[1000000];    static final int MAX_NUM = 50000;    static Random random = new Random();    static {        for (int i = 0; i < nums.length; i++) {
           nums[i] = random.nextInt(1000);
       }
       System.out.println(Arrays.stream(nums).sum());
   }//    static class AddTask extends RecursiveAction {////        int start, end;////        AddTask(int start, int end) {//            this.start = start;//            this.end = end;//        }////        @Override//        protected void compute() {//            if (end - start <= MAX_NUM) {//                long sum = 0L;//                for (int i = 0; i < end; i++) sum += nums[i];//                System.out.println("from:" + start + " to:" + end + " = " + sum);//            } else {//                int middle = start + (end - start) / 2;////                AddTask subTask1 = new AddTask(start, middle);//                AddTask subTask2 = new AddTask(middle, end);//                subTask1.fork();//                subTask2.fork();//            }//        }//    }   static class AddTask extends RecursiveTask<Long> {        int start, end;       AddTask(int start, int end) {            this.start = start;            this.end = end;
       }        @Override
       protected Long compute() {            // 当end与start之间的差大于MAX_NUM,将大任务分解成两个“小任务”
           if (end - start <= MAX_NUM) {                long sum = 0L;                for (int i = start; i < end; i++) sum += nums[i];                return sum;
           } else {                int middle = start + (end - start) / 2;               AddTask subTask1 = new AddTask(start, middle);
               AddTask subTask2 = new AddTask(middle, end);                // 并行执行两个“小任务”
               subTask1.fork();
               subTask2.fork();                // 把两个“小任务”累加的结果合并起来
               return subTask1.join() + subTask2.join();
           }
       }
   }    public static void main(String[] args) throws IOException {
       ForkJoinPool forkJoinPool = new ForkJoinPool();
       AddTask task = new AddTask(0, nums.length);
       forkJoinPool.execute(task);        long result = task.join();
       System.out.println(result);       forkJoinPool.shutdown();
   }
}复制代码

额外补充

上面我们说到过:其实常用Java线程池都是由ThreadPoolExecutor或者ForkJoinPool两个类生成的,只是其根据构造函数传入不同的实参来生成相应线程池而已。那我们现在一起来看看Executors中几个创建线程池对象的静态方法相关的源码:

ThreadPoolExecutor构造函数原型

代码语言:javascript
复制
public ThreadPoolExecutor(int corePoolSize,                              int maximumPoolSize,                              long keepAliveTime,
                             TimeUnit unit,
                             BlockingQueue<Runnable> workQueue) {复制代码

参数说明

  • corePoolSize:核心运行的poolSize,也就是当超过这个范围的时候,就需要将新的Runnable放入到等待队列workQueue中了
  • maximumPoolSize:线程池维护线程的最大数量,当大于了这个值就会将任务由一个丢弃处理机制来处理(当然也存在永远不丢弃任务的线程池,具体得看策略)
  • keepAliveTime:线程空闲时的存活时间(当线程数大于corePoolSize时该参数才有效)
  • unit:keepAliveTime的单位
  • workQueue:用来保存等待被执行的任务的阻塞队列,且任务必须实现Runable接口

newFixedThreadPool

代码语言:javascript
复制
    public static ExecutorService newFixedThreadPool(int nThreads) {        return new ThreadPoolExecutor(nThreads, nThreads,                                      0L, TimeUnit.MILLISECONDS,                                      new LinkedBlockingQueue<Runnable>());
   }复制代码

newSingleThreadExecutor

代码语言:javascript
复制
    public static ExecutorService newSingleThreadExecutor() {        return new FinalizableDelegatedExecutorService
           (new ThreadPoolExecutor(1, 1,                                    0L, TimeUnit.MILLISECONDS,                                    new LinkedBlockingQueue<Runnable>()));
   }复制代码

newCachedThreadPool

代码语言:javascript
复制
    public static ExecutorService newCachedThreadPool() {        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,                                      60L, TimeUnit.SECONDS,                                      new SynchronousQueue<Runnable>());
   }复制代码

newScheduledThreadPool

代码语言:javascript
复制
    public ThreadPoolExecutor(int corePoolSize,                              int maximumPoolSize,                              long keepAliveTime,
                             TimeUnit unit,
                             BlockingQueue<Runnable> workQueue) {        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
            Executors.defaultThreadFactory(), defaultHandler);
   }复制代码

newWorkStealingPool

代码语言:javascript
复制
    public static ExecutorService newWorkStealingPool(int parallelism) {        return new ForkJoinPool
           (parallelism,
            ForkJoinPool.defaultForkJoinWorkerThreadFactory,             null, true);
   }

觉得文章不错,记得转发分享给更多同学哦~

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-01-31,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 技术从心 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 为什么需要使用线程池
  • 线程池做了什么
  • 线程池的使用
    • Executors
      • ExecutorService接口
        • 常见线程池使用示例
          • 一、newFixedThreadPool
          • 二、newSingleThreadExecutor
          • 三、newCachedThreadPool
          • 四、newScheduledThreadPool
          • 五、newWorkStealingPool
          • 六、ForkJoinPool
      • 额外补充
      相关产品与服务
      GPU 云服务器
      GPU 云服务器(Cloud GPU Service,GPU)是提供 GPU 算力的弹性计算服务,具有超强的并行计算能力,作为 IaaS 层的尖兵利器,服务于深度学习训练、科学计算、图形图像处理、视频编解码等场景。腾讯云随时提供触手可得的算力,有效缓解您的计算压力,提升业务效率与竞争力。
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档