前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Semaphore,ReadWriteLock,StampedLock

Semaphore,ReadWriteLock,StampedLock

作者头像
小土豆Yuki
发布2021-01-04 09:59:33
4530
发布2021-01-04 09:59:33
举报
文章被收录于专栏:洁癖是一只狗

如何使用Semaphore实现一个限流器

信号量模型的模型是很简单的,一个计数器,一个等待队列以及三个方法,如下图显示

  • init()设置计数器的初始值
  • up()计数器器值加一,如果计数器的值小于或等于0,则唤醒等待队列中的一个线程,并且从等待队列移除
  • down()计数器减一,如果计数器的值小于0,则当前线程阻塞,否则当前线程继续执行。

上面三个方法都是原子性的,并且这个原子性是由信号量模型实现放保证的,在java中信号量的实现是有类Semaphore实现的,下面看看下面代码,

代码语言:javascript
复制

class Semaphore{
// 计数器
int count;
// 等待队列
  Queue queue;
// 初始化操作
  Semaphore(int c){
this.count=c;
  }
// 
void down(){
this.count--;
if(this.count<0){
//将当前线程插入等待队列
//阻塞当前线程
    }
  }
void up(){
this.count++;
if(this.count<=0) {
//移除等待队列中的某个线程T
//唤醒线程T
    }
  }
}

信号量模型中的down和up其实就是历史上最早成为P操作和V操作,信号量模型也被称为PV原语,而在java中down和up对应的就是acquire和release.

我们在看一下如何使用Semaphore,其实我们可以把信号量当做我们现实生活中的红绿灯,车辆通过必须检查是否是绿灯,只有绿灯才能通过,比如下面代码,我们使用Semaphore实现一个累加器,实现互斥锁保证线程安全

代码语言:javascript
复制

static int count;
//初始化信号量
static final Semaphore s 
    = new Semaphore(1);
//用信号量保证互斥    
static void addOne() {
  s.acquire();
try {
    count+=1;
  } finally {
    s.release();
  }
}

比如我们假设有两个线程T1,T2,当他们同时调用addOne方法,此时都会执行到了acquire,但是只有一个线程会把限号量的计数器减为0(假设是T1),另外一个线程T2把计数器减为-1,此时对于T1,信号量的计数器为0,大于或等于0,则执行操作,而线程T2的信号量的计数器是-1,小于1,按照信号量模型就会阻塞,所以此时只有一个线程T1进入到了count+=1;

然后,当T1执行完之后调用release(相当于调用up),就会把计数器的值+1,此时计数器的值为0,小于或等于0,就会唤醒等待队列中的T2线程,这样就实现了互斥性,保证了线程安全

有的人会有疑问,JDK不是已经实现了Lock,为什么还要实现Semaphore,那是因为Semaphore可以允许多个线程访问一个临界区,正如我经常使用的线程池,连接池,对象池等等

比如我们要是实现这样的一个场景,实现一个对象池,我们的线程要重复利用N个对象,这个对象释放前,不允许其他线程使用,我们使用list保存对象,且不允许超过N个线程同时进入临界区,我们看看下面代码实现的限流器

代码语言:javascript
复制

class ObjPool<T, R> {
final List<T> pool;
// 用信号量实现限流器
final Semaphore sem;
// 构造函数
  ObjPool(int size, T t){
    pool = new Vector<T>(){};
for(int i=0; i<size; i++){
      pool.add(t);
    }
    sem = new Semaphore(size);
  }
// 利用对象池的对象,调用func
R exec(Function<T,R> func) {
    T t = null;
    sem.acquire();
try {
      t = pool.remove(0);
return func.apply(t);
    } finally {
      pool.add(t);
      sem.release();
    }
  }
}
// 创建对象池
ObjPool<Long, String> pool = 
new ObjPool<Long, String>(10, 2);
// 通过对象池获取t,之后执行  
pool.exec(t -> {
    System.out.println(t);
return t.toString();
});

关键的代码就是ObjPool里面的exec()方法,这个方法里面实现了限流的功能,

首先我们调用acquire方法,此时设置的对象池代销是10,当前面的10个线程可以继续执行,每一个线程分配了一个对象,但是当地11个线程调用的时候就会阻塞,而使用完之后就会释放对象,同时调用release方法来更新信号量的计数器,如果此时计数器的值小于等于0,那么说明有线程在等待,此时会自动唤醒等待的线程.

如实使用ReadWriteLock实现一个缓存

