[JDK] ThreadPoolExecutor 线程池配置 和 阻塞队列BlockingQueue
ExecutorService 执行器服务,它使用可能的几个池线程之一执行每个提交的任务,通常使用
Executors
工厂方法配置 线程池可以解决两个不同问题
:由于减少了每个任务调用的开销,它们通常可以在执行大量异步任务时提供增强的性能,并且还可以提供绑定和管理资源(包括执行集合任务时使用的线程)的方法。每个ThreadPoolExecutor 还维护着一些基本的统计数据,如完成的任务数。 为了便于跨大量上下文使用
,此类提供了很多可调整的参数和扩展挂钩。但是,强烈建议程序员使用较为方便的 Executors 工厂方法 Executors.newCachedThreadPool()(无界线程池,可以进行自动线程回收)、Executors.newFixedThreadPool(int)(固定大小线程池)和 Executors.newSingleThreadExecutor()(单个后台线程),它们均为大多数使用场景预定义了设置。否则,在手动配置和调整此类时,使用以下指导:
ThreadPoolExecutor 将根据 corePoolSize
(参见 getCorePoolSize())和 maximumPoolSize
(参见getMaximumPoolSize())设置的边界自动调整池大小。当新任务在方法 execute(java.lang.Runnable) 中提交时,如果运行的线程少于 corePoolSize
,则创建新线程来处理请求,即使其他辅助线程是空闲的。如果运行的线程多于corePoolSize
而少于 maximumPoolSize,则仅当队列满时才创建新线程。如果设置的 corePoolSize
和 maximumPoolSize
相同,则创建了固定大小的线程池。如果将 maximumPoolSize
设置为基本的无界值(如 Integer.MAX_VALUE),则允许池适应任意数量的并发任务。在大多数情况下,核心和最大池大小仅基于构造来设置,不过也可以使用setCorePoolSize(int) 和 setMaximumPoolSize(int) 进行动态更改。
默认情况下,即使核心线程最初只是在新任务需要时才创建和启动的,也可以使用方法 prestartCoreThread()或 prestartAllCoreThreads() 对其进行动态重写。
使用 ThreadFactory
创建新线程。如果没有另外说明,则在同一个 ThreadGroup
中一律使用Executors.defaultThreadFactory() 创建线程,并且这些线程具有相同的 NORM_PRIORITY 优先级和非守护进程状态。通过提供不同的 ThreadFactory
,可以改变线程的名称、线程组、优先级、守护进程状态,等等。如果从 newThread 返回 null 时 ThreadFactory 未能创建线程,则执行程序将继续运行,但不能执行任何任务。
如果池中当前有多于 corePoolSize
的线程,则这些多出的线程在空闲时间超过 keepAliveTime
时将会终止(参见getKeepAliveTime(java.util.concurrent.TimeUnit))。这提供了当池处于非活动状态时减少资源消耗的方法。如果池后来变得更为活动,则可以创建新的线程。也可以使用方法 setKeepAliveTime(long, java.util.concurrent.TimeUnit) 动态地更改此参数。使用 Long.MAX_VALUE TimeUnit.NANOSECONDS 的值在关闭前有效地从以前的终止状态禁用空闲线程。
corePoolSize
,则 Executor 始终首选添加新的线程,而不进行排队。corePoolSize
,则 Executor 始终首选将请求加入队列,而不添加新的线程。maximumPoolSize
,在这种情况下,任务将被拒绝。Ex.3
ThreadPoolTask
& doThreadTest2
package com.example.concurrence.thread; import java.io.Serializable; /**
* <p>
*
* </p>
*
* @author xiachaoyang
* @version V1.0
* @date 2019年01月17日 14:09
* @modificationHistory=========================逻辑或功能性重大变更记录
* @modify By: {修改人} 2019年01月17日
* @modify reason: {方法名}:{原因}
* ...
*/
public class ThreadPoolTask implements Runnable, Serializable { private Object attachData; public ThreadPoolTask(Object tasks) { this.attachData = tasks;
} @Override
public void run() {
System.out.println("开始执行任务:" + attachData);
attachData = null;
} public Object getTask() { return this.attachData;
}
} //ConcurrenceServiceImpl @Override
public void doThreadTest2() { // 构造一个线程池
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 4, 3,
TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(3), new ThreadPoolExecutor.DiscardOldestPolicy()); int produceTaskSleepTime = 2; int produceTaskMaxNumber = 10; for (int i = 1; i <= produceTaskMaxNumber; i++) { try {
String task = "task@ " + i;
System.out.println("创建任务并提交到线程池中:" + task);
threadPool.execute(new ThreadPoolTask(task));
Thread.sleep(produceTaskSleepTime);
} catch (Exception e) {
e.printStackTrace();
}
} //创建任务并提交到线程池中:task@ 1
//开始执行任务:task@ 1
//创建任务并提交到线程池中:task@ 2
//开始执行任务:task@ 2
//创建任务并提交到线程池中:task@ 3
//开始执行任务:task@ 3
//创建任务并提交到线程池中:task@ 4
//...
//创建任务并提交到线程池中:task@ 9
//开始执行任务:task@ 9
//创建任务并提交到线程池中:task@ 10
//开始执行任务:task@ 10
}
ThreadPoolExecutor
ThreadPoolExcutor
为一些Executor
提供了基本的实现,这些Executor是由Executors中的工厂 newCahceThreadPool
、newFixedThreadPool
和newScheduledThreadExecutor
返回的。 ThreadPoolExecutor
是一个灵活的健壮的池实现,允许各种各样的用户定制。
newFixedThreadPool
工厂为请求的池设置了核心池的大小和最大池的大小,而且池永远不会超时。newCacheThreadPool
工厂将最大池的大小设置为Integer.MAX_VALUE
,核心池的大小设置为0,超时设置为一分钟。这样创建了无限扩大的线程池,会在需求量减少的情况下减少线程数量ThreadPoolExecutor
允许你提供一个BlockingQueue
来持有等待执行的任务。任务排队有3种基本方法:无限队列、有限队列和同步移交。newFixedThreadPool
和newSingleThreadExectuor
默认使用的是一个无限的LinkedBlockingQueue
。如果所有的工作者线程都处于忙碌状态,任务会在队列中等候。如果任务持续快速到达,超过了它们被执行的速度,队列也会无限制地增加。稳妥的策略是使用有限队列,比如ArrayBlockingQueue
或有限的LinkedBlockingQueue
以及PriorityBlockingQueue
。SynchronousQueue
,完全绕开队列,直接将任务由生产者交给工作者线程。PriorityBlockingQueue
通过优先级安排任务UML
UML
在ThreadPoolTaskExecutor
源码中我们看到了BlockingQueue<Runnable> queue = createQueue(this.queueCapacity);
这样一句话用来得到一个队列,这个队列是用来存放任务的。当线程池中有空闲线程时就回去任务队列中拿任务并处理。BlockingQueue是一个阻塞并线程安全的一个队列
多线程环境中,通过队列可以很容易实现数据共享,比如经典的“生产者”和“消费者”模型中,通过队列可以很便利地实现两者之间的数据共享。假设我们有若干 生产者线程,另外又有若干个消费者线程。如果生产者线程需要把准备好的数据共享给消费者线程,利用队列的方式来传递数据,就可以很方便地解决他们之间的数 据共享问题。但如果生产者和消费者在某个时间段内,万一发生数据处理速度不匹配的情况呢?理想情况下,如果生产者产出数据的速度大于消费者消费的速度,并 且当生产出来的数据累积到一定程度的时候,那么生产者必须暂停等待一下(阻塞生产者线程),以便等待消费者线程把累积的数据处理完毕,反之亦然。
BlockingQueue的核心方法:
offer(anObject)
:表示如果可能的话,将anObject加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则返回false.(本方法不阻塞当前执行方法的线程)offer(E o, long timeout, TimeUnit unit)
,可以设定等待的时间,如果在指定的时间内,还不能往队列中加入BlockingQueue,则返回失败。put(anObject):把anObject加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻断直到BlockingQueue里面有空间再继续
获取数据:
org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
/**
* Create the BlockingQueue to use for the ThreadPoolExecutor.
* <p>A LinkedBlockingQueue instance will be created for a positive
* capacity value; a SynchronousQueue else.
* @param queueCapacity the specified queue capacity
* @return the BlockingQueue instance
* @see java.util.concurrent.LinkedBlockingQueue
* @see java.util.concurrent.SynchronousQueue
*/protected BlockingQueue<Runnable> createQueue(int queueCapacity) { if (queueCapacity > 0) { return new LinkedBlockingQueue<>(queueCapacity);
} else { return new SynchronousQueue<>();
}
}
ThreadPoolTaskExecutor的代码可以发现,其主要是使用BlockingQueue
的一种实现LinkedBlockingQueue
进行实现。
LinkedBlockingQueue
LinkedBlockingQueue
是一个基于链表的阻塞队列,其内部也维持着一个数据缓冲队列(该队列由一个链表构成),当生产者往队列中放入一个数据时,队列会从生产者手中获取数据,并缓存在队列内部,而生产者立 即返回;只有当队列缓冲区达到最大值缓存容量时(LinkedBlockingQueue
可以通过构造函数指定该值),才会阻塞生产者队列,直到消费者从 队列中消费掉一份数据,生产者线程会被唤醒,反之对于消费者这端的处理也基于同样的原理。而LinkedBlockingQueue
之所以能够高效的处理 并发数据,还因为其对于生产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。
LinkedBlockingQueue
时有一个大小限制,其默认为Integer.MAX_VALUE
.LinkedBlockingQueue
不接受null
值,当添加null
的时候,会直接抛出NullPointerException
://java.util.concurrent.LinkedBlockingQueue/**
* Inserts the specified element at the tail of this queue, waiting if
* necessary for space to become available.
*
* @throws InterruptedException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); // Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count;
putLock.lockInterruptibly(); try { /*
* Note that count is used in wait guard even though it is
* not protected by lock. This works because count can
* only decrease at this point (all other puts are shut
* out by lock), and we (or some other waiting put) are
* signalled if it ever changes from capacity. Similarly
* for all other uses of count in other wait guards.
*/
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement(); if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
} if (c == 0)
signalNotEmpty();
}
队列的优点
在项目启动之初来预测将来项目会碰到什么需求,是极其困难的。消息队列在处理过程中间插入了一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口。这允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。
有时在处理数据的时候处理过程会失败。除非数据被持久化,否则将永远丢失。消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。在被许多消息队列所采用的”插入-获取-删除”范式中,在把一个消息从队列中删除之前,需要你的处理过程明确的指出该消息已经被处理完毕,确保你的数据被安全的保存直到你使用完毕。
因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的;只要另外增加处理过程即可。不需要改变代码、不需要调节参数。扩展就像调大电力按钮一样简单。
在访问量剧增的情况下,你的应用仍然需要继续发挥作用,但是这样的突发流量并不常见;如果为 以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住增长的访问压力,而不是因为超出负荷的请求而完全崩溃。
当体系的一部分组件失效,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。而这种允许重试或者延后处理请求的能力通常是造就一个略感不便的用户和一个沮丧透顶的用户之间的区别。
消息队列提供的冗余机制保证了消息能被实际的处理,只要一个进程读取了该队列即可。在此基础上,IronMQ提供了一个”只送达一次”保证。无论有多少进 程在从队列中领取数据,每一个消息只能被处理一次。这之所以成为可能,是因为获取一个消息只是”预定”了这个消息,暂时把它移出了队列。除非客户端明确的 表示已经处理完了这个消息,否则这个消息会被放回队列中去,在一段可配置的时间之后可再次被处理。
在许多情况下,数据处理的顺序都很重要。消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。
在任何重要的系统中,都会有需要不同的处理时间的元素。例如,加载一张图片比应用过滤器花费更少的时间。消息队列通过一个缓冲层来帮助任务最高效率的执行—写入队列的处理会尽可能的快速,而不受从队列读的预备处理的约束。该缓冲有助于控制和优化数据流经过系统的速度。
在一个分布式系统里,要得到一个关于用户操作会用多长时间及其原因的总体印象,是个巨大的挑战。消息系列通过消息被处理的频率,来方便的辅助确定那些表现不佳的处理过程或领域,这些地方的数据流都不够优化。
很多时候,你不想也不需要立即处理消息。消息队列提供了异步处理机制,允许你把一个消息放入队列,但并不立即处理它。你想向队列中放入多少消息就放多少,然后在你乐意的时候再去处理它们。