前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Semaphore信号量探秘

Semaphore信号量探秘

作者头像
黑洞代码
发布2021-01-14 15:40:32
5760
发布2021-01-14 15:40:32
举报

Semaphore信号量探秘

同步控制是开发功能强大的并发程序的基础;如果把高性能的线程比作跑车,那么同步控制类就好比是跑车上的刹车和方向盘。之前,我们介绍的synchronized + wait( ) + notify()“组合拳”就是一种最简单的线程同步控制机制。其中,synchronized用于决定一个线程是否可以访问临界区资源;wait + notify方法起到了阻塞线程和唤醒线程的作用。除此之外JDK还提供了用于控制并发线程数的工具类——Semaphore(信号量)

Semaphore是计数信号量。Semaphore管理一系列许可证。每个acquire方法阻塞,直到有一个许可证可以获得然后拿走一个许可证;每个release方法增加一个许可证,这可能会释放一个阻塞的acquire方法。然而,其实并没有实际的许可证这个对象,Semaphore只是维持了一个可获得许可证的数量。

01

初识Semaphore

Semaphore主要用于控制同时访问特定资源的线程数量,它通过协调各个线程,始终保持一定数量内的线程去使用公共资源。

我们可以举一个实际生活中的例子,以便更加深入的去了解Semaphore的作用。公司里总共有10台打印机,在公司业务繁忙时期会有很多文件(比如有100份)需要打印;由于打印机公共资源有限,即只允许同时打印10份文件。另有一个指示器用于标识当前正在工作的打印机的数量为10,其余90份文件只能等待;由于每份文件页数各不相同,有些文件会早一些完成打印,而有些文件会晚点才能完成打印。但是,如果当前10份文件中有N份文件已经打印结束,指示牌就会更新为10-N个打印机正在工作,此时允许处于等待状态的文件中选择N份文件进行打印。

上例中的待打印的文件就好比是待执行的线程;文件打印过程就表示线程执行;文件打印结束就表示线程执行完成;指示器显示当前工作中的打印机数量为10意味着其他的线程会被阻塞,不能执行;指示器显示为N,意味着可以唤醒10-N个线程继续执行。

02

API简介

从概念上来讲,Semaphore维护了一个许可证集合,集合的数量是有限的,集合中的一个元素就是一个许可证,线程拿到许可证,就好比是拿到了线程执行的“通行证”,从而可以立即被执行。如果某个线程试图去从空集合中去取许可证,那么就会自动阻塞;而某个线程执行完毕后,需要主动把许可归还入集合,唤醒被阻塞的线程,同时保证了线程的最大并发执行数始终控制在许可证集合的数目内。

下面我们一起来认识Semaphore工具类的几个重要方法:

  • public Semaphore(int permits, boolean fair) Semaphore的构造器,创建一定数量的许可证集合。其中,permits用于指定许可证的数量,即同时能申请多少个许可证,当每个线程每次只申请一个许可时,这就相当于指定了同时有多少个线程同时并发执行;fair用于指定信号量控制策略:true为公平,false为非公平。 在Java语言中,只要一个类可能会出现多个线程被阻塞并且等待同步资源的释放,就会涉及公平性的概念。所谓的非公平模式是指被同步的资源被释放后,所有等待的线程中都去竞争共享资源,而这个竞争的过程是没有任何条件的。公平模式则不然,它根据先来后到的原则分配共享资源。
  • public Semaphore(int permits) 此构造器等价于调用Semaphore(permits,false);即默认执行非公平信号量控制策略。
  • public void acquire() throws InterruptedException 调用该方法的线程尝试获得一个执行的许可证。如果获得不到许可证,则线程会等待,直到有线程释放一个许可证或当前线程被中断。
  • public void acquire(int permits) throws InterruptedException 获取指定数量的许可证,如果没有足够的许可证则等待,或者响应中断。
  • public void acquireUninterruptibly() 该方法与调用acquire()方法类似,但是不响应中断。
  • public boolean tryAcquire() 调用该方法的线程尝试获得一个执行的许可证。如果成功返回true,失败则返回false,它不会进行等待,立即返回。
  • public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException 调用该方法的线程尝试在指定的时间内获得一个执行的许可证。如果成功返回true,失败则返回false,此方法响应中断。
  • public void release() 用于在线程访问资源结束后,释放一个许可证,以便其他等待许可的被阻塞的线程可以进行资源访问。
  • public void release(int permits) 用于在线程访问资源结束后,释放执行数量的个许可证,以便其他等待许可的被阻塞的线程可以进行资源访问。
  • public int availablePermits() 返回当前可用的许可证数量
  • public int drainPermits() 获取并返回所有立即可用的许可证
  • protected void reducePermits(int reduction) 通过指示来减少可用许可证的数量。此方法适合子类使用信号量追踪不可用的资源。
  • public final boolean hasQueuedThreads() 查询当前是否有线程等待获取许可证。需要注意的是:任何时刻都存在取消获取许可证的情况,因此当此方法返回true的时候并不一定确保有线程在
  • protected Collection getQueuedThreads() 返回等待许可证的线程的集合

03

实战

假设有10个人在银行办理业务,只有2个工作窗口,代码实现逻辑如下:

代码语言:javascript
复制
import java.util.Random;
import java.util.concurrent.Semaphore;

/**
 * 如厕管理系统
 */
class Ruce implements Runnable {
    private String userName;
    private Semaphore pots;

    public Ruce(String userName, Semaphore pots) {
        this.userName = userName;
        this.pots = pots;
    }

