专栏首页只为你下Spring Cloud Alibaba:Sentinel实现熔断与限流

抱歉,你查看的文章已删除

原创

Spring Cloud Alibaba:Sentinel实现熔断与限流

1、学习切入点

JDK的并发包中提供了几个非常有用的并发工具类。 CountDownLatch、 CyclicBarrier和 Semaphore工具类提供了一种并发流程控制的手段。本文将介绍CountDownLatch(闭锁)的实现原理。在了解闭锁之前需要先了解AQS,因为CountDownLatch的实现需要依赖于AQS共享锁的实现机制。

官方文档: https://docs.oracle.com/javase/8/docs/api/

百度翻译如下:

一种同步辅助程序,允许一个或多个线程等待在其它线程中执行的一组操作完成。使用给定的计数初始化CountDownLatch。由于调用了countDown()方法,await方法阻塞直到当前计数为零,之后释放所有等待线程,并立即返回await的任何后续调用。这是一个一次性现象——计数不能重置。如果需要重置计数的版本,请考虑使用CyclicBarrier。倒计时锁存器是一种通用的同步工具,可用于多种目的。使用计数1初始化的倒计时锁存器用作简单的开/关锁存器或门:调用倒计时()的线程打开它之前,调用它的所有线程都在门处等待。初始化为N的倒计时锁存器可用于使一个线程等待N个线程完成某个操作或某个操作已完成N次。倒计时锁存器的一个有用特性是,它不要求调用倒计时的线程在继续之前等待计数达到零,它只是防止任何线程在所有线程都可以通过之前继续通过等待。

2、CountDownLatchDemo 一览

// 计数器
public class CountDownLatchDemo {
    public static void main(String[] args) throws InterruptedException {
        // 总数是6,必须要执行任务的时候,再使用!
        CountDownLatch countDownLatch = new CountDownLatch(6);
        for (int i = 1; i <=6 ; i++) {
            new Thread(()->{
                System.out.println(Thread.currentThread().getName()+" Go out");
                countDownLatch.countDown(); // 数量-1
            },String.valueOf(i)).start();
        }
        countDownLatch.await(); // 等待计数器归零,然后再向下执行
        System.out.println("Close Door");
    }
}

结果如下:CountDownLatch可以理解为减法计数器

废话不多说,下面我们开始对CountDownLatch源码进行分析

3、CountDownLatch源码分析

首先我们打开CountDownLatch源码类(我把多余的注释都去掉了...):

package java.util.concurrent;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;

public class CountDownLatch {
   
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;
        //设置同步状态
        Sync(int count) {
            setState(count);
        }
        //获取同步状态的值
        int getCount() {
            return getState();
        }
        //获取共享锁,1、getState>1返回1:表示获取到共享锁,-1:表示没有获取到共享锁
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }
        //释放共享锁
        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                //通过CAS设置同步状态值,如果设置失败则说明同一时刻有其它线程在设置,但是会通过自旋的方式最终设置成功
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }

初始化阶段:找到它的构造函数,来瞄一眼!

    private final Sync sync;

    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

首先从构造函数出发,初始化变量,CountDownLatch使用了一个内部类Sync来实现CountDownLatch的同步控制,而Sync是AQS的一个实现类,它使用AQS的状态(state)来表示count。

public class CountDownLatch {
    /**
     * Synchronization control For CountDownLatch.
     * Uses AQS state to represent count.
     */
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            setState(count);
        }

        int getCount() {
            return getState();
        }
    private volatile int state;

    protected final void setState(int newState) {
        state = newState;
    }

设置状态变量state,其中state是一个volatile关键字,可用来保证可见性,不懂volatile可以看看这篇博客:多线程进阶——狂神说java之JUC并发编程,里面详细介绍了volatile的作用!

由上面可知实际上是把计数器的值赋值给了AQS的state,也就是这里AQS的状态值来表示计数器值。

3.1、【await】方法源码分析(阻塞流程分析,获取锁)

接下来主要看一下CountDownLatch中几个重要的方法内部是如何调用AQS来实现功能的。

