在AbstractQueuedSynchronizer中使用LockSupport类来实现线程的挂起和唤醒,对应方法分别我park和unpark,内部实现原理是代理给了unsafe包的park和unpark
Thread中提供了suspend和resume两个方法,不过这两个方法有很重大的缺陷,就是在suspend之前调用了resume,resume操作时没有任何作用的,线程会一直挂起再也得不到运行,目前这两个方法已经不建议使用。
park会阻塞线程直到unpark调用,但unpark操作不依赖于park,在调用park之前调用了unpark对线程一样有效(park之前检查unpark状态应该是),而且多次调用unpark只对后面的一次park起作用。由于前面遗留的unpark操作影响,调用park后可能会立即返回。不过下一次park又会继续阻塞等待unpark。
其次park还支持超时,获取锁时的超时策略就依赖于它。
在多线程环境下对一个值进行操作时需要保证原子性,lock类使用了Unsafe类中的compareAndSet等CAS方法来保证操作的原子性,在不成功的情况下会自旋重试 Unsafe类是sun.misc包下的类,由于其安全策略,应用程序中写的类是无法使用这个类的,而且其中实现大部分都是native的,了解一下API功能,不影响阅读jdk源码就可以了
地址:http://gee.cs.oswego.edu/dl/papers/aqs.pdf 详细讲述了aqs的设计过程,上面的park与unpark就翻译自里面的一段。
ps:condition相关的先不涉及,单纯的看lock相关源码
ps2:单独看AQS很抽象,我们结合具体类来了解相关功能
ps3:要用多线程的思维去看,单线程思维看这个根本就看不明白
AbstractQueuedSynchronizer维护了一个FIFO的队列,每个队列节点就是一个Node,Node中维护了前后节点(pre,next)的信息,和每个节点的waitStatus以及节点的模式(共享还是独占),在获取锁失败后就会加入到队列末尾,拥有锁的线程释放锁后会通知队列中的第一个节点。
waitStatus有几个状态和约定
值 | 说明 |
---|---|
>0 | 无效状态,说明node不再竞争锁 |
<0 | 有效状态,node正在竞争锁 |
1 | CANCELLED,被取消 |
0 | 初始化状态,表示SYNC |
-1 | SIGNAL 表示后继节点需要被唤醒 |
-2 | CONDITION 表示线程在等待condition |
-3 | PROPAGATE 表示下一次acquireShared应该被无条件传播 |
mode:
值 | 说明 |
---|---|
SHARED | 共享模式 |
EXCLUSIVE | 独占模式 |
我们从最简单的CountDownLatch来看一下AQS的共享模式的使用
jdk中的demo
class Driver2 { // ...
void main() throws InterruptedException {
CountDownLatch doneSignal = new CountDownLatch(N);
Executor e = ...
for (int i = 0; i < N; ++i) // create and start threads
e.execute(new WorkerRunnable(doneSignal, i));
doneSignal.await(); // wait for all to finish
}
}
class WorkerRunnable implements Runnable {
private final CountDownLatch doneSignal;
private final int i;
WorkerRunnable(CountDownLatch doneSignal, int i) {
this.doneSignal = doneSignal;
this.i = i;
}
public void run() {
try {
doWork(i);
doneSignal.countDown();
} catch (InterruptedException ex) {} // return;
}
void doWork() { ... }
}
main函数生成了N个任务放到线程池中异步执行,每个任务执行完毕后会countdown一下表明任务完成,主线程一直await到所有的任务执行完毕才会退出
内部类Sync继承了AQS,重载了share相关的两个方法
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
/**
* 返回值>=0 表示获取锁成功,
* >0 表示需要向后传播 =0不向后传播
* <0 表示获取锁失败
*/
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
CountDownLatch相关方法:
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public void countDown() {
sync.releaseShared(1);
}
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
全部都代理给了Sync类
countDown使用的releaseShared方法比较简单,先来看一下
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) { //1
doReleaseShared();
return true;
}
return false;
}
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) //2
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) //3
continue; // loop on failed CAS
}
if (h == head) // loop if head changed //4
break;
}
}
再来看一下await中的acquireSharedInterruptibly实现
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0) // 1
doAcquireSharedInterruptibly(arg);
}
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED); // 2
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor(); // 3
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) && // 4
parkAndCheckInterrupt()) //5
throw new InterruptedException();
}
} finally {
if (failed) // 6
cancelAcquire(node);
}
}
分步骤说明,不按上述顺序,见标号: 2. 加入到队列中addWaiter方法
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
先自己尝试一下加入队列,如果失败就进入enq方法入队,可以看到,队列初始化时放置了一个空节点作为头部,线程封装的node加入到了其后
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL) //1
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) { //2
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else { //3
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
分三步
可以看到每当有一个新线程进入等待队列时,都会把前一个节点的waitStatus变为SIGNAL,表示后继节点需要被通知唤醒,新入队的节点waitStatus为SYNC
head
head(-1)->node1(0)
head(-1)->node1(-1)->node2(0)
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
//头结点对应的线程已经获得了锁,
//相当于于出队,这个节点已经不再竞争锁了
//再竞争锁会再加入到队列中
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
propagate(tryAcquireShared返回值) > 0 表示需要向后传播
h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0 或头结点为空或状态为有效
通知后继节点doReleaseShared上面已经说过了。
我们分几种情况讨论一下
1 await直接获取到锁,也就是所有任务已经完成,那么直接返回,继续执行
2 任务没有完成,await获取锁失败,进入FIFO队列等待
2.1 任务完成后,调用doReleaseShared通知后继节点,将队列中的第一个node设置为head,并再次调用doReleaseShared
2.2 一直到队列末尾,所有节点获取到锁,通知完毕,所有线程获取到共享锁,继续执行