首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Semaphore流量控制和源码分析

Semaphore流量控制和源码分析

原创
作者头像
用户6391658
修改2019-10-08 11:21:39
3950
修改2019-10-08 11:21:39
举报
文章被收录于专栏:Java源码进阶Java源码进阶

原文链接:https://www.jhonrain.org/2019/09/18/高并发-高并发-Semaphore源码解析和使用场景/

A counting semaphore. Conceptually, a semaphore maintains a set of permits. Each {@link #acquire} blocks if necessary until a permit is available, and then takes it. Each {@link #release} adds a permit,potentially releasing a blocking acquirer.However, no actual permit objects are used; the {@code Semaphore} just keeps a count of the number available and acts accordingly. 一个计数信号。通常来说,一个信号量包含一系列凭证。如果凭证不足每次调用acquire方法都会阻塞,直到有可用的凭证为止。每次执行release方法后都会唤醒一个阻塞的线程。然而,Semaphore只对可用许可的号码进行计数,并采取相应的行动,不会使用实际的许可对象。 Semaphores are often used to restrict the number of threads than can access some (physical or logical) resource. Semaphore 通常用于限制可以访问某些资源(物理或逻辑的)的线程数目。

一、源码解析

  Semaphore内部也是基于AQS并发组件来实现的,提供了内部类Sync和FairSync(公平锁)、NofairSync(非公平锁)。在J.U.C中AQS是基础组件,负责核心并发操作:控制同步状态,管理同步队列,具体的加锁和释放锁都是由子类去实现。从源码来看加锁sync.acquireShared(permits)和释放锁sync.releaseShared.

1.1 构造函数

内部提供两个构造函数,默认是使用非公平锁的方式,

// 默认是非公平的
public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}
// 重载的构造器,提供指定是否为公平的,如果为true可以保证FIFO
public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
1.2 基于AQS内部的Sync
abstract static class Sync extends AbstractQueuedSynchronizer {

    private static final long serialVersionUID = 1192457210091910933L;
    // 将permits映射为AQS中的state
    Sync(int permits) {
        setState(permits);
    }
    // 获取AQS中的共享遍历state的值
    final int getPermits() {
        return getState();
    }
    // 非公平的尝试获取凭证,内部使用CAS操作
    final int nonfairTryAcquireShared(int acquires) {
        for (;;) {
            int available = getState();
            int remaining = available - acquires;
            if (remaining < 0 ||
                // CAS操作
                compareAndSetState(available, remaining))
                return remaining;
        }
    }
    // 尝试释放凭证,内部使用CAS操作+自旋锁
    protected final boolean tryReleaseShared(int releases) {
        for (;;) {
            int current = getState();
            int next = current + releases;
            if (next < current) // overflow 溢出下来
                throw new Error("Maximum permit count exceeded");
            if (compareAndSetState(current, next))
                return true;
        }
    }
    // 减少凭证
    final void reducePermits(int reductions) {
        for (;;) {
            int current = getState();
            int next = current - reductions;
            if (next > current) // underflow
                throw new Error("Permit count underflow");
            if (compareAndSetState(current, next))
                return;
        }
    }
    // 重置凭证
    final int drainPermits() {
        for (;;) {
            int current = getState();
            if (current == 0 || compareAndSetState(current, 0))
                return current;
        }
    }
}
1.3 非公平锁的方式
static final class NonfairSync extends Sync {
    private static final long serialVersionUID = -2694183684443567898L;

    NonfairSync(int permits) {
        super(permits);
    }

    protected int tryAcquireShared(int acquires) {
        return nonfairTryAcquireShared(acquires);
    }
}
1.4 公平锁的方式
static final class FairSync extends Sync {
    private static final long serialVersionUID = 2014338818796000944L;

    FairSync(int permits) {
        super(permits);
    }

    protected int tryAcquireShared(int acquires) {
        for (;;) {
            // 判断队列中是否还有任务阻塞
            if (hasQueuedPredecessors())
                return -1;
            int available = getState();
            int remaining = available - acquires;
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
    }
}
1.5 信号量获取

通过acquire来获取一个凭证

public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

内部调用的是AQS的acquireSharedInterruptibly方法

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
        // 尝试获取,如果是非公平方式就是用NofairSync#tryAcquireShared,否则是用
        // FairSync#tryAcquireShared,详细请查看1.3、1.4
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}
1.6 信号量释放
public void release() {
    sync.releaseShared(1);
}

