今天我们来聊一聊Java中的线程池,首先来看看什么是线程池。
线程池就是以一个或多个线程(循环执行)多个应用逻辑的线程集合.
为了避免系统频繁地创建和销毁线程,我们可以让创建的线程进行复用。其实和数据库连接池是一样的道理,为了避免每次数据库查询都重新建立和销毁数据库连接,我们可以使用数据库连接池维护一些数据库连接,让他们长期保持一个激活状态。当系统需要使用数据库时,并不是创建一个新的连接,而是从连接池中获得一个可用的连接。
线程池中总有那么几个活跃线程,当你需要使用线程时,可以从池子中拿一个空闲线程,当完成工作时,并不急着关闭线程,而是将这个线程回收入池,等待下一个任务的执行。
在JDK中提供了一套Executor框架,可以方便开发者很好的控制线程。
JDK中提供了五类线程池可供使用,其中newWorkStealingPool是1.8之后出来的,其他是之前就有的。
提供一个示例:
package thread;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPool {
public static class MyTask implements Runnable{
@Override
public void run() {
System.out.println(System.currentTimeMillis()+":Thread ID:"+Thread.currentThread().getId());
try{
Thread.sleep(500);
}catch (InterruptedException e){
e.printStackTrace();
}
}
}
public static void main(String[] args) {
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
for (int i=0;i<10;i++){
fixedThreadPool.submit(new MyTask());
}
fixedThreadPool.shutdown();
}
}
除了ForkJoin外的其他几类线程池的核心,其实都是由一个ThreadPoolExecutor类实现的,他们的源码内部其实都调用了这个类。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newWorkStealingPool(int parallelism) {
return new ForkJoinPool
(parallelism,
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
我们知道那四类线程池实际上只是知道了一个外壳,这明显还不够,我们还需要知道ThreadPoolExecutor这个类,首先来看看类的构造函数的定义:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
参数含义如下:
workQueue参数是一个BlockingQueue对象,仅用于存放Runnable对象。根据使用场景不同一般可有四种任务队列。
在ThreadPoolExecutor的构造函数最后一个参数指定了拒绝策略。当任务数量超过系统实际承载能力时,就会使用拒绝策略。在JDK中内置了四种拒绝策略。
当然也可以自定义拒绝策略,可以实现RejectExecuntionHandler接口,该接口定义如下:
public interface ThreadFactory {
/**
* Constructs a new {@code Thread}. Implementations may also initialize
* priority, name, daemon status, {@code ThreadGroup}, etc.
*
* @param r a runnable to be executed by new thread instance
* @return constructed thread, or {@code null} if the request to
* create a thread is rejected
*/
Thread newThread(Runnable r);
}
我相信大家在学习这一部分的时候会有一个和我一样的疑惑:线程池的线程是从哪里来的?
其实从ThreadPoolExecutor的构造函数中不难发现有一个ThreadFactory类的参数,ThreadFactory是一个接口
public interface ThreadFactory {
/**
* Constructs a new {@code Thread}. Implementations may also initialize
* priority, name, daemon status, {@code ThreadGroup}, etc.
*
* @param r a runnable to be executed by new thread instance
* @return constructed thread, or {@code null} if the request to
* create a thread is rejected
*/
Thread newThread(Runnable r);
}
当线程创建时就是用的这个这个方法去创建线程的,使用这一个方法我们可以跟踪线程池中的所有被创建的线程,以及定义其名称、优先级、组等。下面给一个简单的自定义线程示例:
package thread;
import java.util.concurrent.*;
public class ThreadPool {
public static class MyTask implements Runnable{
@Override
public void run() {
System.out.println(System.currentTimeMillis()+":Thread ID:"+Thread.currentThread().getId());
try{
Thread.sleep(500);
}catch (InterruptedException e){
e.printStackTrace();
}
}
}
public static void main(String[] args) {
MyTask myTask = new MyTask();
ExecutorService es = new ThreadPoolExecutor(10, 10, 0L,
TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
System.out.println("create Thread:"+t);
return t;
}
});
for (int i=0;i<10;i++){
es.submit(myTask);
}
}
}
在实际使用线程池中我们很容易遇到一些幽灵错误,没有得到理想的结果而控制台又没有任何错误信息,甚至包括一些异常都不会抛出,感觉像是异常被线程池吞并了一样。比如下面这段代码
package thread;
import java.util.concurrent.*;
public class ThreadPool {
public static class MyTask implements Runnable{
int a;
int b;
public MyTask(int a, int b) {
this.a = a;
this.b = b;
}
@Override
public void run() {
double c = a/b;
System.out.println(c);
}
}
public static void main(String[] args) throws InterruptedException{
ExecutorService es = Executors.newCachedThreadPool() ;
for (int i=0;i<5;i++){
es.submit(new MyTask(100,i));
}
Thread.sleep(1000);
es.shutdown();
}
}
控制台输出如下:
50.0
100.0
33.0
25.0
很明显第一个除0操作会抛出一个异常,但是并没有在控制台打印出。其实有一个很好的解决方法就是把submit()方法改为execute()即可,改了之后就会得到下面的结果:
Exception in thread "pool-1-thread-1" java.lang.ArithmeticException: / by zero
at thread.ThreadPool$MyTask.run(ThreadPool.java:18)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
100.0
50.0
25.0
33.0
这也是submit()方法和execute()方法很重要的一个区别,submit会“吞并”错误异常堆栈,而execute不会。追根问底为什么呢?其实看这两个方法定义就一目了然了
/**
* Submits a Runnable task for execution and returns a Future
* representing that task. The Future's {@code get} method will
* return {@code null} upon <em>successful</em> completion.
*
* @param task the task to submit
* @return a Future representing pending completion of the task
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
* @throws NullPointerException if the task is null
*/
Future<?> submit(Runnable task);
/**
* Executes the given command at some time in the future. The command
* may execute in a new thread, in a pooled thread, or in the calling
* thread, at the discretion of the {@code Executor} implementation.
*
* @param command the runnable task
* @throws RejectedExecutionException if this task cannot be
* accepted for execution
* @throws NullPointerException if command is null
*/
void execute(Runnable command);
submit是一个有结果返回的方法,并且返回对象是Future,返回结果以及异常堆栈都放到了Future中,如果不做处理我们当然看不到了,而execute没有接收异常的对象,所以会直接抛出。
如果我们将主方法这样改一下,就能看到正常的异常信息了
public static void main(String[] args) throws InterruptedException{
ExecutorService es = Executors.newCachedThreadPool() ;
Future[] futures = new Future[5];
for (int i=0;i<5;i++){
futures[i] = es.submit(new MyTask(100,i));
}
Thread.sleep(1000);
for (Future future : futures){
try{
future.get();
}catch (Exception e){
e.printStackTrace();
}
}
es.shutdown();
}
运行结果:
100.0
50.0
25.0
33.0
java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at thread.ThreadPool.main(ThreadPool.java:32)
Caused by: java.lang.ArithmeticException: / by zero
at thread.ThreadPool$MyTask.run(ThreadPool.java:18)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
好了,今天的线程池学习就到这里啦~