Semaphore信号量探秘
同步控制是开发功能强大的并发程序的基础;如果把高性能的线程比作跑车,那么同步控制类就好比是跑车上的刹车和方向盘。之前,我们介绍的synchronized + wait( ) + notify()“组合拳”就是一种最简单的线程同步控制机制。其中,synchronized用于决定一个线程是否可以访问临界区资源;wait + notify方法起到了阻塞线程和唤醒线程的作用。除此之外JDK还提供了用于控制并发线程数的工具类——Semaphore(信号量)
Semaphore是计数信号量。Semaphore管理一系列许可证。每个acquire方法阻塞,直到有一个许可证可以获得然后拿走一个许可证;每个release方法增加一个许可证,这可能会释放一个阻塞的acquire方法。然而,其实并没有实际的许可证这个对象,Semaphore只是维持了一个可获得许可证的数量。
01
初识Semaphore
Semaphore主要用于控制同时访问特定资源的线程数量,它通过协调各个线程,始终保持一定数量内的线程去使用公共资源。
我们可以举一个实际生活中的例子,以便更加深入的去了解Semaphore的作用。公司里总共有10台打印机,在公司业务繁忙时期会有很多文件(比如有100份)需要打印;由于打印机公共资源有限,即只允许同时打印10份文件。另有一个指示器用于标识当前正在工作的打印机的数量为10,其余90份文件只能等待;由于每份文件页数各不相同,有些文件会早一些完成打印,而有些文件会晚点才能完成打印。但是,如果当前10份文件中有N份文件已经打印结束,指示牌就会更新为10-N个打印机正在工作,此时允许处于等待状态的文件中选择N份文件进行打印。
上例中的待打印的文件就好比是待执行的线程;文件打印过程就表示线程执行;文件打印结束就表示线程执行完成;指示器显示当前工作中的打印机数量为10意味着其他的线程会被阻塞,不能执行;指示器显示为N,意味着可以唤醒10-N个线程继续执行。
02
API简介
从概念上来讲,Semaphore维护了一个许可证集合,集合的数量是有限的,集合中的一个元素就是一个许可证,线程拿到许可证,就好比是拿到了线程执行的“通行证”,从而可以立即被执行。如果某个线程试图去从空集合中去取许可证,那么就会自动阻塞;而某个线程执行完毕后,需要主动把许可归还入集合,唤醒被阻塞的线程,同时保证了线程的最大并发执行数始终控制在许可证集合的数目内。
下面我们一起来认识Semaphore工具类的几个重要方法:
03
实战
假设有10个人在银行办理业务,只有2个工作窗口,代码实现逻辑如下:
import java.util.Random;
import java.util.concurrent.Semaphore;
/**
* 如厕管理系统
*/
class Ruce implements Runnable {
private String userName;
private Semaphore pots;
public Ruce(String userName, Semaphore pots) {
this.userName = userName;
this.pots = pots;
}
@Override
public void run() {
try {
// 查询剩下的资源(剩下的茅坑)
int availablePermits = pots.availablePermits();
if (availablePermits > 0) {
System.out.println(userName + "有茅坑...");
} else {
System.out.println(userName + "没有茅坑了...");
}
//申请茅坑 如果资源达到规定的次数,就等待
pots.acquire();
System.out.println(userName + "我开始上厕所了..");
Thread.sleep(new Random().nextInt(3000)); // 模拟上厕所时间。
System.out.println(userName + "厕所上完了...爽啊!");
pots.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}
import java.util.concurrent.Semaphore;
public class TestSemaphore {
public static void main(String[] args) {
// 一个厕所只有3个坑位,但是有10个人来上厕所,那怎么办?假设10的人的编号分别为1-10,并且1号先到厕所,10号最后到厕所。那么1-3号来的时候必然有可用坑位,顺利如厕,4号来的时候需要看看前面3人是否有人出来了,如果有人出来,进去,否则等待。同样的道理,4-10号也需要等待正在上厕所的人出来后才能进去,并且谁先进去这得看等待的人是否有素质,是否能遵守先来先上的规则。
Semaphore semaphore = new Semaphore(3);
for (int i = 1; i <= 10; i++) {
Ruce parent = new Ruce("第" + i + "个人,", semaphore);
new Thread(parent).start();
}
}
}
04
工作原理
Semaphore工作原理:
1.内部类Sync
abstract static class Sync extends AbstractQueuedSynchronizer
抽象内部类Sync继承来了AbstractQueuedSynchronizer(简称AQS)
2.Sync的子类
static final class NonfairSync extends Sync
公平的方式实现信号量
static final class NonfairSync extends Sync
非公平的方式实现信号量
下面以Semaphore的默认实现NonfairSync为例,当调用以下构造函数:
/**
* Creates a {@code Semaphore} with the given number of
* permits and nonfair fairness setting.
*
* @param permits the initial number of permits available.
* This value may be negative, in which case releases
* must occur before any acquires will be granted.
*/
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
此时操作的对象其实就是一个NonfairSync对象。
NonfairSync类的构造函数如下:
NonfairSync(int permits) {
super(permits);
}
其实这里NonfairSync类的构造函数调用了父类Sync中的构造函数:
Sync(int permits) {
setState(permits);
}
Sync中的构造函数调用其父类AbstractQueuedSynchronizer的setState()方法,设置同步状态。
protected final void setState(int newState) {
state = newState;
}
同步状态其实就保存在volatile修饰的int变量state中:
/**
* The synchronization state.
*/
private volatile int state;
当线程尝试获取一个许可证用于执行时,会调用acquire()方法:
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
这里acquire()方法调用的sync.acquireSharedInterruptibly(1)方法其实是抽象内部类Sync的父类AQS中的方法。
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
其中首先要判断线程是否中断了。然后调用tryAcquireShared()方法尝试获取许可证。tryAcquireShared()源码如下:
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
这里调用的nonfairTryAcquireShared()方法的实现如下:
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
//获取同步状态值
int available = getState();
//获取指定数量的许可证
int remaining = available - acquires;
//剩余许可证小于0或者CAS成功
if (remaining < 0 ||
compareAndSetState(available, remaining))
//剩余的许可证数量
return remaining;
}
}
如果tryAcquireShared()方法返回大于0的值,线程调用完acuqire()方法后就直接执行了。如果tryAcquireShared()返回的结果小于0,将执行doAcquireSharedInterruptibly()方法。
/**
* Acquires in shared interruptible mode.
* @param arg the acquire argument
*/
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
主要做的操作如下:
1.将线程加入阻塞队列中
2.如果当前线程在队列的头结点后的第一个位置,那么直接调用tryAcquireShared()方法尝试获取许可证
3.维护线程在阻塞队列中的状态,尝试将线程挂起
LockSupport.park(this);
当线程执行完成后,需要释放许可证。
public void release() {
sync.releaseShared(1);
}
realase()方法其实调用的是AQS中的releaseShared()方法。
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
其中tryReleaseShared()方法是在Semaphore中实现的。
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
tryReleaseShared()方法释放指定数量的许可证。然后唤醒其他阻塞的线程。
/**
* Release action for shared mode -- signals successor and ensures
* propagation. (Note: For exclusive mode, release just amounts
* to calling unparkSuccessor of head if it needs signal.)
*/
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}