首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Sentinel服务治理工作原理【源码笔记】

Sentinel服务治理工作原理【源码笔记】

作者头像
瓜农老梁
发布2019-09-17 16:21:28
5230
发布2019-09-17 16:21:28
举报
文章被收录于专栏:瓜农老梁瓜农老梁
目录
一、服务治理流程
1.服务治理流程图
2.重要概念
3.示例代码
二、定义流控规则
1.定义规则示例
2.将规则更新到缓存
三、定义受保护的资源
1.示例代码
2.资源上下文
3.构造资源插槽链
四、链条执行与规则判断
一、服务治理流程

通过定义规则、受保护的资源,统计调用链及运行时指标;通过比较运行指标与定义的规则,符合规则放行,不符合则阻塞。

1.服务治理流程图
2.重要概念
Resource

资源受保护的一段代码,ResourceWrapper实现类StringResourceWrapper和MethodResourceWrapper。

Context

Context存储当前调用链的元数据

String name: 上下文名称
DefaultNode entranceNode: 当前调用链的入口节点(根节点)
private Entry curEntry:当前调用Entry
private String origin:调用源可以是appId
private final boolean async:是否异步
Slot

每个插槽负责不同的职责,统计流量信息、流控规则校验等。 不同规则的Slot形成插槽链表,逐级向下执行。

Entry

Entry通行证token,允许通过的请求返回Entry对象,反之返回BlockException。

private long createTime: 创建时间用于统计RT
private Node curNode: 记录当前上下文中资源的统计信息
private Node originNode:调用源的统计信息,调用源可以是appId
protected ResourceWrapper resourceWrapper:资源封装类
Node

Node保存资源的实时统计信息

//每分钟的请求数(通过的+阻塞的)
long totalRequest()
//每分钟通过的请求数
long totalPass()
//每分钟请求成功的数量
long totalSuccess();
//每分钟请求阻塞的数量
long blockRequest()
//每分钟业务异常数量
long totalException()
//通过的QPS
double passQps()
//阻塞的QPS
double blockQps()
//总的QPS
double totalQps()
//成功的QPS
double successQps()
//成功QPS的最大值 到当前时间
double maxSuccessQps()
//异常的QPS
double exceptionQps()
//平均RT
double avgRt()
//最小的RT
double minRt()
//返回当前活动的线程数
int curThreadNum()
//前一秒的block QPS
double previousBlockQps()
//前一秒通过的 QPS
double previousPassQps()
//获取资源的有效统计信息
Map<Long, MetricNode> metrics()
//增加通过请求的数量
void addPassRequest(int count)
//增加rt时间和成功的请求数量
void addRtAndSuccess(long rt, int success)
//增加阻塞的QPS
void increaseBlockQps(int count)
//增加异常QPS
void increaseExceptionQps(int count)
//增加当前线程的数量
void increaseThreadNum()
//减少当前线程的数量
void decreaseThreadNum()
//重置计数器
void reset()
3.示例代码

FlowQpsDemo.java

public static void main(String[] args) throws Exception {
    //定义流控规则
    initFlowQpsRule();
    //打印流控信息
    tick();
    //模拟多线程触发流控
    simulateTraffic();

    System.out.println("===== begin to do flow control");
    System.out.println("only 20 requests per second can pass");
}

小结:通过流控示例代码分析流Sentinel服务治理的工作流程。

二、定义流控规则
1.定义规则示例
private static void initFlowQpsRule() {
    List<FlowRule> rules = new ArrayList<FlowRule>();
    FlowRule rule1 = new FlowRule();
    // 设置资源名称
    rule1.setResource(KEY);
    // 阀值
    rule1.setCount(20);
    // 基于运行指标QPS阀值类型
    rule1.setGrade(RuleConstant.FLOW_GRADE_QPS);
    // 对调用来源不做限制
    rule1.setLimitApp("default");
    rules.add(rule1);
    // 限流规则加入到缓存
    FlowRuleManager.loadRules(rules);
}
2.将规则更新到缓存