获取一个共享模式锁,如果发生中断则异常终止。如何实现的呢? 首先会检查中断的状态,可能会重复的阻塞和解阻塞,执行 tryAcquireShared 直到成功或者线程被中断。 ①:首先判断当前线程是否被标记为中断状态,如果被标记为中断状态,则抛出“InterruptedException”异常,并清除中断标志;否则到第二步; ②:执行【tryAcquireShared】来尝试获取锁,如果成功,则返回true退出方法;否则执行到第③步; ③:执行【doAcquireSharedInterruptibly】

    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
    //AQS获取共享资源时,该方法是响应中断的
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        //如果线程中断则抛出异常
        if (Thread.interrupted())
            throw new InterruptedException();
        //tryAcquireShared(arg)是AQS提供的模板方法
        //尝试看当前计数器值是否为0,为0则直接返回,否则进入AQS的等待队列
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }
 //尝试看当前计数器值是否为0,为0则表示获取到共享锁,阻塞的等待的线程将会被唤醒      
 protected int tryAcquireShared(int acquires) {
       return (getState() == 0) ? 1 : -1;
  }

分析源码可知: 如果当前的count=0,那么方法会立即返回,并且返回值为true。 如果当前的count>0,则当前线程因为线程调度而不可用,并且处于休眠状态(进入了AQS的 doAcquireSharedInterruptibly方法让当前线程休眠),直到发生下面两件事之一: ①:由countDown方法的调用当前的count=0,如果count=0,则这个方法将返回true。 ②:其它线程中断了当前的线程,如果当前线程在进入这个方法时设置了中断状态,或者当前线程在等待时被设置了中断状态,那么“InterruptedException”异常将会抛出,并且当前线程的中断状态会被清除。

接下来我们瞅瞅【doAcquireSharedInterruptibly】方法如何实现让当前线程休眠的!

①:创建一个共享模式的节点,并将这个节点加入到等待队列中。

②:获取新创建好的节点的前驱节点。如果前驱节点是head节点,则说明当前节点是队列中第一个等待获取锁的节点,那么就执行【 tryAcquireShared 】方法尝试获取共享锁。 tryAcquireShared 是由 CountDownLatch 重写的方法。假如 获取共享锁失败,进入步骤③。

③:如果前驱节点不是head节点,或者当前节点获取共享锁失败(即步骤②),那么执行【 shouldParkAfterFailedAcquire 】方法,该方法如果获取共享锁失败,则会阻塞挂起当前线程。接着执行【 parkAndCheckInterrupt 】方法,该方法会将当前线程挂起,直到被唤醒,这样做可以避免线程无限循环获取不到锁,从而造成CPU资源的浪费!

友情提示:

在这个逻辑代码中使用了大量的 CAS来进行原子性修改,当修改失败的时候,则会通过for(;;)——自旋锁来保证在多线程并发的情况下,队列节点状态也是正确的,最终使得当前节点状态为,要么获取共享锁成功,要么挂起等待被唤醒!

我们对阻塞情况下,涉及的方法进一步展开【addWaiter】

根据给的的模式创建当前线程的节点,并将创建好的节点入队(加入到等待队列的尾部)。 首先在队列非空的情况下会尝试一次快速入队,也就是通过尝试一次CAS操作入队,如果CAS操作失败,则调用【enq】方法进行“自旋+CAS”方法将创建好的节点加入到队列尾部!

【enq】:使用CAS+自旋的方式插入节点到等待队列,如果等待队列为空,则初始化队列。 初始化队列:怎么初始化呢?首先创建一个空节点,将head和tail都指向这个节点。然后才是将我们待插入的节点插入,即:emptyNode->newNode,head指向emptyNode,tail指向newNode。

【tryAcquireShared】 这个方法总是被线程执行获取共享锁时被调用。如果这个方法报告失败,那么会使进入这个方法的线程排队等待,如果线程还没有入队的话,直到其它线程发出释放的信号。默认实现抛出一个“UnsupportedOperationException”异常!

