前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Java - LinkedBlockingQueue的阻塞实现

Java - LinkedBlockingQueue的阻塞实现

作者头像
夹胡碰
发布2021-01-06 10:56:43
8930
发布2021-01-06 10:56:43
举报
文章被收录于专栏:程序猿~

LinkedBlockingQueueBlockingQueue的链表实现,他的阻塞体现在puttake方法上,下面将通过源码介绍如何LinkedBlockingQueue是如何实现的阻塞队列。

1. ReentrantLock+Condition

通过AQS构建的ReentrantLockCondition实现了puttake的锁与线程挂起/唤醒模型

代码语言:javascript
复制
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();

/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();

/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();

/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
2. put阻塞

1)首先加put锁 2)当容量满时(count.get() == capacity),执行notFull.await();挂起当前线程并释放锁,相当于Object.wait。 3)当容量从0变成1时(if c == 0),执行signalNotEmpty();唤醒一个take挂起线程,相当于Object.notify

代码语言:javascript
复制
public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly(); // put锁
    try {
        while (count.get() == capacity) {
            notFull.await(); // 容量满时,挂起线程释放锁,并加入等待队列
        }
        enqueue(node);
        c = count.getAndIncrement();
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty(); // 当容量从0变成1时,唤醒一个take挂起的线程
}
代码语言:javascript
复制
private void signalNotEmpty() {
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        notEmpty.signal();// 唤醒一个take挂起的线程
    } finally {
        takeLock.unlock();
    }
}
3. take阻塞

1)首先加take锁 2)当容量为0时(count.get() == 0),执行notEmpty.await();挂起当前线程并释放锁,相当于Object.wait。 3)当容量从满容量减少1时(if c == capacity),执行signalNotFull();唤醒一个put挂起线程,相当于Object.notify

代码语言:javascript
复制
public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly(); // take锁
    try {
        while (count.get() == 0) {
            notEmpty.await(); // 当队列中没有对象时,挂起线程释放锁,并加入等待队列
        }
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull(); // 当容量从满容量减少1时,唤醒一个put挂起的线程
    return x;
}
代码语言:javascript
复制
private void signalNotFull() {
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        notFull.signal(); // 唤醒一个put挂起的线程
    } finally {
        putLock.unlock();
    }
}
4. 拓展: offer(timeout) 与 poll(timeout)实现原理

实现与put/take基本一样,只不过底层调用了LockSupport.parkNanos/LockSupport.unparkNanos

  • wait
代码语言:javascript
复制
public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this); // 基础park
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}
  • wait timeout
代码语言:javascript
复制
public final long awaitNanos(long nanosTimeout)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    final long deadline = System.nanoTime() + nanosTimeout;
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        if (nanosTimeout <= 0L) {
            transferAfterCancelledWait(node);
            break;
        }
        if (nanosTimeout >= spinForTimeoutThreshold)
            LockSupport.parkNanos(this, nanosTimeout); // timeout park
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
        nanosTimeout = deadline - System.nanoTime();
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null)
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
    return deadline - System.nanoTime();
}
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. ReentrantLock+Condition
  • 2. put阻塞
  • 3. take阻塞
  • 4. 拓展: offer(timeout) 与 poll(timeout)实现原理
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档