前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >浅析AQS

浅析AQS

作者头像
温安适
发布2018-05-17 16:38:52
7800
发布2018-05-17 16:38:52
举报
文章被收录于专栏:温安适的blog温安适的blog

#前言

好久没写blog了,最近学车,时间真的少了很多,花了一些时间写了一篇AQS,请大家指正。

概述

翻阅AbstractQueuedSynchronizer的源码,会发现如下注释:

Pprovides a framework for implementing blocking locks and related  
synchronizers (semaphores,  events, etc) that rely on   
first-in-first-out (FIFO) wait queues.  

AbstractQueuedSynchronizer提供一个基于FIFO队列的框架,该框架用于实现阻塞锁和相关同步器(例如:semaphores)。

如此可知,AbstractQueuedSynchronizer可以视为JDK同步器的框架,理解它,有助于理解JDK的同步器。

框架说明

本人依据JDK源码中的注释结合并发经验,总结了如下AQS框架说明:

  • AQS是依赖状态进行同步操作的,其内部使用一个整形变量state,来表示同步状态,此状态值依据具体的同步器语义实现。例如:在CountDownLatch中state即为需要等待的线程数。
  • AQS的子类必须定义在获取和释放上对应的状态值。对于AQS状态变量的操作必须使用getState,setState,compareAndSetState 三个原子方法。
  • AQS 提供了互斥与共享两种模式,AbstractQueuedSynchronizer类中的final方法已完善队列和阻塞机制,子类仅需要实现protected方法,
    • protected方法说明: tryAcquire 尝试获取互斥锁或许可 tryRelease 尝试释放互斥锁或许可 tryAcquireShared 尝试获取共享锁或许可 tryReleaseShared 尝试释放共享锁或许可 isHeldExclusively 是否持有互斥锁或许可
  • AQS的子类应该被定义为非公共的内部助手类,用于实现它们的封闭类的同步属性
  • AQS在序列化时仅序列化状态,在默认情况下会得到一个空的线程队列。子类通常需要实现readObject方法,用来设置初始状态。
  • hasQueuedPredecessors在设计公平的同步器时使用,如果该方法返回true,公平的同步器tryAcquire方法应该返回false
  • ConditionObject AQS的内部类,子类可以用ConditionObject实现条件谓词,若不需要实现条件谓词可以不实现。

核心操作

独占式

获取

//JDK中的源码
 public final void acquire(int state) {
        if (!tryAcquire(state) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), state))
            selfInterrupt();
    }

其对应代码的语义为:

 while (!获取不成功) {
     如果当前线程不在队列中, 加入队列
     阻塞当前线程
 }
 即阻塞直到获取成功。

释放

//JDK中的源码
public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

对应代码的语义为:

 if (尝试释放成功)
      解锁队列中的第一个线程

共享式

获取

如果当前节点为队列中的第一个节点,尝试获取,获取成功进行head后续节点的设置。如获取失败维护前后节点关系,若需要阻塞进行阻塞,之后继续重试。 若出现异常获取失败,取消当前节点获取操作。

    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)//尝试获取失败
            doAcquireShared(arg);//进行共享式获取
    }
    
    /**
     * Acquires in shared uninterruptible mode.
     * @param arg the acquire argument
     */
    private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {//当前节点的先驱节点为head,即当前节点为第一个
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {//尝试获取成功
                        //向上冒泡,保证head节点的后驱节点为未获取到的节点
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                //包装为 获取失败的节点 若需要中断进行中断
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

释放

确保级联释放,即使有其他的线程正在进行的获取/释放。 这个过程通常尝试释放head的后续节点,如果他需要被释放。 如果该节点不需要,会向下传递释放动作,直到释放成功。 此外,我们必须在添加新节点时进行循环处理。不同于其他操作 中释放后续节点,我们需要知道CAS是否重置了状态,所以我们需要重复检查。

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
    
     private void doReleaseShared() {
        /*
     

         */
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // 循环检查状态
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // 循环检查CAS
            }
            if (h == head)                   // 循环检查是否有新节点
                break;
        }
    }

JDK注释中的例子

不可重入锁(互斥模式)

在不可重入锁Mutex中 ,我们使用state=0表示释放,state=1表示获取

class Mutex implements Lock, java.io.Serializable {
 
    // 内部助手同步类Sync
    private static class Sync extends AbstractQueuedSynchronizer {
      // 当state=1表示获取了独占锁
      protected boolean isHeldExclusively() {
        return getState() == 1;
      }
 
      // 如果state=0,锁是释放状态,尝试获取
      public boolean tryAcquire(int acquires) {
       assert acquires == 1; // acquires为1表示进行获取操作,其他值无效
        if (compareAndSetState(0, 1)) {//CAS操作
          setExclusiveOwnerThread(Thread.currentThread());//设置锁的持有者为当前线程
         return true;
        }
        return false;
      }
 
      //尝试释放
     protected boolean tryRelease(int releases) {
        assert releases == 1; // 传入的值为1表示进行释放,其他值无效
        if (getState() == 0) throw new IllegalMonitorStateException();
       setExclusiveOwnerThread(null);
       setState(0);//设置状态为0,表示锁已释放
      return true;
      }
 
     // 提供一个条件谓词
     Condition newCondition() { return new ConditionObject(); }
 
     // 反序列化属性
     private void readObject(ObjectInputStream s)
         throws IOException, ClassNotFoundException {
        s.defaultReadObject();
       setState(0); //设置初始状态为释放
      }
   }
 
    // 所有同步操作 委托给Sync,下面我们实现必要的锁需要的操作
    private final Sync sync = new Sync();
 