内部调用AQS的releaseShared

public final boolean releaseShared(int arg) {
    // 调用子类Sync的tryReleaseShared方法
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

Sync#tryReleaseShared方法实现

protected final boolean tryReleaseShared(int releases) {
    // 自旋
    for (;;) {
        int current = getState();
        // 回收releaseS个凭证
        int next = current + releases;
        // 向上溢出
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        // CAS操作
        if (compareAndSetState(current, next))
            return true;
    }
}
1.7 获取阻塞队列的长度
public final int getQueueLength() {
    return sync.getQueueLength();
}
1.8 获取阻塞的线程
protected Collection<Thread> getQueuedThreads() {
    return sync.getQueuedThreads();
}
1.9 是否有阻塞线程
public final boolean hasQueuedThreads() {
    return sync.hasQueuedThreads();
}
1.10 判断是否是公平
public boolean isFair() {
    return sync instanceof FairSync;
}
二、Semaphore限流案例
2.0 背景

模拟场景:有3本书(临界区资源),10个人一起过来借书,每次只允许有3个人借到书籍,其他人等待。

2.1 代码实现
public class BookPool {
    // 使用阻塞队列保存对象,最为对象池
    private final ArrayBlockingQueue<BookInfo> POOL;
    // 信号量
    private final Semaphore PERMITS;
    // 初始化构造器
    public BookPool(int permits) {
        POOL = new ArrayBlockingQueue<>(permits);
        PERMITS = new Semaphore(permits);
        BookInfo bookInfo = null;
        for(int i=1;i<=permits;i++){
            bookInfo = new BookInfo();
            bookInfo.setId(Long.valueOf(i));
            POOL.add(bookInfo);
        }
    }
    // 借书
    public Long book(Function<BookInfo,Long> function) {
        BookInfo book = null;
        try{
            PERMITS.acquire();
            book = POOL.poll();
            System.out.println(Thread.currentThread().getName()+" 需要借用 "+book.getId()+"s !");
            return function.apply(book);
        }catch(InterruptedException e){
            e.printStackTrace();
        }finally{
            POOL.add(book);
            PERMITS.release();
            System.out.println(Thread.currentThread().getName()+"使用"+book.getId()+"s后,归还!")
        }
        return 0L;
    }

    public static void main(String[] args){
        BookPool bookPool = new BookPool(3);
        CountDownLatch latch = new CountDownLatch(1);
        for(int i=0;i<10;i++){
            new Thread(() -> {
                bookPool.book(b -> {
                    TimeUnit.SECONDS.sleep(b.getId());
                    return b.getId();
                });
                // 模拟所有线程都准备好了
                latch.await();
            },"Thread"+i).start();
        }
        // 线程创建完成之后,统一开始争夺资源
        latch.countDown();
        TimeUnit.SECONDS.sleep(20);
    }
}

class BookInfo {
    private Long id;

    public void setId(Long id){
        this.id = id;
    }

    public Long getId(){
        return id;
    }
}
2.2 结果
Thread2需要使用2s
Thread0需要使用3s
Thread5需要使用1s
Thread5使用1s后,归还!
Thread4需要使用1s
Thread2使用2s后,归还!
Thread3需要使用2s
Thread4使用1s后,归还!
Thread6需要使用1s
Thread0使用3s后,归还!
Thread7需要使用3s
Thread6使用1s后,归还!
Thread9需要使用1s
Thread3使用2s后,归还!
Thread8需要使用2s
Thread9使用1s后,归还!
Thread1需要使用1s
Thread1使用1s后,归还!
Thread8使用2s后,归还!
Thread7使用3s后,归还!

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、源码解析
    • 1.1 构造函数
      • 1.2 基于AQS内部的Sync
        • 1.3 非公平锁的方式
          • 1.4 公平锁的方式
            • 1.5 信号量获取
              • 1.6 信号量释放
                • 1.7 获取阻塞队列的长度
                  • 1.8 获取阻塞的线程
                    • 1.9 是否有阻塞线程
                      • 1.10 判断是否是公平
                      • 二、Semaphore限流案例
                        • 2.0 背景
                          • 2.1 代码实现
                            • 2.2 结果
                            领券
                            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档