前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >AQS源码分析之Semaphore

AQS源码分析之Semaphore

作者头像
山行AI
发布2020-03-25 09:25:23
3570
发布2020-03-25 09:25:23
举报
文章被收录于专栏:山行AI山行AI

Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。比如控制用户的访问量,同一时刻只允许1000个用户同时使用系统,如果超过1000个并发,则需要等待。本篇来分析java.util.concurrent.Semaphore的源码。

内部类

java.util.concurrent.Semaphore.Sync

代码语言:javascript
复制
     // 用AQS的state来代表permits值    abstract static class Sync extends AbstractQueuedSynchronizer {        private static final long serialVersionUID = 1192457210091910933L;
        Sync(int permits) {            // 设置AQS的状态,即设置许可            setState(permits);        }
        final int getPermits() {            // 获取许可置            return getState();        }
        // 非公平模式获取共享锁的方法,入参为准备获取的许可数        final int nonfairTryAcquireShared(int acquires) {            for (;;) {// 无限循环                // 可用的许可数                int available = getState();                // 剩余的许可数                int remaining = available - acquires;                // 剩余许可数小于0,然后通过cas设置剩余许可数                if (remaining < 0 ||                    compareAndSetState(available, remaining))                    return remaining;            }        }
        // 尝试释放共享锁的方法        protected final boolean tryReleaseShared(int releases) {            for (;;) {// 无限循环                // 当前许可                int current = getState();                // 当前的与准备释放的之和就是之后将会有的许可数                int next = current + releases;                // 溢出的情况                if (next < current) // overflow                    throw new Error("Maximum permit count exceeded");                // cas设置将要拥有的许可数                if (compareAndSetState(current, next))                    return true;            }        }
        // 减少许可数        final void reducePermits(int reductions) {            for (;;) {// 无限循环                // 当前许可数                int current = getState();                // 剩下拥有的许可数为当前许可数减去要减少的许可数                int next = current - reductions;                // 许可数使用溢出                if (next > current) // underflow                    throw new Error("Permit count underflow");                // cas设置将要拥有的许可数                if (compareAndSetState(current, next))                    return;            }        }        // 释放全部许可        final int drainPermits() {            for (;;) {// 无限循环                // 当前许可                int current = getState();                // 如果当前许可为初始值0或者通过cas设置状态值为0                if (current == 0 || compareAndSetState(current, 0))                    return current;            }        }    }

Sync也是基于AQS来实现的,Sync有两个子类,公平版本和非公平版本。

非公平版本:java.util.concurrent.Semaphore.NonfairSync

代码语言:javascript
复制
 static final class NonfairSync extends Sync {        private static final long serialVersionUID = -2694183684443567898L;
        NonfairSync(int permits) {            // 构造时传入初始的许可数            super(permits);        }        // 尝试获取共享许可        protected int tryAcquireShared(int acquires) {            // 非公平版本获取共享许可            return nonfairTryAcquireShared(acquires);        }    }

公平版本:java.util.concurrent.Semaphore.FairSync

代码语言:javascript
复制
static final class FairSync extends Sync {        private static final long serialVersionUID = 2014338818796000944L;
        FairSync(int permits) {            // 传入初始化的许可            super(permits);        }        // 尝试获取共享许可        protected int tryAcquireShared(int acquires) {// 传入需要获取的许可数            for (;;) {// 无限循环                // 公平版本需要先判断当前AQS队列中是否有头节点,也就是是否有比当前节点等待更久的节点                if (hasQueuedPredecessors())                    return -1;                // 可用的许可数                int available = getState();                // 剩余可用许可数                int remaining = available - acquires;                // 如果剩余可用小于0或者cas设置剩余可用许可成功,直接返回                if (remaining < 0 ||                    compareAndSetState(available, remaining))                    return remaining;            }        }    }

公平版本与非公平版本的区别

公平版本在获取许可时需要先通过hasQueuedPredecessors方法判断是否有比当前节点等待更久的节点。

Semaphore

构造方法

代码语言:javascript
复制
 public Semaphore(int permits) {        sync = new NonfairSync(permits);    }
public Semaphore(int permits, boolean fair) {        sync = fair ? new FairSync(permits) : new NonfairSync(permits);    }

默认是非公平版本的Sync。

获取许可

java.util.concurrent.Semaphore#acquire()
代码语言:javascript
复制
 public void acquire() throws InterruptedException {        sync.acquireSharedInterruptibly(1);    }
java.util.concurrent.Semaphore#acquire(int)
代码语言:javascript
复制
 public void acquire(int permits) throws InterruptedException {        if (permits < 0) throw new IllegalArgumentException();        sync.acquireSharedInterruptibly(permits);    }

关于java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireSharedInterruptibly,实际上调用的是AQS的实现,代码如下:

代码语言:javascript
复制
 public final void acquireSharedInterruptibly(int arg)            throws InterruptedException {        if (Thread.interrupted())            throw new InterruptedException();        if (tryAcquireShared(arg) < 0)//获取不到许可时,调用doAcquireSharedInterruptibly方法            doAcquireSharedInterruptibly(arg);    }

tryAcquireShared方法根据当前的公平与非公平版本Sync来进行不同的处理,当获取不到许可时,调用doAcquireSharedInterruptibly方法。

java.util.concurrent.locks.AbstractQueuedSynchronizer#doAcquireSharedInterruptibly代码:

代码语言:javascript
复制
private void doAcquireSharedInterruptibly(int arg)        throws InterruptedException {        // 向AQS队列添加一个SHARED状态的节点        final Node node = addWaiter(Node.SHARED);        boolean failed = true;        try {            for (;;) {// 无限循环                // 获取前置节点                final Node p = node.predecessor();                // 如果前置节点是头节点                if (p == head) {                    // 尝试获取共享许可                    int r = tryAcquireShared(arg);                    if (r >= 0) {                        // 成功了则设置头节点并进行传播                        setHeadAndPropagate(node, r);                        p.next = null; // help GC                        failed = false;                        return;                    }                }                // 判断是否需要在失败时进行park(即等待)                if (shouldParkAfterFailedAcquire(p, node) &&                    parkAndCheckInterrupt())                    throw new InterruptedException();            }        } finally {            if (failed)                // 取消获取许可                cancelAcquire(node);        }    }

这个方法会在新节点的前置节点是头节点时去尝试获取许可,并在获取成功时进行头节点的设置并进行传播。否则可能会改变waitStatus进行重试,在SIGNAL状态且重试失败时挂起(park)当前节点线程。

java.util.concurrent.Semaphore#tryAcquire()
代码语言:javascript
复制
public boolean tryAcquire() {        return sync.nonfairTryAcquireShared(1) >= 0;    }
   public boolean tryAcquire(int permits) {        if (permits < 0) throw new IllegalArgumentException();        return sync.nonfairTryAcquireShared(permits) >= 0;    }

可以看到tryAcquire方法调用的是非公平版本的acquireShared方法,即nonfairTryAcquireShared,入参为1,也就是默认尝试获取许可,且该方法不会阻塞。tryAcquire(int permits)只是将许可数作为一个入参。

释放许可

方法java.util.concurrent.Semaphore#release():
代码语言:javascript
复制
public void release() {        sync.releaseShared(1);    }

释放许可的方法,默认是释放一个许可。

java.util.concurrent.Semaphore#release(int):
代码语言:javascript
复制
   public void release(int permits) {        if (permits < 0) throw new IllegalArgumentException();        sync.releaseShared(permits);    }

调用的是Sync类中的releaseShared方法来释放指定数量的许可。

总结

Semaphore主要是用AQS中的state来代表许可,然后锁的获取和释放都是基于state变量和AQS队列的cas操作来实现。源码和功能也都相对简单。使用场景主要是对一个共享资源提供n个可以访问的许可,也就是说只有获取到许可的线程可以进行对这个被保护的资源的访问。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-03-20,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 开发架构二三事 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 内部类
    • java.util.concurrent.Semaphore.Sync
      • 非公平版本:java.util.concurrent.Semaphore.NonfairSync
        • 公平版本:java.util.concurrent.Semaphore.FairSync
          • 公平版本与非公平版本的区别
          • Semaphore
            • 构造方法
              • 获取许可
                • java.util.concurrent.Semaphore#acquire()
                • java.util.concurrent.Semaphore#acquire(int)
                • java.util.concurrent.Semaphore#tryAcquire()
              • 释放许可
                • 方法java.util.concurrent.Semaphore#release():
                • java.util.concurrent.Semaphore#release(int):
            • 总结
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档