    public void lock()                { sync.acquire(1); }
    public boolean tryLock()          { return sync.tryAcquire(1); }
    public void unlock()              { sync.release(1); }
   public Condition newCondition()   { return sync.newCondition(); }
    public boolean isLocked()         { return sync.isHeldExclusively(); }
    public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
    public void lockInterruptibly() throws InterruptedException {
      sync.acquireInterruptibly(1);
    }
    public boolean tryLock(long timeout, TimeUnit unit)
       throws InterruptedException {
      return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }
  }

布尔闭锁(共享模式)

state=0表示未被通知(等待中,不可共享获取),state!=0表示被通知(可共享获取)

class BooleanLatch {
    //内部同步器,state=0表示未被通知(等待中,不可共享获取),state!=0表示被通知(可共享获取)
    private static class Sync extends AbstractQueuedSynchronizer {
      boolean isSignalled() { return getState() != 0; }
    /**
    *tryAcquireShared 返回负值 获取失败
    *0 获取成功其他线程不能获取
    *正值获取成功,其他线程也可获取成功
    /

      protected int tryAcquireShared(int ignore) {
        return isSignalled() ? 1 : -1;
      }
 
      protected boolean tryReleaseShared(int ignore) {
        setState(1);
        return true;
      }
    }
 
   private final Sync sync = new Sync();
   public boolean isSignalled() { return sync.isSignalled(); }
   public void signal()         { sync.releaseShared(1); }
   public void await() throws InterruptedException {
      sync.acquireSharedInterruptibly(1);
    }
  }

部分JDK同步器类简要分析

分析JDK中的同步类,除了了解AQS外,还要知道每个同步器中的state的语义是什么,AQS上边已经分析了,下面介绍下几个同步器的state的语义。

ReentrantLock

ReentrantLock 只支持独占方式的获取操作,它实现了tryAcquire,tryRelease和isHeldExclusively. ReentrantLock的状态用于存储锁获取的操作次数,同一线程每获取一次加1,每释放一次减少1. tryAcquire代码简要分析

  1. 当前状态值(即锁获取的操作)>0,锁的所有者非当前线程,获取失败
  2. 如果状态值饱和,获取失败,即超过最大可获取线程数
  3. 如果当前线程符合获取锁的条件,更新状态值,如果需要设置锁的持有者为当前线程
  protected final boolean tryAcquire(int acquires) {
            Thread current = Thread.currentThread();
            int c = getState();
            int w = exclusiveCount(c);
            if (c != 0) {
                //  当前状态值(即锁获取的操作)>0,锁的所有者非当前线程,获取失败
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                if (w + exclusiveCount(acquires) > MAX_COUNT)//如果状态值饱和,获取失败,即超过最大可获取线程数
                    throw new Error("Maximum lock count exceeded");
                //符合获取锁的条件,更新状态值,
                setState(c + acquires);
                return true;
            }
            if (writerShouldBlock() ||
                !compareAndSetState(c, c + acquires))
                return false;
			//设置锁的持有者为当前线程
            setExclusiveOwnerThread(current);
            return true;
        }

Semaphore与CountDownLatch

CountDownLatch同步状态保存当前的计数值。类似BooleanLatch,不做分析。 Semaphore的同步状态用于存储当前可以许可的数量。 Semaphore中的tryAcquireShared,tryReleaseShared tryAcquireShared,获取当前可用许可数量,若可用许可数量大于申请数量,通过compareAndSetState设置新的剩余许可数量,否则获取失败。 tryReleaseShared获取当前可用许可数量,如果当前剩余许可数量+释放数量>0,过compareAndSetState设置新的剩余许可数量,否则获取失败。

/**
*tryAcquireShared,获取当前可用许可数量,若可用许可数量大于申请数量,通过compareAndSetState设置新的剩余许可数量,否则获取失败。
*/
        final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
/**
*
*tryReleaseShared获取当前可用许可数量,如果当前剩余许可数量+释放数量>0,过compareAndSetState设置新的剩余许可数量,否则获取失败。
*/
       protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }

FutueTask

FutueTask的同步器状态值如下:

NEW  = 0;  //初始状态
COMPLETING   = 1;  //运行中
NORMAL       = 2;  //完成
EXCEPTIONAL  = 3;  //异常
CANCELLED    = 4;  //已取消
INTERRUPTING = 5;  //中断中
INTERRUPTED  = 6;  //已中断

可能的状态转换

NEW(初始状态) -> COMPLETING(运行中) -> NORMAL (已完成) NEW(初始状态) -> COMPLETING(运行中) -> EXCEPTIONAL (异常) NEW (初始状态)-> CANCELLED (已取消) NEW (初始状态)-> INTERRUPTING (中断中)-> INTERRUPTED (已中断)

Future.get的语义非常类似闭锁,如果发生了某件事件(由FutureTask表示的任务执行完成 或者取消),那么线程可以恢复执行,否则一致阻塞。

总结

AQS是JDK并发的框架,仔细理解有助于理解JDK的同步工具。 对于JDK的部分同步类,进行了简要说明,详细自行查阅源码。 对于JDK同步类的源码建议进行如下步骤: 1.理解同步器的状态值的语义 2.该同步器使用是AQS的什么模式, 是共享,互斥,还是共享与互斥都有。 3.优先理解同步器的tryAcquire,tryAcquireShared,tryRelease,tryReleaseShared方法,之后查看其它方法。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 概述
  • 框架说明
  • 核心操作
    • 独占式
      • 获取
      • 释放
    • 共享式
      • 获取
      • 释放
  • JDK注释中的例子
    • 不可重入锁(互斥模式)
      • 布尔闭锁(共享模式)
      • 部分JDK同步器类简要分析
        • ReentrantLock
          • Semaphore与CountDownLatch
            • FutueTask
            • 总结
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档