FlowRuleManager#成员变量

// 缓存定义的规则
private static final Map<String, List<FlowRule>> flowRules = new ConcurrentHashMap<String, List<FlowRule>>();
// 回调的Listener实现
private static final FlowPropertyListener LISTENER = new FlowPropertyListener();
// 用于注册Listener
private static SentinelProperty<List<FlowRule>> currentProperty = new DynamicSentinelProperty<List<FlowRule>>();

static {
    //将FlowPropertyListener注册到currentProperty中
    currentProperty.addListener(LISTENER);
    SCHEDULER.scheduleAtFixedRate(new MetricTimerListener(), 0, 1, TimeUnit.SECONDS);
}

小结:FlowRuleManager通过静态方法将FlowPropertyListener注册到DynamicSentinelProperty中。

FlowRuleManager#loadRules

public static void loadRules(List<FlowRule> rules) {
  currentProperty.updateValue(rules);
}

DynamicSentinelProperty#updateValue

@Override
public boolean updateValue(T newValue) {
    //...
    value = newValue;
    for (PropertyListener<T> listener : listeners) {
        //回调listener
        listener.configUpdate(newValue);
    }
    return true;
}

FlowRuleManager#FlowPropertyListener#configUpdate

private static final class FlowPropertyListener implements PropertyListener<List<FlowRule>> {
//回调执行方法,更新缓存规则
@Override
public void configUpdate(List<FlowRule> value) {
    Map<String, List<FlowRule>> rules = FlowRuleUtil.buildFlowRuleMap(value);
    if (rules != null) {
        flowRules.clear();
        flowRules.putAll(rules);
    }
    RecordLog.info("[FlowRuleManager] Flow rules received: " + flowRules);
}
}

小结:FlowRuleManager的内部静态类FlowPropertyListener#configUpdate方法将规则更新到缓存,缓存在FlowRuleManager的成员变量flowRules中。

三、定义受保护的资源
1.示例代码
while (!stop) {
    Entry entry = null;
    try {
        // 定义受保护的资源
        entry = SphU.entry(KEY);
        // 获得通行证 允许通过
        pass.addAndGet(1);
    } catch (BlockException e1) {
        // 未获得通行证 被阻塞
        block.incrementAndGet();
    } catch (Exception e2) {
        // biz exception
    } finally {
        total.incrementAndGet();
        if (entry != null) {
            entry.exit();
        }
    }
}

总结:通过SphU.entry作为入口,如果符合规则放行返回Entry实例;否则抛出BlockException被阻塞。

2.资源上下文

CtSph#entryWithPriority

private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
throws BlockException {
//从线程变量获取资源上下文
Context context = ContextUtil.getContext();
if (context instanceof NullContext) {
    return new CtEntry(resourceWrapper, null, context);
}
//创建默认资源上下文 名称为:sentinel_default_context
if (context == null) {
    context = MyContextUtil.myEnter(Constants.CONTEXT_DEFAULT_NAME, "", resourceWrapper.getType());
}

小结:先从当前线程上下文获取资源上下文Context;如果null则使用默认上下文名称sentinel_default_context通过MyContextUtil.myEnter创建。

创建上下文Context

CtSph#MyContextUtil#myEnter->ContextUtil#trueEnter

//从当前线程变量中 获取上下文
Context context = contextHolder.get();
    if (context == null) {
    ]//创建用于统计进来流量的Node
    node = new EntranceNode(new StringResourceWrapper(name, EntryType.IN), null);
    //Add entrance node.
    Constants.ROOT.addChild(node);
    //将该资源的Node加入缓存
    Map<String, DefaultNode> newMap = new HashMap<>(contextNameNodeMap.size() + 1);
    newMap.putAll(contextNameNodeMap);
    //name为上下文名称
    newMap.put(name, node);
    contextNameNodeMap = newMap;
}
//构建资源上下文 包含:统计信息的Node
context = new Context(node, name);
context.setOrigin(origin);
//设置到当前线程变量
contextHolder.set(context);

