前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >HyStrix替代方案限流降级框架 Sentinel 的原理和实践

HyStrix替代方案限流降级框架 Sentinel 的原理和实践

作者头像
用户5927304
发布2019-09-17 16:13:38
2.8K0
发布2019-09-17 16:13:38
举报
文章被收录于专栏:无敌码农无敌码农

作者丨童海洋(马蜂窝大交通事业部研发工程师)

在实际业务中可能碰到这样的情况:

场景 1

有 A 和 B 两个服务,服务 A 作为业务访问的入口直接暴露给用户使用,服务 B 由 A 调用,负责查询一部分供应商的信息,并在设定时间内返回。这时由于出现某种原因,导致业务B的响应时间超时,需要5s才能获取到供应商的查询信息。即使我们忽略其它操作的时间,这也足以带给用户非常差的体验。

这类问题我们会用熔断降级来解决,具体是指当下游服务 B 因为某种原因突然变得不可用或响应时间超过预设值时,上游的 A 服务为了保证核心服务的可用性,会直接返回用户一个 Mock 数据,缓解服务器的压力。

场景 2

某个业务系统的正常 QPS 为 25, 这时突然出现了一个爬虫程序,以 QPS 高达 100 的频率爬取业务数据,正逐渐将业务拖垮,严重影响正常用户的访问。

对于这类明显的异常,我们可以采用流量控制的方式,当系统 QPS 超过 20 后,直接拒绝其余的访问请求,来保证系统可用。

可见,在生产环境下,熔断降级和流量控制对保证线上服务的稳定可靠起到重要作用。特别是随着微服务的流行,服务和服务之间的稳定性变得越来越重要,熔断降级和流量控制等策略及更好的实现手段也更受关注。在此,墙裂推荐大家使用 Sentinel 完成服务的熔断降级和流量控制。

一、什么是 Sentinel?

Sentinel 是面向分布式服务架构的轻量级流量控制组件,由阿里开源,主要以流量为切入点,从限流、流量整形、熔断降级、系统负载保护等多个维度来保障微服务的稳定性。

这里借用一张表,来说明 Sentinel 和其它主流中间件的性能对比:

从上面的对比我们可以看到,Sentinel 的优势还是比较明显的,比如更丰富的熔断降级和限流策略、支持系统自适应保护、比较易用的控制台、良好的扩展性,以及更广泛的开源生态等。

以下我将结合官方文档和在业务实际应用过程中的理解,介绍 Sentinel 的原理、核心概念和如何使用。

二、Sentinel 原理及核心概念

2.1 核心概念

  • ResourceWrapper

Sentinel 控制的对象即为资源,调用 Entry 方法的时候会 New 一个资源对象,资源由一个全局唯一的资源名称标识。

  • Context

Sentinel 的上下文,包含了上下文名称,一个调用链一个 Context,可以显示创建或者在调用 Entry 的时候隐式创建。

  • Node

持有运行时的资源的各种统计数据。

  1. 一个 Resource 在同一个 Context 中有且仅有一个 DefaultNode 与之对应
  2. 一个 Resource 全局有且仅有一个 ClusterNode
  • Entry

代表一次对资源的访问,每访问一个资源都会创建一个 Entry,在 Context 中以一个双向链表存在。

必须调用 exit() 方法的原因就在于这个链表:exit 方法中会判断上下文的当前 entry 是不是 this,此时其他 entry 掉用 exit 会发现不相等,从而抛出异常。

  • ProcessorSlot

处理插槽,资源的各种控制都通过不同的 Slot 实现类去完成。

  • ProcessorSlotChain

由各个处理插槽组成的链表,每个资源在整个服务中对应一个处理链。

  • Rule

用户定义的各种规则。

  • RuleManager

加载并管理 Rule。

2.2 工作原理

