Sentinel服务治理工作原理【源码笔记】

目录
一、服务治理流程
1.服务治理流程图
2.重要概念
3.示例代码
二、定义流控规则
1.定义规则示例
2.将规则更新到缓存
三、定义受保护的资源
1.示例代码
2.资源上下文
3.构造资源插槽链
四、链条执行与规则判断
一、服务治理流程

通过定义规则、受保护的资源,统计调用链及运行时指标;通过比较运行指标与定义的规则,符合规则放行,不符合则阻塞。

1.服务治理流程图
2.重要概念
Resource

资源受保护的一段代码,ResourceWrapper实现类StringResourceWrapper和MethodResourceWrapper。

Context

Context存储当前调用链的元数据

String name: 上下文名称
DefaultNode entranceNode: 当前调用链的入口节点(根节点)
private Entry curEntry:当前调用Entry
private String origin:调用源可以是appId
private final boolean async:是否异步
Slot

每个插槽负责不同的职责,统计流量信息、流控规则校验等。 不同规则的Slot形成插槽链表,逐级向下执行。

Entry

Entry通行证token,允许通过的请求返回Entry对象,反之返回BlockException。

private long createTime: 创建时间用于统计RT
private Node curNode: 记录当前上下文中资源的统计信息
private Node originNode:调用源的统计信息,调用源可以是appId
protected ResourceWrapper resourceWrapper:资源封装类
Node

Node保存资源的实时统计信息

//每分钟的请求数(通过的+阻塞的)
long totalRequest()
//每分钟通过的请求数
long totalPass()
//每分钟请求成功的数量
long totalSuccess();
//每分钟请求阻塞的数量
long blockRequest()
//每分钟业务异常数量
long totalException()
//通过的QPS
double passQps()
//阻塞的QPS
double blockQps()
//总的QPS
double totalQps()
//成功的QPS
double successQps()
//成功QPS的最大值 到当前时间
double maxSuccessQps()
//异常的QPS
double exceptionQps()
//平均RT
double avgRt()
//最小的RT
double minRt()
//返回当前活动的线程数
int curThreadNum()
//前一秒的block QPS
double previousBlockQps()
//前一秒通过的 QPS
double previousPassQps()
//获取资源的有效统计信息
Map<Long, MetricNode> metrics()
//增加通过请求的数量
void addPassRequest(int count)
//增加rt时间和成功的请求数量
void addRtAndSuccess(long rt, int success)
//增加阻塞的QPS
void increaseBlockQps(int count)
//增加异常QPS
void increaseExceptionQps(int count)
//增加当前线程的数量
void increaseThreadNum()
//减少当前线程的数量
void decreaseThreadNum()
//重置计数器
void reset()
3.示例代码

FlowQpsDemo.java

public static void main(String[] args) throws Exception {
    //定义流控规则
    initFlowQpsRule();
    //打印流控信息
    tick();
    //模拟多线程触发流控
    simulateTraffic();

    System.out.println("===== begin to do flow control");
    System.out.println("only 20 requests per second can pass");
}

小结:通过流控示例代码分析流Sentinel服务治理的工作流程。

二、定义流控规则
1.定义规则示例
private static void initFlowQpsRule() {
    List<FlowRule> rules = new ArrayList<FlowRule>();
    FlowRule rule1 = new FlowRule();
    // 设置资源名称
    rule1.setResource(KEY);
    // 阀值
    rule1.setCount(20);
    // 基于运行指标QPS阀值类型
    rule1.setGrade(RuleConstant.FLOW_GRADE_QPS);
    // 对调用来源不做限制
    rule1.setLimitApp("default");
    rules.add(rule1);
    // 限流规则加入到缓存
    FlowRuleManager.loadRules(rules);
}
2.将规则更新到缓存

FlowRuleManager#成员变量

// 缓存定义的规则
private static final Map<String, List<FlowRule>> flowRules = new ConcurrentHashMap<String, List<FlowRule>>();
// 回调的Listener实现
private static final FlowPropertyListener LISTENER = new FlowPropertyListener();
// 用于注册Listener
private static SentinelProperty<List<FlowRule>> currentProperty = new DynamicSentinelProperty<List<FlowRule>>();

static {
    //将FlowPropertyListener注册到currentProperty中
    currentProperty.addListener(LISTENER);
    SCHEDULER.scheduleAtFixedRate(new MetricTimerListener(), 0, 1, TimeUnit.SECONDS);
}

小结:FlowRuleManager通过静态方法将FlowPropertyListener注册到DynamicSentinelProperty中。

FlowRuleManager#loadRules

public static void loadRules(List<FlowRule> rules) {
  currentProperty.updateValue(rules);
}

