public void execute(Runnable command) { if (command == null) throw new NullPointerException(); // 1. 工作线程数小于corePoolSize时,添加新的worker线程 // 2. 如果达到了corePoolSize,此时一个任务如果能成功入队列(也就是说队列没有满时),需要再进一步来二次确认是否需要添加worker // 3. 如果任务不能入队列,将尝试添加一个worker直到worker数量达到maxPoolSize // 4. 如果线程池中线程数量 > core ,当线程空闲时间超过了keepalive时,则会销毁线程;由此可见线程池的队列如果是无界队列,那么设置线程池最大数量是无效的 int c = ctl.get();// ctl是一个AtomicInteger类型的值,它的某几位标识着不同的值,如workerCount的值和运行的状态,详情参考:https://blog.csdn.net/yjw123456/article/details/77719061 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } // 线程池是否处于RUNNING状态,以及是否能入队列 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // 线程池是否处于RUNNING状态,从队列中移除当前command是否成功,因为在上面的判断中进行了入队操作 if (! isRunning(recheck) && remove(command)) // 线程池不是RUNNING状态时将command从队列中移除并进行reject reject(command); // 如果worker线程的数量为0,则添加worker else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 对应上面的第3种情况 else if (!addWorker(command, false)) // 如果添加失败,则执行reject reject(command); }
总结一下,上面的方法主要包括如下几点:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */ private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */ final Thread thread;// 对应worker的线程 /** Initial task to run. Possibly null. */ Runnable firstTask;// 初始时要运行的task /** Per-thread task counter */ volatile long completedTasks;// 运行完成的任务数
/** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */ Worker(Runnable firstTask) { // 设置AQS的state值为-1 setState(-1); // inhibit interrupts until runWorker // 初始要运行的task this.firstTask = firstTask; // 使用threadFactory创建一个线程 this.thread = getThreadFactory().newThread(this); }
/** Delegates main run loop to outer runWorker */ public void run() { runWorker(this); }
// Lock methods // // The value 0 represents the unlocked state. // The value 1 represents the locked state.
protected boolean isHeldExclusively() {// 是否处理独占状态 return getState() != 0; }
protected boolean tryAcquire(int unused) {// 尝试获取许可 if (compareAndSetState(0, 1)) {//cas设置state值从0到1 // 设置成功时将当前线程设置为独占线程 setExclusiveOwnerThread(Thread.currentThread()); // 获取成功 return true; } // 获取失败 return false; }
// 尝试释放占有的许可 protected boolean tryRelease(int unused) { // 重置Worker的独占线程为null setExclusiveOwnerThread(null); // 状态置为0 setState(0); return true; } // 调用AQS的acquire方法 public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } // 调用AQS的release方法 public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }
tryLock的使用主要在interruptIdleWorkers方法中:
private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); } }
它的主要作用是在对worker进行interrupt操作时需要先获取worker的独占锁。
而lock和unlock方法的调用:
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { // 在每次运行一个任务之前要先对worker锁定,然后在执行完之后进行解锁 w.lock(); ---------------省略部分代码----------------- try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); -------------省略部分代码------------------ } finally { task = null; w.completedTasks++; // 进行解锁 w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
在每次运行一个任务之前要先对worker进行锁定,然后在执行完之后进行解锁。
另外线程池中还有一个锁:
private final ReentrantLock mainLock = new ReentrantLock();
/** * Wait condition to support awaitTermination */ private final Condition termination = mainLock.newCondition();
它的作用主要是在停止线程池时来控制用来做停止操作的线程:
final void tryTerminate() { for (;;) { ----------省略部分代码---------------------- final ReentrantLock mainLock = this.mainLock; // 加锁 mainLock.lock(); try { if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { terminated(); } finally { ctl.set(ctlOf(TERMINATED, 0)); // 唤醒 termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }
它主要为了客户端可以调用的java.util.concurrent.ThreadPoolExecutor#awaitTermination方法服务的,awaitTermination会调用conditon的await方法进行阻塞。在这里就不进行详细地分析了,后续分析线程池源码时再具体分析。
关于worker的部分我们就简要地介绍这么多。它继承AQS的主要目的是在每次运行一个任务之前要先对worker进行锁定,然后在执行完之后进行解锁,这样方便管理。