前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Semaphore 源码分析

Semaphore 源码分析

作者头像
java404
发布2018-05-18 11:54:05
5570
发布2018-05-18 11:54:05
举报

需要提前了解的知识点: AbstractQueuedSynchronizer 实现原理

类介绍

Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。比如控制用户的访问量,同一时刻只允许1000个用户同时使用系统,如果超过1000个并发,则需要等待。

使用场景

比如模拟一个停车场停车信号,假设停车场只有两个车位,一开始两个车位都是空的。这时如果同时来了两辆车,看门人允许它们进入停车场,然后放下车拦。以后来的车必须在入口等待,直到停车场中有车辆离开。这时,如果有一辆车离开停车场,看门人得知后,打开车拦,放入一辆,如果又离开一辆,则又可以放入一辆,如此往复。

public class SemaphoreDemo {
    private static Semaphore s = new Semaphore(2);
    public static void main(String[] args) {
        ExecutorService pool = Executors.newCachedThreadPool();
        pool.submit(new ParkTask("1"));
        pool.submit(new ParkTask("2"));
        pool.submit(new ParkTask("3"));
        pool.submit(new ParkTask("4"));
        pool.submit(new ParkTask("5"));
        pool.submit(new ParkTask("6"));
        pool.shutdown();
    }

    static class ParkTask implements Runnable {
        private String name;
        public ParkTask(String name) {
            this.name = name;
        }
        @Override
        public void run() {
            try {
                s.acquire();
                System.out.println("Thread "+this.name+" start...");
                TimeUnit.SECONDS.sleep(new Random().nextInt(10));
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                s.release();
            }
        }
    }
}

Semaphore 源码分析

Semaphore 通过使用内部类Sync继承AQS来实现。

支持公平锁和非公平锁。内部使用的AQS的共享锁。

具体实现可参考 AbstractQueuedSynchronizer 源码分析

Semaphore 的结构如下:

Semaphore 类结构

Semaphore构造
public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}

public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

构造方法指定信号量的许可数量,默认采用的是非公平锁,也只可以指定为公平锁。

permits赋值给AQS中的state变量。

acquire:可响应中断的获得信号量
public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

public void acquire(int permits) throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireSharedInterruptibly(permits);
}

获得信号量方法,这两个方法支持 Interrupt中断机制,可使用acquire() 方法每次获取一个信号量,也可以使用acquire(int permits) 方法获取指定数量的信号量 。

acquire:不可响应中断的获取信号量
public void acquireUninterruptibly() {
    sync.acquireShared(1);
}

public void acquireUninterruptibly(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireShared(permits);
}

这两个方法不响应Interrupt中断机制,其它功能同acquire方法机制。

tryAcquire 方法,尝试获得信号量
public boolean tryAcquire() {
    return sync.nonfairTryAcquireShared(1) >= 0;
}

public boolean tryAcquire(long timeout, TimeUnit unit)
        throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
        throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
}

尝试获得信号量有三个方法。

  1. 尝试获取信号量,如果获取成功则返回true,否则马上返回false,不会阻塞当前线程。
  2. 尝试获取信号量,如果在指定的时间内获得信号量,则返回true,否则返回false
  3. 尝试获取指定数量的信号量,如果在指定的时间内获得信号量,则返回true,否则返回false。
release 释放信号量
public void release() {
    sync.releaseShared(1);
}

调用AQS中的releaseShared方法,使得state每次减一来控制信号量。

availablePermits方法,获取当前剩余的信号量数量
public int availablePermits() {
    return sync.getPermits();
}

//=========Sync类========
final int getPermits() {
    return getState();
 }

该方法返回AQS中state变量的值,当前剩余的信号量个数

drainPermits方法
public int drainPermits() {
    return sync.drainPermits();
}

//=========Sync类========
final int drainPermits() {
    for (;;) {
        int current = getState();
        if (current == 0 || compareAndSetState(current, 0))
            return current;
    }
}

获取并返回立即可用的所有许可。Sync类的drainPermits方法,获取1个信号量后将可用的信号量个数置为0。例如总共有10个信号量,已经使用了5个,再调用drainPermits方法后,可以获得一个信号量,剩余4个信号量就消失了,总共可用的信号量就变成6个了。

reducePermits 方法
protected void reducePermits(int reduction) {
    if (reduction < 0) throw new IllegalArgumentException();
    sync.reducePermits(reduction);
}

//=========Sync类========
final void reducePermits(int reductions) {
    for (;;) {
        int current = getState();
        int next = current - reductions;
        if (next > current) // underflow
            throw new Error("Permit count underflow");
        if (compareAndSetState(current, next))
            return;
    }
}

该方法是protected 方法,减少信号量个数

判断AQS等待队列中是否还有Node
public final boolean hasQueuedThreads() {
    return sync.hasQueuedThreads();
}

//=========AbstractQueuedSynchronizer类========
public final boolean hasQueuedThreads() {
   //头结点不等于尾节点就说明链表中还有元素
   return head != tail;
}
getQueuedThreads方法
protected Collection<Thread> getQueuedThreads() {
    return sync.getQueuedThreads();
}

//=========AbstractQueuedSynchronizer类========
public final Collection<Thread> getQueuedThreads() {
    ArrayList<Thread> list = new ArrayList<Thread>();
    for (Node p = tail; p != null; p = p.prev) {
        Thread t = p.thread;
        if (t != null)
            list.add(t);
    }
    return list;
}

该方法获取AQS中等待队列中所有未获取信号量的线程相关的信息(等待获取信号量的线程相关信息)。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2017.06.25 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 类介绍
  • 使用场景
  • Semaphore 源码分析
    • Semaphore构造
      • acquire:可响应中断的获得信号量
        • acquire:不可响应中断的获取信号量
          • tryAcquire 方法,尝试获得信号量
            • release 释放信号量
              • availablePermits方法,获取当前剩余的信号量数量
                • drainPermits方法
                  • reducePermits 方法
                    • 判断AQS等待队列中是否还有Node
                      • getQueuedThreads方法
                      领券
                      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档