小结:创建负责入口统计信息的EntranceNode;构建资源上下文并设置到线程变量ThreadLocal中。

3.构造资源插槽链

CtSph#entryWithPriority

private Entry entryWithPriority()
        throws BlockException {
...
//构造资源的插槽链
ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);
...
}

CtSph#lookProcessChain

 ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
    //获取该资源关联的插槽
ProcessorSlotChain chain = chainMap.get(resourceWrapper);
if (chain == null) {
    //加锁
    synchronized (LOCK) {
        //再次获取该资源关联的插槽
        chain = chainMap.get(resourceWrapper);
        //插槽链为null创建新链条
        if (chain == null) {
            // Entry size limit.
            //插槽链条的数量最多为6000个
            if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
                return null;
            }
            //构造插槽链条
            chain = SlotChainProvider.newSlotChain();
            Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(
                chainMap.size() + 1);
            newMap.putAll(chainMap);
            newMap.put(resourceWrapper, chain);
            chainMap = newMap;
        }
    }
}
return chain;
}

SlotChainProvider#newSlotChain

public static ProcessorSlotChain newSlotChain() {
    // 构造插槽链
    if (builder != null) {
        return builder.build();
    }
    // 加载自定义SlotChainBuilder
    // 默认选择DefaultProcessorSlotChain
    resolveSlotChainBuilder();
    // 再次判断为null使用默认的DefaultProcessorSlotChain
    if (builder == null) {
        ...
        builder = new DefaultSlotChainBuilder();
    }
    return builder.build();
}

DefaultSlotChainBuilder#build

ProcessorSlotChain chain = new DefaultProcessorSlotChain();
chain.addLast(new NodeSelectorSlot());
chain.addLast(new ClusterBuilderSlot());
chain.addLast(new LogSlot());
chain.addLast(new StatisticSlot());
chain.addLast(new SystemSlot());
chain.addLast(new AuthoritySlot());
chain.addLast(new FlowSlot());
chain.addLast(new DegradeSlot());

小结:从创建插槽链流程可以看出,使用默认插槽构造器DefaultProcessorSlotChain创建8个插槽形成链表结构;分别为:NodeSelectorSlot、ClusterBuilderSlot、LogSlot、StatisticSlot、SystemSlot、AuthoritySlot、FlowSlot、DegradeSlot。插槽构造器也可以自定义。

四、链条执行与规则判断

CtSph#entryWithPriority

private Entry entryWithPriority(
   Entry e = new CtEntry(resourceWrapper, chain, context);
    try {
        //触发插槽链执行及规则校验
        chain.entry();
    } catch (BlockException e1) {
        //触发流控向上抛出BlockException
        e.exit(count, args);
        throw e1;
    } catch (Throwable e1) {
    }
    //返回通行证Entry
    return e;
}

小结:通过chain.entry()触发插槽链条执行,默认会经过上面8个插槽。每个插槽履行自己职责,判断是否符合流控规则,符合规则放行返回通行证Entry;不符合触发流控向上抛出BlockException。具体各个插槽的职责后续文章再做分析。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-09-14,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 瓜农老梁 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 目录
  • 一、服务治理流程
  • 1.服务治理流程图
  • 2.重要概念
  • Resource
  • Context
  • Slot
  • Entry
  • Node
  • 3.示例代码
  • 二、定义流控规则
  • 1.定义规则示例
  • 2.将规则更新到缓存
  • 三、定义受保护的资源
  • 1.示例代码
  • 2.资源上下文
  • 创建上下文Context
  • 3.构造资源插槽链
  • 四、链条执行与规则判断
相关产品与服务
腾讯云代码分析
腾讯云代码分析(内部代号CodeDog)是集众多代码分析工具的云原生、分布式、高性能的代码综合分析跟踪管理平台,其主要功能是持续跟踪分析代码,观测项目代码质量,支撑团队传承代码文化。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档