前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >快速掌握并发编程---深入学习Condition

快速掌握并发编程---深入学习Condition

作者头像
田维常
发布2020-11-03 11:11:31
3040
发布2020-11-03 11:11:31
举报

目录

notify和waitConditionCondition使用案例生产者消费者测试类结果Condition源码分析await方法addConditionWaiter 方法fullyRelease方法isOnSyncQueue 方法signal方法doSignal 方法transferForSignal 方法从lock、await、signal,release的整个过程Condition等待通知的本质总结

notify和wait

在前面学习 synchronized 的时候:快速掌握并发编程---synchronized篇(上),有讲到 wait/notify 的基本使用,结合synchronized 可以实现对线程的通信。

waitnotifynotifyAll是Object对象的属性,并不属于线程Thread。

我们先解释这三个的一个很重要的概念:

wait使持有该对象的线程把该对象的控制权交出去,然后处于等待状态(这句话很重要,也就是说当调用wait的时候会释放锁并处于等待的状态)

notify:通知某个正在等待这个对象的控制权的线程可以继续运行(这个就是获取锁,使自己的程序开始执行,最后通过notify同样去释放锁,并唤醒正在等待的线程)

notifyAll:会通知所有等待这个对象控制权的线程继续运行(和上面一样,只不过是唤醒所有等待的线程继续执行)

从上面的解释我们可以看出通过wait和notify可以做线程之间的通信,当A线程处理完毕通知B线程执行,B线程执行完毕以后A线程可以继续执行。

那么这个时候我就在思考了,既然 J.U.C 里面提供了Lock锁的实现机制,那·J.U.C里面有没有提供类似的线程通信的工具呢?

Condition

Condition 是一个多线程协调通信的工具类,可以让某些线程一起等待某个条件(condition),只有满足条件时,线程才会被唤醒。

Condition使用案例

下面来实现一个非常典型的生产者和消费者模式;

生产者
代码语言:javascript
复制
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

public class Producer implements Runnable{

    private Queue<String> msg;
    private int maxSize;

    Lock lock;
    Condition condition;

    public Producer(Queue<String> msg, int maxSize, Lock lock, Condition condition) {
        this.msg = msg;
        this.maxSize = maxSize;
        this.lock = lock;
        this.condition = condition;
    }