【shouldParkAfterFailedAcquire】该方法如果获取共享锁失败,则会阻塞挂起当前线程。接着执行【parkAndCheckInterrupt】方法,该方法会将当前线程挂起(LockSupport.park),直到被唤醒,这样做可以避免线程无限循环获取不到锁,从而造成CPU资源的浪费!

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        //只有当当前节点状态为Singal才返回true
        if (ws == Node.SIGNAL)
            return true;
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

我们对shouldParkAfterFailedAcquire分析下,首先获取到当前节点的状态,翻看源码可知,有四个状态,如下:

线程已经被取消
static final int CANCELLED =  1;
线程需要去被唤醒
static final int SIGNAL    = -1;
线程正在唤醒等待条件
static final int CONDITION = -2; 
//线程的共享锁应该被无条件传播
static final int PROPAGATE = -3;

shouldParkAfterFailedAcquire是位于无限for循环内的,这一点需要注意一般每个节点都会经历两次循环后然后被阻塞。建议读者试着走一遍,以加深理解 ,当该函数返回true时 线程调用parkAndCheckInterrupt这个阻塞自身。到这里基本每个调用await函数都阻塞在这里 (很关键哦,应为下次唤醒,从这里开始执行哦)

    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }
    public static void park(Object blocker) {
        Thread t = Thread.currentThread();
        setBlocker(t, blocker);
        UNSAFE.park(false, 0L);
        setBlocker(t, null);
    }

public native void park(boolean var1, long var2);

3.2、LockSupport的park openjdk源码分析

进入LockSupport的park方法,可以发现它是调用了Unsafe的park方法,这是一个本地native方法,只能通过openjdk的源码看看其本地实现了。 

类中定义了一个int类型的_counter变量,可以先执行unpark后执行park,就是通过这个变量实现,看park方法的实现代码(由于方法比较长就不整体截图了):

park方法会调用Atomic::xchg方法,这个方法会原子性的将_counter赋值为0,并返回赋值前的值。如果调用park方法前,_counter大于0,则说明之前调用过unpark方法,所以park方法直接返回。

接着往下看:

实际上Parker类用Posix的mutex,condition来实现的阻塞唤醒。如果对mutex和condition不熟,可以简单理解为mutex就是Java里的synchronized,condition就是Object里的wait/notify操作。park方法里调用pthread_mutex_trylock方法,就相当于Java线程进入Java的同步代码块,然后再次判断_counter是否大于零,如果大于零则将_counter设置为零。最后调用pthread_mutex_unlock解锁,相当于Java执行完退出同步代码块。如果_counter不大于零,则继续往下执行pthread_cond_wait方法,实现当前线程的阻塞。

3.3、countDown()方法源码分析(释放流程分析,释放锁)

接着让我们来看看countDown这个函数的玄机吧,因为线程就是通过这个来函数来触发唤醒条件的 :

减少count,如果count=0,则释放所有正在等待的线程;如果当前的count>0,那么减少count。如果减少后的count=0,那么进入【doReleaseShared】方法,使得所有正在等待的线程因为线程调度的原因被重写启用。如果当前的count值已经为0,那么什么都不会发生!

//调用countDown()释放同步状态,每次调用同步状态值-1
 public void countDown() {
       sync.releaseShared(1);
 }

public final boolean releaseShared(int arg) {
        //tryReleaseShared方法必须保证同步状态线程安全释放,一般是通过CAS和循环来实现
        if (tryReleaseShared(arg)) {
           //唤醒后续处于等待的节点
            doReleaseShared();
            return true;
        }
        return false;
    }

protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {//自旋锁
                int c = getState();
                //在并发情况下,可能已经被其他线程修改为0,则直接返回false
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))//CAS自旋减一
                    return nextc =www.shentuylgw.cn= 0;//next为0返回true
            }
        }
    }

【doReleaseShared】开始唤醒后续需要等待被唤醒的线程

