专栏首页Java源码进阶Semaphore流量控制和源码分析
原创

Semaphore流量控制和源码分析

原文链接:https://www.jhonrain.org/2019/09/18/高并发-高并发-Semaphore源码解析和使用场景/

A counting semaphore. Conceptually, a semaphore maintains a set of permits. Each {@link #acquire} blocks if necessary until a permit is available, and then takes it. Each {@link #release} adds a permit,potentially releasing a blocking acquirer.However, no actual permit objects are used; the {@code Semaphore} just keeps a count of the number available and acts accordingly. 一个计数信号。通常来说,一个信号量包含一系列凭证。如果凭证不足每次调用acquire方法都会阻塞,直到有可用的凭证为止。每次执行release方法后都会唤醒一个阻塞的线程。然而,Semaphore只对可用许可的号码进行计数,并采取相应的行动,不会使用实际的许可对象。 Semaphores are often used to restrict the number of threads than can access some (physical or logical) resource. Semaphore 通常用于限制可以访问某些资源(物理或逻辑的)的线程数目。

一、源码解析

  Semaphore内部也是基于AQS并发组件来实现的,提供了内部类Sync和FairSync(公平锁)、NofairSync(非公平锁)。在J.U.C中AQS是基础组件,负责核心并发操作:控制同步状态,管理同步队列,具体的加锁和释放锁都是由子类去实现。从源码来看加锁sync.acquireShared(permits)和释放锁sync.releaseShared.

1.1 构造函数

内部提供两个构造函数,默认是使用非公平锁的方式,

// 默认是非公平的
public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}
// 重载的构造器,提供指定是否为公平的,如果为true可以保证FIFO
public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
1.2 基于AQS内部的Sync
abstract static class Sync extends AbstractQueuedSynchronizer {

    private static final long serialVersionUID = 1192457210091910933L;
    // 将permits映射为AQS中的state
    Sync(int permits) {
        setState(permits);
    }
    // 获取AQS中的共享遍历state的值
    final int getPermits() {
        return getState();
    }
    // 非公平的尝试获取凭证,内部使用CAS操作
    final int nonfairTryAcquireShared(int acquires) {
        for (;;) {
            int available = getState();
            int remaining = available - acquires;
            if (remaining < 0 ||
                // CAS操作
                compareAndSetState(available, remaining))
                return remaining;
        }
    }
    // 尝试释放凭证,内部使用CAS操作+自旋锁
    protected final boolean tryReleaseShared(int releases) {
        for (;;) {
            int current = getState();
            int next = current + releases;
            if (next < current) // overflow 溢出下来
                throw new Error("Maximum permit count exceeded");
            if (compareAndSetState(current, next))
                return true;
        }
    }
    // 减少凭证
    final void reducePermits(int reductions) {
        for (;;) {
            int current = getState();
            int next = current - reductions;
            if (next > current) // underflow
                throw new Error("Permit count underflow");
            if (compareAndSetState(current, next))
                return;
        }
    }
    // 重置凭证
    final int drainPermits() {
        for (;;) {
            int current = getState();
            if (current == 0 || compareAndSetState(current, 0))
                return current;
        }
    }
}
1.3 非公平锁的方式
static final class NonfairSync extends Sync {
    private static final long serialVersionUID = -2694183684443567898L;

    NonfairSync(int permits) {
        super(permits);
    }

    protected int tryAcquireShared(int acquires) {
        return nonfairTryAcquireShared(acquires);
    }
}
1.4 公平锁的方式
static final class FairSync extends Sync {
    private static final long serialVersionUID = 2014338818796000944L;

    FairSync(int permits) {
        super(permits);
    }

    protected int tryAcquireShared(int acquires) {
        for (;;) {
            // 判断队列中是否还有任务阻塞
            if (hasQueuedPredecessors())
                return -1;
            int available = getState();
            int remaining = available - acquires;
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
    }
}
1.5 信号量获取

通过acquire来获取一个凭证

public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

内部调用的是AQS的acquireSharedInterruptibly方法

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
        // 尝试获取,如果是非公平方式就是用NofairSync#tryAcquireShared,否则是用
        // FairSync#tryAcquireShared,详细请查看1.3、1.4
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}
1.6 信号量释放
public void release() {
    sync.releaseShared(1);
}

内部调用AQS的releaseShared

