前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >关于禁止使用Executors创建线程池的分析

关于禁止使用Executors创建线程池的分析

作者头像
冬天里的懒猫
修改2020-08-05 11:21:59
1.3K0
修改2020-08-05 11:21:59
举报

1.规范

在java开发手测中,对Executors有一个专门的规约:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-lKKPHsVd-1596103123455)(EE1DC468E4A74F7AB21E05EF9D46AFB0)]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-lKKPHsVd-1596103123455)(EE1DC468E4A74F7AB21E05EF9D46AFB0)]

线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。 注意,这里的重点是 不允许。而不是不建议。可见该规范 背后都是血淋淋的生产事故。

2.Executors主要功能

打开Executors的官方文档,其描述为:

Factory and utility methods for Executor, ExecutorService, ScheduledExecutorService, ThreadFactory, and Callable classes defined in this package. This class supports the following kinds of methods:
Methods that create and return an ExecutorService set up with commonly useful configuration settings.
Methods that create and return a ScheduledExecutorService set up with commonly useful configuration settings.
Methods that create and return a "wrapped" ExecutorService, that disables reconfiguration by making implementation-specific methods inaccessible.
Methods that create and return a ThreadFactory that sets newly created threads to a known state.
Methods that create and return a Callable out of other closure-like forms, so they can be used in execution methods requiring Callable.

Executors主要是为 Executor, ExecutorService, ScheduledExecutorService, ThreadFactory, 和 Callable这些类提供的创建工厂方法。这个类主要提供如下几种方法:

  • 创建并返回一个具有通用配置的ExecutorService。
  • 创建并返回一个具有通用配置的ScheduledExecutorService 。
  • 创建并返回一个包装的ExecutorService,不能重新对其参数进行配置。
  • 创建并返回一个ThreadFactory,设置创建线程为指定状态。
  • 创建并返回一个闭包形式的callable,以便在执行方法中执行其所需形式的callable。

其常用方法有newFixedThreadPool、newSingleThreadExecutor、newCachedThreadPool等。

2.1 newFixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads)
Creates a thread pool that reuses a fixed number of threads operating off a shared unbounded queue. At any point, at most nThreads threads will be active processing tasks. If additional tasks are submitted when all threads are active, they will wait in the queue until a thread is available. If any thread terminates due to a failure during execution prior to shutdown, a new one will take its place if needed to execute subsequent tasks. The threads in the pool will exist until it is explicitly shutdown.

Parameters:

nThreads - the number of threads in the pool

Returns:

the newly created thread pool

Throws:

IllegalArgumentException - if nThreads <= 0

创建一个线程池,该线程池重用固定数量的线程在一个共享的无界队列上操作,在任何时候,大多数线程都在活动处理任务。如果在所有的线程都处于活动状态时提交了其他的任务,则他们在队列中等待,直到有一个线程可用为止,如果任何线程在关闭之前的执行过程由于失败而终止,那么在需要执行后续任务时,将有一个新的线程替代它。池中的线程将一直存在,直到池显示关闭。 还有另外一个方法:

public static ExecutorService newFixedThreadPool(int nThreads,
                                                 ThreadFactory threadFactory)

与之类似,只是自行指定了ThreadFactory。

2.2 newSingleThreadExecutor

public static ExecutorService newSingleThreadExecutor()
Creates an Executor that uses a single worker thread operating off an unbounded queue. (Note however that if this single thread terminates due to a failure during execution prior to shutdown, a new one will take its place if needed to execute subsequent tasks.) Tasks are guaranteed to execute sequentially, and no more than one task will be active at any given time. Unlike the otherwise equivalent newFixedThreadPool(1) the returned executor is guaranteed not to be reconfigurable to use additional threads.

Returns:

the newly created single-threaded Executor

创建一个Executor,该Executor使用单个工作队列线程操作一个无界队列。但是请注意,如果这个线程在关闭之前由于执行失败而终止,那么在需要之系的后续任务的时候,一个新的线程将取代它。)任务包装安顺序之系,并且在任何时间内活动的任务不超过一个。与newFixedThreadPool(1)不同,返回的Executor保证不会重新配置以使用其他的线程。 与之类似的还有:

public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory)

2.3 newCachedThreadPool

public static ExecutorService newCachedThreadPool()
Creates a thread pool that creates new threads as needed, but will reuse previously constructed threads when they are available. These pools will typically improve the performance of programs that execute many short-lived asynchronous tasks. Calls to execute will reuse previously constructed threads if available. If no existing thread is available, a new thread will be created and added to the pool. Threads that have not been used for sixty seconds are terminated and removed from the cache. Thus, a pool that remains idle for long enough will not consume any resources. Note that pools with similar properties but different details (for example, timeout parameters) may be created using ThreadPoolExecutor constructors.

