1.线程池的好处。
线程使应用能够更加充分合理的协调利用cpu 、内存、网络、i/o等系统资源。
线程的创建需要开辟虚拟机栈,本地方法栈、程序计数器等线程私有的内存空间。
在线程的销毁时需要回收这些系统资源。频繁的创建和销毁线程会浪费大量的系统资源,增加并发编程的风险。
另外,在服务器负载过大的时候,如何让新的线程等待或者友好的拒绝服务?这些丢失线程自身无法解决的。所以需要通过线程池协调多个线程,并实现类似主次线程隔离、定时执行、周期执行等任务。线程池的作用包括:
在了解线程池的基本作用后,我们学习一下线程池是如何创建线程的。首先从ThreadPoolExecutor构造方法讲起,学习如何定义ThreadFectory和RejectExecutionHandler,并编写一个最简单的线程池示例。然后,通过分析ThreadPoolExecutor的execute和addWorker两个核心方法,学习如何把任务线程加入到线程池中运行。ThreadPoolExecutor的构造方法如下:
1 public ThreadPoolExecutor(
2 int corePoolSize, //第1个参数
3 int maximumPoolSize, //第2个参数
4 long keepAliveTime, //第3个参数
5 TimeUnit unit, //第4个参数
6 BlockingQueue<Runnable> workQueue, //第5个参数
7 ThreadFactory threadFactory, //第6个参数
8 RejectedExecutionHandler handler) { //第7个参数
9 if (corePoolSize < 0 ||
10 maximumPoolSize <= 0 ||
11 maximumPoolSize < corePoolSize ||
12 keepAliveTime < 0) // 第一处
13 throw new IllegalArgumentException();
14 if (workQueue == null || threadFactory == null || handler == null)//第二处
15 throw new NullPointerException();
16 this.corePoolSize = corePoolSize;
17 this.maximumPoolSize = maximumPoolSize;
18 this.workQueue = workQueue;
19 this.keepAliveTime = unit.toNanos(keepAliveTime);
20 this.threadFactory = threadFactory;
21 this.handler = handler;
22 }
从代码第2处来看,队列、线程工程、拒绝处理服务都必须有实例对象,但在实际编码中,很少有程序员对着三者进行实例化,而通过Executors这个线程池静态工厂提供默认实现,那么Executors与ThreadPoolExecutor 是什么关系呢?线程池相关的类图
ExecutorService接口继承了Executor接口,定义了管理线程任务的方法。ExecutorService 的抽象类AbstractExecutorService 提供了 submit()、invokeAll()等部分方法的实现,但是核心方法Executor.execute() 并没有在这里实现。因为所有的任务都在这个方法里执行,不同的实现会带来不同的执行策略,这一点在后续的ThreadPoolExecutor解析时,会一步步分析。通过Executor的静态工厂方法可以创建三个线程池的包装对象:ForkJoinPool、ThreadPoolExecutor、ScheduledThreadPoolExecutor。Executors核心方法有5个:
1 /**
2 * Creates a work-stealing thread pool using all
3 * {@link Runtime#availableProcessors available processors}
4 * as its target parallelism level.
5 * @return the newly created thread pool
6 * @see #newWorkStealingPool(int)
7 * @since 1.8
8 */
9 public static ExecutorService newWorkStealingPool() {
10 return new ForkJoinPool
11 (Runtime.getRuntime().availableProcessors(),
12 ForkJoinPool.defaultForkJoinWorkerThreadFactory,
13 null, true);
14 }
1 public static ExecutorService newFixedThreadPool(int nThreads) {
2 return new ThreadPoolExecutor(nThreads, nThreads,
3 0L, TimeUnit.MILLISECONDS,
4 new LinkedBlockingQueue<Runnable>());
5 }
Executors 中默认的线程工程和拒绝策略过于简单,通常对用户不够友好。线程工厂需要做创建前的准备工作,对线程池创建的线程必须明确标识,就像药品的生产批号一样,为线程本身指定有意思的名称和相应的序列号。拒绝策略应该考虑到业务场景返回相应的提示或者友好的跳转 1 public class UserThreadFactory implements ThreadFactory { 2 private final String namePrefix;
3 private final AtomicInteger nextId = new AtomicInteger();
4 //定义线程组名称,在使用jstack 来排查问题是,非常有帮助
5 UserThreadFactory(String whatFeatureOfGroup){
6 namePrefix = "UserThreadFactory's"+whatFeatureOfGroup+"-Worker-";
7 }
8
9 @Override
10 public Thread newThread(Runnable task){
11 String name = namePrefix+ nextId.getAndIncrement();
12 Thread thread = new Thread(null,task,name,0);
//打印threadname
13 return thread;
14 }
15
16 public static void main(String[] args) {
17 UserThreadFactory threadFactory = new UserThreadFactory("你好");
18 String ss = threadFactory.namePrefix;
19 Task task = new Task(ss);
20 Thread thread = null;
21 for(int i = 0; i < 10; i++) {
22 thread = threadFactory.newThread(task);
23 thread.start();
24 }
25
26 }
27 }
28
29 class Task implements Runnable{
30 String poolname;
31 Task(String poolname){
32 this.poolname = poolname;
33 }
34 private final AtomicLong count = new AtomicLong();
35
36 @Override
37 public void run(){
38 System.out.println(poolname+"_running_"+ count.getAndIncrement());
39 }
40 }
上述示例包括线程工厂和任务执行体的定义,通过newThread 方法快速、统一地创建线程任务,强调线程一定要是欧特定的意义和名称,方便出错时回溯。
下面简单地实现一下RejectedExecutionHandler,实现了接口的rejectExecution方法,打印当前线程池状态,源码如下。
1 public class UserRejectHandler implements RejectedExecutionHandler {
2
3 @Override
4 public void rejectedExecution(Runnable task, ThreadPoolExecutor executor){
5 System.out.println("task rejected."+ executor.toString());
6 }
7 }
在ThreadPoolExecutor 中提供了4个公开的内部静态类:
根据之前实现的线程工厂和拒绝策略,线程池的相关代码实现如下:
public class UserThreadPool {
public static void main(String[] args) {
BlockingQueue queue = new LinkedBlockingQueue(2);
UserThreadFactory f1 = new UserThreadFactory("第一机房");
UserThreadFactory f2 = new UserThreadFactory("第二机房");
UserRejectHandler handler = new UserRejectHandler();
ThreadPoolExecutor threadPoolExecutor =
new ThreadPoolExecutor(1,2,60,
TimeUnit.SECONDS,queue,f1,handler);
ThreadPoolExecutor threadPoolExecutor2 =
new ThreadPoolExecutor(1,2,60,
TimeUnit.SECONDS,queue,f2,handler);
Runnable task1 = new Task("");
for (int i = 0;i<200;i++){
threadPoolExecutor.execute(task1);
threadPoolExecutor2.execute(task1);
}
}
}
当任务被拒绝的时候,拒绝策略会打印出当前线程池的大小以及达到了maximumPoolSize=2 ,且队列已满。
我的博客即将同步至腾讯云+社区,邀请大家一同入驻:https://cloud.tencent.com/developer/support-plan?invite_code=5cjrpedpyda8