原文链接:https://www.jhonrain.org/2019/09/18/高并发-高并发-Semaphore源码解析和使用场景/
A counting semaphore. Conceptually, a semaphore maintains a set of permits. Each {@link #acquire} blocks if necessary until a permit is available, and then takes it. Each {@link #release} adds a permit,potentially releasing a blocking acquirer.However, no actual permit objects are used; the {@code Semaphore} just keeps a count of the number available and acts accordingly. 一个计数信号。通常来说,一个信号量包含一系列凭证。如果凭证不足每次调用acquire方法都会阻塞,直到有可用的凭证为止。每次执行release方法后都会唤醒一个阻塞的线程。然而,Semaphore只对可用许可的号码进行计数,并采取相应的行动,不会使用实际的许可对象。 Semaphores are often used to restrict the number of threads than can access some (physical or logical) resource. Semaphore 通常用于限制可以访问某些资源(物理或逻辑的)的线程数目。
Semaphore内部也是基于AQS并发组件来实现的,提供了内部类Sync和FairSync(公平锁)、NofairSync(非公平锁)。在J.U.C中AQS是基础组件,负责核心并发操作:控制同步状态,管理同步队列,具体的加锁和释放锁都是由子类去实现。从源码来看加锁sync.acquireShared(permits)和释放锁sync.releaseShared.
内部提供两个构造函数,默认是使用非公平锁的方式,
// 默认是非公平的
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
// 重载的构造器,提供指定是否为公平的,如果为true可以保证FIFO
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
// 将permits映射为AQS中的state
Sync(int permits) {
setState(permits);
}
// 获取AQS中的共享遍历state的值
final int getPermits() {
return getState();
}
// 非公平的尝试获取凭证,内部使用CAS操作
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
// CAS操作
compareAndSetState(available, remaining))
return remaining;
}
}
// 尝试释放凭证,内部使用CAS操作+自旋锁
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow 溢出下来
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
// 减少凭证
final void reducePermits(int reductions) {
for (;;) {
int current = getState();
int next = current - reductions;
if (next > current) // underflow
throw new Error("Permit count underflow");
if (compareAndSetState(current, next))
return;
}
}
// 重置凭证
final int drainPermits() {
for (;;) {
int current = getState();
if (current == 0 || compareAndSetState(current, 0))
return current;
}
}
}
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
FairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
for (;;) {
// 判断队列中是否还有任务阻塞
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
通过acquire来获取一个凭证
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
内部调用的是AQS的acquireSharedInterruptibly方法
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 尝试获取,如果是非公平方式就是用NofairSync#tryAcquireShared,否则是用
// FairSync#tryAcquireShared,详细请查看1.3、1.4
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
public void release() {
sync.releaseShared(1);
}
内部调用AQS的releaseShared
public final boolean releaseShared(int arg) {
// 调用子类Sync的tryReleaseShared方法
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
Sync#tryReleaseShared方法实现
protected final boolean tryReleaseShared(int releases) {
// 自旋
for (;;) {
int current = getState();
// 回收releaseS个凭证
int next = current + releases;
// 向上溢出
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
// CAS操作
if (compareAndSetState(current, next))
return true;
}
}
public final int getQueueLength() {
return sync.getQueueLength();
}
protected Collection<Thread> getQueuedThreads() {
return sync.getQueuedThreads();
}
public final boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}
public boolean isFair() {
return sync instanceof FairSync;
}
模拟场景:有3本书(临界区资源),10个人一起过来借书,每次只允许有3个人借到书籍,其他人等待。
public class BookPool {
// 使用阻塞队列保存对象,最为对象池
private final ArrayBlockingQueue<BookInfo> POOL;
// 信号量
private final Semaphore PERMITS;
// 初始化构造器
public BookPool(int permits) {
POOL = new ArrayBlockingQueue<>(permits);
PERMITS = new Semaphore(permits);
BookInfo bookInfo = null;
for(int i=1;i<=permits;i++){
bookInfo = new BookInfo();
bookInfo.setId(Long.valueOf(i));
POOL.add(bookInfo);
}
}
// 借书
public Long book(Function<BookInfo,Long> function) {
BookInfo book = null;
try{
PERMITS.acquire();
book = POOL.poll();
System.out.println(Thread.currentThread().getName()+" 需要借用 "+book.getId()+"s !");
return function.apply(book);
}catch(InterruptedException e){
e.printStackTrace();
}finally{
POOL.add(book);
PERMITS.release();
System.out.println(Thread.currentThread().getName()+"使用"+book.getId()+"s后,归还!")
}
return 0L;
}
public static void main(String[] args){
BookPool bookPool = new BookPool(3);
CountDownLatch latch = new CountDownLatch(1);
for(int i=0;i<10;i++){
new Thread(() -> {
bookPool.book(b -> {
TimeUnit.SECONDS.sleep(b.getId());
return b.getId();
});
// 模拟所有线程都准备好了
latch.await();
},"Thread"+i).start();
}
// 线程创建完成之后,统一开始争夺资源
latch.countDown();
TimeUnit.SECONDS.sleep(20);
}
}
class BookInfo {
private Long id;
public void setId(Long id){
this.id = id;
}
public Long getId(){
return id;
}
}
Thread2需要使用2s
Thread0需要使用3s
Thread5需要使用1s
Thread5使用1s后,归还!
Thread4需要使用1s
Thread2使用2s后,归还!
Thread3需要使用2s
Thread4使用1s后,归还!
Thread6需要使用1s
Thread0使用3s后,归还!
Thread7需要使用3s
Thread6使用1s后,归还!
Thread9需要使用1s
Thread3使用2s后,归还!
Thread8需要使用2s
Thread9使用1s后,归还!
Thread1需要使用1s
Thread1使用1s后,归还!
Thread8使用2s后,归还!
Thread7使用3s后,归还!
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。