Returns:

the newly created thread pool

创建一个线程池,根据需要创建的新线程,但是在可用时可以重用之前的构造线程,这些pool通常会提高之系许多短期异步任务的程序的性能。如果可用,对execute的调用将重用之前构造的线程。如果没有可用的现有线程,将创建一个新线程并添加到pool中。未使用超过60s的线程将被终止之后删除。因此,一个足够长时间保持空闲的pool将不会消耗任何资源。注意,可以使用ThreadPoolExecccutor构造函数创建具有类似属性但细节不同的pool。 与此类似的还有:

public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory)

2.4 newSingleThreadScheduledExecutor

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
Creates a thread pool that can schedule commands to run after a given delay, or to execute periodically.

Parameters:

corePoolSize - the number of threads to keep in the pool, even if they are idle

Returns:

a newly created scheduled thread pool

Throws:

IllegalArgumentException - if corePoolSize < 0

创建一个线程池,该线程池可以在指定的时间延期执行或者定期执行。 与之类似的还有:

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize,                                                ThreadFactory threadFactory)
                                                              
public static ScheduledExecutorService newSingleThreadScheduledExecutor()  

public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory)

3.OOM测试

3.1 FixedThreadPool

有如下代码:

//-Xmx10m -Xms10m
public static void main(String[] args) throws InterruptedException{
		ThreadPoolExecutor threadPool = (ThreadPoolExecutor)Executors.newFixedThreadPool(1);

		IntStream.range(0,10000000).forEach((i)->{
				threadPool.execute(() -> {
					byte[] array = new byte[1024*1024*1];
					try {
						TimeUnit.HOURS.sleep(1);
						int length  = array.length;
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				});
		});

		threadPool.shutdown();
		threadPool.awaitTermination(1,TimeUnit.HOURS);
	}

执行结果:

Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit exceeded
	at java.util.concurrent.LinkedBlockingQueue.offer(LinkedBlockingQueue.java:416)
	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1371)
	at com.dhb.executors.test.FixedThreadPoolOOM.lambda$main$1(FixedThreadPoolOOM.java:18)
	at com.dhb.executors.test.FixedThreadPoolOOM$$Lambda$1/1078694789.accept(Unknown Source)
	at java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:110)
	at java.util.stream.IntPipeline$Head.forEach(IntPipeline.java:559)
	at com.dhb.executors.test.FixedThreadPoolOOM.main(FixedThreadPoolOOM.java:17)

我们可以看到,对于FixedThreadPool,固定线程,但是添加的task将被放到队列里面,一段时间之后就会出现OOM异常。

3.1 SingleThread

与之前代码一样,我们进行修改:

//-Xmx10m -Xms10m
public static void main(String[] args) throws InterruptedException{
		ExecutorService threadPool =  Executors.newSingleThreadExecutor();

		IntStream.range(0,10000000).forEach((i)->{
			threadPool.execute(() -> {
				byte[] array = new byte[1024*1024*1];
				try {
					TimeUnit.HOURS.sleep(1);
					int length  = array.length;
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			});
		});

		threadPool.shutdown();
		threadPool.awaitTermination(1,TimeUnit.HOURS);
	}

同样会出现OOM。

Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit exceeded
	at java.util.concurrent.LinkedBlockingQueue.offer(LinkedBlockingQueue.java:416)
	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1371)
	at java.util.concurrent.Executors$DelegatedExecutorService.execute(Executors.java:668)
	at com.dhb.executors.test.SingleThreadOOM.lambda$main$1(SingleThreadOOM.java:15)
	at com.dhb.executors.test.SingleThreadOOM$$Lambda$1/1078694789.accept(Unknown Source)
	at java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:110)
	at java.util.stream.IntPipeline$Head.forEach(IntPipeline.java:559)
	at com.dhb.executors.test.SingleThreadOOM.main(SingleThreadOOM.java:14)

3.3 CachedThreadPool

将线程池修改为cachedthreadPool