在 Sentinel 里,所有的资源都对应一个资源名称(resourceName),每次资源调用都会创建一个 Entry 对象。Entry 可以通过对主流框架的适配自动创建,也可以通过注解的方式或调用 SphU API 显式创建。Entry 创建的时候,同时也会创建一系列功能插槽(slot chain),这些插槽有不同的职责,例如:

  • NodeSelectorSlot 负责收集资源的路径,并将这些资源的调用路径,以树状结构存储起来,用于根据调用路径来限流降级;
  • ClusterBuilderSlot 则用于存储资源的统计信息以及调用者信息,例如该资源的 RT, QPS, Thread Count 等等,这些信息将用作为多维度限流,降级的依据;
  • StatisticSlot 则用于记录、统计不同纬度的 runtime 指标监控信息;
  • FlowSlot 则用于根据预设的限流规则以及前面 slot 统计的状态,来进行流量控制;
  • AuthoritySlot 则根据配置的黑白名单和调用来源信息,来做黑白名单控制;
  • DegradeSlot 则通过统计信息以及预设的规则,来做熔断降级;
  • SystemSlot 则通过系统的状态,例如 load1 等,来控制总的入口流量。

整体框架图:

Sentinel 将 SlotChainBuilder 作为 SPI 接口进行扩展,使得 Slot Chain 具备了扩展的能力。可以自行加入自定义的 slot 并编排 slot 间的顺序,来为 Sentinel 添加自定义功能:

下面重点讲解流量控制和熔断降级部分。

2.2.1 流量控制

本次介绍不涉及集群流控,因为集群流控需要通过配置单独的 Server 与其它实例通信,来判断是否调用规则。而在我们现有的环境下,生产部署的集群是不区分 Client 和 Server 的。因此,以下介绍以单个实例的流量控制为主。

上文简单提到,流量控制(Flow Control)的原理是监控应用流量的 QPS 或并发线程数等指标,当达到指定的阈值时对流量进行控制,以避免被瞬时的流量高峰冲垮,从而保障应用的高可用性。

同一个资源可以创建多条限流规则。Sentinel 中的 FlowSlot 会对该资源的所有限流规则依次遍历,直到有规则触发限流或者所有规则遍历完毕。

  • 并发线程数流量控制

并发线程数限流用于保护业务线程数不被耗尽。例如,当应用所依赖的下游应用由于某种原因导致服务不稳定、响应延迟增加,对于调用者来说,意味着吞吐量下降和更多的线程数占用,极端情况下甚至导致线程池耗尽。为应对太多线程占用的情况,业内有使用隔离的方案,比如通过不同业务逻辑使用不同线程池来隔离业务自身之间的资源争抢(线程池隔离)。这种隔离方案虽然隔离性比较好,但是代价就是线程数目太多,线程上下文切换的 overhead 比较大,特别是对低延时的调用有比较大的影响。

Sentinel 并发线程数限流不负责创建和管理线程池,而是简单统计当前请求上下文的线程数目,如果超出阈值,新的请求会被立即拒绝,效果类似于信号量隔离。

  • QPS 流量控制

当 QPS 超过某个阈值的时候,则采取措施进行流量控制。流量控制的手段包括以下几种:直接拒绝、Warm Up、匀速排队。对应 FlowRule 中的 controlBehavior 字段。

  • 直接拒绝

直接拒绝(RuleConstant.CONTROL_BEHAVIOR_DEFAULT)方式是默认的流量控制方式,当 QPS 超过任意规则的阈值后,新的请求就会被立即拒绝,拒绝方式为抛出 FlowException。这种方式适用于对系统处理能力确切已知的情况下,比如通过压测确定了系统的准确水位时。

  • Warm Up

Warm Up(RuleConstant.CONTROL_BEHAVIOR_WARM_UP)方式,即预热/冷启动方式。系统长期处于低水位的情况下,当流量突然增加时,直接把系统拉升到高水位可能瞬间把系统压垮。通过「冷启动」让通过的流量缓慢增加,在一定时间内逐渐增加到阈值上限,可以给冷系统一个预热的时间,避免冷系统被压垮。

通常冷启动的过程系统允许通过的 QPS 曲线如下图所示:

  • 匀速排队