DynamicSentinelProperty#updateValue

@Override
public boolean updateValue(T newValue) {
    //...
    value = newValue;
    for (PropertyListener<T> listener : listeners) {
        //回调listener
        listener.configUpdate(newValue);
    }
    return true;
}

FlowRuleManager#FlowPropertyListener#configUpdate

private static final class FlowPropertyListener implements PropertyListener<List<FlowRule>> {
//回调执行方法,更新缓存规则
@Override
public void configUpdate(List<FlowRule> value) {
    Map<String, List<FlowRule>> rules = FlowRuleUtil.buildFlowRuleMap(value);
    if (rules != null) {
        flowRules.clear();
        flowRules.putAll(rules);
    }
    RecordLog.info("[FlowRuleManager] Flow rules received: " + flowRules);
}
}

小结:FlowRuleManager的内部静态类FlowPropertyListener#configUpdate方法将规则更新到缓存,缓存在FlowRuleManager的成员变量flowRules中。

三、定义受保护的资源
1.示例代码
while (!stop) {
    Entry entry = null;
    try {
        // 定义受保护的资源
        entry = SphU.entry(KEY);
        // 获得通行证 允许通过
        pass.addAndGet(1);
    } catch (BlockException e1) {
        // 未获得通行证 被阻塞
        block.incrementAndGet();
    } catch (Exception e2) {
        // biz exception
    } finally {
        total.incrementAndGet();
        if (entry != null) {
            entry.exit();
        }
    }
}

总结:通过SphU.entry作为入口,如果符合规则放行返回Entry实例;否则抛出BlockException被阻塞。

2.资源上下文

CtSph#entryWithPriority

private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
throws BlockException {
//从线程变量获取资源上下文
Context context = ContextUtil.getContext();
if (context instanceof NullContext) {
    return new CtEntry(resourceWrapper, null, context);
}
//创建默认资源上下文 名称为:sentinel_default_context
if (context == null) {
    context = MyContextUtil.myEnter(Constants.CONTEXT_DEFAULT_NAME, "", resourceWrapper.getType());
}

小结:先从当前线程上下文获取资源上下文Context;如果null则使用默认上下文名称sentinel_default_context通过MyContextUtil.myEnter创建。

创建上下文Context

CtSph#MyContextUtil#myEnter->ContextUtil#trueEnter

//从当前线程变量中 获取上下文
Context context = contextHolder.get();
    if (context == null) {
    ]//创建用于统计进来流量的Node
    node = new EntranceNode(new StringResourceWrapper(name, EntryType.IN), null);
    //Add entrance node.
    Constants.ROOT.addChild(node);
    //将该资源的Node加入缓存
    Map<String, DefaultNode> newMap = new HashMap<>(contextNameNodeMap.size() + 1);
    newMap.putAll(contextNameNodeMap);
    //name为上下文名称
    newMap.put(name, node);
    contextNameNodeMap = newMap;
}
//构建资源上下文 包含:统计信息的Node
context = new Context(node, name);
context.setOrigin(origin);
//设置到当前线程变量
contextHolder.set(context);

小结:创建负责入口统计信息的EntranceNode;构建资源上下文并设置到线程变量ThreadLocal中。

3.构造资源插槽链

CtSph#entryWithPriority

private Entry entryWithPriority()
        throws BlockException {
...
//构造资源的插槽链
ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);
...
}

CtSph#lookProcessChain

 ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
    //获取该资源关联的插槽
ProcessorSlotChain chain = chainMap.get(resourceWrapper);
if (chain == null) {
    //加锁
    synchronized (LOCK) {
        //再次获取该资源关联的插槽
        chain = chainMap.get(resourceWrapper);
        //插槽链为null创建新链条
        if (chain == null) {
            // Entry size limit.
            //插槽链条的数量最多为6000个
            if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
                return null;
            }
            //构造插槽链条
            chain = SlotChainProvider.newSlotChain();
            Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(
                chainMap.size() + 1);
            newMap.putAll(chainMap);
            newMap.put(resourceWrapper, chain);
            chainMap = newMap;
        }
    }
}
return chain;
}

SlotChainProvider#newSlotChain

public static ProcessorSlotChain newSlotChain() {
    // 构造插槽链
    if (builder != null) {
        return builder.build();
    }
    // 加载自定义SlotChainBuilder
    // 默认选择DefaultProcessorSlotChain
    resolveSlotChainBuilder();
    // 再次判断为null使用默认的DefaultProcessorSlotChain
    if (builder == null) {
        ...
        builder = new DefaultSlotChainBuilder();
    }
    return builder.build();
}

DefaultSlotChainBuilder#build

