前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Java并发中的Condition是如何实现的

Java并发中的Condition是如何实现的

作者头像
Bug开发工程师
发布2018-12-12 15:30:07
5510
发布2018-12-12 15:30:07
举报
文章被收录于专栏:码农沉思录码农沉思录
代码语言:javascript
复制
原文:http://youngforzy.top/2017/12/01/Java%E5%B9%B6%E5%8F%91%E4%B9%8BCondition%E7%9A%84%E5%AE%9E%E7%8E%B0%E5%88%86%E6%9E%90/

一、Condition 的概念

介绍

回忆 synchronized 关键字,它配合 Object 的 wait()、notify() 系列方法可以实现等待/通知模式。对于 Lock,通过 Condition 也可以实现等待/通知模式。Condition 是一个接口。Condition 接口的实现类是 Lock(AQS)中的 ConditionObject。Lock 接口中有个 newCondition() 方法,通过这个方法可以获得 Condition 对象(其实就是 ConditionObject )。因此,通过 Lock 对象可以获得 Condition 对象。

代码语言:javascript
复制
Lock lock  = new ReentrantLock();
Condition c1 = lock.newCondition();
Condition c2 = lock.newCondition();

二、Condition 的实现分析

实现

ConditionObject 类是 AQS 的内部类,实现了 Condition 接口。

代码语言:javascript
复制
public class ConditionObject implements Condition, java.io.Serializable {
        private transient Node firstWaiter;
        private transient Node lastWaiter;
        ...

可以看到,等待队列和同步队列一样,使用的都是同步器 AQS 中的节点类 Node。同样拥有首节点和尾节点,每个 Condition 对象都包含着一个 FIFO 队列。结构图如下:

等待

调用 Condition 的 await() 方法会使线程进入等待队列,并释放锁,线程状态变为等待状态。

代码语言:javascript
复制
public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            //释放同步状态(锁)
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            //判断节点是否放入同步对列
            while (!isOnSyncQueue(node)) {
                //阻塞
                LockSupport.park(this);
                //如果已经中断了,则退出
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

分析上述方法的大概过程:

  1. 将当前线程创建为节点,加入等待队列。
  2. 释放锁,唤醒同步队列中的后继节点。
  3. while 循环判断节点是否放入同步队列;如果没有放入则阻塞继续 while 循环(如果已经中断则退出);如果放入则退出 while 循环执行后面的判断。
  4. 退出 while 说明节点已经在同步队列中,调用 acquireQueued() 方法加入同步状态竞争。
  5. 竞争到锁后从 await() 方法返回,即退出该方法。

addConditionWaiter() 方法:

代码语言:javascript
复制
private Node addConditionWaiter() {
            Node t = lastWaiter;
            if (t != null && t.waitStatus != Node.CONDITION) {
                //清除条件队列中所有状态不为Condition的节点
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            //将该线程创建节点,放入等待队列
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }

过程分析:同步队列的首节点移动到等待队列。加入尾节点之前会清除所有状态不为 Condition 的节点。

通知

调用 Condition 的 signal() 方法可以唤醒等待队列的首节点(等待时间最长),唤醒之前会将该节点移动到同步队列中。

代码语言:javascript
复制
public final void signal() {
            //判断是否获取了锁
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }

过程:

  1. 先判断当前线程是否获取了锁。
  2. 然后对首节点调用 doSignal() 方法。
代码语言:javascript
复制
private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }

过程:

  1. 修改首节点。
  2. 调用 transferForSignal() 方法将节点移动到同步队列。
代码语言:javascript
复制
final boolean transferForSignal(Node node) {
        //将节点状态变为0   
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;
        //将该节点加入同步队列
        Node p = enq(node);
        int ws = p.waitStatus;
        //如果结点p的状态为cancel 或者修改waitStatus失败,则直接唤醒
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }

调用同步器的 enq 方法,将节点移动到同步队列,满足条件后使用 LockSupport 唤醒该线程。

当 Condition 调用 signalAll() 方法:

代码语言:javascript
复制
public final void signalAll() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignalAll(first);
}

private void doSignalAll(Node first) {
    lastWaiter = firstWaiter = null;
    do {
        Node next = first.nextWaiter;
        first.nextWaiter = null;
        transferForSignal(first);
        first = next;
    } while (first != null);
}

可以看到 doSignalAll() 方法使用了 do-while 循环来唤醒每一个等待队列中的节点,直到 first 为 null 时停止循环。

一句话总结 signalAll() 的作用:将等待队列中的全部节点移动到同步队列中,并唤醒每个节点的线程。

总结

整个过程可以分为三步:

第一步:一个线程获取锁后,通过调用 Condition 的 await() 方法,会将当前线程先加入到等待队列中,并释放锁。然后就在 await() 中的一个 while 循环中判断节点是否已经在同步队列,是则尝试获取锁,否则一直阻塞。

第二步:当线程调用 signal() 方法后,程序首先检查当前线程是否获取了锁,然后通过 doSignal(Node first) 方法将节点移动到同步队列,并唤醒节点中的线程。

第三步:被唤醒的线程,将从 await() 中的 while 循环中退出来,然后调用 acquireQueued() 方法竞争同步状态。竞争成功则退出 await() 方法继续执行。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2018-11-10,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码农沉思录 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 介绍
  • 实现
  • 等待
  • 通知
  • 总结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档