线程池那些事儿

什么是线程池 ?

实际开发中我们需要让程序执行某个特定任务时,就会开启一个线程,如果并发的线程数量太多,频繁地创建线程就会严重影响系统的运行效率,如何解决呢?有没有一种方式可以让线程得到复用?执行一次任务之后不被销毁,可以继续执行其他任务,这就跟数据库连接池的思路一样了,数据库连接池的实现逻辑是在缓冲池中预先放置一定数量的连接对象,然后进行复用,那么很显然,在缓冲池中预先放置一定数量的线程对象以实现复用的机制就叫做线程池。

线程池的优点

1、线程是稀缺资源,使用线程池可以减少创建和销毁线程的次数,每个工作线程都可以重复使用。

2、可以根据系统的承受能力,调整线程池中工作线程的数量,防止因为消耗过多内存导致服务器崩溃。

工作流程

1、提交一个任务时,线程池会创建一个新的线程执行任务,直到当前线程数等于 corePoolSize。

2、如果当前线程数为 corePoolSize,继续提交的任务被保存到任务队列中,等待被执行。

3、如果任务队列满了,那就创建新的线程执行当前任务,直到线程池中的线程数达到 maxPoolSize,这时再有任务来,只能执行reject() 拒绝处理该任务。

ThreadPoolExecutor 类

public class ThreadPoolExecutor extends AbstractExecutorService {
    .....
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue);
 
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);
 
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);
 
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
        BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
    ...
}

可以看到,ThreadPoolExecutor 类继承了 AbstractExecutorService 类,AbstractExecutorService 是一个抽象类,它实现了 ExecutorService 接口,ExecutorService 继承了 Executor 接口。

public interface Executor {
    void execute(Runnable command);
}

ThreadPoolExecutor、AbstractExecutorService、ExecutorService 和 Executor 之间的关系:

1、Executor是一个顶层接口,只声明了一个方法execute(Runnable),用来执行任务。

2、ExecutorService 接口继承了 Executor 接口,并额外声明了一些相关方法。

3、抽象类 AbstractExecutorService 实现了 ExecutorService 接口,并且实现了ExecutorService 中声明的方法。

4、ThreadPoolExecutor 继承了 AbstractExecutorService。

ThreadPoolExecutor 类的核心方法有这么几个:execute()、

submit()、shutdown()、shutdownNow()。

execute() 方法继承自顶层接口 Executor,在 ThreadPoolExecutor 进行了具体的实现,通过这个方法可以向线程池提交一个任务,交由线程池去调度执行。

submit() 方法是 ExecutorService 声明的方法,在 AbstractExecutorService 进行了具体实现,ThreadPoolExecutor 从 AbstractExecutorService 中直接继承过来,该方法也可以向线程池提交任务,与 execute() 方法不同之处在于它能够返回任务执行的结果。

ThreadPoolExecutor 类各种构造函数的参数含义如下:

1、corePoolSize:核心线程的数量。

2、maximumPoolSize:线程池的最大线程数,表示在线程池中最多能创建多少个线程。

corePoolSize 可以理解为就是线程池的大小,而 maximumPoolSize 是线程池在特定情况下的一种处理机制,当任务量突增的时候,会额外创建一些线程对象以满足调用需求,一旦任务量恢复正常,额外创建的线程对象随即释放,maximumPoolSize 就是线程池最大的容载量(核心线程数量+额外线程数量)。

可通过以下两个方法动态调整线程池容量:

1)setCorePoolSize():设置核心线程数量。

2)setMaximumPoolSize():设置线程池的最大线程数量。

3、keepAliveTime:非核心线程的超时时长,当系统中非核心线程(额外创建的线程)闲置时间超过 keepAliveTime 之后,则会被回收。如果ThreadPoolExecutor 的 allowCoreThreadTimeOut 属性设置为 true,则该参数也表示核心线程的超时时长。

4、unit:keepAliveTime 参数的时间单位,取值范围 TimeUnit 类中的7种静态属性。

5、workQueue:线程池中的任务队列,该队列主要用来存储已经被提交但是尚未执行的任务。存储在这里的任务是由 ThreadPoolExecutor 的 execute() 方法提交来的,常用的任务队列有以下几种选择:

1)ArrayBlockingQueue:基于数组的先进先出队列,此队列创建时必须指定大小。

2)LinkedBlockingQueue:基于链表的先进先出队列,如果创建时没有指定此队列大小,则默认为Integer.MAX_VALUE。

3)synchronousQueue:这个队列比较特殊,它不会保存提交的任务,而是将直接新建一个线程来执行新来的任务。

4)PriorityBlockingQuene:具有优先级的无界阻塞队列。

6、threadFactory:为线程池提供创建新线程的功能。

7、handler:拒绝策略,当线程无法执行新任务时(一般是由于线程池中的线程数量已经达到最大数或者线程池关闭导致的),默认情况下,当线程池无法处理新线程时,会抛出一个RejectedExecutionException,常用的拒绝策略有以下四种:

ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出 RejectedExecutionException 异常。 
ThreadPoolExecutor.DiscardPolicy:丢弃任务不抛出异常。 
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列首位的任务,然后重新尝试执行任务。
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务。