匀速排队(RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER)方式会严格控制请求通过的间隔时间,即让请求以均匀的速度通过,与之对应的是漏桶算法。

匀速排队方式的作用如下图所示:

这种方式主要用于处理间隔性突发的流量,例如消息队列。想象一下这样的场景,在某一秒有大量的请求到来,而接下来的几秒则处于空闲状态,我们希望系统能够在接下来的空闲期逐渐处理这些请求,而不是在第一秒直接拒绝多余的请求。

  • 预加热 + 匀速排队

即 Warm Up + 匀速排队。

  • 基于调用关系的流量控制

调用关系包括调用方、被调用方;一个方法又可能会调用其它方法,形成一个调用链路的层次关系。Sentinel 通过 NodeSelectorSlot 建立不同资源间的调用的关系,并且通过 ClusterNodeBuilderSlot 记录每个资源的实时统计信息。

有了调用链路的统计信息,我们可以衍生出多种流量控制手段。

1)根据调用方限流

ContextUtil.enter(resourceName, origin) 方法中的 origin 参数标明了调用方身份。这些信息会在 ClusterBuilderSlot 中被统计。

流控规则中的 limitApp 字段用于根据调用来源进行流量控制。该字段的值有以下三种选项,分别对应不同的场景:

  • default:表示不区分调用者,来自任何调用者的请求都将进行限流统计。如果这个资源名的调用总和超过了这条规则定义的阈值,则触发限流。
  • {some_origin_name}:表示针对特定的调用者,只有来自这个调用者的请求才会进行流量控制。例如 NodeA 配置了一条针对调用者 caller1 的规则,那么当且仅当来自 caller1 对 NodeA 的请求才会触发流量控制。
  • other:表示针对除 {some_origin_name} 以外的其余调用方的流量进行流量控制。例如,资源 NodeA 配置了一条针对调用者 caller1 的限流规则,同时又配置了一条调用者为 other 的规则,那么任意来自非 caller1 对 NodeA 的调用,都不能超过 other 这条规则定义的阈值。

同一个资源名可以配置多条规则,规则的生效顺序为:{some_origin_name} > other > default。

2)根据调用链路入口限流:链路限流

NodeSelectorSlot 中记录了资源之间的调用链路,这些资源通过调用关系,相互之间构成一棵调用树。这棵树的根节点是一个名字为 machine-root 的虚拟节点,调用链的入口都是这个虚节点的子节点。

一棵典型的调用树如下图所示:

上图中来自入口 Entrance1 和 Entrance2 的请求都调用到了资源 NodeA,Sentinel 允许只根据某个入口的统计信息对资源限流。比如我们可以设置 FlowRule.strategy 为 RuleConstant.CHAIN,同时设置 FlowRule.refResource 为 Entrance1 来表示只有从入口 Entrance1 的调用才会记录到 NodeA 的限流统计当中,而不关心经 Entrance2 到来的调用。

调用链的入口(上下文)是通过 API 方法 ContextUtil.enter(contextName) 定义的,其中 contextName 即对应调用链路入口名称。

  • 具有关系的资源流量控制:关联流量控制

当两个资源之间具有资源争抢或者依赖关系的时候,这两个资源便具有了关联。比如对数据库同一个字段的读操作和写操作存在争抢,读的速度过高会影响写的速度,写的速度过高会影响读的速度。如果放任读写操作争抢资源,则争抢本身带来的开销会降低整体的吞吐量。

这里可使用关联限流来避免具有关联关系的资源之间过度争抢。举例来说,read_db 和 write_db 这两个资源分别代表数据库读写,我们可以给 read_db 设置限流规则来达到写优先的目的:设置FlowRule.strategy为RuleConstant.RELATE同时设置FlowRule.refResource为write_db。这样当写库操作过于频繁时,读数据的请求会被限流。

规则判断流程:

2.2.2 熔断降级