ProcessorSlotChain chain = new DefaultProcessorSlotChain();
chain.addLast(new NodeSelectorSlot());
chain.addLast(new ClusterBuilderSlot());
chain.addLast(new LogSlot());
chain.addLast(new StatisticSlot());
chain.addLast(new SystemSlot());
chain.addLast(new AuthoritySlot());
chain.addLast(new FlowSlot());
chain.addLast(new DegradeSlot());

小结:从创建插槽链流程可以看出,使用默认插槽构造器DefaultProcessorSlotChain创建8个插槽形成链表结构;分别为:NodeSelectorSlot、ClusterBuilderSlot、LogSlot、StatisticSlot、SystemSlot、AuthoritySlot、FlowSlot、DegradeSlot。插槽构造器也可以自定义。

四、链条执行与规则判断

CtSph#entryWithPriority

private Entry entryWithPriority(
   Entry e = new CtEntry(resourceWrapper, chain, context);
    try {
        //触发插槽链执行及规则校验
        chain.entry();
    } catch (BlockException e1) {
        //触发流控向上抛出BlockException
        e.exit(count, args);
        throw e1;
    } catch (Throwable e1) {
    }
    //返回通行证Entry
    return e;
}

小结:通过chain.entry()触发插槽链条执行,默认会经过上面8个插槽。每个插槽履行自己职责,判断是否符合流控规则,符合规则放行返回通行证Entry;不符合触发流控向上抛出BlockException。具体各个插槽的职责后续文章再做分析。

本文分享自微信公众号 - 瓜农老梁(gh_01130ae30a83)

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2019-09-14

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏BAT的男人

【小家java】并发编程中wait/notify await/singal notify/notifyAll sleep/yield 的区别以及死锁案例

调用sleep和yield的时候不释放当前线程所获得的锁,但是调用await/wait的时候却释放了其获取的锁并阻塞等待。

8820
来自专栏BAT的男人

玩转Spring Cache --- 开启基于注解的缓存功能@EnableCaching原理了解【享学Spring】

缓存现已成为了项目的标配,更是面必问的知识点。若你说你的项目中还没有使用到缓存,估计你都不太好意思介绍你的项目。

32940
来自专栏BAT的男人

【小家java】Java8新特性之---CompletableFuture的系统讲解和实例演示(使用CompletableFuture构建异步应用)

传统单线程环境下,调用函数是同步的,必须等待程序返回结果后,才可进行其他处理。因此为了提高系统整体的并发性能,引入了异步执行~

15240
来自专栏BAT的男人

【小家java】JUC并发编程工具之CountDownLatch(闭锁)、CyclicBarrier、Semaphore的使用

CountDownLatch:是一个同步工具类,用来协调多个线程之间的同步,或者说起到线程之间的通信(而不是用作互斥的作用)。

10420
来自专栏樯橹代码

nodejs之http-proxy几点常见问题

http-proxy是一个nodejs的http代理库,已经被webpack-dev-server集成进来,做代理使用。原因是在前后端分离大行其道的今天,我们如...

10630
来自专栏BAT的男人

【小家Spring】从基于@Transactional全注解方式的声明式事务入手,彻底掌握Spring事务管理的原理

上篇文章: 【小家Spring】Spring-jdbc的使用以及Spring事务管理的8种方式介绍(声明式事务+编程式事务) 介绍了Spring事务的众多使...

13440
来自专栏BAT的男人

【小家java】Java之Apache Commons-IO使用精讲(FileUtils、IOUtils、FileFilter全覆盖)

该工具类可能是平时使用得最多的工具类了。 IOUtils包含处理读、写和复制的工具方法。方法对InputStream、OutputStream、Reader和...

16720
来自专栏BAT的男人

【小家Spring】Spring异步处理@Async的使用以及原理、源码分析(@EnableAsync)

在开发过程中,我们会遇到很多使用线程池的业务场景,例如异步短信通知、异步记录操作日志。大多数使用线程池的场景,就是会将一些可以进行异步操作的业务放在线程池中去完...

39820
来自专栏BAT的男人

【小家Spring】Spring注解驱动开发---Spring Ioc容器中Bean的生命周期详解(BeanPostProcessor解析)

我们可以自定义初始化和销毁方法;容器在bean进行到当前生命周期的时候来调用我们自定义的初始化和销毁方法

6620
来自专栏BAT的男人

【小家Spring】注意BeanPostProcessor启动时对依赖Bean的“误伤”陷阱(is not eligible for getting processed by all...)

本篇博文和Spring的上下文启动有较强的关联性,同时需要读者对Spring中的BeanPostProcessor有较为熟悉的了解。若之前没有接触过的同学,建议...

62330

扫码关注云+社区

领取腾讯云代金券

年度创作总结 领取年终奖励