前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >AQS之同步器

AQS之同步器

作者头像
spilledyear
发布2020-02-10 11:46:57
4220
发布2020-02-10 11:46:57
举报
文章被收录于专栏:小白鼠小白鼠

ReentrantLock

代码语言:javascript
复制
Lock lock = new ReentrantLock();
for (int i = 0; i < 3; i++) {
    new Thread(() -> {
        lock.lock();
        System.out.println(Thread.currentThread().getName() + " " + LocalDateTime.now());
        sleep(TimeUnit.SECONDS, 2);
        lock.unlock();
    }).start();
}

---------------------------------执行结果------------------------------------
Thread-0 2020-01-01T17:12:24.678
Thread-1 2020-01-01T17:12:26.681
Thread-2 2020-01-01T17:12:28.681
  1. 实现原理基于独占锁
  2. 公平锁tryAcquire方法中会先判断队列中有没有阻塞节点,有就加入队列,没有就通过CAS尝试获取锁
  3. 非公平锁tryAcquire中不管队列中有没有阻塞节点,直接先通过CAS尝试获取锁,获取成功就返回,获取失败就加入阻塞队列

CountDownLatch

代码语言:javascript
复制
CountDownLatch countDownLatch = new CountDownLatch(3);
new Thread(() -> {
    sleep(TimeUnit.MILLISECONDS, 80);
    System.out.println(Thread.currentThread().getName() + " Finished");
    countDownLatch.countDown();
}).start();
new Thread(() -> {
    sleep(TimeUnit.MILLISECONDS, 50);
    System.out.println(Thread.currentThread().getName() + " Finished");
    countDownLatch.countDown();
}).start();
new Thread(() -> {
    sleep(TimeUnit.MILLISECONDS, 60);
    System.out.println(Thread.currentThread().getName() + " Finished");
    countDownLatch.countDown();
}).start();
new Thread(() -> {
    sleep(TimeUnit.MILLISECONDS, 60);
    System.out.println(Thread.currentThread().getName() + " Finished");
    countDownLatch.countDown();
}).start();
countDownLatch.await();
System.out.println("All Finished");


---------------------------------执行结果------------------------------------
Thread-1 Finished
Thread-2 Finished
Thread-3 Finished
All Finished
Thread-0 Finished
  1. 实现原理基于共享锁
  2. 在初始化的时候传入一个变量,该变量即代表同步器中state的值
  3. 调用CountDownLatch#await方法的时候,会判断state的值是否等于0,不等于0就添加到阻塞队列,等于0就直接返回
  4. 调用CountDownLatch#countDown方法的时候,state减1,判断state的值是否等于0,等于0就释放因调用CountDownLatch#await方法而阻塞的线程,不等于0就直接返回

CyclicBarrier