Sentinel 目前有三种降级策略。当然, 我们还可以通过自定义 Slot 和 Rule 的方式制定符合要求的降级策略。

  • 平均响应时间 (DEGRADE_GRADE_RT):当 1s 内持续进入 5 个请求,对应时刻的平均响应时间(秒级)均超过阈值(count,以 ms 为单位),那么在接下的时间窗口(DegradeRule 中的 timeWindow,以 s 为单位)之内,对这个方法的调用都会自动地熔断(抛出 DegradeException)。注意 Sentinel 默认统计的 RT 上限是 4900 ms,超出此阈值的都会算作 4900 ms,若需要变更此上限可以通过启动配置项-Dcsp.sentinel.statistic.max.rt=xxx 来配置。
  • 异常比例 (DEGRADE_GRADE_EXCEPTION_RATIO):当资源的每秒请求量 >= 5,并且每秒异常总数占通过量的比值超过阈值(DegradeRule 中的 count)之后,资源进入降级状态,即在接下的时间窗口(DegradeRule 中的 timeWindow,以 s 为单位)之内,对这个方法的调用都会自动地返回。异常比率的阈值范围是 [0.0, 1.0],代表 0% - 100%。
  • 异常数 (DEGRADE_GRADE_EXCEPTION_COUNT):当资源近 1 分钟的异常数目超过阈值之后会进行熔断。注意由于统计时间窗口是分钟级别的,若 timeWindow 小于 60s,则结束熔断状态后仍可能再进入熔断状态。

规则判断流程:

当 SlotChain 执行到 DegradeSlot 时,DegradeSlot 先调用 DegradeManager 判断规则。执行完毕后,如果还有下一个 Slot,就执行下一个 Slot。

DegradeManager 会调用 DegradeRule 进行判断。DegradeRule 在判断符合熔断规则的情况下会抛出异常,否则资源正常执行。

三、Sentinel 使用示例

3.1 Sentinel 启动配置项

  • sentinel-core 配置项
  • sentinel-transport-common 配置项:

3.2 编码方法 (调用SphU API显示创建)

代码语言:javascript
复制
public ResultWrapper<OrderDTO> getOrder(String id) {
        /**
         * pre inception
         */
        Entry entry = null;
        try {
            entry = SphU.entry("getOrder");
            ResultWrapper<OrderDTO> orderResultWrapper = testRestManagerImpl.getOrderInfo(id);

            if (orderResultWrapper.getData() == null) {
                orderResultWrapper.setData(new OrderDTO("No Order", "No Shop", " Random Data"));
            }

            return orderResultWrapper;
        } catch (BlockException e) {
            Tracer.trace(e);
            log.info(id + " 服务熔断");
            /**
             * 可以mock一个假数据返回
             */
            return new ResultWrapper<>();
        } catch (Exception e) {
            log.info(id + " 业务异常");
            /**
             * 业务异常,mock假数据返回
             */
            return new ResultWrapper<>();
        } finally {
            if (entry != null) {
                entry.exit();
            }
        }
        /**
         * 之前也可以不返回结果,这里调用其他的业务逻辑
         * post inception
         */
    }

由实例代码看出,编码方式对系统是有一定入侵的。但好处也很明显,就是资源真正由编码人员掌控,资源甚至可以是一个小小的代码块。

3.3 注解方式