    @Override
    public void run() {
        try {
            // 查询剩下的资源(剩下的茅坑)
            int availablePermits = pots.availablePermits();
            if (availablePermits > 0) {
                System.out.println(userName + "有茅坑...");
            } else {
                System.out.println(userName + "没有茅坑了...");
            }
            //申请茅坑 如果资源达到规定的次数,就等待
            pots.acquire();
            System.out.println(userName + "我开始上厕所了..");
            Thread.sleep(new Random().nextInt(3000)); // 模拟上厕所时间。
            System.out.println(userName + "厕所上完了...爽啊!");
            pots.release();

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

import java.util.concurrent.Semaphore;

public class TestSemaphore {
    public static void main(String[] args) {
        // 一个厕所只有3个坑位,但是有10个人来上厕所,那怎么办?假设10的人的编号分别为1-10,并且1号先到厕所,10号最后到厕所。那么1-3号来的时候必然有可用坑位,顺利如厕,4号来的时候需要看看前面3人是否有人出来了,如果有人出来,进去,否则等待。同样的道理,4-10号也需要等待正在上厕所的人出来后才能进去,并且谁先进去这得看等待的人是否有素质,是否能遵守先来先上的规则。
        Semaphore semaphore = new Semaphore(3);
        for (int i = 1; i <= 10; i++) {
            Ruce parent = new Ruce("第" + i + "个人,", semaphore);
            new Thread(parent).start();
        }
    }
}

04

工作原理

Semaphore工作原理:

1.内部类Sync

abstract static class Sync extends AbstractQueuedSynchronizer

抽象内部类Sync继承来了AbstractQueuedSynchronizer(简称AQS)

2.Sync的子类

static final class NonfairSync extends Sync

公平的方式实现信号量

static final class NonfairSync extends Sync

非公平的方式实现信号量

下面以Semaphore的默认实现NonfairSync为例,当调用以下构造函数:

代码语言:javascript
复制
/**
 * Creates a {@code Semaphore} with the given number of
 * permits and nonfair fairness setting.
 *
 * @param permits the initial number of permits available.
 *        This value may be negative, in which case releases
 *        must occur before any acquires will be granted.
 */
public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}

此时操作的对象其实就是一个NonfairSync对象。

NonfairSync类的构造函数如下:

代码语言:javascript
复制
NonfairSync(int permits) {
    super(permits);
}

其实这里NonfairSync类的构造函数调用了父类Sync中的构造函数:

代码语言:javascript
复制
Sync(int permits) {
    setState(permits);
}

Sync中的构造函数调用其父类AbstractQueuedSynchronizer的setState()方法,设置同步状态。

代码语言:javascript
复制
protected final void setState(int newState) {
    state = newState;
}

同步状态其实就保存在volatile修饰的int变量state中:

代码语言:javascript
复制
/**
 * The synchronization state.
 */
private volatile int state;

当线程尝试获取一个许可证用于执行时,会调用acquire()方法:

代码语言:javascript
复制
public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

这里acquire()方法调用的sync.acquireSharedInterruptibly(1)方法其实是抽象内部类Sync的父类AQS中的方法。

代码语言:javascript
复制
public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

其中首先要判断线程是否中断了。然后调用tryAcquireShared()方法尝试获取许可证。tryAcquireShared()源码如下:

代码语言:javascript
复制
protected int tryAcquireShared(int acquires) {
    return nonfairTryAcquireShared(acquires);
}

这里调用的nonfairTryAcquireShared()方法的实现如下:

代码语言:javascript
复制
final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
//获取同步状态值
        int available = getState();
//获取指定数量的许可证
        int remaining = available - acquires;
//剩余许可证小于0或者CAS成功
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
//剩余的许可证数量
            return remaining;
    }
}

如果tryAcquireShared()方法返回大于0的值,线程调用完acuqire()方法后就直接执行了。如果tryAcquireShared()返回的结果小于0,将执行doAcquireSharedInterruptibly()方法。

代码语言:javascript
复制
/**
 * Acquires in shared interruptible mode.
 * @param arg the acquire argument
 */
private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

主要做的操作如下:

1.将线程加入阻塞队列中

2.如果当前线程在队列的头结点后的第一个位置,那么直接调用tryAcquireShared()方法尝试获取许可证

3.维护线程在阻塞队列中的状态,尝试将线程挂起

代码语言:javascript
复制
LockSupport.park(this);

当线程执行完成后,需要释放许可证。

代码语言:javascript
复制
public void release() {
    sync.releaseShared(1);
}

realase()方法其实调用的是AQS中的releaseShared()方法。

代码语言:javascript
复制
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

其中tryReleaseShared()方法是在Semaphore中实现的。

代码语言:javascript
复制
protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        int current = getState();
        int next = current + releases;
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        if (compareAndSetState(current, next))
            return true;
    }
}

tryReleaseShared()方法释放指定数量的许可证。然后唤醒其他阻塞的线程。

代码语言:javascript
复制
/**
 * Release action for shared mode -- signals successor and ensures
 * propagation. (Note: For exclusive mode, release just amounts
 * to calling unparkSuccessor of head if it needs signal.)
 */
private void doReleaseShared() {
    /*
     * Ensure that a release propagates, even if there are other
     * in-progress acquires/releases.  This proceeds in the usual
     * way of trying to unparkSuccessor of head if it needs
     * signal. But if it does not, status is set to PROPAGATE to
     * ensure that upon release, propagation continues.
     * Additionally, we must loop in case a new node is added
     * while we are doing this. Also, unlike other uses of
     * unparkSuccessor, we need to know if CAS to reset status
     * fails, if so rechecking.
     */
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2019-03-21,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 落叶飞翔的蜗牛 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档