线程池状态

1、RUNNING:运行状态,线程池可以接收新任务,并处理队列中的任务。

2、SHUTDOWN: 关闭状态,线程池不接收新任务,但是会处理队列中的任务。

3、STOP : 停止状态,线程池中断所有正在运行的任务,不接收新任务,同时也不处理队列中的任务。

4、TIDYING : 整理状态,线程池对线程资源进行整理优化。

5、TERMINATED:结束状态,线程池停止工作。

线程池的使用

public class Test {
        public static void main(String[] args) {
                ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 15, 100, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5));
                for (int i = 0; i < 20; i++) {
                        MyTask myTask = new MyTask(i);
                        executor.execute(myTask);
                        System.out.println("线程池中线程数量:"+executor.getPoolSize()+"、任务队列中等待执行的任务数量:"+
                                executor.getQueue().size()+"、执行完成的任务数量:"+executor.getCompletedTaskCount());
                }
                executor.shutdown();
        }
}

class MyTask implements Runnable{
        private int num;
        public MyTask(int num) {
                this.num = num;
        }

        public void run() {
                // TODO Auto-generated method stub
                System.out.println("正在执行任务"+num);
                try {
                        Thread.currentThread().sleep(3000);
                } catch (InterruptedException e) {
                        e.printStackTrace();
                }
                System.out.println("任务"+num+"执行完毕");
        }
}

运行结果如下所示。

从执行结果可以看出,当线程池中线程的数目大于10的时候,就会将任务放入队列,当任务队列满了之后,就会额外创建新的线程。在实际开发中,并不需要直接使用 ThreadPoolExecutor,可以使用 Executors 的静态方法来创建线程池:

1、newFixedThreadPool()

创建一个固定大小的线程池,任务超过10个时,会将线程放入等待队列中。初始化一个指定线程数的线程池,其中 corePoolSize == maxiPoolSize,使用 LinkedBlockingQuene 作为任务队列。

特点:即使当线程池没有可执行任务时,也不会释放线程。

ExecutorService executorService = Executors.newFixedThreadPool(10);
    for(int i = 0; i < 10; i++) {
          executorService.execute(new Runnable() {
        
            @Override
            public void run() {
                // TODO Auto-generated method stub
                  try {
                        Thread.currentThread().sleep(1000);
                  } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                  }
              System.out.println("...");
            }
      });
}
executorService.shutdown();

1000ms之后,10行...同时打印,因为线程池开启后直接创建了10个线程,一起等待1000ms之后同时执行。

2、newCachedThreadPool()

创建一个缓存线程池,初始化一个可以缓存线程的线程池,默认缓存60s,线程池的线程数可达到 Integer.MAX_VALUE,即2147483647,内部使用 SynchronousQueue 作为任务队列。

特点:在没有任务执行时,当线程的空闲时间超过 keepAliveTime,会自动释放线程资源。当提交新任务时,如果没有空闲线程,则创建新线程执行任务,会导致一定的系统开销,因此,使用时要注意控制并发的任务数,防止因创建大量的线程导致而降低性能。

ExecutorService executorService = Executors.newCachedThreadPool();

3、newSingleThreadExecutor()

创建一个单例线程池,线程池中只有一个线程,初始化只有一个线程的线程池,内部使用 LinkedBlockingQueue 作为任务队列。特点:如果该线程异常结束,会重新创建一个新的线程继续执行任务,唯一的线程可以保证所提交任务的顺序执行。

ExecutorService executorService = Executors.newSingleThreadExecutor();
    for(int i = 0; i < 10; i++) {
          executorService.execute(new Runnable() {
        
            @Override
            public void run() {
                // TODO Auto-generated method stub
                  try {
                        Thread.currentThread().sleep(1000);
                  } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                  }
                  System.out.println("...");
            }
      });
}
executorService.shutdown();

4、newScheduledThreadPool()

任务调度线程池。

特点:初始化的线程池可以在指定的时间内周期性的执行所提交的任务,在实际的业务场景中可以使用该线程池定期的同步数据。

//任务调度的线程池
//10表示延迟10s开始任务调度,3表示调度之间间隔3s
ScheduledExecutorService sch = Executors.newScheduledThreadPool(10);
sch.scheduleAtFixedRate(new Runnable() {
      
  @Override
  public void run() {
        // TODO Auto-generated method stub
        System.out.println("...");
      }
}, 10, 3, TimeUnit.SECONDS);
sch.shutdown();

除了 newScheduledThreadPool 的内部实现特殊一点之外,其它线程池内部都是基于 ThreadPoolExecutor 类实现的。

原文发布于微信公众号 - Java大联盟(javaunion)

原文发表时间:2019-02-27

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

扫码关注云+社区

领取腾讯云代金券