合理利用线程池能够带来三个好处。 1、第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。 2、第二:提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行。 3、第三:提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。但是要做到合理的利用线程池,必须对其原理了如指掌。
我们可以通过ThreadPoolExecutor来创建一个线程池。
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize, long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler;
}
创建一个线程池需要输入几个参数:
线程池中定义了五种状态,这些状态都和线程的执行密切相关。
private static final int RUNNING = -1 << COUNT_BITS;private static final int SHUTDOWN = 0 << COUNT_BITS;private static final int STOP = 1 << COUNT_BITS;private static final int TIDYING = 2 << COUNT_BITS;private static final int TERMINATED = 3 << COUNT_BITS;
用图表示为:
从上图我们可以看出,当提交一个新任务到线程池时,线程池的处理流程如下:
源码分析:上面的流程分析让我们很直观的了解的线程池的工作原理,让我们再通过源代码来看看是如何实现的。线程池执行任务的方法如下:
public void execute(Runnable command) { if (command == null) throw new NullPointerException();//获取当前线程池的状态
int c = ctl.get();//如果线程数量小于核心线程池
if (workerCountOf(c) < corePoolSize) {//创建新的线程并执行
if (addWorker(command, true)) return;
c = ctl.get();
}//如线程数大于等于基本线程数,线程池处于运行状态,则将当前任务放到工作队列
if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get();//双重检查,再次获取线程状态;如果线程状态变了(非运行状态)就需要从阻塞队列移除任务,并尝试判断线程是否全部执行完毕。同时执行拒绝策略。
if (! isRunning(recheck) && remove(command))
reject(command);//如果当前线程池为空就新创建一个线程并执行
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}else if (!addWorker(command, false))
工作线程:线程池创建线程时,会将线程封装成工作线程Worker,Worker在执行完任务后,还会无限循环获取工作队列里的任务来执行。我们可以从Worker的runWorker方法里看到这点:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) {
w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt(); try {
beforeExecute(wt, task);
Throwable thrown = null; try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
通过Executors可以创建不同的线程池。
可以看到,corePoolSize和maximumPoolSize的大小是一样的(实际上,后面会介绍,如果使用无界queue的话maximumPoolSize参数是没有意义的),keepAliveTime和unit的设值表名什么?-就是该实现不想keep alive!最后的BlockingQueue选择了LinkedBlockingQueue,该queue有一个特点,他是无界的。
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));
}
首先是无界的线程池,所以我们可以发现maximumPoolSize为big big。其次BlockingQueue的选择上使用SynchronousQueue。这样每当有新任务时都去创建新的线程,而60秒内没有执行任务的线程将被销毁。
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0,Integer.MAX_VALUE, 60L,TimeUnit.SECONDS,ynchronousQueue<Runnable>());
}
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return;
c = ctl.get();
} if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command))
reject(command); else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}else if (!addWorker(command, false))
reject(command);
}
2、我们也可以使用submit 方法来提交任务,它会返回一个future,那么我们可以通过这个future来判断任务是否执行成功,通过future的get方法来获取返回值,get方法会阻塞住直到任务完成,而使用get(long timeout, TimeUnit unit)方法则会阻塞一段时间后立即返回,这时有可能任务没有执行完。
public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask); return ftask;
}
关闭线程池的例子:
long start = System.currentTimeMillis();
for (int i = 0; i <= 5; i++) {
pool.execute(new Job());
}
pool.shutdown();
while (!pool.awaitTermination(1, TimeUnit.SECONDS)) {
LOGGER.info("线程还在执行。。。");
}
long end = System.currentTimeMillis();
LOGGER.info("一共处理了【{}】", (end - start));
其实 ThreadPool 本身已经提供了不少 api 可以获取线程状态。这样我们可以通过调用这些方法获取线程池状态。 甚至我们可以继承线程池扩展其中的几个函数来自定义监控逻辑:
protected void beforeExecute(Thread t, Runnable r) { }protected void afterExecute(Runnable r, Throwable t) { }protected void terminated() { }
END