核心注解:@SentinelResource

  • value:资源名称,必需项(不能为空)
  • entryType:entry 类型,可选项(默认为 EntryType.OUT)
  • blockHandler / blockHandlerClass: blockHandler 对应处理 BlockException 的函数名称,可选项。blockHandler 函数访问范围需要是 public,返回类型需要与原方法相匹配,参数类型需要和原方法相匹配并且最后加一个额外的参数,类型为 BlockException。blockHandler 函数默认需要和原方法在同一个类中。若希望使用其他类的函数,则可以指定 blockHandlerClass 为对应的类的 Class 对象,注意对应的函数必须为 static 函数,否则无法解析。
  • fallback:fallback 函数名称,可选项,用于在抛出异常的时候提供 fallback 处理逻辑。fallback 函数可以针对所有类型的异常(除了 exceptionsToIgnore 里面排除掉的异常类型)进行处理。fallback 函数签名和位置要求:
  • 返回值类型必须与原函数返回值类型一致;
  • 方法参数列表需要和原函数一致,或者可以额外多一个 Throwable 类型的参数用于接收对应的异常。
  • fallback 函数默认需要和原方法在同一个类中。若希望使用其他类的函数,则可以指定 fallbackClass 为对应的类的 Class 对象,注意对应的函数必须为 static 函数,否则无法解析。
  • defaultFallback(since 1.6.0):默认的 fallback 函数名称,可选项,通常用于通用的 fallback 逻辑(即可以用于很多服务或方法)。默认 fallback 函数可以针对所有类型的异常(除了 exceptionsToIgnore 里面排除掉的异常类型)进行处理。若同时配置了 fallback 和 defaultFallback,则只有 fallback 会生效。defaultFallback 函数签名要求:
  • 返回值类型必须与原函数返回值类型一致;
  • 方法参数列表需要为空,或者可以额外多一个 Throwable 类型的参数用于接收对应的异常。
  • defaultFallback 函数默认需要和原方法在同一个类中。若希望使用其他类的函数,则可以指定 fallbackClass 为对应的类的 Class 对象,注意对应的函数必须为 static 函数,否则无法解析。
  • exceptionsToIgnore(since1.6.0):用于指定哪些异常被排除掉,不会计入异常统计中,也不会进入 fallback 逻辑中,而是会原样抛出。
代码语言:javascript
复制
首先,注入SentinelResource注解
@Configuration
public class SentinelAspectConfiguration {

    @Bean
    public SentinelResourceAspect sentinelResourceAspect() {
        return new SentinelResourceAspect();
    }
}

然后直接使用注解:
@SentinelResource(value = "getOrder",entryType = EntryType.IN, fallback = "getOrderFallBack", fallbackClass = {ITestRestAPIUtil.class}, blockHandler = "getOrderHandler", blockHandlerClass = {ITestRestAPIUtil.class})
public ResultWrapper<OrderDTO> getOrder(String id) {
    ResultWrapper<OrderDTO> orderResultWrapper = testRestManagerImpl.getOrderInfo(id);

    if (orderResultWrapper.getData() == null) {
        orderResultWrapper.setData(new OrderDTO("No Order", "No Shop", " Random Data"));
    }

    return orderResultWrapper;
}

然后实现我们设定的BlockHander和fallback方法
@Slf4j
public class ITestRestAPIUtil {
    public static ResultWrapper<OrderDTO> getOrderHandler(String id, BlockException ex) {
//        log.info(JSON.toJSONString(ex));
        //这里可以进行埋点
        //mock假数据返回
        return new ResultWrapper().setSuccess(true).setData(new OrderDTO()).setCode("服务已被降级").addMessage("服务已被降级");
    }

    public static ResultWrapper<OrderDTO> getOrderFallBack(String id, Throwable throwable) {
        log.info("捕捉到业务异常:{}", throwable.toString());
//        log.error("getOrderError", id);
        //mock假数据返回
        return new ResultWrapper<OrderDTO>().setSuccess(false).addMessage("捕捉到业务异常");
       }
}

由实例代码可以看出,注解方式对系统侵入较小,利于维护与修改。但由于 SentinelResource 仅作用于 Method,资源和方法的界限在这里混淆。

3.4 动态配置规则

如果规则的修改无法实时改变,那Sentinel的效果将大打折扣。我们可以配合 Apollo 来动态的修改所需规则。

官方给出了 sentinel-datasource-apollo 的依赖来支持 Apollo 的实现,个人认为实用意义不大。因为项目中可能不止 Sentinel 会用到 Apollo,没有理由将 Sentinel 和其它业务使用完全剥离开来。而且该依赖也只是将 Apollo 的使用封装了起来。

代码语言:javascript
复制
@Configuration
public class SentinelConfig {

    @Resource
    private ApolloConfigureCenter apolloConfigureCenter;