public final boolean releaseShared(int arg) {
    // 调用子类Sync的tryReleaseShared方法
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

Sync#tryReleaseShared方法实现

protected final boolean tryReleaseShared(int releases) {
    // 自旋
    for (;;) {
        int current = getState();
        // 回收releaseS个凭证
        int next = current + releases;
        // 向上溢出
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        // CAS操作
        if (compareAndSetState(current, next))
            return true;
    }
}
1.7 获取阻塞队列的长度
public final int getQueueLength() {
    return sync.getQueueLength();
}
1.8 获取阻塞的线程
protected Collection<Thread> getQueuedThreads() {
    return sync.getQueuedThreads();
}
1.9 是否有阻塞线程
public final boolean hasQueuedThreads() {
    return sync.hasQueuedThreads();
}
1.10 判断是否是公平
public boolean isFair() {
    return sync instanceof FairSync;
}

二、Semaphore限流案例

2.0 背景

模拟场景:有3本书(临界区资源),10个人一起过来借书,每次只允许有3个人借到书籍,其他人等待。

2.1 代码实现
public class BookPool {
    // 使用阻塞队列保存对象,最为对象池
    private final ArrayBlockingQueue<BookInfo> POOL;
    // 信号量
    private final Semaphore PERMITS;
    // 初始化构造器
    public BookPool(int permits) {
        POOL = new ArrayBlockingQueue<>(permits);
        PERMITS = new Semaphore(permits);
        BookInfo bookInfo = null;
        for(int i=1;i<=permits;i++){
            bookInfo = new BookInfo();
            bookInfo.setId(Long.valueOf(i));
            POOL.add(bookInfo);
        }
    }
    // 借书
    public Long book(Function<BookInfo,Long> function) {
        BookInfo book = null;
        try{
            PERMITS.acquire();
            book = POOL.poll();
            System.out.println(Thread.currentThread().getName()+" 需要借用 "+book.getId()+"s !");
            return function.apply(book);
        }catch(InterruptedException e){
            e.printStackTrace();
        }finally{
            POOL.add(book);
            PERMITS.release();
            System.out.println(Thread.currentThread().getName()+"使用"+book.getId()+"s后,归还!")
        }
        return 0L;
    }

    public static void main(String[] args){
        BookPool bookPool = new BookPool(3);
        CountDownLatch latch = new CountDownLatch(1);
        for(int i=0;i<10;i++){
            new Thread(() -> {
                bookPool.book(b -> {
                    TimeUnit.SECONDS.sleep(b.getId());
                    return b.getId();
                });
                // 模拟所有线程都准备好了
                latch.await();
            },"Thread"+i).start();
        }
        // 线程创建完成之后,统一开始争夺资源
        latch.countDown();
        TimeUnit.SECONDS.sleep(20);
    }
}

class BookInfo {
    private Long id;

    public void setId(Long id){
        this.id = id;
    }

    public Long getId(){
        return id;
    }
}
2.2 结果
Thread2需要使用2s
Thread0需要使用3s
Thread5需要使用1s
Thread5使用1s后,归还!
Thread4需要使用1s
Thread2使用2s后,归还!
Thread3需要使用2s
Thread4使用1s后,归还!
Thread6需要使用1s
Thread0使用3s后,归还!
Thread7需要使用3s
Thread6使用1s后,归还!
Thread9需要使用1s
Thread3使用2s后,归还!
Thread8需要使用2s
Thread9使用1s后,归还!
Thread1需要使用1s
Thread1使用1s后,归还!
Thread8使用2s后,归还!
Thread7使用3s后,归还!

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 迷宫问题的简单栈实现

    以一个n的长方阵表示迷宫,0和1分别表示迷宫中的通路和障碍,设计一个程序,对任意设定的迷宫,求出一条从入口到出口的通路,或得出没有通路的结论。

    glm233
  • 剑指offer第10题:矩阵中的路径

    根据题目要求,需要我们从一个已知矩阵中找到一个可以挨个形成给定字符串的路径。如果有这条路径的话,我们需要返回true,如果没有的话,我们返回false,并且相同...

    鹏-程-万-里
  • LWC 51:683. K Empty Slots

    LWC 51:683. K Empty Slots 传送门:683. K Empty Slots Problem: There is a garden wit...

    用户1147447
  • 如何比较?Comparable还是Comparator

    我家开了个小卖店,为了实现数字化管理,我准备写个后台程序来对所有货物进行管理。首先定义了这个实体类,这个类就是“货物”类,num指的是他的编号,s指他的名称或描...

    naget
  • C#版(击败96.64%的提交) - Leetcode 728. 自除数 - 题解

    Leetcode 728 - Self Dividing Numbers 在线提交: https://leetcode-cn.com/problems/se...

    Enjoy233
  • 漫画:贼简单的题目,但百分之99%的人都不会

    今天是小浩算法“365刷题计划”第53天。为大家分享一道本应很简单的题目,但是却因增加了特殊条件,而大幅增加了难度。话不多说,直接看题。

    程序员小浩
  • 策略模式及其与简单工厂模式的区别

    The strategy pattern(also known as the policy pattern) is a behavioural software...

    用户2434869
  • 漫画:贼简单的题目,但百分之99%的人都不会

    为大家分享一道本应很简单的题目,但是却因增加了特殊条件,而大幅增加了难度。话不多说,直接看题。

    五分钟学算法
  • LeetCode 559. N叉树的最大深度

    Michael阿明
  • 763. Partition Labels

    思路: 很暴力,直接找可以Partition的位置,如果不能Partition,继续向后搜索直到找到第一个可以Partition的位置为止,这样剩余问题就是...

    用户1147447

扫码关注云+社区

领取腾讯云代金券