//释放共享锁,通知后续节点,主要是唤醒调用了await方法的线程(一般为主线程)
private void doReleaseShared(www.yongshenyul.com) {
        for (;;) {
            Node h = head;//获取头节点
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                //SIGNAL为-1,后继节点的线程处于等待状态,当前节点的线程如果释放了同步状态或者被取消,将会通知后继节点
                if (ws == Node.SIGNAL) { //头结点的状态为Node.SIGNAL
                    //将头结点的状态值设置为0
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);//这里唤醒后继节点
                }
                //WaitStatus为0的时候表示为初始状态,设置当前节点为-3,表示线程的共享锁应该被无条件传播
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                www.lecaixuanzc.cn// loop on failed CAS
            }
           //如果h还是指向头结点,说明没有其他节点对头结点进行修改
            if (h == head)                    www.shundayLzc.cn// loop if head changed
                break;
        }
    }

只有当在最后一个执行 countDown()方法的线程时,才会进入在doReleaseShared()方法中,其大致的逻辑如下:

1、判断head节点不为null,且不为tail节点,说明等待队列中有等待唤醒的线程,在等待队列中,头结点中并没有保存正在等待的线程,其只是一个空的Node节点,真正等待的线程是从头结点的下一个节点开始排队等待的。

2、在判断等待队列中有正在等待的线程之后,将头结点的状态信息置为初始状态0,并且调用 unparkSuccessor(Node)方法唤醒后继节点,使后继节点可以尝试去获取共享锁。(重点)

3、如果头结点的的 waitStatus为0此时为初始状态 ,则将头结点的 waitStatus设置为为-3,表示下一次同步状态的获取将会无条件的传播下去。

4、头结点没有被其他线程修改,则跳出循环。

下面瞅瞅【unparkSuccessor】函数是如何唤醒后继节点的

private void unparkSuccessor(Node node) {
       
        int ws = node.waitStatus;
        if (ws < www.yasenyulee.cn)
            compareAndSetWaitStatus(node, ws, 0);
        Node s = node.next;
        if (s == null |www.sangyulpt.com | s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);www.wangffzc.cn//唤醒线程(解锁的过程)
    }

当调用了【LockSupport.unpark(s.thread)】操作后,等待队列中的第一个等待的线程就会重新启动。流程回到【doAcquireSharedInterruptibly】让当前线程休眠的方法!这里,线程从阻塞状态进行恢复。

第一个释放的线程从【parkAndCheckInterrupt】方法中的【LockSupport.park】挂起结束,继续后面的流程,因为此时是正常的被唤醒流程,线程并没有设置中断标志,因此【parkAndCheckInterrupt】会返回false。流程重新开始循环。执行到大框中,获取共享锁成功。接着通过【setHeadAndPropagate】将当前节点设置为头结点并进行广播。然后将旧的head节点的next置为null,heap GC,结束方法调用返回true。

private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);//设置Head为头结点
    
        //如果走到了setHeadAndPropagate方法,那么propagate的值一定大于1,if条件成立
        if (propagate > 0 |www.baishenjzc.cn  h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            //获取当前节点的下一个节点
            Node s = node.next;
            if (s == null |www.jinniugpt.cn| s.isShared())
                doReleaseShared();//进入这里
        }
    }

这个函数相信你不陌生吧,就是第一个释放锁所调用的,在这里,被唤醒的线程再调一次,来释放后继节点

