最早用来解决进程同步与互斥问题的机制: 包括一个称为信号量的变量及对它进行的两个原语操作(PV操作)
什么是信号量?
信号量(semaphore)的数据结构为一个值和一个指针,指针指向等待该信号量的下一个进程。信号量的值与相应资源的使用情况有关。
PV操作由P操作原语和V操作原语组成(原语是不可中断的过程)
(注,P是荷兰语的Passeren,相当于英文的pass,V是荷兰语的Verhoog,相当于英文中的incremnet)
对信号量进行操作,具体定义如下:
PV操作的意义:我们用信号量及PV操作来实现进程的同步和互斥。PV操作属于进程的低级通信
使用PV操作实现进程互斥时应该注意的是:
//许可数量
private int permits = 1;
public synchronized void P() {
permits--;
if(permits < 0 ){
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public synchronized void V(){
permits++;
if(permits <=0){
notifyAll();
}
}
JUC提供了工具类之一就是Semaphore,提供了丰富的API,不再需要自己实现
// 创建具有给定的许可数和非公平的公平设置的 Semaphore。
Semaphore(int permits)
// 创建具有给定的许可数和给定的公平设置的 Semaphore。
Semaphore(int permits, boolean fair)
// 从此信号量获取一个许可,在提供一个许可前一直将线程阻塞,否则线程被中断。
void acquire()
// 从此信号量获取给定数目的许可,在提供这些许可前一直将线程阻塞,或者线程已被中断。
void acquire(int permits)
// 从此信号量中获取许可,在有可用的许可前将其阻塞。
void acquireUninterruptibly()
// 从此信号量获取给定数目的许可,在提供这些许可前一直将线程阻塞。
void acquireUninterruptibly(int permits)
// 返回此信号量中当前可用的许可数。
int availablePermits()
// 获取并返回立即可用的所有许可。
int drainPermits()
// 返回一个 collection,包含可能等待获取的线程。
protected Collection<Thread> getQueuedThreads()
// 返回正在等待获取的线程的估计数目。
int getQueueLength()
// 查询是否有线程正在等待获取。
boolean hasQueuedThreads()
// 如果此信号量的公平设置为 true,则返回 true。
boolean isFair()
// 根据指定的缩减量减小可用许可的数目。
protected void reducePermits(int reduction)
// 释放一个许可,将其返回给信号量。
void release()
// 释放给定数目的许可,将其返回到信号量。
void release(int permits)
// 返回标识此信号量的字符串,以及信号量的状态。
String toString()
// 仅在调用时此信号量存在一个可用许可,才从信号量获取许可。
boolean tryAcquire()
// 仅在调用时此信号量中有给定数目的许可时,才从此信号量中获取这些许可。
boolean tryAcquire(int permits)
// 如果在给定的等待时间内此信号量有可用的所有许可,并且当前线程未被中断,则从此信号量获取给定数目的许可。
boolean tryAcquire(int permits, long timeout, TimeUnit unit)
// 如果在给定的等待时间内,此信号量有可用的许可并且当前线程未被中断,则从此信号量获取一个许可。
boolean tryAcquire(long timeout, TimeUnit unit)
对于JUC的Semaphore源码,此篇不阐述了,另开新篇;但对分布式的Semaphore倒是可以研究下
Redission中有对应的RSemaphore
RSemaphore semaphore = redisson.getSemaphore("semaphore");
semaphore.acquire();
//或
semaphore.acquireAsync();
semaphore.acquire(23);
semaphore.tryAcquire();
可过期信号量
RPermitExpirableSemaphore semaphore = redisson.getPermitExpirableSemaphore("mySemaphore");
String permitId = semaphore.acquire();
// 获取一个信号,有效期只有2秒钟。
String permitId = semaphore.acquire(2, TimeUnit.SECONDS);
// ...
semaphore.release(permitId);
直接上最本质的源码片段,lua脚本很简单,对信号量进行计数,acquire时,信号量减1,release时,信号量加1;主要是保证操作的原子性
@Override
public RFuture<Boolean> tryAcquireAsync(int permits) {
if (permits < 0) {
throw new IllegalArgumentException("Permits amount can't be negative");
}
if (permits == 0) {
return RedissonPromise.newSucceededFuture(true);
}
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"local value = redis.call('get', KEYS[1]); " +
"if (value ~= false and tonumber(value) >= tonumber(ARGV[1])) then " +
"local val = redis.call('decrby', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.<Object>singletonList(getName()), permits);
}
@Override
public RFuture<Void> releaseAsync(int permits) {
if (permits < 0) {
throw new IllegalArgumentException("Permits amount can't be negative");
}
if (permits == 0) {
return RedissonPromise.newSucceededFuture(null);
}
return commandExecutor.evalWriteAsync(getName(), StringCodec.INSTANCE, RedisCommands.EVAL_VOID,
"local value = redis.call('incrby', KEYS[1], ARGV[1]); " +
"redis.call('publish', KEYS[2], value); ",
Arrays.<Object>asList(getName(), getChannelName()), permits);
}
在最本质的基础上,再深入看一下还做了哪些事,能真正达到一个工业生产标准
非阻塞式,有信息量就正常获取,没有刚快速返回,就是lua本质,没有做额外的事情
@Override
public void acquire(int permits) throws InterruptedException {
if (tryAcquire(permits)) {
return;
}
RFuture<RedissonLockEntry> future = subscribe();
commandExecutor.syncSubscription(future);
try {
while (true) {
if (tryAcquire(permits)) {
return;
}
getEntry().getLatch().acquire(permits);
}
} finally {
unsubscribe(future);
}
}
阻塞式,相对非阻塞式就多了一些事
订阅事件内部细节,另开篇再说了,他的目的其实就是释放Semaphore
想像一下,同一个client的两个线程A,B 同时需要获取信号量,如果A成功获取,那么B将被Semaphore阻塞住了,何时退出阻塞呢?
就在线程A进行release()之后,会publish,细节可查看上面的release()中的lua脚本,当B监听到事件时,就会调用Semaphore.release(),再次进行tryAcquire()
如果在给定的等待时间内此信号量有可用的所有许可,并且当前线程未被中断,则从此信号量获取给定数目的许可
@Override
public boolean tryAcquire(int permits, long waitTime, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
if (tryAcquire(permits)) {
return true;
}
time -= (System.currentTimeMillis() - current);
if (time <= 0) {
return false;
}
current = System.currentTimeMillis();
RFuture<RedissonLockEntry> future = subscribe();
if (!await(future, time, TimeUnit.MILLISECONDS)) {
return false;
}
try {
time -= (System.currentTimeMillis() - current);
if (time <= 0) {
return false;
}
while (true) {
current = System.currentTimeMillis();
if (tryAcquire(permits)) {
return true;
}
time -= (System.currentTimeMillis() - current);
if (time <= 0) {
return false;
}
// waiting for message
current = System.currentTimeMillis();
getEntry().getLatch().tryAcquire(permits, time, TimeUnit.MILLISECONDS);
time -= (System.currentTimeMillis() - current);
if (time <= 0) {
return false;
}
}
} finally {
unsubscribe(future);
}
// return get(tryAcquireAsync(permits, waitTime, unit));
}
其实await(future, time, TimeUnit.MILLISECONDS)是使用的CountDownLatch
如果计数到达零,则返回 true;如果在计数到达零之前超过了等待时间,则返回 false
当前是第一个请求,或者别的释放,那就再往下进入循环
CountDownLatch.await()+Semaphore.tryAcquire()配合使用
每一次等待时间后,都需要检查是否超过等待时间
为什么需要引入CountDownLatch.await()呢? 都使用Semaphore.tryAcquire()不行吗?这个需要再次深入挖掘了
分布式信号量,原理很明了,主要还是通过lua保障redis操作的原子性
阅读redisson源码,发现里面的操作基本都是异步化,底层又是基于netty,大量使用了future模式,如果不知道future模式,会很绕,debug都会晕掉,所以在深入redisson之前,需要再对future模式温习一下