代码语言:javascript
复制
CyclicBarrier cyclicBarrier = new CyclicBarrier(2, () -> System.out.println("阻塞解除之后执行一些逻辑 " + LocalDateTime.now()));
for (int i = 0; i < 3; i++) {
    new Thread(() -> {
        try {
            System.out.println(Thread.currentThread().getName() + " doXX");
            cyclicBarrier.await();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }).start();

    new Thread(() -> {
        try {
            System.out.println(Thread.currentThread().getName() + " doXX");
            cyclicBarrier.await();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }).start();

    sleep(TimeUnit.SECONDS, 2);
}

---------------------------------执行结果------------------------------------
Thread-0 doXX
Thread-1 doXX
阻塞解除之后执行一些逻辑 2020-01-01T14:19:15.735
Thread-2 doXX
Thread-3 doXX
阻塞解除之后执行一些逻辑 2020-01-01T14:19:17.706
Thread-4 doXX
Thread-5 doXX
阻塞解除之后执行一些逻辑 2020-01-01T14:19:19.706

字面意思就是循环壁垒,使用上与CountDownLatch类似,不过实现上完全不一样,CyclicBarrier统计的的是调用了CyclicBarrier#await方法的线程数,当线程数达到了CyclicBarrier初始时规定的数目时,所有进入等待状态的线程将被唤醒然后进入下一轮,可以重复使用

  1. 实现原理是基于ReentrantLockCondition
  2. 执行CyclicBarrier#await之前,需要先执行ReentrantLock#lock方法,完成之后执行ReentrantLock#unlock方法,即通过ReentrantLock保证执行CyclicBarrier#await方法是安全的
  3. 在执行CyclicBarrier#await方法的时候,不满足释放条件(调用CyclicBarrier#await方法的线程数不等于初始值)时,会调用Condition#await方法是当前线程阻塞,满足释放条件时会调用Condition#signalAll唤醒所有阻塞的线程然后进入下一轮
  4. CyclicBarrier中有一个内部类Generation,该内部类就表示一轮一轮循环的意思,当满足释放条件时,除了唤醒所有因调用CyclicBarrier#await方法而阻塞的线程,还会生成一个新的Generation对象,代表下一轮开始

Semaphore

信号量,可以用来控制同时访问资源的线程个数,比如可以用在对线程数的限流,在初始化的时候需要用户传入许可的数量,通过Semaphore#acquire方法获取一个许可,如果Semaphore还有许可可获取就直接返回,否则阻塞当前线程

代码语言:javascript
复制
Semaphore semaphore = new Semaphore(2);
for (int i = 0; i < 4; i++) {
    new Thread(() -> {
        try {
            semaphore.acquire();
            System.out.println(Thread.currentThread().getName() + "访问资源");
            sleep(TimeUnit.MILLISECONDS, 10 + new Random().nextInt(10));
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            System.out.println(Thread.currentThread().getName() + "释放资源");
            semaphore.release();
        }

    }).start();
}

---------------------------------执行结果------------------------------------
Thread-1访问资源
Thread-0访问资源
Thread-0释放资源
Thread-2访问资源
Thread-1释放资源
Thread-3访问资源
Thread-2释放资源
Thread-3释放资源
  1. 实现原理基于共享锁
  2. 内部类Sync实现了AbstractQueuedSynchronizer,Sync实现类有两个,对应公平锁和非公平锁
  3. 简单来讲, Semaphore其实就是一个共享锁,根据我们对共享锁的了解,共享锁表示同一时刻最多有多少个线程持有锁,这和Semaphore的特性是一致的,所以Semaphore只需要再封装一层API,调用Semaphore#acquire方法时候获取锁或阻塞,调用,调用Semaphore#release`通过自旋释放锁就可以了

DelayQueue

  1. 实现原理基于ReentrantLockCondition
  2. 优先级 无界 阻塞队列
  3. 元素必须实现Delayed接口,Delayed#getDelay方法返回值<=0代表该元素延迟时间已到,可以出队了
  4. 基于 优先级 保证元素插入的顺序,基于Delayed#getDelay来控制元素延迟的时间,基于Condition控制在获取元素时是阻塞还是返回(队首元素)

LinkedBlockingQueue

  1. 实现原理基于2个ReentrantLock和2个Condition,分别为ReentrantLock存取锁Condition存取锁
  2. 有界 阻塞队列,如果不指定,容量默认为Integer.MAX_VALUE
  3. 链表结构,每个元素会被封装成一个Node节点,每个Node节点有一个next指针
  4. 尾进头出,添加元素的时候放到尾部,获取元素的时候放到头部

添加元素

涉及到的方法:LinkedBlockingQueue#put,LinkedBlockingQueue#offer,如果队列满了,前者会阻塞,后者不会阻塞

  1. 先获取ReentrantLock存锁
  2. 如果已经达到容量上限,就通过Condition存锁阻塞当前线程,否则添加元素
  3. 入队,将当前元素添加到队尾,将上次队位元素的next指针指向该元素,更新last指向
  4. 容量加1,如果容量==0,唤醒Condition写锁

获取元素

涉及到的方法:LinkedBlockingQueue#take,LinkedBlockingQueue#poll,如果队列空了,前者会阻塞,后者不会阻塞

  1. 先获取ReentrantLock写锁
  2. 如果队列里面没有元素,就通过Condition写锁阻塞当前线程,否则获取元素
  3. 出队,从队首去取出一个元素,更新head指向
  4. 容量减1,如果容量满了,唤醒Condition读锁

ArrayBlockingQueue

  1. 实现原理基于1个ReentrantLock和2个Condition,两个Condition分别用于判断是否为空和是否已满
  2. 有界 阻塞队列,基于final数组,自然是有界的
  3. 数组结构,基于final数组,,在初始化ArrayBlockingQueue的时候需要指定容量

添加元素

涉及到的方法:ArrayBlockingQueue#put,ArrayBlockingQueue#offer,如果队列满了,前者会阻塞,后者不会阻塞

  1. 获取ReentrantLock
  2. 如果队列满了,通过判断是否已满的Condition阻塞当前线程
  3. 有一个变量putIndex用于记录下次添加元素时对应的数组下标,当takeIndex==队列.length的时候,重置该变量为0
  4. 唤醒判断是否为空的Condition

获取元素

涉及到的方法:ArrayBlockingQueue#take,ArrayBlockingQueue#poll,如果队列空了,前者会阻塞,后者不会阻塞

  1. 获取ReentrantLock
  2. 如果队列为空了,通过判断是否为空的Condition阻塞当前线程
  3. 有一个变量takeIndex用于记录下次获取元素时对应的数组下标,当takeIndex==队列.length的时候,重置该变量为0
  4. 唤醒判断是否已满的Condition

LinkedTransferQueue

进行线程间数据交换的利器 todo

SynchronousQueue

PriorityBlockingQueue

  1. 实现原理基于ReentrantLockCondition
  2. 优先级 无界 阻塞队列,优先级的实现基于二叉堆
  3. 值不允许null,且需要实现Comparable接口

优先级的 无界阻塞队列,优先级可以基于自然排序,也可以基于Comparable接口,取决于你使用哪个构造函数

添加元素

涉及到的方法: PriorityBlockingQueue#add PriorityBlockingQueue#offer

  1. 先通过ReentrantLock获取锁
  2. 判断size >= queue.length,如果条件成立就扩容 3.通过Comparable#二叉堆便利找到合适的位置插入元素
  3. size加1
  4. 唤醒因执行了PriorityBlockingQueue#take方法而阻塞的线程(即队列中没有元素的时候),这个通过Condition实现
  5. 释放锁

获取元素

涉及到的方法: PriorityBlockingQueue#poll PriorityBlockingQueue#take

  1. 在调用这些方法之前都需要先通过ReentrantLock获取锁
  2. PriorityBlockingQueue#poll方法,在队列中没有元素的时候直接返回null,不会阻塞当前线程
  3. PriorityBlockingQueue#take方法,在队列中没有元素的时候会阻塞当前线程,知道队列中有元素然后再被唤醒返回,基于Condition实现

队列扩容

涉及到的方法: PriorityBlockingQueue#tryGrow

  1. 执行ReentrantLock#unlock方法,释放ReentrantLock锁,为什么要释放这个锁呢?我猜这里是为了提高性能,在扩容之前先释放锁,然后通过一个CAS变量来控制扩容的并发问题,这样在扩容期间就不会接阻塞其他调用线程,比如take操作,很妙
  2. 当前是否正在扩容通过一个volatile变量表示,0表示目前不在扩容,1表示正在扩容,每次扩容之前通过CAS将其设置为1,如果CAS失败说明目前有其他线程正在扩容,此时不做处理
  3. 扩容的时候先判断当前容量是否小于64,如果小于64就对容量*2+2;如果不小于64,则对容量*1.5
  4. 重新基于该容量创建一个新的Object[]
  5. 如果有并发问题,就通过Thread#yield让出当前CPU
  6. 通过System#arraycopy对行队列赋值,在赋值之前需要先通过ReentrantLock#lock再次获取锁

CopyOnWriteArrayList

可以将它看成是一个线程安全的ArrayList,在涉及到修改操作时,通过ReentrantLock获取锁,然后复制一个新的数组去修改,基于volatile语义可以读数据时不会有问题,适用于读多写少的场景,如果写比较多的比较影响性能

  1. 实现原理基于ReentrantLockvolatile
  2. 元素可以为null

获取元素

例如CopyOnWriteArrayList#get方法,直接返回数组下标对应的元素即可

修改元素

例如CopyOnWriteArrayList#set CopyOnWriteArrayList#add`方法

  1. 先获取ReentrantLock
  2. 基于原数组创建一个新的数组,然后使引用指向新数组
  3. 释放锁

CopyOnWriteArraySet

内部持有一个CopyOnWriteArrayList引用,也就是它的实现完全是基于CopyOnWriteArrayList,那它是如何保证元素不唯一呢?在CopyOnWriteArrayList中有一个addIfAbsent方法,该方法会通过遍历的方式去判断你要添加的元素是否存在.

适合读多写少的场景

ReentrantReadWriteLock

  1. 持有写锁的时候不能申请读锁,持有读锁的时候不能申请写锁,但锁降级的时候是个例外(持有写锁的情况下降级成读锁,实际上是持有写锁的时候再去申请读锁,因为都是被同一个线程占有,所以不会有问题)
  2. 写锁,独占锁,int类型低16位表示,当一个线程持有写锁的时候,其它线程不能获取读锁,只能在阻塞队列排队,写锁可以降级成读锁
  3. 读锁,共享锁,int类型高16位表示,当一个线程持有读锁的时候,其他线程还可以申请读锁,但不能申请写锁,读锁不可以降级成写锁

使用示例

代码语言:javascript
复制
StringBuilder data = new StringBuilder("data");
ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();

new Thread((() -> {
    // sleep一会,让写锁先持有锁
    sleep(TimeUnit.MILLISECONDS, 10);

    readWriteLock.readLock().lock();
    System.out.println(Thread.currentThread().getName() + "获得读锁 " + LocalDateTime.now());
    System.out.println(Thread.currentThread().getName() + data);
    readWriteLock.readLock().unlock();
}), "readLock ").start();

new Thread((() -> {
    readWriteLock.writeLock().lock();
    System.out.println(Thread.currentThread().getName() + "获得写锁 " + LocalDateTime.now());
    sleep(TimeUnit.SECONDS, 3);
    data.append("666");

    // 锁降级 这里这样子使用降级感觉没有什么意思,那锁降级一般用到什么场景
    readWriteLock.readLock().lock();
    System.out.println(Thread.currentThread().getName() + data);

    readWriteLock.writeLock().unlock();
}), "writeLock ").start();

构造函数

代码语言:javascript
复制
public ReentrantReadWriteLock() {
    this(false);
}

// 可以在创建ReentrantReadWriteLock时选择公平模式还是非公平模式
public ReentrantReadWriteLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
    readerLock = new ReadLock(this);
    writerLock = new WriteLock(this);
}
  1. 可以在创建ReentrantReadWriteLock时选择使用公平锁还是非公平锁
  2. 内部类Sync继承自AbstractQueuedSynchronizer,它负责实现同步器的模板方法,是实现同步器的关键
  3. ReadLockWriteLock实现了Lock接口,可以将它们看作是API层,具体逻辑委托给Sync实现,面向用户

Sync

代码语言:javascript
复制
static final int SHARED_SHIFT   = 16;
static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

/** Returns the number of shared holds represented in count  */
static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }

/** Returns the number of exclusive holds represented in count  */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

获取读锁

代码语言:javascript
复制
public void lock() {
    sync.acquireShared(1);
}

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

protected final int tryAcquireShared(int unused) {
    Thread current = Thread.currentThread();
    int c = getState();

    // 其实这里就涉及到锁降级,如果当前已经有写锁,返回-1,将它加入到阻塞队列, 否则继续尝试获取读锁
    if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current)
        return -1;

    int r = sharedCount(c);
    // readerShouldBlock主要是针对公平锁和非公平锁, `c + SHARED_UNIT`是因为共享锁用的是高16位
    if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) {
        if (r == 0) {
            // firstReader是把读锁状态从0变成1的那个线程
            firstReader = current;
            firstReaderHoldCount = 1;
        } else if (firstReader == current) {
            // 锁重入
            firstReaderHoldCount++;
        } else {
            // cachedHoldCounter是上一个获取锁成功的线程
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))
                cachedHoldCounter = rh = readHolds.get();
            else if (rh.count == 0)
                readHolds.set(rh);
            rh.count++;
        }
        return 1;
    }

    // 上面的if失败了,即通过自旋获取锁
    return fullTryAcquireShared(current);
}
代码语言:javascript
复制
final int fullTryAcquireShared(Thread current) {
    HoldCounter rh = null;
    for (;;) {
        int c = getState();
        if (exclusiveCount(c) != 0) {
            if (getExclusiveOwnerThread() != current)
                return -1;
            // else we hold the exclusive lock; blocking here
            // would cause deadlock.
        } else if (readerShouldBlock()) {
            // Make sure we're not acquiring read lock reentrantly
            if (firstReader == current) {
                // assert firstReaderHoldCount > 0;
            } else {
                if (rh == null) {
                    rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current)) {
                        rh = readHolds.get();
                        if (rh.count == 0)
                            readHolds.remove();
                    }
                }
                if (rh.count == 0)
                    return -1;
            }
        }
        if (sharedCount(c) == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            if (sharedCount(c) == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                if (rh == null)
                    rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
                cachedHoldCounter = rh; // cache for release
            }
            return 1;
        }
    }
}

