前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >深入理解Condition

深入理解Condition

作者头像
Java识堂
发布2019-08-13 10:24:50
4120
发布2019-08-13 10:24:50
举报
文章被收录于专栏:Java识堂Java识堂

前言

建议先看一下这篇分享,深入理解AbstractQueuedSynchronizer,这篇文章主要介绍了AQS的同步队列实现,而本篇文章主要介绍AQS条件队列的实现

在进行线程间的通信时,当我们使用synchronized时,可以用基于Object对象的wait和notify方法实现等待/通知机制,但是在AQS相关类中怎么实现这种等待/通知机制呢?答案是Condition,Condition是一个接,AbstractQueuedSynchronizer中有一个内部类实现了这个接口

基于Object实现等待/通知机制的相关方法

举个例子

代码语言:javascript
复制
public class WaitNotify {

    // 代码来自《Java并发编程的艺术》
    static boolean flag = true;
    static Object lock = new Object();

    public static void main(String[] args) throws InterruptedException {
        Thread waitThread = new Thread(new Wait(), "WaitThread");
        waitThread.start();
        TimeUnit.SECONDS.sleep(1);
        Thread notifyThread = new Thread(new Notify(), "notifyThread");
        notifyThread.start();
    }

    static class Wait implements Runnable {

        @Override
        public void run() {
            synchronized (lock) {
                // 条件不满足时,继续wait,同时释放了lock的锁
                while (flag) {
                    try {
                        System.out.println(Thread.currentThread() + " flag is true. await @ "
                        + new SimpleDateFormat("HH:mm:ss").format(new Date()));
                        lock.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                // 条件满足,完成工作
                System.out.println(Thread.currentThread() + " flag is false. running @ "
                + new SimpleDateFormat("HH:mm:ss").format(new Date()));
            }
        }
    }

    static class Notify implements Runnable {

        @Override
        public void run() {
            synchronized (lock) {
                System.out.println(Thread.currentThread() + " hold lock. notify @ "
                        + new SimpleDateFormat("HH:mm:ss").format(new Date()));
                lock.notifyAll();
                flag = false;
                // 暂停5秒
                SleepUtils.second(5);
            }
            synchronized (lock) {
                System.out.println(Thread.currentThread() + " hold lock again. sleep @ "
                        + new SimpleDateFormat("HH:mm:ss").format(new Date()));
                SleepUtils.second(5);
            }
        }
    }

    static class SleepUtils {
        public static void second(int n) {
            try {
                TimeUnit.SECONDS.sleep(n);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
代码语言:javascript
复制
Thread[WaitThread,5,main] flag is true. await @ 00:05:55
Thread[notifyThread,5,main] hold lock. notify @ 00:05:55
Thread[notifyThread,5,main] hold lock again. sleep @ 00:06:00
Thread[WaitThread,5,main] flag is false. running @ 00:06:05

这里有几个需要注意的点

  1. 第三行和第四行的顺序有可能颠倒,因为是竞争获取锁的
  2. wait()方法被执行后,锁被自动释放,但notify()方法被执行后,锁却不自动释放 ,必须执行完notify()方法所在的同步synchronized代码块后才释放锁

基于Condition实现等待/通知机制(包含了Condition接口的所有方法)

Conditon使用例子如下,可以实现条件性的通知

代码语言:javascript
复制
    static ReentrantLock lock = new ReentrantLock();
    static Condition conditionA  = lock.newCondition();
    static Condition conditionB = lock.newCondition();

    public static void main(String[] args) throws InterruptedException {
        Thread waitThreadA = new Thread(new WaitA(), "WaitThreadA");
        waitThreadA.start();
        Thread waitThreadB = new Thread(new WaitB(), "WaitThreadB");
        waitThreadB.start();
        TimeUnit.SECONDS.sleep(2);
        lock.lock();
        try {
            conditionA.signal();
        } finally {
            lock.unlock();
        }
    }

    static class WaitA implements Runnable {

        @Override
        public void run() {
            lock.lock();
            try {
                System.out.println(Thread.currentThread() + " begin await @ "
                        + new SimpleDateFormat("HH:mm:ss").format(new Date()));
                conditionA.await();
                System.out.println(Thread.currentThread() + " end await @ "
                        + new SimpleDateFormat("HH:mm:ss").format(new Date()));
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    }

    static class WaitB implements Runnable {

        @Override
        public void run() {
            lock.lock();
            try {
                System.out.println(Thread.currentThread() + " begin await @ "
                        + new SimpleDateFormat("HH:mm:ss").format(new Date()));
                conditionB.await();
                System.out.println(Thread.currentThread() + " end await @ "
                        + new SimpleDateFormat("HH:mm:ss").format(new Date()));
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    }
}
代码语言:javascript
复制
Thread[WaitThreadA,5,main] begin await @ 00:49:57
Thread[WaitThreadB,5,main] begin await @ 00:49:57
Thread[WaitThreadA,5,main] end await @ 00:49:59

WaitThreadB因为没有被通知,一直阻塞 ,这里说一下Condition的大概实现,AQS内部维护着一个同步队列(双向链表实现),多个条件队列(单向链表实现),条件队列由AQS的内部类ConditionObject来维护,new一个ConditonObject ,则多一个条件队列,当一个线程执行await方法是,会把当线程包装成一个Node节点,放到执行await方法的ConditionObject的条件队列中,释放锁并被阻塞,当执行signal方式时,会把条件队列的第一个节点移除,并转移到同步队列中,获取到锁即可继续执行

源码

基于jdk1.8.0_20 ,Object的监视器方法和Condition接口的对比

ConditionObject 是AQS的一个内部类,用来实现条件队列,属性如下

代码语言:javascript
复制
public class ConditionObject implements Condition, java.io.Serializable {

    // 条件队列的头节点
    private transient Node firstWaiter;

    // 条件队列的尾节点
    private transient Node lastWaiter;

    public ConditionObject() { }

    // 阻塞过程中不响应中断,仅设置标志位,让之后的方法处理
    private static final int REINTERRUPT =  1;

    // 阻塞过程中响应中断,并throw InterruptedException
    private static final int THROW_IE    = -1;
}

假如在阻塞过程中发生了中断,REINTERRUPT标志了中断发生在 signalled之后,THROW_IE标志了中断发生在 signalled之前,从而决定采用那种方式响应中断

来看await方法

代码语言:javascript
复制
// ConditionObject
public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    // 当前线程加入等待队列
    Node node = addConditionWaiter();
    // 释放锁
    int savedState = fullyRelease(node);
    // 标志位
    int interruptMode = 0;
    // 判断节点是否在同步队列中,即是否被唤醒
    while (!isOnSyncQueue(node)) {
        // 阻塞
        LockSupport.park(this);
        // 线程被唤醒,线程节点从条件队列移除,并放到放到同步队列,或被中断
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    // 唤醒之后竞争获取锁
    // 获取锁的过程中有中断,并且标志位不是(响应中断)
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        // 清除等待队列中不是等待状态的节点
        unlinkCancelledWaiters();
    // 阻塞中被中断过,则处理中断
    if (interruptMode != 0)
        // 根据标志位,决定对中断的处理方式
        reportInterruptAfterWait(interruptMode);
}

将当前线程包装成Node节点,并放入等待队列

代码语言:javascript
复制
// ConditionObject
private Node addConditionWaiter() {
    Node t = lastWaiter;
    // If lastWaiter is cancelled, clean out.
    if (t != null && t.waitStatus != Node.CONDITION) {
        // 清除等待队列中取消状态的节点
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    // 链表还没有初始化
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}

释放锁

代码语言:javascript
复制
// AbstractQueuedSynchronizer
final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        int savedState = getState();
        if (release(savedState)) {
            failed = false;
            return savedState;
        } else {
            // 释放锁失败
            throw new IllegalMonitorStateException();
        }
    } finally {
        // 释放锁失败后,将当前节点状态设置为CANCELLED
        // 后序会被清理出条件队列
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}

判断节点是否在同步队列

代码语言:javascript
复制
// AbstractQueuedSynchronizer
final boolean isOnSyncQueue(Node node) {
    // 节点在条件队列
    // 同步队列中节点的状态 只能为0、SIGNAL、PROPAGATE 和 CANCELLED 其中之一
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    // 如果后继节点不为null,则表明节点在同步队列上
    // 因为条件队列使用的是nextWaiter指向后继节点的
    // 条件队列上节点的next均为null
    if (node.next != null) // If has successor, it must be on queue
        return true;

    // 走到这一步,说明node.prev!=null && node.next=null
    // 但这并不能说明node在同步队列中,因为节点在入队过程中
    // 是先设置node.prev后设置node.next的(详见addWaiter方法)
    // 有可能CAS设置尾节点失败,导致没有加入队列
    // 所以从尾到头遍历一遍
    return findNodeFromTail(node);
}
代码语言:javascript
复制
// AbstractQueuedSynchronizer
private boolean findNodeFromTail(Node node) {
    Node t = tail;
    for (;;) {
        if (t == node)
            return true;
        if (t == null)
            return false;
        t = t.prev;
    }
}

检测线程在等待期间是否发生中断

代码语言:javascript
复制
// ConditionObject
// Checks for interrupt, returning THROW_IE if interrupted
// before signalled, REINTERRUPT if after signalled, or
// 0 if not interrupted.
private int checkInterruptWhileWaiting(Node node) {
    return Thread.interrupted() ?
        (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
        0;
}
代码语言:javascript
复制
// AbstractQueuedSynchronizer
final boolean transferAfterCancelledWait(Node node) {
    // signalled之前发生中断,因为signalled之后会将会将节点状态从CONDITION 设置为0
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
        enq(node);
        return true;
    }

    // signalled之后发生中断
    // 如果节点还没有被放入同步队列,则放弃当前CPU资源
    // 让其他任务执行
    while (!isOnSyncQueue(node))
        Thread.yield();
    return false;
}

清除条件队列中取消状态的节点

代码语言:javascript
复制
// ConditionObject
private void unlinkCancelledWaiters() {
    Node t = firstWaiter;
    // 指向尾节点
    Node trail = null;
    while (t != null) {
        Node next = t.nextWaiter;
        if (t.waitStatus != Node.CONDITION) {
            t.nextWaiter = null;
            // 只有头节点的状态不是CONDITION才会执行到这一步
            if (trail == null)
                firstWaiter = next;
            else
                trail.nextWaiter = next;
            // 遍历完链表,设置尾节点
            if (next == null)
                lastWaiter = trail;
        }
        else
            trail = t;
        t = next;
    }
}

响应中断的方式

代码语言:javascript
复制
// ConditionObject
private void reportInterruptAfterWait(int interruptMode)
    throws InterruptedException {
    if (interruptMode == THROW_IE)
        // 直接响应中断
        throw new InterruptedException();
    else if (interruptMode == REINTERRUPT)
        selfInterrupt();
}
代码语言:javascript
复制
// AbstractQueuedSynchronizer
static void selfInterrupt() {
    Thread.currentThread().interrupt();
}

来看signal,唤醒等待时间最长的线程

代码语言:javascript
复制
// ConditionObject
public final void signal() {
    // 当前线程没有获取到锁
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    // 唤醒等待队列中的头结点
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}
代码语言:javascript
复制
// ConditionObject
private void doSignal(Node first) {
    do {
        // 将同步队列的头结点,设置为目前头结点的下一个节点
        // 如果头节点的下一个节点为null,则设置尾节点为null
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        // 将first节点从条件队列中移除
        first.nextWaiter = null;
        // 通知第一个非CANCELLED节点被唤醒,或者遍历完,退出
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}
代码语言:javascript
复制
// 将节点从条件队列放入同步队列,true为成功 
// AbstractQueuedSynchronizer
final boolean transferForSignal(Node node) {

    // 通过CAS将节点的状态从CONDITION设置为0
    // 如果设置失败,说明这个节点状态为CANCELLED
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    // 加入同步队列,并返回前继节点
    Node p = enq(node);
    int ws = p.waitStatus;
    // 前继节点为CANCELLED状态,或者设置SIGNAL状态失败
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        // 唤醒线程
        LockSupport.unpark(node.thread);
    return true;
}

signalAll和signal实现类似,区别如下,signal将等待队列中的一个非CANCELLED节点放到同步队列,而signalAll是将等待队列中的所有非CANCELLED节点放到同步队列中

参考书籍

《Java并发编程的艺术》

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2018-05-20,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Java识堂 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档