    @Override
    public void run() {
        int i=0;
        while(true){
            i++;
            lock.lock();
                //队列中消息满了,此时生产者不能再生产了,因为装不下了
                //所以生产者就开始等待状态
                while(msg.size()==maxSize){
                    System.out.println("生产者队列满了,先等待");
                    try {
                        condition.await(); //阻塞线程并释放锁
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("生产消息:"+i);
                msg.add("生产者的消息内容"+i);
                condition.signal(); //唤醒阻塞状态下的线程
            lock.unlock();
        }
    }
}
消费者
代码语言:javascript
复制
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

public class Consumer implements Runnable{
    private Queue<String> msg;
    private int maxSize;

    Lock lock;
    Condition condition;

    public Consumer(Queue<String> msg, int maxSize, Lock lock, Condition condition) {
        this.msg = msg;
        this.maxSize = maxSize;
        this.lock = lock;
        this.condition = condition;
    }

    @Override
    public void run() {
        int i=0;
        while(true){
            i++;
            lock.lock(); //synchronized
            //消费者进来的时候需要判断是有可用的消息,
            //没有可用的消息就等待状态
            while(msg.isEmpty()){
                System.out.println("消费者队列空了,先等待");
                try {
                    condition.await(); //阻塞线程并释放锁   wait
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("消费消息:"+msg.remove());
            condition.signal(); //唤醒阻塞状态下的线程
            lock.unlock();
        }
    }
}
测试类
代码语言:javascript
复制
public class TestCondition {
    public static void main( String[] args ){
        Queue<String> queue=new LinkedList<>();
        Lock lock=new ReentrantLock(); //重入锁
        Condition condition=lock.newCondition();
        int maxSize=5;

        Producer producer=new Producer(queue,maxSize,lock,condition);
        Consumer consumer=new Consumer(queue,maxSize,lock,condition);

        Thread t1=new Thread(producer);
        Thread t2=new Thread(consumer);
        t1.start();
        t2.start();

    }
}
结果

通过这个案例简单实现了 wait 和 notify 的功能,当调用 await 方法后,当前线程会释放锁并等待,而其他线程调用 condition 对象的 signal 或者 signalall 方法通知并被阻塞的线程,然后自己执行 unlock 释放锁,被唤醒的线程获得之前的锁继续执行,最后释放锁。

所以,condition 中两个最重要的方法,一个是 await,一个是 signal 方法。

await:把当前线程阻塞挂起;

signal:唤醒阻塞的线程。

Condition源码分析
await方法

在Condition接口只是定义了await方法

代码语言:javascript
复制
void await() throws InterruptedException;

实现类在AQS

代码语言:javascript
复制
public final void await() throws InterruptedException {
    //表示 await 允许被中断
    if (Thread.interrupted()) throw new InterruptedException();
    //创建一个新的节点,节点状态为 condition,采用的数据结构仍然是单向链表
     Node node = addConditionWaiter();
    //释放当前的锁,得到锁的状态,并唤醒 AQS 队列中的一个线程
    //不管重入几次,都把state释放为0
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    //如果当前节点没有在同步队列上,即还没有被 signal,则将当前线程阻塞
    while (!isOnSyncQueue(node)) {
            ////通过 park 挂起当前线程
            LockSupport.park(this);
            // 当这个线程醒来,会尝试拿锁, 当 acquireQueued 返回 false 就是拿到锁了.
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
        }
    // 当这个线程醒来,会尝试拿锁, 当 acquireQueued 返回 false 就是拿到锁了.
    // interruptMode != THROW_IE -> 表示这个线程没有成功将 node 入队,但 signal 执行了 enq 方法让其入队了.
    // 将这个变量设置成 REINTERRUPT
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
    // 如果 node 的下一个等待者不是 null, 则进行清理,清理 Condition 队列上的节点. 
    // 如果是 null ,就没有什么好清理的了.
    if (node.nextWaiter != null) {
        //清理掉状态为cancelled状态的 
        nlinkCancelledWaiters();
    }
    // 如果线程被中断了,需要抛出异常.或者什么都不做
    if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
}

接下来吧整个方法里涉及到的重要方法走一遍。

addConditionWaiter 方法

这个方法的主要作用是把当前线程封装成 Node,添加到等待队列。这里的队列不再是双向链表,而是单向链表。

代码语言:javascript
复制
        /**
         * Adds a new waiter to wait queue.
         * @return its new wait node
         */
        private Node addConditionWaiter() {
            Node t = lastWaiter;
            // If lastWaiter is cancelled, clean out.
            //如果lastWaiter为canclled状态,则把他从链表中清理出去
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            //构建一个 Node,waitStatus=CONDITION。这里的链表是一个单向的,
            //所以相比 AQS 里的双向队来说简单了很多
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            if (t == null){
                firstWaiter = node;
            } else{
                t.nextWaiter = node;
            }
            lastWaiter = node;
            return node;
        }

上面这段代码用图来展示:

fullyRelease方法

就是彻底的释放锁,什么叫彻底呢,就是如果当前锁存在多次重入,那么在这个方法中只需要释放一次就会把所有的重入次数归零。

代码语言:javascript
复制
final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            //获取AQS中state值
            int savedState = getState();
            //释放锁并且唤醒下一个同步队列中的线程
            //注意这里处理的是同步队列
            if (release(savedState)) {
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }

此时线程A释放了锁,线程B就获得的锁。下面用一张图来展示:

isOnSyncQueue 方法

判断当前节点是否在同步队列中,返回 false 表示不在,返回 true 表示在。

如果不在 AQS 同步队列,说明当前节点没有唤醒去争抢同步锁,所以需要把当前线程阻塞起来,直到其他的线程调用 signal 唤醒。

如果在 AQS 同步队列,意味着它需要去竞争同步锁去获得执行程序执行权限。

为什么要做这个判断呢?

因为在 condition 队列中的节点会重新加入到 AQS 队列去竞争锁。也就是当调用 signal的时候,会把当前节点从 condition 队列转移到 AQS 队列。

代码语言:javascript
复制
    final boolean isOnSyncQueue(Node node) {
        //如果当前节点状态是CONDITION或node.prev是null,则证明当前节点在等待队列上而不是同步队列上。
        //之所以可以用node.prev来判断,是因为一个节点如果要加入同步队列,在加入前就会设置好prev字段。
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false;
        //如果node.next不为null,则一定在同步队列上,
        //因为node.next是在节点加入同步队列后设置的
        if (node.next != null)
            return true;
        //前面的两个判断没有返回的话,就
        //从同步队列队尾遍历一个一个看是不是当前节点。
        return findNodeFromTail(node);
    } 
   //这个方法就相当简单了,就是从同步队列队尾遍历一个一个看是不是当前节点。
    private boolean findNodeFromTail(Node node) {
        Node t = tail;
        for (;;) {
            if (t == node)
                return true;
            if (t == null)
                return false;
            t = t.prev;
        }
    }

如何去判断ThreadA这个节点是否存在于 AQS队列中呢?

  1. 如果 ThreadAwaitStatus 的状态为 CONDITION,说明它存在于 condition 队列中,不在 AQS队列。因为AQS队列的状态一定不可能有 CONDITION
  2. 如果 node.prev为空,说明也不存在于 AQS队列,原因是prev=nullAQS队列中只有一种可能性,就是它是head 节点,head 节点意味着它是获得锁的节点。
  3. 如果node.next 不等于空,说明一定存在于 AQS队列中,因为只有 AQS队列才会存在 next 和 prev的关系
  4. findNodeFromTail,表示从 tail 节点往前扫描 AQS队列,一旦发现 AQS队列的节点和当前节点相等,说明节点一定存在于 AQS队列中
signal方法

await 方法会阻塞 ThreadA,然后 ThreadB抢占到了锁获得了执行权限,这个时候在 ThreadB中调用了 Condition的 signal()方法,将会唤醒在等待队列中节点。

代码语言:javascript
复制
public final void signal() {
      //先判断当前线程是否获得了锁,这个判断比较简单,直接用获得锁的线程和当前线程相比即可
      if (!isHeldExclusively()){
          //如果同步状态不是被当前线程独占,直接抛出异常。从这里也能看出来,Condition只能配合独占类同步组件使用。
          throw new IllegalMonitorStateException();
       }
       // 拿到 Condition 队列上第一个节点
       Node first = firstWaiter;
       if (first != null){
           //通知等待队列队首的节点。
           doSignal(first);
        }
 }       
doSignal 方法
代码语言:javascript
复制
private void doSignal(Node first) {
      do {
             //从 Condition 队列中删除 first 节点
             if ( (firstWaiter = first.nextWaiter) == null){
                    // 将 next 节点设置成 null
                    lastWaiter = null;
              }
              first.nextWaiter = null;
         //transferForSignal方法尝试唤醒当前节点,如果唤醒失败,则继续尝试唤醒当前节点的后继节点。
        } while (!transferForSignal(first) &&(first = firstWaiter) != null);
}
transferForSignal 方法
代码语言:javascript
复制
final boolean transferForSignal(Node node) {
         //更新节点的状态为 0,如果更新失败,只有一种可能就是节点被 CANCELLED 了
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;
        //调用 enq,把当前节点添加到AQS 队列。并且返回返回按当前节点的上一个节点,也就是原tail 节点
        Node p = enq(node);
        int ws = p.waitStatus;
        // 如果上一个节点的状态被取消了, 或者尝试设置上一个节点的状态为 SIGNAL 
       //失败了(SIGNAL 表示: 他的 next 节点需要停止阻塞)
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            // 唤醒节点上的线程
            LockSupport.unpark(node.thread);
        //如果 node 的 prev 节点已经是signal 状态,那么被阻塞的 ThreadA 的唤醒工作由 AQS 队列来完成
        return true;
}

执行完 doSignal 以后,会把 condition 队列中的节点转移到 AQS队列上,这个时候会判断 ThreadAprev 节点也就是 head 节点的 waitStatus。

如果大于 0 或者设置 SIGNAL 失败,表示点被设置成了 CANCELLED 状态。这个时候会唤醒ThreadA这个线程。否则就基于 AQS队列的机制来唤醒,也就是等到ThreadB释放锁之后来唤醒 ThreadA

逻辑结构图如下:

从lock、await、signal,release的整个过程

Condition等待通知的本质

总的来说,Condition的本质就是等待队列和同步队列的交互:

当一个持有锁的线程调用Condition.await()时,它会执行以下步骤:

  1. 构造一个新的等待队列节点加入到等待队列队尾
  2. 释放锁,也就是将它的同步队列节点从同步队列队首移除
  3. 自旋,直到它在等待队列上的节点移动到了同步队列(通过其他线程调用signal())或被中断
  4. 阻塞当前节点,直到它获取到了锁,也就是它在同步队列上的节点排队排到了队首。

当一个持有锁的线程调用Condition.signal()时,它会执行以下操作:

从等待队列的队首开始,尝试对队首节点执行唤醒操作;如果节点CANCELLED,就尝试唤醒下一个节点;如果再CANCELLED则继续迭代。

对每个节点执行唤醒操作时,首先将节点加入同步队列,此时await()操作的步骤3的解锁条件就已经开启了。然后分两种情况讨论:

  1. 如果先驱节点的状态为CANCELLED(>0) 或设置先驱节点的状态为SIGNAL失败,那么就立即唤醒当前节点对应的线程,此时await()方法就会完成步骤3,进入步骤4。
  2. 如果成功把先驱节点的状态设置为了SIGNAL,那么就不立即唤醒了。等到先驱节点成为同步队列首节点并释放了同步状态后,会自动唤醒当前节点对应线程的,这时候await()的步骤3才执行完成,而且有很大概率快速完成步骤4。

总结

用一张图来总结:

线程 awaitThread 先通过lock.lock()方法获取锁成功后调用了 condition.await 方法进入等待队列,而另一个线程signalThread 通过 lock.lock()方法获取锁成功后调用了 condition.signal 或者 signalAll 方法,使得线程 awaitThread 能够有机会移入到同步队列中。

当其他线程释放 lock 后使得线程 awaitThread 能够有机会获取 lock,从而使得线程 awaitThread 能够从 await 方法中退出执行后续操作。如果 awaitThread 获取 lock 失败会直接进入到同步队列。

阻塞:await()方法中,在线程释放锁资源之后,如果节点不在 AQS 等待队列,则阻塞当前线程,如果在等待队列,则自旋等待尝试获取锁;

释放:signal()后,节点会从 condition 队列移动到 AQS 等待队列,则进入正常锁的获取流程。

关注公众号“Java后端技术全栈”

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-10-26,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Java后端技术全栈 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • notify和wait
  • Condition
    • Condition使用案例
      • Condition源码分析
        • 从lock、await、signal,release的整个过程
        • Condition等待通知的本质
        • 总结
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档