首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Semaphore加锁与释放流程

Semaphore加锁与释放流程

作者头像
None_Ling
发布2020-09-02 16:32:18
6710
发布2020-09-02 16:32:18
举报
文章被收录于专栏:Android相关Android相关Android相关

背景

ReentrantLock的独占(Exclusive)不同的是 , Semaphore是共享类型的(Share). 也就是当资源充足的时候 , 允许多个线程获取相应资源同时执行. 可以理解用来解决哲学家用餐问题.

其原理也就是通过初始化state变量来控制资源数量 , 如果当前state<0则代表资源用尽 , 需要阻塞等待 , 直到被其他线程释放资源后唤醒.

构造函数

与ReentrantLock相同 , 同样可以有公平锁(FairSync)与非公平锁(NonfairSync) , 同时 , permits代表当前拥有的资源总数 , Sync会将permits赋值给state变量.

public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }

acquire申请资源

  1. 调用acquire申请Shared类型的资源
public void acquire(int permits) throws InterruptedException {
        // 如果permits小于0的话 , 则认为参数不合法
        if (permits < 0) throw new IllegalArgumentException();
        // 开始申请资源
        sync.acquireSharedInterruptibly(permits);
    }
  1. 尝试调用tryAcquireShared来申请资源 , 如果小于0 , 则调用``
public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        // 尝试获取Shared资源
        if (tryAcquireShared(arg) < 0)
            // 如果当前剩余的可用资源小于0的话 , 则开始等待
            doAcquireSharedInterruptibly(arg);
    }
  1. 尝试获取共享数据 , 即获得当前剩余资源总数
protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
}

final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                // 获取当前可用资源数量
                int available = getState();
                // 得到剩余资源总数
                int remaining = available - acquires;
                // 如果剩余资源小于0 , 或者CAS设置失败的话 , 则返回剩余资源数量
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
  1. 如果剩余资源大于等于0的话 , 则认为申请成功 , 可以继续执行
  2. 如果剩余资源小于0的话, 则会调用doAcquireSharedInterruptibly插入队列中
private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        // 添加Shared类型的节点到队列中
        final Node node = addWaiter(Node.SHARED);
        try {
            for (;;) {
                // 从队列尾部往前头部遍历
                // 获取当前节点的前驱节点
                final Node p = node.predecessor();
                // 如果遍历到了头节点
                if (p == head) {
                    //  当前为头节点, 尝试获取相关资源 
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        // 如果获取大于0的话 , 则将头节点设置成Propagate状态
                        setHeadAndPropagate(node, r);
                        // 设置null
                        p.next = null; // help GC
                        return;
                    }
                }
                //  shouldParkAfterFailedAcquire , 找到前驱节点为SINGAL的节点
                //  如果找到的话 , 则park当前线程 , 并且抛出中断异常
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } catch (Throwable t) {
            // 捕捉到异常后 , 则将当前Node设置成Cancel状态
            cancelAcquire(node);
            throw t;
        }
    }
  1. 处理前驱节点的信号
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            // 如果前驱节点为SIGNAL的话 , 允许执行 , 返回true
            return true;
        if (ws > 0) {
            // 如果waitStatus>0 , 代表为Cancel节点 ,则直接删除
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            // 如果是waitStatus是Propagate , 则设置成SIGNAL
            pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
        }
        // 返回false , 不允许park
        return false;
    }
  1. 在本函数中 , 主要处理头节点的唤醒
private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        // 将Node设置成Head节点
        setHead(node);
        // 如果当前剩余资源>0 , 或者head的waitStatus<0 , 或者Node的waitStatus<0
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            // 则得到Node.next
            Node s = node.next;
            if (s == null || s.isShared())
                // 如果Node.next为空 , 或者为Share的话 , 则开始选择线程唤醒
                doReleaseShared();
        }
    }
  1. doReleaseShared函数中 , 会unpark头节点的线程 , 让其唤醒开始执行
private void doReleaseShared() {
         
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    // 自旋设置头节点Node的waitStatus
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !h.compareAndSetWaitStatus(0, Node.PROPAGATE))
                  //  如果waitStatus为0 , 则设置成为PROPAGATE
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

release过程

  1. 调用release方法释放资源
 public void release(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        sync.releaseShared(permits);
    }
  1. 释放资源成功返回true , 否则返回false
public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
  1. 尝试释放资源 , 如果释放成功 , 则会开始
protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                // 获取当前资源
                int current = getState();
                // 如果release是负数的话 , 则返回错误
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                // 通过CAS操作设置state变量
                if (compareAndSetState(current, next))
                    return true;
            }
        }
  1. 尝试寻找waitStatus为Singal的前驱节点唤醒
private void doReleaseShared() {
        for (;;) {
            Node h = head;
            // 如果头节点不为空 , 并且头节点不是尾节点 , 即队列中有节点
            if (h != null && h != tail) {
                // 判断当前waitStatus
                int ws = h.waitStatus;
                // 如果当前节点为SIGNAL代表可以执行
                if (ws == Node.SIGNAL) {
                    // 将h节点的waitStatus设置成0失败 , 则自旋设置直到成功
                    if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    // 开始unpark前驱节点
                    unparkSuccessor(h);
                }
                // 如果waitStatus为0的话 , 则设置PROPAGATE
                else if (ws == 0 && !h.compareAndSetWaitStatus(0, Node.PROPAGATE))
                    continue;               
            }
            // 如果头节点相同则break , 否则一直循环
            if (h == head)                   
                break;
        }
    }
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 背景
  • 构造函数
  • acquire申请资源
  • release过程
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档