专栏首页做不甩锅的后端java线程池(一):java线程池基本使用及Executors

java线程池(一):java线程池基本使用及Executors

文章目录

在前面学习线程组的时候就提到过线程池。实际上线程组在我们的日常工作中已经不太会用到,但是线程池恰恰相反,是我们日常工作中必不可少的工具之一。现在开始对线程池的使用,以及底层ThreadPoolExecutor的源码进行分析。

1.为什么需要线程池

我们在前面对线程基础以及线程的生命周期有过详细介绍。一个基本的常识就是,线程是一个特殊的对象,其底层是依赖于JVM的native方法,在jvm虚拟机内部实现的。线程与普通对象不一样的地方在于,除了需要在堆上分配对象之外,还需要给每个线程分配一个线程栈、以及本地方法栈、程序计数器等线程的私有空间。线程的初始化工作相对于线程执行的大多数任务而言,都是一个耗时比较长的工作。这与数据库使用一样。有时候我们连接数据库,仅仅只是为了执行一条很小的sql语句。但是在我们日常的开发工作中,我们的绝大部分工作内容,都会分解为一个个短小的执行任务来执行。这样才能更加合理的复用资源。这种思想就与我们之前提到的协程一样。任务要尽可能的小。但是在java中,任务不可能像协程那样拆分得那么细。那么试想,如果说,有一个已经初始化好的很多线程,在随时待命,那么当我们有任务提交的时候,这些线程就可以立即工作,无缝接管我们的任务请求。那么效率就会大大增加。这些个线程可以处理任何任务。这样一来我们就把实际的任务与线程本身进行了解耦。从而将这些线程实现了复用。 这种复用的一次创建,可以重复使用的池化的线程对象就被成为线程池。 在线程池中,我们的线程是可以复用的,不用每次都创建一个新的线程。减少了创建和销毁线程的时间开销。 同时,线程池还具有队列缓冲策略,拒绝机制和动态线程管理。可以实现线程环境的隔离。当一个线程有问题的时候,也不会对其他的线程造成影响。 以上就是我们使用线程池的原因。一句话来概括就是资源复用,降低开销。

2.java中线程池的实现

在java中,线程池的主要接口是Executor和ExecutorService在这两个接口中分别对线程池的行为进行了约束,最主要的是在ExecutorService。之后,线程池的实际实现类是AbstractExecutorService类。这个类有三个主要的实现类,ThreadpoolExecutorService、ForkJoinPool以及DelegatedExecutorService。

后面我们将对这三种最主要的实现类的源码以及实现机制进行分析。

3.创建线程的工厂方法Executors

在java中, 已经给我们提供了创建线程池的工厂方法类Executors。通过这个类以静态方法的模式可以为我们创建大多数线程池。Executors提供了5种创建线程池的方式,我们先来看看这个类提供的工厂方法。

3.1 newFixedThreadPool

/**
 * Creates a thread pool that reuses a fixed number of threads
 * operating off a shared unbounded queue.  At any point, at most
 * {@code nThreads} threads will be active processing tasks.
 * If additional tasks are submitted when all threads are active,
 * they will wait in the queue until a thread is available.
 * If any thread terminates due to a failure during execution
 * prior to shutdown, a new one will take its place if needed to
 * execute subsequent tasks.  The threads in the pool will exist
 * until it is explicitly {@link ExecutorService#shutdown shutdown}.
 *
 * @param nThreads the number of threads in the pool
 * @return the newly created thread pool
 * @throws IllegalArgumentException if {@code nThreads <= 0}
 */
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

这个方法能够创建一个固定线程数量的无界队列的线程池。参数nthreads是最多可同时处理的活动的线程数。如果在所有线程都在处理任务的情况下,提交了其他的任务,那么这些任务将处于等待队列中。直到有一个线程可用为止。如果任何线程在关闭之前的执行过程中,由于失败而终止,则需要在执行后续任务的时候,创建一个新的线程来替换。线程池中的所有线程都将一直存在,直到显示的调用了shutdown方法。 上述方法能创建一个固定线程数量的线程池。内部默认的是使用LinkedBlockingQueue。但是需要注意的是,这个LinkedBlockingQueue底层是链表结构,其允许的最大队列长度为Integer.MAX_VALUE。

public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}

这样在使用的过程中如果我们没有很好的控制,那么就可能导致内存溢出,出现OOM异常。因此这种方式实际上已经不被提倡。我们在使用的过程中应该谨慎使用。 newFixedThreadPool(int nThreads, ThreadFactory threadFactory)方法:

