Java并发之Condition的实现分析

一、Condition的概念

介绍

回忆 synchronized 关键字,它配合 Object 的 wait()、notify() 系列方法可以实现等待/通知模式。

对于 Lock,通过 Condition 也可以实现等待/通知模式。

Condition 是一个接口。

Condition 接口的实现类是 Lock(AQS)中的 ConditionObject。

Lock 接口中有个 newCondition() 方法,通过这个方法可以获得 Condition 对象(其实就是 ConditionObject)。

因此,通过 Lock 对象可以获得 Condition 对象。

Lock lock  = newReentrantLock();

Condition c1 = lock.newCondition();

Condition c2 = lock.newCondition();

二、Condition的实现分析

实现

ConditionObject 类是 AQS 的内部类,实现了 Condition 接口。

publicclassConditionObject implementsCondition, java.io.Serializable {

        privatetransientNode firstWaiter;

        privatetransientNode lastWaiter;

        ...

可以看到,等待队列和同步队列一样,使用的都是同步器 AQS 中的节点类 Node。

同样拥有首节点和尾节点,

每个 Condition 对象都包含着一个 FIFO 队列。

结构图:

等待

调用 Condition 的 await() 方法会使线程进入等待队列,并释放锁,线程状态变为等待状态。

publicfinalvoidawait() throwsInterruptedException {

    if(Thread.interrupted())

        thrownewInterruptedException();

    Node node = addConditionWaiter();

    //释放同步状态(锁)

    intsavedState = fullyRelease(node);

    intinterruptMode = 0;

    //判断节点是否放入同步对列

    while(!isOnSyncQueue(node)) {

        //阻塞

        LockSupport.park(this);

        //如果已经中断了,则退出

        if((interruptMode = checkInterruptWhileWaiting(node)) != 0)

            break;

    }

    if(acquireQueued(node, savedState) && interruptMode != THROW_IE)

        interruptMode = REINTERRUPT;

            if(node.nextWaiter != null) // clean up if cancelled

            unlinkCancelledWaiters();

            if(interruptMode != 0)

                reportInterruptAfterWait(interruptMode);

}

分析上述方法的大概过程:

将当前线程创建为节点,加入等待队列;

释放锁,唤醒同步队列中的后继节点;

while循环判断节点是否放入同步队列:

没有放入,则阻塞,继续 while 循环(如果已经中断了,则退出)

放入,则退出 while 循环,执行后面的判断

退出 while 说明节点已经在同步队列中,调用 acquireQueued() 方法加入同步状态竞争。

竞争到锁后从 await() 方法返回,即退出该方法。

addConditionWaiter() 方法:

privateNode addConditionWaiter() {

    Node t = lastWaiter;

    if(t != null&& t.waitStatus != Node.CONDITION) {

        //清除条件队列中所有状态不为Condition的节点

        unlinkCancelledWaiters();

        t = lastWaiter;

    }

    //将该线程创建节点,放入等待队列

    Node node = newNode(Thread.currentThread(), Node.CONDITION);

    if(t == null)

        firstWaiter = node;

    else

        t.nextWaiter = node;

    lastWaiter = node;

    returnnode;

}

过程分析:同步队列的首节点移动到等待队列。加入尾节点之前会清除所有状态不为 Condition 的节点。

通知

调用 Condition 的 signal() 方法,可以唤醒等待队列的首节点(等待时间最长),唤醒之前会将该节点移动到同步队列中。

publicfinalvoidsignal() {

    //判断是否获取了锁

    if(!isHeldExclusively())

        thrownewIllegalMonitorStateException();

    Node first = firstWaiter;

    if(first != null)

        doSignal(first);

}

过程:

先判断当前线程是否获取了锁;

然后对首节点调用 doSignal() 方法。

privatevoiddoSignal(Node first) {

    do{

        if( (firstWaiter = first.nextWaiter) == null)

            lastWaiter = null;

        first.nextWaiter = null;

    } while(!transferForSignal(first) &&

       (first = firstWaiter) != null);

}

过程:

修改首节点;

调用 transferForSignal() 方法将节点移动到同步队列。

finalbooleantransferForSignal(Node node) {

    //将节点状态变为0  

    if(!compareAndSetWaitStatus(node, Node.CONDITION, 0))

        returnfalse;

    //将该节点加入同步队列

    Node p = enq(node);

    intws = p.waitStatus;

    //如果结点p的状态为cancel 或者修改waitStatus失败,则直接唤醒

    if(ws > 0|| !compareAndSetWaitStatus(p, ws, Node.SIGNAL))

        LockSupport.unpark(node.thread);

    returntrue;

}

调用同步器的 enq 方法,将节点移动到同步队列,

满足条件后使用 LockSupport 唤醒该线程。

当 Condition 调用 signalAll() 方法:

publicfinalvoidsignalAll() {

    if(!isHeldExclusively())

        thrownewIllegalMonitorStateException();

    Node first = firstWaiter;

    if(first != null)

        doSignalAll(first);

}

privatevoiddoSignalAll(Node first) {

    lastWaiter = firstWaiter = null;

    do{

        Node next = first.nextWaiter;

        first.nextWaiter = null;

        transferForSignal(first);

        first = next;

    } while(first != null);

}

可以看到 doSignalAll() 方法使用了 do-while 循环来唤醒每一个等待队列中的节点,直到 first 为 null 时,停止循环。

一句话总结 signalAll() 的作用:将等待队列中的全部节点移动到同步队列中,并唤醒每个节点的线程。

总结

整个过程可以分为三步:

第一步:一个线程获取锁后,通过调用 Condition 的 await() 方法,会将当前线程先加入到等待队列中,并释放锁。然后就在 await() 中的一个 while 循环中判断节点是否已经在同步队列,是则尝试获取锁,否则一直阻塞。

第二步:当线程调用 signal() 方法后,程序首先检查当前线程是否获取了锁,然后通过 doSignal(Node first) 方法将节点移动到同步队列,并唤醒节点中的线程。

第三步:被唤醒的线程,将从 await() 中的 while 循环中退出来,然后调用 acquireQueued() 方法竞争同步状态。竞争成功则退出 await() 方法,继续执行。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏闪电gogogo的专栏

Python初学——多线程Threading

接着上篇继续跟着沫凡小哥学Python啦 1.1 什么是多线程 Threading 多线程可简单理解为同时执行多个任务。 多进程和多线程都可以执行多个任务,线程...

22850
来自专栏coolblog.xyz技术专栏

AbstractQueuedSynchronizer 原理分析 - Condition 实现原理

Condition是一个接口,AbstractQueuedSynchronizer 中的ConditionObject内部类实现了这个接口。Condition声...

481100
来自专栏黑泽君的专栏

Java程序的运行原理及JVM的启动是多线程的吗?

A:Java程序的运行原理     Java通过java命令会启动java虚拟机。启动JVM,等于启动了一个应用程序,也就是启动了一个进程。   ...

40720
来自专栏炸天帮5

win32进程概念之句柄表,以及内核对象.

我们知道.我们使用CreateProcess 的时候会返回一个进程句柄.以及线程句柄. 其实在调用CreateProcess的时候.内核中会新建一个EPROCE...

9810
来自专栏张善友的专栏

使用自定义行为扩展 WCF

Windows® Communication Foundation (WCF) 提供了许多扩展点,供开发人员自定义运行时行为,从而实现服务调度和客户代理调用。您...

39170
来自专栏北京马哥教育

python线程笔记

豌豆贴心提醒,本文阅读时间5分钟 来源:伯乐在线 原文:http://python.jobbole.com/87498/ 引言&动机 考虑一下...

39550
来自专栏cs

知识点回顾

1.0 java的集合 集合分为值value[collection],key-value[map]. 存储值的分为list,和set。list是线性表,包括循...

10440
来自专栏pangguoming

Redis快速入门

Redis是一个开源,先进的key-value存储,并用于构建高性能,可扩展的Web应用程序的完美解决方案。 Redis从它的许多竞争继承来的三个主要特点: ...

34750
来自专栏Python爬虫与算法进阶

Python函数超时,用装饰器解决

我们在自定义一个函数后,会调用这个函数来完成我们想要的功能。 就拿爬虫来举例,你发送请求,服务器给你响应,但是有可能服务器没有给你任何数据,无论是他识别了爬虫、...

38420
来自专栏北京马哥教育

面试分享系列 | 17道Python面试题,让你在求职中无往不利

今天给大家分享的是Python面试题系列的第一篇文章,后续我也会陆续整理Python相关的问题给大家,无论是求职者还是新人都可以通过面试题来考察自己的能力缺陷。...

38040

扫码关注云+社区

领取腾讯云代金券