一、服务治理流程
1.服务治理流程图
2.重要概念
3.示例代码
二、定义流控规则
1.定义规则示例
2.将规则更新到缓存
三、定义受保护的资源
1.示例代码
2.资源上下文
3.构造资源插槽链
四、链条执行与规则判断
通过定义规则、受保护的资源,统计调用链及运行时指标;通过比较运行指标与定义的规则,符合规则放行,不符合则阻塞。
资源受保护的一段代码,ResourceWrapper实现类StringResourceWrapper和MethodResourceWrapper。
Context存储当前调用链的元数据
String name: 上下文名称
DefaultNode entranceNode: 当前调用链的入口节点(根节点)
private Entry curEntry:当前调用Entry
private String origin:调用源可以是appId
private final boolean async:是否异步
每个插槽负责不同的职责,统计流量信息、流控规则校验等。 不同规则的Slot形成插槽链表,逐级向下执行。
Entry通行证token,允许通过的请求返回Entry对象,反之返回BlockException。
private long createTime: 创建时间用于统计RT
private Node curNode: 记录当前上下文中资源的统计信息
private Node originNode:调用源的统计信息,调用源可以是appId
protected ResourceWrapper resourceWrapper:资源封装类
Node保存资源的实时统计信息
//每分钟的请求数(通过的+阻塞的)
long totalRequest()
//每分钟通过的请求数
long totalPass()
//每分钟请求成功的数量
long totalSuccess();
//每分钟请求阻塞的数量
long blockRequest()
//每分钟业务异常数量
long totalException()
//通过的QPS
double passQps()
//阻塞的QPS
double blockQps()
//总的QPS
double totalQps()
//成功的QPS
double successQps()
//成功QPS的最大值 到当前时间
double maxSuccessQps()
//异常的QPS
double exceptionQps()
//平均RT
double avgRt()
//最小的RT
double minRt()
//返回当前活动的线程数
int curThreadNum()
//前一秒的block QPS
double previousBlockQps()
//前一秒通过的 QPS
double previousPassQps()
//获取资源的有效统计信息
Map<Long, MetricNode> metrics()
//增加通过请求的数量
void addPassRequest(int count)
//增加rt时间和成功的请求数量
void addRtAndSuccess(long rt, int success)
//增加阻塞的QPS
void increaseBlockQps(int count)
//增加异常QPS
void increaseExceptionQps(int count)
//增加当前线程的数量
void increaseThreadNum()
//减少当前线程的数量
void decreaseThreadNum()
//重置计数器
void reset()
FlowQpsDemo.java
public static void main(String[] args) throws Exception {
//定义流控规则
initFlowQpsRule();
//打印流控信息
tick();
//模拟多线程触发流控
simulateTraffic();
System.out.println("===== begin to do flow control");
System.out.println("only 20 requests per second can pass");
}
小结:通过流控示例代码分析流Sentinel服务治理的工作流程。
private static void initFlowQpsRule() {
List<FlowRule> rules = new ArrayList<FlowRule>();
FlowRule rule1 = new FlowRule();
// 设置资源名称
rule1.setResource(KEY);
// 阀值
rule1.setCount(20);
// 基于运行指标QPS阀值类型
rule1.setGrade(RuleConstant.FLOW_GRADE_QPS);
// 对调用来源不做限制
rule1.setLimitApp("default");
rules.add(rule1);
// 限流规则加入到缓存
FlowRuleManager.loadRules(rules);
}
FlowRuleManager#成员变量
// 缓存定义的规则
private static final Map<String, List<FlowRule>> flowRules = new ConcurrentHashMap<String, List<FlowRule>>();
// 回调的Listener实现
private static final FlowPropertyListener LISTENER = new FlowPropertyListener();
// 用于注册Listener
private static SentinelProperty<List<FlowRule>> currentProperty = new DynamicSentinelProperty<List<FlowRule>>();
static {
//将FlowPropertyListener注册到currentProperty中
currentProperty.addListener(LISTENER);
SCHEDULER.scheduleAtFixedRate(new MetricTimerListener(), 0, 1, TimeUnit.SECONDS);
}
小结:FlowRuleManager通过静态方法将FlowPropertyListener注册到DynamicSentinelProperty中。
FlowRuleManager#loadRules
public static void loadRules(List<FlowRule> rules) {
currentProperty.updateValue(rules);
}
DynamicSentinelProperty#updateValue
@Override
public boolean updateValue(T newValue) {
//...
value = newValue;
for (PropertyListener<T> listener : listeners) {
//回调listener
listener.configUpdate(newValue);
}
return true;
}
FlowRuleManager#FlowPropertyListener#configUpdate
private static final class FlowPropertyListener implements PropertyListener<List<FlowRule>> {
//回调执行方法,更新缓存规则
@Override
public void configUpdate(List<FlowRule> value) {
Map<String, List<FlowRule>> rules = FlowRuleUtil.buildFlowRuleMap(value);
if (rules != null) {
flowRules.clear();
flowRules.putAll(rules);
}
RecordLog.info("[FlowRuleManager] Flow rules received: " + flowRules);
}
}
小结:FlowRuleManager的内部静态类FlowPropertyListener#configUpdate方法将规则更新到缓存,缓存在FlowRuleManager的成员变量flowRules中。
while (!stop) {
Entry entry = null;
try {
// 定义受保护的资源
entry = SphU.entry(KEY);
// 获得通行证 允许通过
pass.addAndGet(1);
} catch (BlockException e1) {
// 未获得通行证 被阻塞
block.incrementAndGet();
} catch (Exception e2) {
// biz exception
} finally {
total.incrementAndGet();
if (entry != null) {
entry.exit();
}
}
}
总结:通过SphU.entry作为入口,如果符合规则放行返回Entry实例;否则抛出BlockException被阻塞。
CtSph#entryWithPriority
private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
throws BlockException {
//从线程变量获取资源上下文
Context context = ContextUtil.getContext();
if (context instanceof NullContext) {
return new CtEntry(resourceWrapper, null, context);
}
//创建默认资源上下文 名称为:sentinel_default_context
if (context == null) {
context = MyContextUtil.myEnter(Constants.CONTEXT_DEFAULT_NAME, "", resourceWrapper.getType());
}
小结:先从当前线程上下文获取资源上下文Context;如果null则使用默认上下文名称sentinel_default_context通过MyContextUtil.myEnter创建。
CtSph#MyContextUtil#myEnter->ContextUtil#trueEnter
//从当前线程变量中 获取上下文
Context context = contextHolder.get();
if (context == null) {
]//创建用于统计进来流量的Node
node = new EntranceNode(new StringResourceWrapper(name, EntryType.IN), null);
//Add entrance node.
Constants.ROOT.addChild(node);
//将该资源的Node加入缓存
Map<String, DefaultNode> newMap = new HashMap<>(contextNameNodeMap.size() + 1);
newMap.putAll(contextNameNodeMap);
//name为上下文名称
newMap.put(name, node);
contextNameNodeMap = newMap;
}
//构建资源上下文 包含:统计信息的Node
context = new Context(node, name);
context.setOrigin(origin);
//设置到当前线程变量
contextHolder.set(context);
小结:创建负责入口统计信息的EntranceNode;构建资源上下文并设置到线程变量ThreadLocal中。
CtSph#entryWithPriority
private Entry entryWithPriority()
throws BlockException {
...
//构造资源的插槽链
ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);
...
}
CtSph#lookProcessChain
ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
//获取该资源关联的插槽
ProcessorSlotChain chain = chainMap.get(resourceWrapper);
if (chain == null) {
//加锁
synchronized (LOCK) {
//再次获取该资源关联的插槽
chain = chainMap.get(resourceWrapper);
//插槽链为null创建新链条
if (chain == null) {
// Entry size limit.
//插槽链条的数量最多为6000个
if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
return null;
}
//构造插槽链条
chain = SlotChainProvider.newSlotChain();
Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(
chainMap.size() + 1);
newMap.putAll(chainMap);
newMap.put(resourceWrapper, chain);
chainMap = newMap;
}
}
}
return chain;
}
SlotChainProvider#newSlotChain
public static ProcessorSlotChain newSlotChain() {
// 构造插槽链
if (builder != null) {
return builder.build();
}
// 加载自定义SlotChainBuilder
// 默认选择DefaultProcessorSlotChain
resolveSlotChainBuilder();
// 再次判断为null使用默认的DefaultProcessorSlotChain
if (builder == null) {
...
builder = new DefaultSlotChainBuilder();
}
return builder.build();
}
DefaultSlotChainBuilder#build
ProcessorSlotChain chain = new DefaultProcessorSlotChain();
chain.addLast(new NodeSelectorSlot());
chain.addLast(new ClusterBuilderSlot());
chain.addLast(new LogSlot());
chain.addLast(new StatisticSlot());
chain.addLast(new SystemSlot());
chain.addLast(new AuthoritySlot());
chain.addLast(new FlowSlot());
chain.addLast(new DegradeSlot());
小结:从创建插槽链流程可以看出,使用默认插槽构造器DefaultProcessorSlotChain创建8个插槽形成链表结构;分别为:NodeSelectorSlot、ClusterBuilderSlot、LogSlot、StatisticSlot、SystemSlot、AuthoritySlot、FlowSlot、DegradeSlot。插槽构造器也可以自定义。
CtSph#entryWithPriority
private Entry entryWithPriority(
Entry e = new CtEntry(resourceWrapper, chain, context);
try {
//触发插槽链执行及规则校验
chain.entry();
} catch (BlockException e1) {
//触发流控向上抛出BlockException
e.exit(count, args);
throw e1;
} catch (Throwable e1) {
}
//返回通行证Entry
return e;
}
小结:通过chain.entry()触发插槽链条执行,默认会经过上面8个插槽。每个插槽履行自己职责,判断是否符合流控规则,符合规则放行返回通行证Entry;不符合触发流控向上抛出BlockException。具体各个插槽的职责后续文章再做分析。