/**
 * Creates a thread pool that reuses a fixed number of threads
 * operating off a shared unbounded queue, using the provided
 * ThreadFactory to create new threads when needed.  At any point,
 * at most {@code nThreads} threads will be active processing
 * tasks.  If additional tasks are submitted when all threads are
 * active, they will wait in the queue until a thread is
 * available.  If any thread terminates due to a failure during
 * execution prior to shutdown, a new one will take its place if
 * needed to execute subsequent tasks.  The threads in the pool will
 * exist until it is explicitly {@link ExecutorService#shutdown
 * shutdown}.
 *
 * @param nThreads the number of threads in the pool
 * @param threadFactory the factory to use when creating new threads
 * @return the newly created thread pool
 * @throws NullPointerException if threadFactory is null
 * @throws IllegalArgumentException if {@code nThreads <= 0}
 */
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>(),
                                  threadFactory);
}

这个方法与3.1中newFixedThreadPool(int nThreads)的方法的唯一区别就是,增加了threadFactory参数。在前面方法中,对于线程的创建是采用的默认实现Executors.defaultThreadFactory()。而在此方法中,可以根据需要自行定制。

3.2 newSingleThreadExecutor

/**
 * Creates an Executor that uses a single worker thread operating
 * off an unbounded queue. (Note however that if this single
 * thread terminates due to a failure during execution prior to
 * shutdown, a new one will take its place if needed to execute
 * subsequent tasks.)  Tasks are guaranteed to execute
 * sequentially, and no more than one task will be active at any
 * given time. Unlike the otherwise equivalent
 * {@code newFixedThreadPool(1)} the returned executor is
 * guaranteed not to be reconfigurable to use additional threads.
 *
 * @return the newly created single-threaded Executor
 */
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

此方法将会创建指有一个线程和一个无届队列的线程池。需要注意的是,如果这个执行线程在执行过程中由于失败而终止,那么需要在执行后续任务的时候,用一个新的线程来替换。 那么这样一来,上述线程池就能确保任务的顺序性,并且在任何时间都不会有多个线程处于活动状态。与newFixedThreadPool(1)不同的是,使用newSingleThreadExecutor返回的ExecutorService不能被重新分配线程数量。而使用newFixExecutor(1)返回的ExecutorService,其活动的线程的数量可以重新分配。后面专门对这个问题进行详细分析。 newSingleThreadExecutor(ThreadFactory threadFactory) 方法:

/**
 * Creates an Executor that uses a single worker thread operating
 * off an unbounded queue, and uses the provided ThreadFactory to
 * create a new thread when needed. Unlike the otherwise
 * equivalent {@code newFixedThreadPool(1, threadFactory)} the
 * returned executor is guaranteed not to be reconfigurable to use
 * additional threads.
 *
 * @param threadFactory the factory to use when creating new
 * threads
 *
 * @return the newly created single-threaded Executor
 * @throws NullPointerException if threadFactory is null
 */
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>(),
                                threadFactory));
}

这个方法与3.3中newSingleThreadExecutor的区别就在于增加了一个threadFactory。可以自定义创建线程的方法。

3.3 newCachedThreadPool

/**
 * Creates a thread pool that creates new threads as needed, but
 * will reuse previously constructed threads when they are
 * available.  These pools will typically improve the performance
 * of programs that execute many short-lived asynchronous tasks.
 * Calls to {@code execute} will reuse previously constructed
 * threads if available. If no existing thread is available, a new
 * thread will be created and added to the pool. Threads that have
 * not been used for sixty seconds are terminated and removed from
 * the cache. Thus, a pool that remains idle for long enough will
 * not consume any resources. Note that pools with similar
 * properties but different details (for example, timeout parameters)
 * may be created using {@link ThreadPoolExecutor} constructors.
 *
 * @return the newly created thread pool
 */
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

这个方法用来创建一个线程池,该线程池可以根据需要自动增加线程。以前的线程也可以复用。这个线程池通常可以提高很多执行周期短的异步任务的性能。对于execute将重用以前的构造线程。如果没有可用的线程,就创建一个 新的线程添加到pool中。60秒内,如果该线程没有被使用,则该线程将会终止,并从缓存中删除。因此,在足够长的时间内,这个线程池不会消耗任何资源。可以使用ThreadPoolExecutor构造函数创建具有类似属性但是详细信息不同的线程池。 需要注意的是,这个方法创建的线程池,虽然队列的长度可控,但是线程的数量的范围是Integer.MAX_VALUE。这样的话,如果使用不当,同样存在OOM的风险。比如说,我们使用的每个任务的耗时比较长,任务的请求又非常快,那么这样势必会造成在单位时间内创建了大量的线程。从而造成内存溢出。 newCachedThreadPool(ThreadFactory threadFactory)方法:

/**
 * Creates a thread pool that creates new threads as needed, but
 * will reuse previously constructed threads when they are
 * available, and uses the provided
 * ThreadFactory to create new threads when needed.
 * @param threadFactory the factory to use when creating new threads
 * @return the newly created thread pool
 * @throws NullPointerException if threadFactory is null
 */
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>(),
                                  threadFactory);
}

这个方法区别同样也是在于,增加了threadFactory可以自行指定线程的创建方式。

2.4 newScheduledThreadPool

/**
 * Creates a thread pool that can schedule commands to run after a
 * given delay, or to execute periodically.
 * @param corePoolSize the number of threads to keep in the pool,
 * even if they are idle
 * @return a newly created scheduled thread pool
 * @throws IllegalArgumentException if {@code corePoolSize < 0}
 */
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

创建一个线程池,该线程池可以将任务在指定的延迟时间之后运行。或者定期运行。这个方法返回的是ScheduledThreadPoolExecutor。这个类是ThreadPoolExecutor的子类。在原有线程池的的基础之上,增加了延迟和定时功能。我们在后面分析了ThreadPoolExecutor源码之后,再来分析这个类的源码。 与之类似的方法:

/**
 * Creates a thread pool that can schedule commands to run after a
 * given delay, or to execute periodically.
 * @param corePoolSize the number of threads to keep in the pool,
 * even if they are idle
 * @param threadFactory the factory to use when the executor
 * creates a new thread
 * @return a newly created scheduled thread pool
 * @throws IllegalArgumentException if {@code corePoolSize < 0}
 * @throws NullPointerException if threadFactory is null
 */
public static ScheduledExecutorService newScheduledThreadPool(
        int corePoolSize, ThreadFactory threadFactory) {
    return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}

通过这个方法,我们可以指定threadFactory。自定义线程创建的方式。 同样,我们还可以只指定一个线程:

public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
    return new DelegatedScheduledExecutorService
        (new ScheduledThreadPoolExecutor(1));
}

public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
    return new DelegatedScheduledExecutorService
        (new ScheduledThreadPoolExecutor(1, threadFactory));
}

上述两个方法都可以实现这个功能,但是需要注意的是,这两个方法的返回在外层包裹了一个包装类。

3.5 newWorkStealingPool

这种方式是在jdk1.8之后新增的。我们先来看看其源码:

/**
 * Creates a thread pool that maintains enough threads to support
 * the given parallelism level, and may use multiple queues to
 * reduce contention. The parallelism level corresponds to the
 * maximum number of threads actively engaged in, or available to
 * engage in, task processing. The actual number of threads may
 * grow and shrink dynamically. A work-stealing pool makes no
 * guarantees about the order in which submitted tasks are
 * executed.
 *
 * @param parallelism the targeted parallelism level
 * @return the newly created thread pool
 * @throws IllegalArgumentException if {@code parallelism <= 0}
 * @since 1.8
 */
public static ExecutorService newWorkStealingPool(int parallelism) {
    return new ForkJoinPool
        (parallelism,
         ForkJoinPool.defaultForkJoinWorkerThreadFactory,
         null, true);
}

这个方法实际上返回的是ForkJoinPool。该方法创建了一个线程池,该线程池维护足够的线程以支持并行的级别,并且可以使用多个队列来减少竞争。并行级别用于主动参数或者可用于任务处理的最大线程数。实际线程数可以动态的增长和收缩。不过需要注意的是,工作抢占线程池不能保证提交任务的执行顺序。 newWorkStealingPool最大不同在于,创建的任务队列是多个而不是一个,这样就不存在需要队列中前面的任务全部被执行之后才能轮到后面的任务。可以通过多个队列并行执行。这个并行度即是parallelism指定的大小。 在StealingPool中,根据并行度决定了后台有多少个线程来处理。每个线程分别有一个工作队列。那么假定一个线程的任务全部执行完毕,工作队列为空的话,那么他就会去别的线程队列去拿任务来执行。这就是所谓的工作窃取。实际上就是一种抢占的方式。 同样,也可以不对并行度进行指定:

/**
 * Creates a work-stealing thread pool using all
 * {@link Runtime#availableProcessors available processors}
 * as its target parallelism level.
 * @return the newly created thread pool
 * @see #newWorkStealingPool(int)
 * @since 1.8
 */
