1
前言
在前面分享的一篇文章中,我分享到了ReentrantLock源码,它是基于AQS进行实现的,那么今天本文分享的同样也是基于AQS实现的Semaphore工具类
2
什么是Semaphore
Semaphore字面意思是信号量的意思,这种说法并不能够很好的描述它的作用,更通俗的来说应该是令牌管理器。官方文档说明Semaphore是一个计数信号量,从概念来讲Semaphore包含了一组许可证。每个用户都必须拿到许可证才能访问后续资源,没有拿到许可证的用于则需要进行等待获取许可证。
3
Semaphore的使用场景
Semaphore 信号量,用来控制同时访问资源的线程数,可以通过协调各个线程确保合理使用资源。
这么说会稍微有点不好理解,举个很通俗易懂例子:
相信大家伙有在火车站售票窗口排队买火车票的经历,例如火车站开放了三个售票窗口,现在来了10个人要进行排队买票,每个窗口同一时刻只能够处理一个用户购买火车票,三个售票窗口那么够同时处理三个用户购买火车票,后面的人看到售票窗口已经有人占用了,也就不能购票了,但是如果前面三个窗口有任意一人购买成功,那么就允许后面一个人进行购票。
这个案例的人就是线程,而正在购票的操作表示线程正在执行业务逻辑,离开窗口则表示线程执行完毕,看到窗口有人正在购票则排队等待则表示线程即将入队阻塞。
4
Semaphore源码详解
如何使用Semaphore
聊完Semaphore的使用场景后,我们来看看基于上面的场景通过Semaphore来实现相应的功能
public class SemaphoreTest {
public static void main(String[] args) {
// 声明3个窗口
Semaphore windows = new Semaphore(3, true);
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
//占用窗口
windows.acquire();
System.out.println(Thread.currentThread().getName() + ":开始买票");
//模拟买票流程
Thread.sleep(3000);
System.out.println(Thread.currentThread().getName() + ":购买成功");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
windows.release();
}
}).start();
}
}
}
源码分析
Semaphore有两个构造方法,默认构造方法是非公平锁,而另外一个构造方法通过 "fair" 属性决定是否走公平锁还是非公平锁,这个地方其实与ReentrantLock逻辑相似。
// 默认构造方法,走非公平锁流程,permits:支持令牌数量
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
// 根据fair属性决定是否走公平还是非公平锁流程,permits:支持令牌数量
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
非公平锁加锁流程
首先从 "windows.acquire()" 作为源码入口
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
在该方法内有两个if判断,首先判断当前线程是否被中断,如果被中断了则直接抛出异常,这里为什么要这么做呢?放上我的理解,假如当前线程被标记为中断状态且成功抢到了令牌,但是在执行的过程中,该线程有逻辑检测到线程被中断从而将线程停止,那么该线程获取到令牌之后就无法被释放掉,始终占用着这个令牌,也就意味着后续都会少一个令牌,而且可能以上这种情况,会导致令牌全部被占用,无法得到释放。
而tryAcquireShared()方法则是通用的共享锁获取锁的方法。
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
在我们前面文章中就有讲到,在AQS中该方法是个模板方法,具体的加锁逻辑由子类自身的特性去具体实现的,在Semaphore中,它的加锁钩子方法如下所示,如果不进行重写该方法,则强制抛出异常。
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
进入到tryAcquireShared()方法,在其内部调用了nonfairTryAcquireShared()非公平锁方法
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
如果大家伙看过我前面分享ReentranLock源码,就会发现这里与一般的tryAcquire逻辑不同,Semaphore的tryAcquire逻辑是一个自旋操作,因为Semaphore它是共享锁,同一时刻可能存在多个线程来更新这个state值,所以我们需要通过CAS+自旋来保证线程安全,在有令牌的前提下,该方法退出的唯一条件就是通过cas成功更新state值,并返回state剩余值。如果剩下令牌不足(没有令牌可使用),也就不需要进行cas更新state值,直接返回计算后的state值。 【自旋的目的,关于这个判断,如果还有令牌可获取且通过cas获取令牌失败,则继续重试获取令牌】
final int nonfairTryAcquireShared(int acquires) {
for (;;) { // 自旋
int available = getState(); //获得当前令牌数量
int remaining = available - acquires; // 计算令牌数量
if (remaining < 0 ||
compareAndSetState(available, remaining)) // 还有令牌可用的话,则直接进行CAS获取令牌
return remaining;
}
}
退回到acquireSharedInterruptibly()方法,结合上面代码块流程,如果可用的令牌不足,那么tryAcquireShared()方法返回的内容必定是小于0的,为什么呢?其实可以看到上面代码块返回的state值是计算后的值,没有令牌可用state值为0,一旦计算state值后,那么返回的state值为 "-1" ,如果为 "-1" 则进入获取令牌失败方法
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
进入到获取令牌失败的逻辑方法,主要的逻辑都在自旋里面,但是外面同样也有个比较重要的方法,就是addWaiter()方法,该方法传入的参数值为 "Node.SHARED" ,而SHARED的值就是new Node() 也就是创建了一个空的节点,然后我们来看看addWaiter()方法其内部逻辑做了些什么事情?
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED); // 构建双向链表 或 入队操作
boolean failed = true;
try {
for (;;) { // 自旋
final Node p = node.predecessor(); //获取当前节点的前驱节点
if (p == head) {
int r = tryAcquireShared(arg); // 尝试获取令牌
if (r >= 0) { // 获取令牌成功
setHeadAndPropagate(node, r); //传播链表
p.next = null; // help GC 将前驱节点的引用指向为NULL,待垃圾回收器回收
failed = false;
return; // 获取令牌成功,退出自旋
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) // 阻塞当前线程
throw new InterruptedException();
}
} finally {
// 如果某个线程被中断,非正常流程退出则将当前线程的节点设置为cancel状态
if (failed)
cancelAcquire(node);
}
}
使文字更好理解代码这里先做前缀说明,node = 当前节点,tail = 链表末尾节点,head = 链表头节点
首先将当前线程封装为node节点,接着获取tail节点,判断当前AQS中是否存在双向链表,如果存在的话,将node前驱节点引用指向tail节点,通过cas将node节点设置为末尾节点,如果设置成功则将tail节点的后驱引用指向node,那么node就顺理成章的成了双向链表的末尾节点了。关于这里我们其实需要思考一个问题,在多线程情况下同时通过cas去设置尾节点,此时只会有一个线程设置成功且返回出去,那接下来的线程该怎么办呢?且不急,带着这个疑问我们进入到enq方法
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode); // 封装节点
// Try the fast path of enq; backup to full enq on failure
Node pred = tail; // 获取末尾节点
if (pred != null) {
node.prev = pred; // 当前节点的前驱引用指向为pred
if (compareAndSetTail(pred, node)) { // 将当前节点设置为链表末尾节点
pred.next = node; // 原末尾节点后驱引用指向为当前节点
return node;
}
}
enq(node);
return node;
}
基于FIFO入队流程图
通过如下图理解上面这段话,我相信应该是能够明白的
使文字更好理解代码这里先做前缀说明,node = 当前节点,tail = 链表末尾节点,head = 链表头节点
得勒,进来就是一层自旋,注意这里的精华就是自旋,以及上面所提到多线程通过cas设置尾节点失败的解决方案就在此方法。
进入自旋获取链表的末尾节点,如果获取tail为null则证明当前并没有构成双向链表,接着通过cas去设置head,然后将head指向tail,这样双向链表就完成了,如果获取tail不为null,将node前驱引用指向tail节点,然后tail的后驱节点引用指向node节点,然后返回出去。那如果设置失败了怎么办呢?回到上面的问题,问题不大,这方法不是自旋嘛,它会一直自旋到你设置成功为止,才退出自旋。
private Node enq(final Node node) {
for (;;) {
Node t = tail; // 获取末尾节点
if (t == null) { // Must initialize // 构建双向链表
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
如果通过cas设置不成功,就一直进行自旋,直到设置成功才退出循环。
接着,回到获取令牌失败走的逻辑方法,通过上面的流程下来,我们就知道node节点现在已经成功入队到双向链表中,接着判断如果当前节点的前驱节点是为头节点此时会尝试获取令牌,如果获取失败则将线程进行阻塞,同理当前节点的前驱节点不是链表的头节点,也会将当前线程进行阻塞。无论如何只要令牌没有了,就得老老实实的在队列中进行呆着,直到下一次的唤醒。
那如果线程为头节点且获取令牌成功了,setHeadAndPropagate()方法又会做些什么事情呢?带着这个疑问,我们进去一探究竟
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED); // 构建双向链表 或 入队操作
boolean failed = true;
try {
for (;;) { // 自旋
final Node p = node.predecessor(); //获取当前节点的前驱节点
if (p == head) {
int r = tryAcquireShared(arg); // 尝试获取令牌
if (r >= 0) { // 获取令牌成功
setHeadAndPropagate(node, r); //传播链表
p.next = null; // help GC 将前驱节点的引用指向为NULL,待垃圾回收器回收
failed = false;
return; // 获取令牌成功,退出自旋
}
}
if (shouldParkAfterFailedAcquire(p, node) && //判断线程是否需要被阻塞
parkAndCheckInterrupt()) // 阻塞当前线程
throw new InterruptedException();
}
} finally {
// 如果某个线程被中断,非正常流程退出则将当前线程的节点设置为cancel状态
if (failed)
cancelAcquire(node);
}
}
首先我们看到该方法的入参内容,node:当前获取令牌线程节点,propagate:剩余令牌数量。
该方法主要作用在于两点,第一点:将当前节点设置为头节点,第二点:将当前节点的获取令牌成功,则会自动唤醒下一个节点去获取令牌,如果获取令牌失败了,还是会被加入到队列进行阻塞
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
if (propagate > 0 || h == null || h.waitStatus < 0 || // 还有令牌可获取 || 头节点状态处于等待状态
(h = head) == null || h.waitStatus < 0) {
Node s = node.next; // 获取当前下一节点
if (s == null || s.isShared()) // 判断下节点是否为共享节点
doReleaseShared(); // 传播~~ 具体传播什么呢???
}
}
稍微可以看下设置头节点方法,也就是出队操作,主要就是将当前线程设置为头节点,然后将当前节点的前驱节点引用指向为null,配合方法外,会将之前的头节点的next节点设置为null,那么之前的头节点也就自然会被垃圾回收器进行
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
基于FIFO出队流程图
又一次来到自旋,首先验证链表中是否还存在多个节点,如果存在且状态为SIGNAL会将head的后驱节点进行唤醒,让后驱节点尝试去获取令牌。如果后驱节点获取失败了也没关系,还是会被阻塞在队列中,顺序是不会变动的。为什么要唤醒后驱节点呢?我们可以想象多线程的场景,假如现在可以令牌有两个,头节点获取令牌成功了,那么还有一个令牌可获取对吧,恰好我后面还有节点,我就可以通知下一个节点继续获取令牌。
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) { // 自旋 可以理解为传播 【加自旋的原因,可能同时有多个令牌被释放,那么在这里就可以唤醒后续所有节点去获取令牌,就不用在前面再去判断是否要去唤醒后驱节点了。如果没有获取到令牌也没关系,后面还是会将没有抢到的线程进行阻塞住】
Node h = head;
if (h != null && h != tail) { // 头节点不为null 其 头非等于尾节点 则证明当前链表还有多个节点
int ws = h.waitStatus; // 获取head的节点状态
if (ws == Node.SIGNAL) { // 如果当前节点状态为SIGNAL,就代表后驱节点正在被阻塞着
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) // 通过cas将状态从等待更换为非等待,然后取反的话,将下一个节点唤醒
continue; // loop to recheck cases
unparkSuccessor(h); // 唤醒线程 去获取令牌
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) // 如果节点状态已经为0,则会将节点的状态更新为PROPAGATE PROPAGATE:表示下一次共享式同步状态获取将会被无条件地传播下去
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break; // 跳出当前循环
}
}
非公平锁加锁流程图
那么Semaphore非公平模式获取令牌的流程到此就结束啦
非公平锁释放令牌流程
首先从 "windows.release()" 作为源码入口
sync.releaseShared(1);
我们来看到releaseShared方法,该方法内部有两个核心方法,我们先进入看看tryReleaseShared做了些什么事情
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) { //通用释放令牌
doReleaseShared(); //唤醒后驱节点
return true;
}
return false;
}
我们又看到了自旋,获取令牌数量并计算令牌数量,然后通过cas更新到主内存中,如果更新失败继续自旋,直到成功才会退出自旋
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
这个方法在前面说过的,又一次来到自旋,首先验证链表中是否还存在多个节点,如果存在且状态为SIGNAL会将head的后驱节点进行唤醒,让后驱节点尝试去获取令牌。
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) { // 自旋 可以理解为传播 【加自旋的原因,可能同时有多个令牌被释放,那么在这里就可以唤醒后续所有节点去获取令牌,就不用在前面再去判断是否要去唤醒后驱节点了。如果没有获取到令牌也没关系,后面还是会将没有抢到的线程进行阻塞住】
Node h = head;
if (h != null && h != tail) { // 头节点不为null 其 头非等于尾节点 则证明当前链表还有多个节点
int ws = h.waitStatus; // 获取head的节点状态
if (ws == Node.SIGNAL) { // 如果当前节点状态为SIGNAL,就代表后驱节点正在被阻塞着
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) // 通过cas将状态从等待更换为非等待,然后取反的话,将下一个节点唤醒
continue; // loop to recheck cases
unparkSuccessor(h); // 唤醒线程 去获取令牌
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) // 如果节点状态已经为0,则会将节点的状态更新为PROPAGATE PROPAGATE:表示下一次共享式同步状态获取将会被无条件地传播下去
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break; // 跳出当前循环
}
}
然后我们来看看最终该方法它是如何唤醒后驱节点的。
private void unparkSuccessor(Node node) {
// 先获取head节点的状态,应该是等于-1,原因在shouldParkAfterFailedAcquire方法中有体现
int ws = node.waitStatus;
// 由于-1会小于0,所以更新改为0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 获取第一个正常排队的节点
Node s = node.next;
//正常解锁流程不会走该if判断
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 正常来说第一个排队的节点不应该为空,所以直接把第一个排队的线程唤醒
if (s != null)
LockSupport.unpark(s.thread);
}
如果线程被唤醒了,则会接着这个方法继续执行下去
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
那么到此通过非公平锁获取令牌以及释放令牌就结束啦,接下来来看看公平锁是如何实现获取与释放令牌的~
公平锁获取令牌流程
首先从 "windows.acquire()" 作为源码入口
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
这里与公平锁获取令牌方式一摸一样,就不过多解释了
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
前面提到了,在AQS中该方法是个模板方法,具体的加锁逻辑由子类自身的特性去具体实现的,在Semaphore中,它的加锁钩子方法如下所示,如果不进行重写该方法,则强制抛出异常。
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
接着看获取锁的通用方法tryAcquireShared方法,这里有个重点在于,公平锁的tryAcquireShared()通过自旋+cas获取令牌之前,会先执行hasQueuedPredecessors()方法,不防我们来看看它里面执行的逻辑是啥
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
从方法名我们就可知道这是判断队列中是否有优先级更高的等待线程,队列中哪个线程优先级最高?由于头结点是当前获取锁的线程,队列中的第二个结点代表的线程优先级最高,也就意味着没有了新来线程插队的情况,保证了公平锁的获取串行化。
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
公平锁的释放令牌与非公平锁的释放令牌逻辑一摸一样,也不多解释啦
那么到此Semaphore公平与非公平获取令牌的与释放令牌的流程,到此就结束啦
最后方法Semaphore源码详解视频,可结合博文一起观看
我是黎明大大,我知道我没有惊世的才华,也没有超于凡人的能力,但毕竟我还有一个不屈服,敢于选择向命运冲锋的灵魂,和一个就是伤痕累累也要义无反顾走下去的心。
如果您觉得本文对您有帮助,还请关注点赞一波,后期将不间断更新更多技术文章