    @PostConstruct
    private void initFlowRule() {
        List<FlowRule> rules = JSON.parseArray(apolloConfigureCenter.getFlowControlRule(), FlowRule.class);
        FlowRuleManager.loadRules(rules);
    }

    @PostConstruct
    private void initDegradeRule() {
        List<DegradeRule> rules = JSON.parseArray(apolloConfigureCenter.getDegradeRule(), DegradeRule.class);
        DegradeRuleManager.loadRules(rules);
    }

    //自定义规则,另一种写在代码中方法,也可以配置到apollo中
    @PostConstruct
    private void initCustomRule() {
        List<CustomRule> rules = new ArrayList<>();
        CustomRule customRule = new CustomRule();
        customRule.setPercentage();
        customRule.setTimeWindow();
        customRule.setResource("testCustom");
        rules.add(customRule);
        CustomRuleManager.loadRules(rules);
    }
}

可以看出,上面的实例是通过 loadRules 这个方法来注入规则的。分析源码可以得知,loadRules 其实是对之前规则的一次重制,这就意味着我们可以动态的修改规则,只需要对 Apollo 进行监听就可以了。

其它规则配置大致与此思路类似,都是 Clear 原先的 Rules,然后添加新的 Rules。

3.5 实时监控使用方式

下载 Sentinel 源码,打包运行 sentinel-dashboard.jar 就行了。

java -Dserver.port=8082 -Dcsp.sentinel.dashboard.server=localhost:8082 -jar sentinel-dashboard/target/sentinel-dashboard.jar 可以指定实时监控页面的端口号与 IP 地址。

四、Sentinel 进阶使用

自定义 Rule 与自定义 SlotChain

Sentinel已经实现的功能基本可以满足大部分情况下的需求。但在一些特殊场景下,我们还是需要设定自己的规则。

这里以通过 Sentinel 实现灰度作为示例。

首先自定义一个CustomRule:

代码语言:javascript
复制
public class CustomRule extends AbstractRule {
    private static final int RT_MAX_EXCEED_N = ;
    private double percentage;
    private int timeWindow;
    private final AtomicBoolean cut = new AtomicBoolean(false);
    private AtomicLong passCount = new AtomicLong(0L);

    public CustomRule() {
    }

    public CustomRule(String resourceName) {
        this.setResource(resourceName);
    }

    public double getPercentage() {
        return this.percentage;
    }

    public CustomRule setPercentage(double percentage) {
        this.percentage = percentage;
        return this;
    }

    public int getTimeWindow() {
        return timeWindow;
    }

    public CustomRule setTimeWindow(int timeWindow) {
        this.timeWindow = timeWindow;
        return this;
    }

    private boolean isCut() {
        return this.cut.get();
    }

    private void setCut(boolean cut) {
        this.cut.set(cut);
    }

    public AtomicLong getPassCount() {
        return this.passCount;
    }

    public boolean equals(Object o) {
        if (this == o) {
            return true;
        } else if (!(o instanceof CustomRule)) {
            return false;
        } else if (!super.equals(o)) {
            return false;
        } else {
            CustomRule that = (CustomRule)o;
            if (this.percentage != that.percentage) {
                return false;
            } else {
                return false;
            }
        }
    }

    public int hashCode() {
        int result = super.hashCode();
        result =  * result + (new Double(this.percentage)).hashCode();
        result =  * result + this.timeWindow;
        return result;
    }

    public boolean passCheck(Context context, DefaultNode node, int acquireCount, Object... args) {
        //实际情况下肯定不会只传一个参数,需要看情况自己解析
        Object[] objects = args;
        String uid = (String)objects[];
        if (StringUtils.isBlank(uid)) {
            return true;
        } else {
            取后两位与灰度值进行比较
            double greyUID = Double.parseDouble(uid.substring(uid.length()-));
            if (greyUID < this.percentage) {
                return true;
            }
        }

        return false;
    }

    public String toString() {
        return "CustomRule{resource=" + this.getResource() + ", percentage=" + this.percentage + ", limitApp=" + this.getLimitApp() + ", timeWindow=" + this.timeWindow + "}";
    }