public static ExecutorService newWorkStealingPool() {
    return new ForkJoinPool
        (Runtime.getRuntime().availableProcessors(),
         ForkJoinPool.defaultForkJoinWorkerThreadFactory,
         null, true);
}

那么将使用Runtime.getRuntime().availableProcessors()做为该线程池运行的并行度。

4.线程池的使用方式

在java种,线程池提供了两种方式来进行调用。分别是通过submit来提交任务。只是一种方式提交的是Runnable普通线程,而且不能返回执行结果。而另外一种方式则是提交Callable任务,返回Future对象。通过Future对象能获取到执行的结果。

public class ExecutorTest {

	public static void main(String[] args) throws Exception{
		ExecutorService executor = Executors.newFixedThreadPool(10);
		executor.submit(new RunTest());
		Future<Boolean>  future = executor.submit(new CallableTest("1"));
		System.out.println(future.get());
		


	}


	static class CallableTest implements Callable<Boolean> {

		private String param;

		public CallableTest(String param) {
			this.param = param;
		}

		@Override
		public Boolean call() throws Exception {
			if(param.equals("1")) {
				return true;
			}else {
				return false;
			}
		}
	}

	static class RunTest implements Runnable {
		@Override
		public void run() {
			System.out.println("RunTest run ...");
		}
	}
}

上述代码的执行结果:

RunTest run ...
true

这分别是这两种线程提交方式在线程池中的执行结果。 如果我们需要直接得到线程的执行结果而不是通过共享内存的方式来实现的话,我们就最好使用第二种,Future。但是需要注意的是,Future本身虽然看起来实现了异步,实际上还是个同步方法,因为其执行结果需要通过在Future的isDone()方法轮询,或者调用get方法,而get本身就是个阻塞的方法。 因此对于这种情况,java1.8中引入了新的异步接口CompletableFuture。对于CompletableFuture 这是后面需要分析的重点。

5.总结

本文介绍了java中线程池的创建方式,以及线程池工厂方法类Executors的五种线程池。需要注意的是,这5种线程池在阿里规范中有部分已经不推荐。 可以参考本文:《关于阿里规范禁止使用Executors创建线程池的分析》。但是需要注意的是,我们需要知道这五种线程池的基本原理,以及为什么不推荐的原因。这也是面试过程中高频的面试内容。后面将对部分重要的类的源码进行详细分析。以彻底理解线程池的基本原理。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • A Java Fork/Join Framework(Doug Lea 关于java Fork/Join框架的论文翻译)

    Doug Lea State University of New York at Oswego Oswego NY 13126 315−341−2688...

    冬天里的懒猫
  • 多线程基础(一): 线程概念及生命周期

    什么是进程,相信大家都知道什么是进程却很难解释清楚。百科中的解释是:进程(Process)是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调...

    冬天里的懒猫
  • java线程池(三):ThreadPoolExecutor源码分析

    在前面分析了Executors工厂方法类之后,我们来看看AbstractExecutorService的最主要的一种实现类,ThreadpoolExecutor...

    冬天里的懒猫
  • FutureTask——另一种闭锁的实现

    关于多线程,我们接触对多的,最基础,入门的可能就是实现Runnable接口继承Thead类,因为Java单继承的原因,通常建议是实现Runnable接口。但这种...

    用户1148394
  • Java线程池详解

    构造一个线程池为什么需要几个参数?如果避免线程池出现OOM?Runnable和Callable的区别是什么?本文将对这些问题一一解答,同时还将给出使用线程池的常...

    java架构师
  • 大白话java多线程,高手勿入

    我们看到的这些单独运行的程序就是一个独立的进程,进程之间是相互独立存在的。我们上面图中的360浏览器、百度云盘等等都是独立的进程。

    java金融
  • 大白话Java多线程,小白都能看的懂的哦

    我们看到的这些单独运行的程序就是一个独立的进程,进程之间是相互独立存在的。我们上面图中的360浏览器、百度云盘、云数据库mysql等等都是独立的进程。

    java金融
  • 搞懂线程池(一)

    创建线程是一个很代价很高的操作,每个异步操作创建线程都会对 CPU 产生显著的性能影响。为了解决这个问题我们引入了线程池的概念,所谓的线程池就是我们提前分配一定...

    喵叔
  • 线程池的基本使用

    笑凡尘
  • 线程池

    线程池的作用: 线程池作用就是限制系统中执行线程的数量。      根据系统的环境情况,可以自动或手动设置线程数量,达到运行的最佳效果;少了浪费了系统资源,...

    汤高

扫码关注云+社区

领取腾讯云代金券