//-Xms10m -Xmx10m
public static void main(String[] args) throws InterruptedException{
		ExecutorService threadPool =  Executors.newCachedThreadPool();

		IntStream.range(0,10000000).forEach((i)->{
			threadPool.execute(() -> {
				byte[] array = new byte[1024*1024*1];
				try {
					TimeUnit.HOURS.sleep(1);
					int length  = array.length;
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			});
		});

		threadPool.shutdown();
		threadPool.awaitTermination(1,TimeUnit.HOURS);
	}

执行之后同样OOM:

java.lang.OutOfMemoryError: Java heap space
java.lang.OutOfMemoryError: Java heap space
java.lang.OutOfMemoryError: Java heap space
java.lang.OutOfMemoryError: Java heap space
	at com.dhb.executors.test.CachedThreadPoolOOM.lambda$null$0(CachedThreadPoolOOM.java:16)
	at com.dhb.executors.test.CachedThreadPoolOOM$$Lambda$2/1149319664.run(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
java.lang.OutOfMemoryError: Java heap space

3.4 ScheduledThreadPool

public static void main(String[] args) {
		ExecutorService threadPool =  Executors.newScheduledThreadPool(1);

		try {
			IntStream.range(0,10000000).forEach((i)->{
				threadPool.execute(() -> {
					byte[] array = new byte[1024*1024*1];
					try {
						TimeUnit.HOURS.sleep(1);
						int length  = array.length;
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				});
			});

			threadPool.shutdown();
			threadPool.awaitTermination(1,TimeUnit.HOURS);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}

同样也会出现OOM

Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit exceeded
	at java.util.concurrent.Executors.callable(Executors.java:407)
	at java.util.concurrent.FutureTask.<init>(FutureTask.java:152)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.<init>(ScheduledThreadPoolExecutor.java:209)
	at java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:532)
	at java.util.concurrent.ScheduledThreadPoolExecutor.execute(ScheduledThreadPoolExecutor.java:622)
	at com.dhb.executors.test.ScheduledThreadPoolOOM.lambda$main$1(ScheduledThreadPoolOOM.java:15)

4.源码分析

对于这些threadPool,在执行的过程中都不约而同的出现了OOM异常。我们可以看看这些方法的源代码:

4.1 newFixedThreadPool

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

4.2 newSingleThreadExecutor

 public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

4.3 newCachedThreadPool

  public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

4.4 newScheduledThreadPool

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
    
       public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }

5.总结

通过源码可以看到,这四个线程池最底层都是用ThreadPoolExecutor的构造方法。newFixedThreadPool和newSingleThreadExecutor使用的是无界队列LinkedBlockingQueue。而LinkedBlockingQueue在没有指定长度的情况下,默认的队列长度为:

 public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }

也就是Integer.MAX_VALUE就是这两个方法的队列长度。而方法newCachedThreadPool和ScheduledExecutorService虽然没有使用LinkedBlockingQueue,但是其线程池的最大线程数是Integer.MAX_VALUE。面对队列中的数据,这是两类处理策略,前者是通过加大队列的缓冲数据的长度来实现,而后者则是让可用的最大线程数没有上限。这两种办法都不是一个很好解决问题的办法,在资源有限的情况下,都有可能导致OOM。

5.1 创建线程池的正确方式

jdk规范让我们避免使用Executors的默认方法创建线程池。那么我们可以使用手动的方法来创建,手动指定线程数量和队列的长度:

private static ExecutorService executor = new ThreadPoolExecutor(10, 10,
60L, TimeUnit.SECONDS,
new ArrayBlockingQueue(10));

这种情况下,一旦提交的线程超过了当前的可用线程时,就会触发拒绝策略,抛出java.util.concurrent.RejectedExecutionException,我们可以捕获异常之后来进行相应的处理。另外,我们还可以使用guava来创建队列:

private static ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("test-pool-%d").build();
private static ExecutorService pool = new ThreadPoolExecutor(5, 20,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(100), namedThreadFactory, new
ThreadPoolExecutor.
AbortPolicy());

通过guava的ThreadFactoryBuilder就能很好的创建一个线程池。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2020-07-30 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1.规范
  • 2.Executors主要功能
    • 2.1 newFixedThreadPool
      • 2.2 newSingleThreadExecutor
        • 2.3 newCachedThreadPool
          • 2.4 newSingleThreadScheduledExecutor
          • 3.OOM测试
            • 3.1 FixedThreadPool
              • 3.1 SingleThread
                • 3.3 CachedThreadPool
                  • 3.4 ScheduledThreadPool
                  • 4.源码分析
                    • 4.1 newFixedThreadPool
                      • 4.2 newSingleThreadExecutor
                        • 4.3 newCachedThreadPool
                          • 4.4 newScheduledThreadPool
                          • 5.总结
                            • 5.1 创建线程池的正确方式
                            领券
                            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档