    private static final class ResetTask implements Runnable {
        private CustomRule rule;

        ResetTask(CustomRule rule) {
            this.rule = rule;
        }

        public void run() {
            this.rule.getPassCount().set(0L);
            this.rule.cut.set(false);
        }
    }
}

定义 CustomRuleException:

代码语言:javascript
复制
public class CustomException extends BlockException {
    public CustomException(String ruleLimitApp) {
        super(ruleLimitApp);
    }

    public CustomException(String ruleLimitApp, CustomRule rule) {
        super(ruleLimitApp, rule);
    }

    public CustomException(String message, Throwable cause) {
        super(message, cause);
    }

    public CustomException(String ruleLimitApp, String message) {
        super(ruleLimitApp, message);
    }

    @Override
    public Throwable fillInStackTrace() {
        return this;
    }

    /**
     * Get triggered rule.
     * Note: the rule result is a reference to rule map and SHOULD NOT be modified.
     *
     * @return triggered rule
     * @since 1.4.2
     */
    @Override
    public CustomRule getRule() {
        return rule.as(CustomRule.class);
    }
}

定义 CustomRuleManager:

代码语言:javascript
复制
public class CustomRuleManager {

    private static final Map<String, Set<CustomRule>> greyRules = new ConcurrentHashMap<>();

    private static final RulePropertyListener LISTENER = new RulePropertyListener();
    private static SentinelProperty<List<CustomRule>> currentProperty
            = new DynamicSentinelProperty<>();

    static {
        currentProperty.addListener(LISTENER);
    }

    public static void register2Property(SentinelProperty<List<CustomRule>> property) {
        AssertUtil.notNull(property, "property cannot be null");
        synchronized (LISTENER) {
            RecordLog.info("[CustomRuleManager] Registering new property to degrade rule manager");
            currentProperty.removeListener(LISTENER);
            property.addListener(LISTENER);
            currentProperty = property;
        }
    }

    public static void checkDegrade(ResourceWrapper resource, Context context, DefaultNode node, int count, Object ...args)
            throws BlockException {

        Set<CustomRule> rules = greyRules.get(resource.getName());
        if (rules == null) {
            return;
        }

        for (CustomRule rule : rules) {
            if (!rule.passCheck(context, node, count, args)) {
                throw new CustomException(rule.getLimitApp(), rule);
            }
        }
    }

    public static boolean hasConfig(String resource) {
        if (resource == null) {
            return false;
        }
        return greyRules.containsKey(resource);
    }

    /**
     * Get a copy of the rules.
     *
     * @return a new copy of the rules.
     */
    public static List<CustomRule> getRules() {
        List<CustomRule> rules = new ArrayList<>();
        for (Map.Entry<String, Set<CustomRule>> entry : greyRules.entrySet()) {
            rules.addAll(entry.getValue());
        }
        return rules;
    }

    /**
     * Load {@link CustomRule}s, former rules will be replaced.
     *
     * @param rules new rules to load.
     */
    public static void loadRules(List<CustomRule> rules) {
        try {
            currentProperty.updateValue(rules);
        } catch (Throwable e) {
            RecordLog.warn("[CustomRuleManager] Unexpected error when loading degrade rules", e);
        }
    }

    /**
     * Set degrade rules for provided resource. Former rules of the resource will be replaced.
     *
     * @param resourceName valid resource name
     * @param rules        new rule set to load
     * @return whether the rules has actually been updated
     * @since 1.5.0
     */
    public static boolean setRulesForResource(String resourceName, Set<CustomRule> rules) {
        AssertUtil.notEmpty(resourceName, "resourceName cannot be empty");
        try {
            Map<String, Set<CustomRule>> newRuleMap = new HashMap<>(greyRules);
            if (rules == null) {
                newRuleMap.remove(resourceName);
            } else {
                Set<CustomRule> newSet = new HashSet<>();
                for (CustomRule rule : rules) {
                    if (isValidRule(rule) && resourceName.equals(rule.getResource())) {
                        newSet.add(rule);
                    }
                }
                newRuleMap.put(resourceName, newSet);
            }
            List<CustomRule> allRules = new ArrayList<>();
            for (Set<CustomRule> set : newRuleMap.values()) {
                allRules.addAll(set);
            }
            return currentProperty.updateValue(allRules);
        } catch (Throwable e) {
            RecordLog.warn(
                    "[CustomRuleManager] Unexpected error when setting degrade rules for resource: " + resourceName, e);
            return false;
        }
    }

