AQS
。
Sync
是这些类中都有的内部类,其结构如下:Sync
是AQS
的实现。
AQS
主要完成的任务:
状态state
FIFO队列
实现获取/释放等方法
AQS在juc中用法套路:
AbstractQueuedSynchronizer
类都用AQS
代替。CountDownLatch
对象时:CountDownLatch cdl = new CountDownLatch(3);
cdl
内部创建一个Sync
对象,并将this.sync
中的state
设置为3,代表要倒数三次。cdl.await()
,此时调用该语句的线程会陷入等待,原因分析如下:
await()
方法中的内容如下:
public void await() throws InterruptedException {
// 位于CountDownLatch类中
sync.acquireSharedInterruptibly(1);
}
acquireSharedInterruptibly
位于AQS
类中,内容如下:public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
// 位于AQS类中
if (Thread.interrupted()) throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg); // 把当前线程放入队列等待,并且让当前线程阻塞
}
tryAcquireShared(arg)
被AQS
的子类Sync
重写,内容如下:protected int tryAcquireShared(int acquires) {
// 位于CountDownLatch类的内部类Sync中
return (getState() == 0) ? 1 : -1;
}
getState()
位于AQS
类中,内容如下:protected final int getState() {
// 位于AQS类中
return state;
}
getState()
返回的是 3,因此tryAcquireShared
会返回 -1,因此会执行doAcquireSharedInterruptibly
,然后当前线程进入阻塞。countDown()
函数,会让state--
,如果值变为0,则会唤醒之前等待的线程。原因分析如下:
countDown()
方法中的内容如下:
public void countDown() {
// 位于CountDownLatch类中
sync.releaseShared(1);
}
releaseShared
位于AQS
类中,内容如下:public final boolean releaseShared(int arg) {
// 位于AQS类中
if (tryReleaseShared(arg)) {
doReleaseShared(); // 把之前阻塞的线程全部唤醒
return true;
}
return false;
}
tryReleaseShared(arg)
被AQS
的子类Sync
重写,内容如下:protected boolean tryReleaseShared(int releases) {
// 位于CountDownLatch类的内部类Sync中
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
state
的值减少到 0 之后会唤醒之前阻塞进程。Semaphore
对象时:Semaphore s = new Semaphore(3);
s
内部创建一个NonfairSync
对象(继承自Sync
),并将this.sync
中的state
设置为3,代表有三个通行证。s.acquire(2)
,会获取到两个许可证,分析如下:
acquire(int permits)
方法中的内容如下:
public void acquire(int permits) throws InterruptedException {
// 位于Semaphore类中
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
acquireSharedInterruptibly
位于AQS
类中,内容如下:public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
// 位于AQS类中
if (Thread.interrupted()) throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg); // 把当前线程放入队列等待,并且让当前线程阻塞
}
tryAcquireShared(arg)
被AQS
的子类Sync
重写,内容如下:protected int tryAcquireShared(int acquires) {
// 位于Semaphore类的内部类NonfairSync中
return nonfairTryAcquireShared(acquires);
}
nonfairTryAcquireShared
内容如下:final int nonfairTryAcquireShared(int acquires) {
// 位于Semaphore类的内部类Sync中
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
available
就是state
,当我们想要获取的许可证数量acquires
小于等于state
时,返回正数,在函数acquireSharedInterruptibly
判断不成立,不会陷入阻塞,并将state
更新为最新的值;否则如果acquires
大于state
时,返回负数,当前线程会陷入阻塞。s.release(2)
,会获取到两个许可证,分析如下:
release(int permits)
方法中的内容如下:
public void release(int permits) {
// 位于Semaphore类中
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}
releaseShared
位于AQS
类中,内容如下:public final boolean releaseShared(int arg) {
// 位于AQS类中
if (tryReleaseShared(arg)) {
doReleaseShared(); // 把之前阻塞的线程全部唤醒
return true;
}
return false;
}
tryAcquireShared(arg)
被AQS
的子类Sync
重写,内容如下:protected final boolean tryReleaseShared(int releases) {
// 位于Semaphore类的内部类Sync中
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
state
的值增加releases
,也就是增加许可证的数量。ReentrantLock
对象时:ReentrantLock r = new ReentrantLock();
r
内部创建一个NonfairSync
对象sync
(继承自Sync
)。r.lock()
,分析如下:
r.lock()
方法中的内容如下:
public void lock() {
// 位于ReentrantLock类中
sync.lock();
}
sync.lock()
内容如下:final void lock() {
// 位于ReentrantLock类的内部类NonfairSync中
if (compareAndSetState(0, 1)) // CAS只有在没有人持有这把锁的时候才能成功
setExclusiveOwnerThread(Thread.currentThread()); // 把当前线程设置为持有锁的线程
else
acquire(1);
}
acquire(1)
位于AQS
类中,内容如下:public final void acquire(int arg) {
// 位于AQS类中
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
tryAcquire
内容如下:protected final boolean tryAcquire(int acquires) {
// 位于ReentrantLock类的内部类NonfairSync中
return nonfairTryAcquire(acquires);
}
nonfairTryAcquire
内容如下:final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
state
为0时,代表没有任何人持有该锁,获取该锁;否则如果当前线程正好是该锁的持有者,重入。r.unlock()
,分析如下:
r.unlock()
方法中的内容如下:
public void unlock() {
// // 位于ReentrantLock类中
sync.release(1);
}
sync.release(1)
内容如下:public final boolean release(int arg) {
// 位于AQS类中
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
tryRelease
被AQS
的子类Sync
重写,内容如下:protected final boolean tryRelease(int releases) {
// 位于ReentrantLock类的内部类Sync中
int c = getState() - releases; // 将重入次数减少releases
if (Thread.currentThread() != getExclusiveOwnerThread()) // 判断当前线程是不是持有锁的线程
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
// 只有当重入次数为0时,才去释放锁
free = true;
setExclusiveOwnerThread(null); // 设置当前持有这把锁的线程为null
}
setState(c);
return free;
}
state
的值被减去releases
,如果state
变为0,则设置当前持有这把锁的线程为null。版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/234952.html原文链接:https://javaforall.cn