一、概述
在执行一个异步任务或并发任务时,往往是通过直接new Thread()
方法来创建新的线程,这样做弊端较多,更好的解决方案是合理地利用线程池,线程池的优势很明显,如下:
Java API针对不同需求,利用Executors
类提供了4种不同的线程池:newCachedThreadPool, newFixedThreadPool, newScheduledThreadPool, newSingleThreadExecutor,接下来讲讲线程池的用法。
创建一个可缓存的无界线程池,该方法无参数。当线程池中的线程空闲时间超过60s则会自动回收该线程,当任务超过线程池的线程数则创建新线程。线程池的大小上限为Integer.MAX_VALUE,可看做是无限大。
public void cachedThreadPoolDemo(){
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
final int index = i;
cachedThreadPool.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+", index="+index);
}
});
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
运行结果:
pool-1-thread-1, index=0
pool-1-thread-1, index=1
pool-1-thread-1, index=2
pool-1-thread-1, index=3
pool-1-thread-1, index=4
从运行结果可以看出,整个过程都在同一个线程pool-1-thread-1
中运行,后面线程复用前面的线程。
创建一个固定大小的线程池,该方法可指定线程池的固定大小,对于超出的线程会在LinkedBlockingQueue
队列中等待。
public void fixedThreadPoolDemo(){
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
for (int i = 0; i < 6; i++) {
final int index = i;
fixedThreadPool.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+", index="+index);
}
});
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
运行结果:
pool-1-thread-1, index=0
pool-1-thread-2, index=1
pool-1-thread-3, index=2
pool-1-thread-1, index=3
pool-1-thread-2, index=4
pool-1-thread-3, index=5
从运行结果可以看出,线程池大小为3,每休眠1s后将任务提交给线程池的各个线程轮番交错地执行。线程池的大小设置,可参考Runtime.getRuntime().availableProcessors()。
创建一个只有线程的线程池,该方法无参数,所有任务都保存队列LinkedBlockingQueue中,等待唯一的单线程来执行任务,并保证所有任务按照指定顺序(FIFO或优先级)执行。
public void singleThreadExecutorDemo(){
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
for (int i = 0; i < 3; i++) {
final int index = i;
singleThreadExecutor.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+", index="+index);
}
});
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
运行结果:
pool-1-thread-1, index=0
pool-1-thread-1, index=1
pool-1-thread-1, index=2
从运行结果可以看出,所有任务都是在单一线程运行的。
创建一个可定时执行或周期执行任务的线程池,该方法可指定线程池的核心线程个数。
public void scheduledThreadPoolDemo(){
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(3);
//定时执行一次的任务,延迟1s后执行
scheduledThreadPool.schedule(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+", delay 1s");
}
}, 1, TimeUnit.SECONDS);
//周期性地执行任务,延迟2s后,每3s一次地周期性执行任务
scheduledThreadPool.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+", every 3s");
}
}, 2, 3, TimeUnit.SECONDS);
}
运行结果:
pool-1-thread-1, delay 1s
pool-1-thread-1, every 3s
pool-1-thread-2, every 3s
pool-1-thread-2, every 3s
...
ScheduledExecutorService功能强大,对于定时执行的任务,建议多采用该方法。
上述4个方法的参数对比,如下:
其他参数都相同,其中线程工厂的默认类为DefaultThreadFactory,线程饱和的默认策略为ThreadPoolExecutor.AbortPolicy。
Executors类提供4个静态工厂方法:newCachedThreadPool()、newFixedThreadPool(int)、newSingleThreadExecutor和newScheduledThreadPool(int)。这些方法最终都是通过ThreadPoolExecutor类来完成的,这里强烈建议大家直接使用Executors类提供的便捷的工厂方法,能完成绝大多数的用户场景,当需要更细节地调整配置,需要先了解每一项参数的意义。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
创建线程池,在构造一个新的线程池时,必须满足下面的条件:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
参数说明:
pool-m-thread-n
(m为线程池的编号,n为线程池内的线程编号);排队有三种通用策略:
BlockingQueue的插入/移除/检查这些方法,对于不能立即满足但可能在将来某一时刻可以满足的操作,共有4种不同的处理方式:第一种是抛出一个异常,第二种是返回一个特殊值(null 或 false,具体取决于操作),第三种是在操作可以成功前,无限期地阻塞当前线程,第四种是在放弃前只在给定的最大时间限制内阻塞。如下表格:
操作 | 抛出异常 | 特殊值 | 阻塞 | 超时 |
---|---|---|---|---|
插入 | add(e) | offer(e) | put(e) | offer(e, time, unit) |
移除 | remove() | poll() | take() | poll(time, unit) |
检查 | element() | peek() | 不可用 | 不可用 |
实现BlockingQueue接口的常见类如下:
调用线程池的shutdown()或shutdownNow()方法来关闭线程池
中断采用interrupt方法,所以无法响应中断的任务可能永远无法终止。但调用上述的两个关闭之一,isShutdown()方法返回值为true,当所有任务都已关闭,表示线程池关闭完成,则isTerminated()方法返回值为true。当需要立刻中断所有的线程,不一定需要执行完任务,可直接调用shutdownNow()方法。
需要针对具体情况而具体处理,不同的任务类别应采用不同规模的线程池,任务类别可划分为CPU密集型任务、IO密集型任务和混合型任务。
利用线程池提供的参数进行监控,参数如下:
通过扩展线程池进行监控:继承线程池并重写线程池的beforeExecute(),afterExecute()和terminated()方法,可以在任务执行前、后和线程池关闭前自定义行为。如监控任务的平均执行时间,最大执行时间和最小执行时间等。