读写锁与互斥锁的一个重要区别就是读写锁允许多个线程同时读共享变量,而互斥锁是不允许的,对写锁在读锁写少的场景下性能优于互斥锁的,但读写锁的写操作是互斥的,当一个线程在写共享标量的时候,是不允许其他线程执行写操作和读操作的,

现在我们就使用读写锁实现一个缓存,我们声明一个类Cache<k,v>,k即使缓存key的类型,v就是value的类型,我们会把缓存的数据放到HashMap中,由于hashmap是线程不安全,我们可以使用读写锁实现保证线程安全,而读写锁readWriteLock是一个接口,他的实现类可以是ReentrantReadWriteLock,

我们实现的Cache实现两个方法,get和put,如下代码

代码语言:javascript
复制

class Cache<K,V> {
final Map<K, V> m =
new HashMap<>();
final ReadWriteLock rwl =
new ReentrantReadWriteLock();
// 读锁
final Lock r = rwl.readLock();
// 写锁
final Lock w = rwl.writeLock();
// 读缓存
V get(K key) {
    r.lock();
try { return m.get(key); }
finally { r.unlock(); }
  }
// 写缓存
V put(K key, V value) {
    w.lock();
try { return m.put(key, v); }
finally { w.unlock(); }
  }
}

我们知道缓存的初始化一般分为两种,一种是全量加载,一种是按需加载,全量加载就是把数据一次性放入到缓存,而当数据量特别大的时候,我们可以使用按需加载,也就是懒加载,

我们假设我们的数据源就是数据库,如果缓存中没有缓存的对象,那么就需要从数据库中加载,然后写入缓存,写缓存需要加锁,所以在代码中我们就是用w.lock获取写锁.

但是我们要注意的是,如果在获取到写锁之后,我们直接去查询数据库而是再一次验证缓存是否存在,如果验证还是不存在,才回去数据库查询并更新本地缓存,这样多的目的很简单,在高并发的场景,存在锁竞争,有可能在存在三个线程T1,T2,T3,此时T1线程获取到了写锁,并更新了缓存,最终释放了写锁,而假设T2此时也获取到了写锁,但是没有再次验证的话,就会重新查询数据库,所以这里的查询数据库是没有必要的,所以再次验证,能够避免高并发下重复查询数据的问题。具体代码如下

代码语言:javascript
复制
class Cache<K,V> {
final Map<K, V> m =
    new HashMap<>();
final ReadWriteLock rwl = 
    new ReentrantReadWriteLock();
final Lock r = rwl.readLock();
final Lock w = rwl.writeLock();

  V get(K key) {
    V v = null;
//读缓存
    r.lock();         ①
try {
      v = m.get(key); ②
    } finally{
      r.unlock();     ③
    }
//缓存中存在,返回
if(v != null) {   ④
return v;
    }  
//缓存中不存在,查询数据库
    w.lock();         ⑤
try {
//再次验证
//其他线程可能已经查询过数据库
      v = m.get(key); ⑥
if(v == null){  ⑦
//查询数据库
        v=省略代码无数
        m.put(key, v);
      }
    } finally{
      w.unlock();
    }
return v; 
  }
}

但是我们发现是否可以在②处 下面实现缓存验证的逻辑呢,如下代码

代码语言:javascript
复制

//读缓存
r.lock();         ①
try {
  v = m.get(key); ②
if (v == null) {
    w.lock();
try {
//再次验证并更新缓存
//省略详细代码
    } finally{
      w.unlock();
    }
  }
} finally{
  r.unlock();     ③
}

看上去没有说明问题,但是我们忽略了当我们获取写锁的时候,读锁还没有释放,因此此时获取写锁的操作会一直等待,最后导致相关线程都被阻塞,永远没有机会呗唤醒,从读锁到写锁的过程叫做锁的升级,然而这里是不允许的锁升级的,但是支持锁降级,

下面代码就是ReentrantReadWriteLock官方实例,你会发现获取读锁的时候,线程还是持有写锁的,

代码语言:javascript
复制

class CachedData {
  Object data;
volatile boolean cacheValid;
final ReadWriteLock rwl =
new ReentrantReadWriteLock();
// 读锁  
final Lock r = rwl.readLock();
//写锁
final Lock w = rwl.writeLock();

void processCachedData() {
// 获取读锁
    r.lock();
if (!cacheValid) {
// 释放读锁,因为不允许读锁的升级
      r.unlock();
// 获取写锁
      w.lock();
try {
// 再次检查状态  
if (!cacheValid) {
          data = ...
          cacheValid = true;
        }
// 释放写锁前,降级为读锁
// 降级是可以的
        r.lock(); ①
      } finally {
// 释放写锁
        w.unlock(); 
      }
    }
// 此处仍然持有读锁
try {use(data);} 
finally {r.unlock();}
  }
}

