我们上面讲解了 Executor框架以及 ThreadPoolExecutor 类,下面让我们实战一下,来通过写一个 ThreadPoolExecutor 的小 Demo 来回顾上面的内容。
首先创建一个 Runnable 接口的实现类(当然也可以是 Callable 接口,我们上面也说了两者的区别。)
MyRunnable.java
12345678910111213141516171819202122232425262728293031323334 | import java.util.Date; /** * 这是一个简单的Runnable类,需要大约5秒钟来执行其任务。 * @author shuang.kou */public class MyRunnable implements Runnable { private String command; public MyRunnable(String s) { this.command = s; } @Override public void run() { System.out.println(Thread.currentThread().getName() + " Start. Time = " + new Date()); processCommand(); System.out.println(Thread.currentThread().getName() + " End. Time = " + new Date()); } private void processCommand() { try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } } @Override public String toString() { return this.command; }} |
---|
编写测试程序,我们这里以阿里巴巴推荐的使用 ThreadPoolExecutor 构造函数自定义参数的方式来创建线程池。
ThreadPoolExecutorDemo.java
1234567891011121314151617181920212223242526272829303132333435 | import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit; public class ThreadPoolExecutorDemo { private static final int CORE_POOL_SIZE = 5; private static final int MAX_POOL_SIZE = 10; private static final int QUEUE_CAPACITY = 100; private static final Long KEEP_ALIVE_TIME = 1L; public static void main(String[] args) { //使用阿里巴巴推荐的创建线程池的方式 //通过ThreadPoolExecutor构造函数自定义参数创建 ThreadPoolExecutor executor = new ThreadPoolExecutor( CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS, new ArrayBlockingQueue<>(QUEUE_CAPACITY), new ThreadPoolExecutor.CallerRunsPolicy()); for (int i = 0; i < 10; i++) { //创建WorkerThread对象(WorkerThread类实现了Runnable 接口) Runnable worker = new MyRunnable("" + i); //执行Runnable executor.execute(worker); } //终止线程池 executor.shutdown(); while (!executor.isTerminated()) { } System.out.println("Finished all threads"); }} |
---|
可以看到我们上面的代码指定了:
1234567891011121314151617181920 | pool-1-thread-2 Start. Time = Tue Nov 12 20:59:44 CST 2019pool-1-thread-5 Start. Time = Tue Nov 12 20:59:44 CST 2019pool-1-thread-4 Start. Time = Tue Nov 12 20:59:44 CST 2019pool-1-thread-1 Start. Time = Tue Nov 12 20:59:44 CST 2019pool-1-thread-3 Start. Time = Tue Nov 12 20:59:44 CST 2019pool-1-thread-5 End. Time = Tue Nov 12 20:59:49 CST 2019pool-1-thread-3 End. Time = Tue Nov 12 20:59:49 CST 2019pool-1-thread-2 End. Time = Tue Nov 12 20:59:49 CST 2019pool-1-thread-4 End. Time = Tue Nov 12 20:59:49 CST 2019pool-1-thread-1 End. Time = Tue Nov 12 20:59:49 CST 2019pool-1-thread-2 Start. Time = Tue Nov 12 20:59:49 CST 2019pool-1-thread-1 Start. Time = Tue Nov 12 20:59:49 CST 2019pool-1-thread-4 Start. Time = Tue Nov 12 20:59:49 CST 2019pool-1-thread-3 Start. Time = Tue Nov 12 20:59:49 CST 2019pool-1-thread-5 Start. Time = Tue Nov 12 20:59:49 CST 2019pool-1-thread-2 End. Time = Tue Nov 12 20:59:54 CST 2019pool-1-thread-3 End. Time = Tue Nov 12 20:59:54 CST 2019pool-1-thread-4 End. Time = Tue Nov 12 20:59:54 CST 2019pool-1-thread-5 End. Time = Tue Nov 12 20:59:54 CST 2019pool-1-thread-1 End. Time = Tue Nov 12 20:59:54 CST 2019 |
---|
我们通过代码输出结果可以看出:线程池每次会同时执行 5 个任务,这 5 个任务执行完之后,剩余的 5 个任务才会被执行。 大家可以先通过上面讲解的内容,分析一下到底是咋回事?(自己独立思考一会)
现在,我们就分析上面的输出内容来简单分析一下线程池原理。
为了搞懂线程池的原理,我们需要首先分析一下 execute方法。在 5.1 节中的 Demo 中我们使用 executor.execute(worker)来提交一个任务到线程池中去,这个方法非常重要,下面我们来看看它的源码:
12345678910111213141516171819202122232425262728293031323334353637383940 | // 存放线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount) private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static int workerCountOf(int c) { return c & CAPACITY; } private final BlockingQueue<Runnable> workQueue; public void execute(Runnable command) { // 如果任务为null,则抛出异常。 if (command == null) throw new NullPointerException(); // ctl 中保存的线程池当前的一些状态信息 int c = ctl.get(); // 下面会涉及到 3 步 操作 // 1.首先判断当前线程池中之行的任务数量是否小于 corePoolSize // 如果小于的话,通过addWorker(command, true)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } // 2.如果当前之行的任务数量大于等于 corePoolSize 的时候就会走到这里 // 通过 isRunning 方法判断线程池状态,线程池处于 RUNNING 状态才会被并且队列可以加入任务,该任务才会被加入进去 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // 再次获取线程池状态,如果线程池状态不是 RUNNING 状态就需要从任务队列中移除任务,并尝试判断线程是否全部执行完毕。同时执行拒绝策略。 if (!isRunning(recheck) && remove(command)) reject(command); // 如果当前线程池为空就新创建一个线程并执行。 else if (workerCountOf(recheck) == 0) addWorker(null, false); } //3. 通过addWorker(command, false)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。 //如果addWorker(command, false)执行失败,则通过reject()执行相应的拒绝策略的内容。 else if (!addWorker(command, false)) reject(command); } |
---|