    private static class RulePropertyListener implements PropertyListener<List<CustomRule>> {

        @Override
        public void configUpdate(List<CustomRule> conf) {
            Map<String, Set<CustomRule>> rules = loadDegradeConf(conf);
            if (rules != null) {
                greyRules.clear();
                greyRules.putAll(rules);
            }
            RecordLog.info("[CustomRuleManager] Degrade rules received: " + greyRules);
        }

        @Override
        public void configLoad(List<CustomRule> conf) {
            Map<String, Set<CustomRule>> rules = loadDegradeConf(conf);
            if (rules != null) {
                greyRules.clear();
                greyRules.putAll(rules);
            }
            RecordLog.info("[CustomRuleManager] Degrade rules loaded: " + greyRules);
        }

        private Map<String, Set<CustomRule>> loadDegradeConf(List<CustomRule> list) {
            Map<String, Set<CustomRule>> newRuleMap = new ConcurrentHashMap<>();

            if (list == null || list.isEmpty()) {
                return newRuleMap;
            }

            for (CustomRule rule : list) {
                if (!isValidRule(rule)) {
                    RecordLog.warn(
                            "[CustomRuleManager] Ignoring invalid degrade rule when loading new rules: " + rule);
                    continue;
                }

                if (StringUtil.isBlank(rule.getLimitApp())) {
                    rule.setLimitApp(RuleConstant.LIMIT_APP_DEFAULT);
                }

                String identity = rule.getResource();
                Set<CustomRule> ruleSet = newRuleMap.get(identity);
                if (ruleSet == null) {
                    ruleSet = new HashSet<>();
                    newRuleMap.put(identity, ruleSet);
                }
                ruleSet.add(rule);
            }

            return newRuleMap;
        }
    }

    public static boolean isValidRule(CustomRule rule) {
        boolean baseValid = rule != null && !StringUtil.isBlank(rule.getResource())
                && rule.getPercentage() >=  && rule.getTimeWindow() > ;
        if (!baseValid) {
            return false;
        }
        // Warn for RT mode that exceeds the {@code TIME_DROP_VALVE}.
        int maxAllowedRt = Constants.TIME_DROP_VALVE;
        return true;
    }
}

自定义 CustomSlot:

代码语言:javascript
复制
public class CustomSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
    public CustomSlot() {

    }

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {
        CustomRuleManager.checkDegrade(resourceWrapper, context, node, count, args);
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }

    @Override
    public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
        fireExit(context, resourceWrapper, count, args);
    }
}

自定义 SlotChain:

代码语言:javascript
复制
public class CustomSlotChainBuilder implements SlotChainBuilder {

    @Override
    public ProcessorSlotChain build() {

        ProcessorSlotChain chain = new DefaultProcessorSlotChain();
        chain.addLast(new NodeSelectorSlot());
        chain.addLast(new ClusterBuilderSlot());
        chain.addLast(new LogSlot());
        chain.addLast(new StatisticSlot());
        chain.addLast(new SystemSlot());
        chain.addLast(new AuthoritySlot());
        chain.addLast(new CustomSlot());
        chain.addLast(new FlowSlot());
        chain.addLast(new DegradeSlot());

        return chain;
    }
}

这里将灰度置于流控与熔断降级之前:

测试:

最后,关于 Sentinel 源码解读后面有机会再和大家分享!

参考文档:

https://github.com/alibaba/Sentinel/wiki/%E4%B8%BB%E9%A1%B5

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-09-12,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 无敌码农 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档