有没有比读写锁更块的锁

在java jdk1.8提供了一种锁叫做StampedLock的锁,他的性能比读写锁更好.

ReadWriteLock支持两种模式一种读锁,一种写锁,而StampedLock支持三个模式,写锁,悲观读锁,乐观读锁,其中写锁和悲观读锁和读写锁的写锁,读锁语义基本一致,允许多个线程同时获取悲观读锁,但是只有一个线程获取写锁,且写锁和读锁互斥,不同的是StampedLock里面的写锁和悲观读锁加锁成功之后,会返回一个stamp,当解锁的时候需要传入这个stamp

代码语言:javascript
复制

final StampedLock sl = 
new StampedLock();

// 获取/释放悲观读锁示意代码
long stamp = sl.readLock();
try {
//省略业务相关代码
} finally {
  sl.unlockRead(stamp);
}

// 获取/释放写锁示意代码
long stamp = sl.writeLock();
try {
//省略业务相关代码
} finally {
  sl.unlockWrite(stamp);
}

StampedLock的性能之所以比ReadWriteLock还要好,关键是因为支持乐观读的方式,ReadWriteLock支持多个线程同时读,但是当多个线程同时读的时候,此时的写线程就会阻塞,但是Stamplock是允许一个写线程获取写锁的,也就是不是多有写操作都会阻塞

这里我们要注意的是乐观读和乐观读锁是不一样的,客观读是一个无锁操作,因此相比ReadWriteLock,乐观读的性能更好一些。

我们可以看一下官方给出的代码,如下

代码语言:javascript
复制

class Point {
private int x, y;
final StampedLock sl = 
new StampedLock();
//计算到原点的距离  
int distanceFromOrigin() {
// 乐观读
long stamp = 
      sl.tryOptimisticRead();
// 读入局部变量,
// 读的过程数据可能被修改
int curX = x, curY = y;
//判断执行读操作期间,
//是否存在写操作,如果存在,
//则sl.validate返回false
if (!sl.validate(stamp)){
// 升级为悲观读锁
      stamp = sl.readLock();
try {
        curX = x;
        curY = y;
      } finally {
//释放悲观读锁
        sl.unlockRead(stamp);
      }
    }
return Math.sqrt(
      curX * curX + curY * curY);
  }
}

上面方法我们现实使用s1.tryOptimisticRead方法,这个方法就是乐观读,之后把共享变量x,y放到方法的局部变量,但是乐观读是无锁的操作,在共享变量放入局部变量的时候,x,y的值可能被其他线程修改,因此最后读完之后,还需要验证一下是否存在写操作,这个验证操作是通过validate(stamp)实现的.

这里乐观读升级为悲观读锁,这个是非常好的做法, 因为要保证共享变量x,y的正确性和一致性,如果我们这里循环反复执行乐观读,直到执行乐观读操作期间没有写操作,而循环读会浪费大量的CPU,升级为悲观读锁,代码简单不易出错.

我们使用StampedLock虽然性能很好,但是他并不支持重入,且StampedLock的悲观读锁,写锁都不支持条件变量,最后一点最为重要,如果线程阻塞在StampedLock的readLock或者writeLock的时候,此时调用线程的中断方法interrupt,会导致CPU飙升,如果需要支持中断功能,可以使用悲观读锁readLockInterruptibly或writeLockInterruptibly(),如下代码就可能会导致CPU飙升

代码语言:javascript
复制

final StampedLock lock
  = new StampedLock();
Thread T1 = new Thread(()->{
// 获取写锁
lock.writeLock();
// 永远阻塞在此处,不释放写锁
  LockSupport.park();
});
T1.start();
// 保证T1获取写锁
Thread.sleep(100);
Thread T2 = new Thread(()->
//阻塞在悲观读锁
lock.readLock()
);
T2.start();
// 保证T2阻塞在读锁
Thread.sleep(100);
//中断线程T2
//会导致线程T2所在CPU飙升
T2.interrupt();
T2.join();

如果对您有一丝丝帮助,麻烦点个关注,也欢迎转发,谢谢

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-12-21,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 洁癖是一只狗 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档