受限于硬件、内存和性能,我们不可能无限制的创建任意数量的线程,因为每一台机器允许的最大线程是一个有界值。线程池就是用这些有限个数的线程,去执行提交的任务。然而对于多用户、高并发的应用来说,提交的任务数量非常巨大,一定会比允许的最大线程数多很多。为了解决这个问题,必须要引入排队机制,或者是在内存中,或者是在硬盘等容量很大的存储介质中。Java提供的ThreadPoolExecutor只支持任务在内存中排队,通过BlockingQueue暂存还没有来得及执行的任务。
任务的管理是一件比较容易的事,复杂的是线程的管理,这会涉及线程数量、等待/唤醒、同步/锁、线程创建和死亡等问题。ThreadPoolExecutor与线程相关的几个成员变量是:keepAliveTime、allowCoreThreadTimeOut、poolSize、corePoolSize、maximumPoolSize,它们共同负责线程的创建和销毁。 常用构造器
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue);
几个重要参数:
执行机制
执行机制
看下allowCoreThreadTimeOut和keepAliveTime属性的含义。在压力很大的情况下,线程池中的所有线程都在处理新提交的任务或者是在排队的任务,这个时候线程池处在忙碌状态。如果压力很小,那么可能很多线程池都处在空闲状态,这个时候为了节省系统资源,回收这些没有用的空闲线程,就必须提供一些超时机制,这也是线程池大小调节策略的一部分。通过corePoolSize和maximumPoolSize,控制如何新增线程;通过allowCoreThreadTimeOut和keepAliveTime,控制如何销毁线程。
设置核心、最大线程数
threadPool.setCorePoolSize(10);
threadPool.setMaximumPoolSize(10);
设置核心线程数可空闲超时退出
threadPool.allowCoreThreadTimeOut(true);
设置拒绝策略
threadPool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
/**
* 线程池里活跃的大致线程数量
*/
long activeCount = threadPool.getActiveCount();
/**
* 返回已执行的任务的大致总数。
*/
long taskCount = threadPool.getTaskCount();
/**
* 返回线程池在运行过程中已完成的大致任务数
*/
long completedTaskCount = threadPool.getCompletedTaskCount();
/**
* 返回线程池里的线程数量
*/
long poolSize = threadPool.getPoolSize();
/**
* 返回曾经创建过的最大线程数
*/
long largestPoolSize = threadPool.getLargestPoolSize();
/**
* 返回当前最大配置线程数
*/
long maximumPoolSize = threadPool.getMaximumPoolSize();
这里我对largestPooSize的含义比较困惑,按字面理解是“最大的线程池数量”,但是按照线程池的定义,maximumPoolSize和coreSize相同的时候(我测试时前面设置的都是20,现在设置的都是10),一个线程池里的最大线程数是10,那么为什么有的时候largestPooSize可以是20呢?我去翻这块的源码注释:
/**
* Returns the largest number of threads that have ever
* simultaneously been in the pool.
*
* @return the number of threads
*/
public int getLargestPoolSize() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
return largestPoolSize;
} finally {
mainLock.unlock();
}
}
在addWorker方法中发现两点:
前面已经提到四种拒绝策略,下面进行详细解释和验证
1、默认策略AbortPolicy 丢弃任务,抛运行时异常
private static final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 2, 5, SECONDS, new ArrayBlockingQueue<>(2));
private static void task() {
try {
sleep(1000);
System.out.println(Thread.currentThread().getName() + ":queue size:" + threadPool.getQueue().size() + ", active:" + threadPool.getActiveCount());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Test
public void test_aborot_policy() {
for (int i = 0; i < 10; i++) {
threadPool.submit(TestThread::task);
}
}
运行结果:
java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@6833ce2c rejected from java.util.concurrent.ThreadPoolExecutor@725bef66[Running, pool size = 2, active threads = 2, queued tasks = 2, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
2、CallerRunsPolicy 用主线程去执行任务
@Test
public void test_CallerRunsPolicy() throws InterruptedException {
threadPool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
for (int i = 0; i < 10; i++) {
threadPool.submit(TestThread::task);
}
Thread.sleep(1000000);//用于阻塞住主线程,否则主线程退出无法看到打印
}
结果:
pool-2-thread-1:queue size:2, active:2
pool-2-thread-2:queue size:2, active:2
main:queue size:2, active:2
pool-2-thread-1:queue size:2, active:2
pool-2-thread-2:queue size:2, active:2
main:queue size:0, active:2
pool-2-thread-1:queue size:2, active:2
pool-2-thread-2:queue size:2, active:2
pool-2-thread-1:queue size:0, active:2
pool-2-thread-2:queue size:0, active:2
可以看到主线程去执行了部分任务,同时主线程此时无法进行调度线程,直到主线程执行完毕再开始线程调度执行,activeCount不准是因为线程状态瞬息万变,activeCount只是对当前活跃线程数的大体估计。
3、 DiscardPolicy 忽视,直接丢弃,什么都不会发生
@Test
public void test_DiscardPolicy() throws InterruptedException {
threadPool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
for (int i = 0; i < 20; i++) {
threadPool.submit(TestThread::task);
}
Thread.sleep(1000000);//用于阻塞住主线程,否则主线程退出无法看到打印
}
结果;
pool-2-thread-1:queue size:2, active:2
pool-2-thread-2:queue size:2, active:2
pool-2-thread-1:queue size:0, active:2
pool-2-thread-2:queue size:0, active:2
只有线程和队列的执行了,其他都被丢弃了,也不会抛出异常
4、 DiscardOldestPolicy 从队列中踢出最先进入队列(最后一个执行)的任务
private static void task2(int cnout) {
try {
sleep(1000);
System.out.println(cnout);
System.out.println(Thread.currentThread().getName() + ":queue size:" + threadPool.getQueue().size() + ", active:" + threadPool.getActiveCount());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Test
public void test_DiscardOldestPolicy() throws InterruptedException {
threadPool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
for (int i = 0; i < 20; i++) {
int finalI = i;
threadPool.submit(()->task2(finalI));
}
Thread.sleep(1000000);//用于阻塞住主线程,否则主线程退出无法看到打印
}
结果:
0
3
pool-2-thread-1:queue size:2, active:2
pool-2-thread-2:queue size:2, active:2
18
19
pool-2-thread-1:queue size:0, active:2
pool-2-thread-2:queue size:0, active:2
可以看出除了最开始执行的两个线程和最后进入队列的两个线程,其他任务都被丢弃了
通过线程池创建的构造器可以看到还有一个参数:Executors.defaultThreadFactory()
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
这些工厂类都实现了一个接口,接口只有一个newThread方法就是用来生产线程的,子类需要实现这个方法来根据自己规则生产相应的线程。
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
DefaultThreadFactory就做了两件事,一是创建线程时把线程设为非守护线程,二是设置线程的名字为pool-2-thread-1格式。但是这种格式的线程名区分度不够,最好能根据线程池使用的功能来定义线程名,这时可以引入其他线程工厂,比如用guava的ThreadFactoryBuilder和Spring的CustomizableThreadFactory:
@Test
public void test_thread_factory() throws InterruptedException {
threadPool.setThreadFactory(new ThreadFactoryBuilder().setNameFormat("Clear Task-%d").build());
for (int i = 0; i < 4; i++) {
threadPool.submit(TestThread::task);
}
Thread.sleep(1000000);//用于阻塞住主线程,否则主线程退出无法看到打印
}
结果:
Clear Task-0:queue size:2, active:2
Clear Task-1:queue size:2, active:2
Clear Task-0:queue size:0, active:2
Clear Task-1:queue size:0, active:2
java通过Executors提供四种线程池,分别为:
但是在阿里巴巴Java开发手册中不允许使用Executors创建线程池。
原因是这几种方式创建线程池时都有可能出现OOM现象,要么使用阻塞队列但是都是使用的无界队列,队列默认值为Integer.MAX_VALUE,要么线程最大可创建数量为Integer.MAX_VALUE,这都会导致内存溢出现象
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
所以创建线程池还是最好使用传统的创建方式ThreadPoolExecutor。
程序运行中绝对不是线程数越多处理越快!!!
NCpu=Runtime.getRuntime().availableProcessors()
当然,这只是一个参考值,具体的设置还需要根据实际情况进行调整,比如可以先将线程池大小设置为参考值,再观察任务运行情况和系统负载、资源利用率来进行适当调整。