作者丨童海洋(马蜂窝大交通事业部研发工程师)
在实际业务中可能碰到这样的情况:
场景 1
有 A 和 B 两个服务,服务 A 作为业务访问的入口直接暴露给用户使用,服务 B 由 A 调用,负责查询一部分供应商的信息,并在设定时间内返回。这时由于出现某种原因,导致业务B的响应时间超时,需要5s才能获取到供应商的查询信息。即使我们忽略其它操作的时间,这也足以带给用户非常差的体验。
这类问题我们会用熔断降级来解决,具体是指当下游服务 B 因为某种原因突然变得不可用或响应时间超过预设值时,上游的 A 服务为了保证核心服务的可用性,会直接返回用户一个 Mock 数据,缓解服务器的压力。
场景 2
某个业务系统的正常 QPS 为 25, 这时突然出现了一个爬虫程序,以 QPS 高达 100 的频率爬取业务数据,正逐渐将业务拖垮,严重影响正常用户的访问。
对于这类明显的异常,我们可以采用流量控制的方式,当系统 QPS 超过 20 后,直接拒绝其余的访问请求,来保证系统可用。
可见,在生产环境下,熔断降级和流量控制对保证线上服务的稳定可靠起到重要作用。特别是随着微服务的流行,服务和服务之间的稳定性变得越来越重要,熔断降级和流量控制等策略及更好的实现手段也更受关注。在此,墙裂推荐大家使用 Sentinel 完成服务的熔断降级和流量控制。
一、什么是 Sentinel?
Sentinel 是面向分布式服务架构的轻量级流量控制组件,由阿里开源,主要以流量为切入点,从限流、流量整形、熔断降级、系统负载保护等多个维度来保障微服务的稳定性。
这里借用一张表,来说明 Sentinel 和其它主流中间件的性能对比:
从上面的对比我们可以看到,Sentinel 的优势还是比较明显的,比如更丰富的熔断降级和限流策略、支持系统自适应保护、比较易用的控制台、良好的扩展性,以及更广泛的开源生态等。
以下我将结合官方文档和在业务实际应用过程中的理解,介绍 Sentinel 的原理、核心概念和如何使用。
二、Sentinel 原理及核心概念
2.1 核心概念
Sentinel 控制的对象即为资源,调用 Entry 方法的时候会 New 一个资源对象,资源由一个全局唯一的资源名称标识。
Sentinel 的上下文,包含了上下文名称,一个调用链一个 Context,可以显示创建或者在调用 Entry 的时候隐式创建。
持有运行时的资源的各种统计数据。
代表一次对资源的访问,每访问一个资源都会创建一个 Entry,在 Context 中以一个双向链表存在。
必须调用 exit() 方法的原因就在于这个链表:exit 方法中会判断上下文的当前 entry 是不是 this,此时其他 entry 掉用 exit 会发现不相等,从而抛出异常。
处理插槽,资源的各种控制都通过不同的 Slot 实现类去完成。
由各个处理插槽组成的链表,每个资源在整个服务中对应一个处理链。
用户定义的各种规则。
加载并管理 Rule。
2.2 工作原理
在 Sentinel 里,所有的资源都对应一个资源名称(resourceName),每次资源调用都会创建一个 Entry 对象。Entry 可以通过对主流框架的适配自动创建,也可以通过注解的方式或调用 SphU API 显式创建。Entry 创建的时候,同时也会创建一系列功能插槽(slot chain),这些插槽有不同的职责,例如:
整体框架图:
Sentinel 将 SlotChainBuilder 作为 SPI 接口进行扩展,使得 Slot Chain 具备了扩展的能力。可以自行加入自定义的 slot 并编排 slot 间的顺序,来为 Sentinel 添加自定义功能:
下面重点讲解流量控制和熔断降级部分。
2.2.1 流量控制
本次介绍不涉及集群流控,因为集群流控需要通过配置单独的 Server 与其它实例通信,来判断是否调用规则。而在我们现有的环境下,生产部署的集群是不区分 Client 和 Server 的。因此,以下介绍以单个实例的流量控制为主。
上文简单提到,流量控制(Flow Control)的原理是监控应用流量的 QPS 或并发线程数等指标,当达到指定的阈值时对流量进行控制,以避免被瞬时的流量高峰冲垮,从而保障应用的高可用性。
同一个资源可以创建多条限流规则。Sentinel 中的 FlowSlot 会对该资源的所有限流规则依次遍历,直到有规则触发限流或者所有规则遍历完毕。
并发线程数限流用于保护业务线程数不被耗尽。例如,当应用所依赖的下游应用由于某种原因导致服务不稳定、响应延迟增加,对于调用者来说,意味着吞吐量下降和更多的线程数占用,极端情况下甚至导致线程池耗尽。为应对太多线程占用的情况,业内有使用隔离的方案,比如通过不同业务逻辑使用不同线程池来隔离业务自身之间的资源争抢(线程池隔离)。这种隔离方案虽然隔离性比较好,但是代价就是线程数目太多,线程上下文切换的 overhead 比较大,特别是对低延时的调用有比较大的影响。
Sentinel 并发线程数限流不负责创建和管理线程池,而是简单统计当前请求上下文的线程数目,如果超出阈值,新的请求会被立即拒绝,效果类似于信号量隔离。
当 QPS 超过某个阈值的时候,则采取措施进行流量控制。流量控制的手段包括以下几种:直接拒绝、Warm Up、匀速排队。对应 FlowRule 中的 controlBehavior 字段。
直接拒绝(RuleConstant.CONTROL_BEHAVIOR_DEFAULT)方式是默认的流量控制方式,当 QPS 超过任意规则的阈值后,新的请求就会被立即拒绝,拒绝方式为抛出 FlowException。这种方式适用于对系统处理能力确切已知的情况下,比如通过压测确定了系统的准确水位时。
Warm Up(RuleConstant.CONTROL_BEHAVIOR_WARM_UP)方式,即预热/冷启动方式。系统长期处于低水位的情况下,当流量突然增加时,直接把系统拉升到高水位可能瞬间把系统压垮。通过「冷启动」让通过的流量缓慢增加,在一定时间内逐渐增加到阈值上限,可以给冷系统一个预热的时间,避免冷系统被压垮。
通常冷启动的过程系统允许通过的 QPS 曲线如下图所示:
匀速排队(RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER)方式会严格控制请求通过的间隔时间,即让请求以均匀的速度通过,与之对应的是漏桶算法。
匀速排队方式的作用如下图所示:
这种方式主要用于处理间隔性突发的流量,例如消息队列。想象一下这样的场景,在某一秒有大量的请求到来,而接下来的几秒则处于空闲状态,我们希望系统能够在接下来的空闲期逐渐处理这些请求,而不是在第一秒直接拒绝多余的请求。
即 Warm Up + 匀速排队。
调用关系包括调用方、被调用方;一个方法又可能会调用其它方法,形成一个调用链路的层次关系。Sentinel 通过 NodeSelectorSlot 建立不同资源间的调用的关系,并且通过 ClusterNodeBuilderSlot 记录每个资源的实时统计信息。
有了调用链路的统计信息,我们可以衍生出多种流量控制手段。
1)根据调用方限流
ContextUtil.enter(resourceName, origin) 方法中的 origin 参数标明了调用方身份。这些信息会在 ClusterBuilderSlot 中被统计。
流控规则中的 limitApp 字段用于根据调用来源进行流量控制。该字段的值有以下三种选项,分别对应不同的场景:
同一个资源名可以配置多条规则,规则的生效顺序为:{some_origin_name} > other > default。
2)根据调用链路入口限流:链路限流
NodeSelectorSlot 中记录了资源之间的调用链路,这些资源通过调用关系,相互之间构成一棵调用树。这棵树的根节点是一个名字为 machine-root 的虚拟节点,调用链的入口都是这个虚节点的子节点。
一棵典型的调用树如下图所示:
上图中来自入口 Entrance1 和 Entrance2 的请求都调用到了资源 NodeA,Sentinel 允许只根据某个入口的统计信息对资源限流。比如我们可以设置 FlowRule.strategy 为 RuleConstant.CHAIN,同时设置 FlowRule.refResource 为 Entrance1 来表示只有从入口 Entrance1 的调用才会记录到 NodeA 的限流统计当中,而不关心经 Entrance2 到来的调用。
调用链的入口(上下文)是通过 API 方法 ContextUtil.enter(contextName) 定义的,其中 contextName 即对应调用链路入口名称。
当两个资源之间具有资源争抢或者依赖关系的时候,这两个资源便具有了关联。比如对数据库同一个字段的读操作和写操作存在争抢,读的速度过高会影响写的速度,写的速度过高会影响读的速度。如果放任读写操作争抢资源,则争抢本身带来的开销会降低整体的吞吐量。
这里可使用关联限流来避免具有关联关系的资源之间过度争抢。举例来说,read_db 和 write_db 这两个资源分别代表数据库读写,我们可以给 read_db 设置限流规则来达到写优先的目的:设置FlowRule.strategy为RuleConstant.RELATE同时设置FlowRule.refResource为write_db。这样当写库操作过于频繁时,读数据的请求会被限流。
规则判断流程:
2.2.2 熔断降级
Sentinel 目前有三种降级策略。当然, 我们还可以通过自定义 Slot 和 Rule 的方式制定符合要求的降级策略。
规则判断流程:
当 SlotChain 执行到 DegradeSlot 时,DegradeSlot 先调用 DegradeManager 判断规则。执行完毕后,如果还有下一个 Slot,就执行下一个 Slot。
DegradeManager 会调用 DegradeRule 进行判断。DegradeRule 在判断符合熔断规则的情况下会抛出异常,否则资源正常执行。
三、Sentinel 使用示例
3.1 Sentinel 启动配置项
3.2 编码方法 (调用SphU API显示创建)
public ResultWrapper<OrderDTO> getOrder(String id) {
/**
* pre inception
*/
Entry entry = null;
try {
entry = SphU.entry("getOrder");
ResultWrapper<OrderDTO> orderResultWrapper = testRestManagerImpl.getOrderInfo(id);
if (orderResultWrapper.getData() == null) {
orderResultWrapper.setData(new OrderDTO("No Order", "No Shop", " Random Data"));
}
return orderResultWrapper;
} catch (BlockException e) {
Tracer.trace(e);
log.info(id + " 服务熔断");
/**
* 可以mock一个假数据返回
*/
return new ResultWrapper<>();
} catch (Exception e) {
log.info(id + " 业务异常");
/**
* 业务异常,mock假数据返回
*/
return new ResultWrapper<>();
} finally {
if (entry != null) {
entry.exit();
}
}
/**
* 之前也可以不返回结果,这里调用其他的业务逻辑
* post inception
*/
}
由实例代码看出,编码方式对系统是有一定入侵的。但好处也很明显,就是资源真正由编码人员掌控,资源甚至可以是一个小小的代码块。
3.3 注解方式
核心注解:@SentinelResource
首先,注入SentinelResource注解
@Configuration
public class SentinelAspectConfiguration {
@Bean
public SentinelResourceAspect sentinelResourceAspect() {
return new SentinelResourceAspect();
}
}
然后直接使用注解:
@SentinelResource(value = "getOrder",entryType = EntryType.IN, fallback = "getOrderFallBack", fallbackClass = {ITestRestAPIUtil.class}, blockHandler = "getOrderHandler", blockHandlerClass = {ITestRestAPIUtil.class})
public ResultWrapper<OrderDTO> getOrder(String id) {
ResultWrapper<OrderDTO> orderResultWrapper = testRestManagerImpl.getOrderInfo(id);
if (orderResultWrapper.getData() == null) {
orderResultWrapper.setData(new OrderDTO("No Order", "No Shop", " Random Data"));
}
return orderResultWrapper;
}
然后实现我们设定的BlockHander和fallback方法
@Slf4j
public class ITestRestAPIUtil {
public static ResultWrapper<OrderDTO> getOrderHandler(String id, BlockException ex) {
// log.info(JSON.toJSONString(ex));
//这里可以进行埋点
//mock假数据返回
return new ResultWrapper().setSuccess(true).setData(new OrderDTO()).setCode("服务已被降级").addMessage("服务已被降级");
}
public static ResultWrapper<OrderDTO> getOrderFallBack(String id, Throwable throwable) {
log.info("捕捉到业务异常:{}", throwable.toString());
// log.error("getOrderError", id);
//mock假数据返回
return new ResultWrapper<OrderDTO>().setSuccess(false).addMessage("捕捉到业务异常");
}
}
由实例代码可以看出,注解方式对系统侵入较小,利于维护与修改。但由于 SentinelResource 仅作用于 Method,资源和方法的界限在这里混淆。
3.4 动态配置规则
如果规则的修改无法实时改变,那Sentinel的效果将大打折扣。我们可以配合 Apollo 来动态的修改所需规则。
官方给出了 sentinel-datasource-apollo 的依赖来支持 Apollo 的实现,个人认为实用意义不大。因为项目中可能不止 Sentinel 会用到 Apollo,没有理由将 Sentinel 和其它业务使用完全剥离开来。而且该依赖也只是将 Apollo 的使用封装了起来。
@Configuration
public class SentinelConfig {
@Resource
private ApolloConfigureCenter apolloConfigureCenter;
@PostConstruct
private void initFlowRule() {
List<FlowRule> rules = JSON.parseArray(apolloConfigureCenter.getFlowControlRule(), FlowRule.class);
FlowRuleManager.loadRules(rules);
}
@PostConstruct
private void initDegradeRule() {
List<DegradeRule> rules = JSON.parseArray(apolloConfigureCenter.getDegradeRule(), DegradeRule.class);
DegradeRuleManager.loadRules(rules);
}
//自定义规则,另一种写在代码中方法,也可以配置到apollo中
@PostConstruct
private void initCustomRule() {
List<CustomRule> rules = new ArrayList<>();
CustomRule customRule = new CustomRule();
customRule.setPercentage();
customRule.setTimeWindow();
customRule.setResource("testCustom");
rules.add(customRule);
CustomRuleManager.loadRules(rules);
}
}
可以看出,上面的实例是通过 loadRules 这个方法来注入规则的。分析源码可以得知,loadRules 其实是对之前规则的一次重制,这就意味着我们可以动态的修改规则,只需要对 Apollo 进行监听就可以了。
其它规则配置大致与此思路类似,都是 Clear 原先的 Rules,然后添加新的 Rules。
3.5 实时监控使用方式
下载 Sentinel 源码,打包运行 sentinel-dashboard.jar 就行了。
java -Dserver.port=8082 -Dcsp.sentinel.dashboard.server=localhost:8082 -jar sentinel-dashboard/target/sentinel-dashboard.jar 可以指定实时监控页面的端口号与 IP 地址。
四、Sentinel 进阶使用
自定义 Rule 与自定义 SlotChain
Sentinel已经实现的功能基本可以满足大部分情况下的需求。但在一些特殊场景下,我们还是需要设定自己的规则。
这里以通过 Sentinel 实现灰度作为示例。
首先自定义一个CustomRule:
public class CustomRule extends AbstractRule {
private static final int RT_MAX_EXCEED_N = ;
private double percentage;
private int timeWindow;
private final AtomicBoolean cut = new AtomicBoolean(false);
private AtomicLong passCount = new AtomicLong(0L);
public CustomRule() {
}
public CustomRule(String resourceName) {
this.setResource(resourceName);
}
public double getPercentage() {
return this.percentage;
}
public CustomRule setPercentage(double percentage) {
this.percentage = percentage;
return this;
}
public int getTimeWindow() {
return timeWindow;
}
public CustomRule setTimeWindow(int timeWindow) {
this.timeWindow = timeWindow;
return this;
}
private boolean isCut() {
return this.cut.get();
}
private void setCut(boolean cut) {
this.cut.set(cut);
}
public AtomicLong getPassCount() {
return this.passCount;
}
public boolean equals(Object o) {
if (this == o) {
return true;
} else if (!(o instanceof CustomRule)) {
return false;
} else if (!super.equals(o)) {
return false;
} else {
CustomRule that = (CustomRule)o;
if (this.percentage != that.percentage) {
return false;
} else {
return false;
}
}
}
public int hashCode() {
int result = super.hashCode();
result = * result + (new Double(this.percentage)).hashCode();
result = * result + this.timeWindow;
return result;
}
public boolean passCheck(Context context, DefaultNode node, int acquireCount, Object... args) {
//实际情况下肯定不会只传一个参数,需要看情况自己解析
Object[] objects = args;
String uid = (String)objects[];
if (StringUtils.isBlank(uid)) {
return true;
} else {
取后两位与灰度值进行比较
double greyUID = Double.parseDouble(uid.substring(uid.length()-));
if (greyUID < this.percentage) {
return true;
}
}
return false;
}
public String toString() {
return "CustomRule{resource=" + this.getResource() + ", percentage=" + this.percentage + ", limitApp=" + this.getLimitApp() + ", timeWindow=" + this.timeWindow + "}";
}
private static final class ResetTask implements Runnable {
private CustomRule rule;
ResetTask(CustomRule rule) {
this.rule = rule;
}
public void run() {
this.rule.getPassCount().set(0L);
this.rule.cut.set(false);
}
}
}
定义 CustomRuleException:
public class CustomException extends BlockException {
public CustomException(String ruleLimitApp) {
super(ruleLimitApp);
}
public CustomException(String ruleLimitApp, CustomRule rule) {
super(ruleLimitApp, rule);
}
public CustomException(String message, Throwable cause) {
super(message, cause);
}
public CustomException(String ruleLimitApp, String message) {
super(ruleLimitApp, message);
}
@Override
public Throwable fillInStackTrace() {
return this;
}
/**
* Get triggered rule.
* Note: the rule result is a reference to rule map and SHOULD NOT be modified.
*
* @return triggered rule
* @since 1.4.2
*/
@Override
public CustomRule getRule() {
return rule.as(CustomRule.class);
}
}
定义 CustomRuleManager:
public class CustomRuleManager {
private static final Map<String, Set<CustomRule>> greyRules = new ConcurrentHashMap<>();
private static final RulePropertyListener LISTENER = new RulePropertyListener();
private static SentinelProperty<List<CustomRule>> currentProperty
= new DynamicSentinelProperty<>();
static {
currentProperty.addListener(LISTENER);
}
public static void register2Property(SentinelProperty<List<CustomRule>> property) {
AssertUtil.notNull(property, "property cannot be null");
synchronized (LISTENER) {
RecordLog.info("[CustomRuleManager] Registering new property to degrade rule manager");
currentProperty.removeListener(LISTENER);
property.addListener(LISTENER);
currentProperty = property;
}
}
public static void checkDegrade(ResourceWrapper resource, Context context, DefaultNode node, int count, Object ...args)
throws BlockException {
Set<CustomRule> rules = greyRules.get(resource.getName());
if (rules == null) {
return;
}
for (CustomRule rule : rules) {
if (!rule.passCheck(context, node, count, args)) {
throw new CustomException(rule.getLimitApp(), rule);
}
}
}
public static boolean hasConfig(String resource) {
if (resource == null) {
return false;
}
return greyRules.containsKey(resource);
}
/**
* Get a copy of the rules.
*
* @return a new copy of the rules.
*/
public static List<CustomRule> getRules() {
List<CustomRule> rules = new ArrayList<>();
for (Map.Entry<String, Set<CustomRule>> entry : greyRules.entrySet()) {
rules.addAll(entry.getValue());
}
return rules;
}
/**
* Load {@link CustomRule}s, former rules will be replaced.
*
* @param rules new rules to load.
*/
public static void loadRules(List<CustomRule> rules) {
try {
currentProperty.updateValue(rules);
} catch (Throwable e) {
RecordLog.warn("[CustomRuleManager] Unexpected error when loading degrade rules", e);
}
}
/**
* Set degrade rules for provided resource. Former rules of the resource will be replaced.
*
* @param resourceName valid resource name
* @param rules new rule set to load
* @return whether the rules has actually been updated
* @since 1.5.0
*/
public static boolean setRulesForResource(String resourceName, Set<CustomRule> rules) {
AssertUtil.notEmpty(resourceName, "resourceName cannot be empty");
try {
Map<String, Set<CustomRule>> newRuleMap = new HashMap<>(greyRules);
if (rules == null) {
newRuleMap.remove(resourceName);
} else {
Set<CustomRule> newSet = new HashSet<>();
for (CustomRule rule : rules) {
if (isValidRule(rule) && resourceName.equals(rule.getResource())) {
newSet.add(rule);
}
}
newRuleMap.put(resourceName, newSet);
}
List<CustomRule> allRules = new ArrayList<>();
for (Set<CustomRule> set : newRuleMap.values()) {
allRules.addAll(set);
}
return currentProperty.updateValue(allRules);
} catch (Throwable e) {
RecordLog.warn(
"[CustomRuleManager] Unexpected error when setting degrade rules for resource: " + resourceName, e);
return false;
}
}
private static class RulePropertyListener implements PropertyListener<List<CustomRule>> {
@Override
public void configUpdate(List<CustomRule> conf) {
Map<String, Set<CustomRule>> rules = loadDegradeConf(conf);
if (rules != null) {
greyRules.clear();
greyRules.putAll(rules);
}
RecordLog.info("[CustomRuleManager] Degrade rules received: " + greyRules);
}
@Override
public void configLoad(List<CustomRule> conf) {
Map<String, Set<CustomRule>> rules = loadDegradeConf(conf);
if (rules != null) {
greyRules.clear();
greyRules.putAll(rules);
}
RecordLog.info("[CustomRuleManager] Degrade rules loaded: " + greyRules);
}
private Map<String, Set<CustomRule>> loadDegradeConf(List<CustomRule> list) {
Map<String, Set<CustomRule>> newRuleMap = new ConcurrentHashMap<>();
if (list == null || list.isEmpty()) {
return newRuleMap;
}
for (CustomRule rule : list) {
if (!isValidRule(rule)) {
RecordLog.warn(
"[CustomRuleManager] Ignoring invalid degrade rule when loading new rules: " + rule);
continue;
}
if (StringUtil.isBlank(rule.getLimitApp())) {
rule.setLimitApp(RuleConstant.LIMIT_APP_DEFAULT);
}
String identity = rule.getResource();
Set<CustomRule> ruleSet = newRuleMap.get(identity);
if (ruleSet == null) {
ruleSet = new HashSet<>();
newRuleMap.put(identity, ruleSet);
}
ruleSet.add(rule);
}
return newRuleMap;
}
}
public static boolean isValidRule(CustomRule rule) {
boolean baseValid = rule != null && !StringUtil.isBlank(rule.getResource())
&& rule.getPercentage() >= && rule.getTimeWindow() > ;
if (!baseValid) {
return false;
}
// Warn for RT mode that exceeds the {@code TIME_DROP_VALVE}.
int maxAllowedRt = Constants.TIME_DROP_VALVE;
return true;
}
}
自定义 CustomSlot:
public class CustomSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
public CustomSlot() {
}
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {
CustomRuleManager.checkDegrade(resourceWrapper, context, node, count, args);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
fireExit(context, resourceWrapper, count, args);
}
}
自定义 SlotChain:
public class CustomSlotChainBuilder implements SlotChainBuilder {
@Override
public ProcessorSlotChain build() {
ProcessorSlotChain chain = new DefaultProcessorSlotChain();
chain.addLast(new NodeSelectorSlot());
chain.addLast(new ClusterBuilderSlot());
chain.addLast(new LogSlot());
chain.addLast(new StatisticSlot());
chain.addLast(new SystemSlot());
chain.addLast(new AuthoritySlot());
chain.addLast(new CustomSlot());
chain.addLast(new FlowSlot());
chain.addLast(new DegradeSlot());
return chain;
}
}
这里将灰度置于流控与熔断降级之前:
测试:
最后,关于 Sentinel 源码解读后面有机会再和大家分享!
参考文档:
https://github.com/alibaba/Sentinel/wiki/%E4%B8%BB%E9%A1%B5