释放读锁

代码语言:javascript
复制
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

protected final boolean tryReleaseShared(int unused) {
    Thread current = Thread.currentThread();

    // 如果 firstReader 是当前线程
    if (firstReader == current) {
        // assert firstReaderHoldCount > 0;

        // 并且firstReaderHoldCount == 1,说明释放锁之后需要重置firstReader
        if (firstReaderHoldCount == 1)
            firstReader = null;
        else
            firstReaderHoldCount--;
    } else {
        HoldCounter rh = cachedHoldCounter;
        if (rh == null || rh.tid != getThreadId(current))
            rh = readHolds.get();
        int count = rh.count;
        if (count <= 1) {
            readHolds.remove();
            if (count <= 0)
                throw unmatchedUnlockException();
        }
        --rh.count;
    }
    for (;;) {
        int c = getState();
        int nextc = c - SHARED_UNIT;
        if (compareAndSetState(c, nextc))
            // Releasing the read lock has no effect on readers,
            // but it may allow waiting writers to proceed if
            // both read and write locks are now free.
            return nextc == 0;
    }
}
  1. 读锁和写锁不能同时使用,所以释放读锁的时候按理来说并不需要去唤醒节点
  2. 但如果state==0,需要唤醒阻塞的写锁

获取写锁

代码语言:javascript
复制
protected final boolean tryAcquire(int acquires) {
    Thread current = Thread.currentThread();
    int c = getState();
    int w = exclusiveCount(c);

    // state != 0 && w==0 说明有读锁
    if (c != 0) {
        // (Note: if c != 0 and w == 0 then shared count != 0)
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;

        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");

        // Reentrant acquire  锁重入
        setState(c + acquires);
        return true;
    }

    // writerShouldBlock用于确定公平和非公平模式
    if (writerShouldBlock() ||
        !compareAndSetState(c, c + acquires))
        return false;
    setExclusiveOwnerThread(current);
    return true;
}

判断是又读锁,判断是否锁重入,通过CAS设置状态

释放写锁

代码语言:javascript
复制
protected final boolean tryRelease(int releases) {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();

    int nextc = getState() - releases;
    boolean free = exclusiveCount(nextc) == 0;
    if (free)
        setExclusiveOwnerThread(null);
    setState(nextc);
    return free;
}

和正常的独占锁释放一样

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • CountDownLatch
  • CyclicBarrier
  • Semaphore
  • DelayQueue
  • LinkedBlockingQueue
    • 添加元素
      • 获取元素
      • ArrayBlockingQueue
        • 添加元素
          • 获取元素
          • LinkedTransferQueue
          • SynchronousQueue
          • PriorityBlockingQueue
            • 添加元素
              • 获取元素
                • 队列扩容
                • CopyOnWriteArrayList
                  • 获取元素
                    • 修改元素
                    • CopyOnWriteArraySet
                    • ReentrantReadWriteLock
                      • 使用示例
                        • 构造函数
                          • Sync
                            • 获取读锁
                              • 释放读锁
                                • 获取写锁
                                  • 释放写锁
                                  领券
                                  问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档