private void doReleaseShared(www.yixingxzc.cn) {
        for (;;) {
            Node h = head;
            if (h != null && h !www.muyuyulept.com= tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            www.jinniuylziz.cn// loop to recheck cases
                    unparkSuccessor(h);//唤醒后续等待被唤醒的线程
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
           //明白这里为什么要加一次判断了吧!!!,被唤醒的线程会在执行该函数
            if (h == head)                   // loop if head changed
                break;
        }
    }

现在明白其唤醒机制了吧 先唤醒一个线程(第一个阻塞的线程) 然后被唤醒的线程又会执行到这里唤醒线程,如此重复下去 最终所有线程都会被唤醒, 其实这也是AQS共享锁的唤醒原理,自此完成了对countDownLatch阻塞和唤醒原理的基本分析。

3.4、countDown()执行流程和await()执行流程

【unparkSuccessor】方法

    private void unparkSuccessor(Node node) {
      
        int ws = node.waitStatus;
        if (ws www.jujinyulee.com< 0)
            compareAndSetWaitStatus(node, ws, 0);
        Node s = node.next;
        if (s == null |www.jucaiyle.cn| s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

3.5、LockSupport的unpark openjdk源码分析

进入LockSupport的unpark方法,可以发现它是调用了Unsafe的unpark方法,这是一个本地native方法,只能通过openjdk的源码看看其本地实现了。 

图中的1和4就相当于Java的进入synchronized和退出synchronized的加锁解锁操作,代码2将_counter设置为1,同时判断先前_counter的值是否小于1,即这段代码:if(s<1)。如果不小于1,则就不会有线程被park,所以方法直接执行完毕,否则就会执行代码3,来唤醒被阻塞的线程。

4、总结:

第一步:构造方法带参数告诉我们需要多少个线程数量

第二步:await方法是去拿到节点的共享锁,并且调用locksupport.part上锁

第三步:countDown方法是去释放这个锁,并且调用Locksupport.unpark解锁

CountDownLatch还提供了超时等待机制,在特定时间后就不会再阻塞当前线程;不可能重新初始化或者修改CountDownLatch对象的内部计数器的值。一个线程调用countDown方法happen-before,另外一个线程调用await方法。

CountDownLatch 底层实现依赖于AQS共享锁的实现机制,首先初始化计数器count,调用countDown()方法时,计数器count-1,当计数器count=0时,会唤醒处于AQS等待队列中的线程。调用await()方法,线程会被挂起,他会等待直到count=0才会继续执行,否则会加入到等待队列中,等待被唤醒。

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • JVM源码分析之javaagent原理完全解读

    当我们一个系统既需要mysql驱动,也需要oracle驱动的时候,在并发加载初始化这些驱动类的过程中产生死锁的可能性非常大,下面是一个模拟的例子,对于Threa...

    不会飞的小鸟
  • 爱奇艺网络协程编写高并发应用实践

    本⽂以爱奇艺开源的⽹络协程库(https://www.jintianxuesha.com)为例,讲解⽹络协程的设计原理、编程实践、性能优化等⽅⾯内容。

    不会飞的小鸟
  • GStreamer基础教程07 - 播放速率控制

      在常见的媒体播放器中,通常可以看到快进,快退,慢放等功能,这部分功能被称为“特技模式(Trick Mode)”,这些模式有个共同点:都通过修改播放的速率来达...

    不会飞的小鸟
  • Java 多线程学习(2)——停止线程

    interrupt()方法仅仅是在当前线程中打了一个停止标记,并不会真正的停止线程。 示例如下:

    胡了了
  • Java 线程基础

    简言之,进程可视为一个正在运行的程序。它是系统运行程序的基本单位,因此进程是动态的。进程是具有一定独立功能的程序关于某个数据集合上的一次运行活动。进程是操作系统...

    静默虚空
  • C# dotnet 自己实现一个线程同步上下文

    昨天鹏飞哥问了我一个问题,为什么在控制台程序的主线程等待某个线程执行完成之后回来,是在其他线程执行的。而 WPF 在等待某个线程执行完成之后,可以回到主线程执行...

    林德熙
  • Java并发之线程

         在前面我们介绍的一些内容中,我们的程序都是一条执行流,一步一步的执行。但其实这种程序对我们计算机的资源的使用上是低效的。例如:我们有一个用于计算的程序...

    Single
  • ThreadPoolExecutor源码分析

    ThreadPoolExecutor继承AbstractExecutorService,层级实现了ExecutorService,ExecutorService...

    冰枫
  • Java并发编程:线程控制

    在上一篇文章中(Java并发编程:线程的基本状态)我们介绍了线程状态的 5 种基本状态以及线程的声明周期。这篇文章将深入讲解Java如何对线程进行状态控制,比如...

    陈树义
  • JMeter(连载3)

    这个组件用于测试流程的参数化,参数化文件采用类似于CSV文件。如图16所示。通过菜单“Add->Config Element->CSVData Set Conf...

    小老鼠

扫码关注云+社区

领取腾讯云代金券