信号量(Semaphore)控制同时访问资源的线程数量,支持公平和非公平两种方式获取许可。
提供的方法
1 public Semaphore(int permits) //permits为许可数,默认非公平方式
2 public Semaphore(int permits, boolean fair)
3
4 //获取一个许可。若获取成功,permits-1,直接返回;否则当前线程阻塞直到有permits被释放,除非线程被中断
5 //如果线程被中断,则抛出 InterruptedException,并且清除当前线程的已中断状态。
6 public void acquire() throws InterruptedException
7 //忽略中断
8 public void acquireUninterruptibly()
9 //尝试获取一个许可,成功返回true,否则返回false。
10 //即使已将此信号量设置为使用公平排序策略,但是调用 tryAcquire() 也将 立即获取许可(如果有一个可用),而不管当前是否有正在等待的线程。
11 public boolean tryAcquire()
12 //超时尝试获取一个许可,该方法遵循公平设置
13 public boolean tryAcquire(long timeout, TimeUnit unit)
14 //释放一个许可
15 public void release()
16
17 //以上方法都是获取或释放一个许可,每个方法都存在对应的获取或释放指定个数许可的方法。例如public boolean tryAcquire(int permits)
使用示例:
使用信号量实现对内容池(例如线程池)的访问。
1 class Pool {
2 private static final int MAX_AVAILABLE = 100; //许可数为100,在本例中也是内容池的item的个数。
3 private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);
4
5 //获取池中的一个item,首先从Semaphore获取许可;获取许可成功后,从池中获取一个可用的item,并把该item标记为已使用。
6 public Object getItem() throws InterruptedException {
7 available.acquire();
8 return getNextAvailableItem();
9 }
10
11 //将指定的item释放到池中,如果markAsUnused返回true,释放Semaphore的一个许可。
12 public void putItem(Object x) {
13 if (markAsUnused(x))
14 available.release();
15 }
16
17 // Not a particularly efficient data structure; just for demo
18
19 protected Object[] items = ... whatever kinds of items being managed //内容池,例如:连接池,每个item代表一个连接。
20 protected boolean[] used = new boolean[MAX_AVAILABLE]; //标记池中的每个item是否已经被占用
21
22 protected synchronized Object getNextAvailableItem() {
23 for (int i = 0; i < MAX_AVAILABLE; ++i) {
24 if (!used[i]) {
25 used[i] = true;
26 return items[i];
27 }
28 }
29 return null; // not reached
30 }
31
32 protected synchronized boolean markAsUnused(Object item) {
33 for (int i = 0; i < MAX_AVAILABLE; ++i) {
34 if (item == items[i]) {
35 if (used[i]) {
36 used[i] = false;
37 return true;
38 } else
39 return false;
40 }
41 }
42 return false;
43 }
44 }
基于AQS实现,用同步状态(state)表示许可数(permits),使用AQS的共享式获取和释放同步状态来实现permits的获取和释放。
1 private final Sync sync;
Sync是Semaphore的抽象内部类,继承了AQS。它有两个子类NonfairSync和FairSync,分别是非公平同步器和公平同步器。
Sync的源码:
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
Sync(int permits) {
setState(permits);
}
final int getPermits() {
return getState();
}
//非公平共享式获取同步状态。自旋CAS获取同步状态直到成功或许可不足。
//返回值语义:负数代表获取失败、0代表获取成功但没有剩余资源、正数代表获取成功,还有剩余资源。
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining)) //此处CAS时有可能同步队列中已有等待的线程,就导致的不公平性
return remaining;
}
}
//自定义共享式释放同步状态。自旋CAS释放同步状态直到成功,除非overflow
//返回值语义:true表示成功,不可能释放失败,除非overflow
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
final void reducePermits(int reductions) {
for (;;) {
int current = getState();
int next = current - reductions;
if (next > current) // underflow
throw new Error("Permit count underflow");
if (compareAndSetState(current, next))
return;
}
}
final int drainPermits() {
for (;;) {
int current = getState();
if (current == 0 || compareAndSetState(current, 0))
return current;
}
}
}
NonfairSync(非公平)源码:
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
FairSync(公平)源码:
1 static final class FairSync extends Sync {
2 private static final long serialVersionUID = 2014338818796000944L;
3
4 FairSync(int permits) {
5 super(permits);
6 }
7 //自定义共享式释放同步状态。
8 protected int tryAcquireShared(int acquires) {
9 for (;;) {
10 //首先检查同步队列中是否有前驱。如果有则返回失败,将当前线程加入到同步队列的尾部,保证先尝试获取同步状态的线程先成功。
11 if (hasQueuedPredecessors())
12 return -1;
13 int available = getState();
14 int remaining = available - acquires;
15 if (remaining < 0 ||
16 compareAndSetState(available, remaining))
17 return remaining;
18 }
19 }
20 }
除tryAcquire外,都是通过调用AQS提供的方法实现获取失败时的阻塞和唤醒机制,具体策略建AQS的源码。
1 //阻塞式获取一个许可,响应中断
2 public void acquire() throws InterruptedException {
3 sync.acquireSharedInterruptibly(1); //调用AQS提供的可响应中断共享式获取同步状态方法
4 }
5
6 //阻塞式获取一个许可,忽略中断
7 public void acquireUninterruptibly() {
8 sync.acquireShared(1);
9 }
10
11 //非阻塞式获取一个许可
12 public boolean tryAcquire() {
13 return sync.nonfairTryAcquireShared(1) >= 0;
14 }
15
16 //释放一个许可
17 public void release() {
18 sync.releaseShared(1);
19 }
JDK DOC
《Java并发编程的艺术》