一、插槽链类结构
2.插槽链运行时内存结构
先看示例,然后分析在内存中形成的调用结构,最后分析源码。
@Test
public void test01(){
try{
ContextUtil.enter("entrance1", "appA");
Entry nodeA = SphU.entry("nodeA");
if (nodeA != null) {
nodeA.exit();
}
ContextUtil.exit();
}catch(Exception e){
e.printStackTrace();
}
}
上面的示例代码会在内存中形成如下结构
@Test
public void test02(){
try{
ContextUtil.enter("entrance1", "appA");
Entry nodeA = SphU.entry("nodeA");
if (nodeA != null) {
nodeA.exit();
}
ContextUtil.exit();
ContextUtil.enter("entrance2", "appA");
nodeA = SphU.entry("nodeA");
if (nodeA != null) {
nodeA.exit();
}
ContextUtil.exit();
}catch(Exception e){
}
}
entrance1跟上图一样,下图为entrance2的结构
NodeSelectorSlot#entry
DefaultNode node = map.get(context.getName());
if (node == null) {
synchronized (this) {
node = map.get(context.getName());
if (node == null) {
// 构造该资源Node
node = new DefaultNode(resourceWrapper, null);
HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size());
cacheMap.putAll(map);
// 将该Node放入缓存 key为上下文名称
cacheMap.put(context.getName(), node);
map = cacheMap;
}
// Build invocation tree
// 将该Node加入上下文调用链
((DefaultNode)context.getLastNode()).addChild(node);
}
}
// 在上下文中 设置当前Node
context.setCurNode(node);
// 触发链条向下调用
fireEntry(context, resourceWrapper, node, count, prioritized, args);
// 只在第一次调用时生成
if (clusterNode == null) {
synchronized (lock) {
if (clusterNode == null) {
// Create the cluster node.
clusterNode = new ClusterNode();
HashMap<ResourceWrapper, ClusterNode> newMap = new HashMap<>(Math.max(clusterNodeMap.size(), 16));
newMap.putAll(clusterNodeMap);
newMap.put(node.getId(), clusterNode);
clusterNodeMap = newMap;
}
}
}
// 设置当前节点的ClusterNode
node.setClusterNode(clusterNode);
if (!"".equals(context.getOrigin())) {
Node originNode = node.getClusterNode().getOrCreateOriginNode(context.getOrigin());
context.getCurEntry().setOriginNode(originNode);
}
fireEntry(context, resourceWrapper, node, count, prioritized, args);
1.NodeSelectorSlot职责记录调用轨迹,Entry调用会在内存中形成调用轨迹; 2.ClusterBuilderSlot职责构造ClusterNode,为当前节点设置ClusterNode; 3.ClusterNode为ClusterBuilderSlot的成员变量,一个资源对应一个ClusterNode; 4.Context会记录每次调用的元数据信息,EntranceNode为调用的根节点,DefaultNode和ClusterNode均为资源的统计信息; 5.不同的上下文生成不同的DefaultNode统计节点,相同资源的不同的上下文使用同一个ClusterNode
try {
fireEntry(context, resourceWrapper, obj, count, prioritized, args);
} catch (BlockException e) {
EagleEyeLogUtil.log(resourceWrapper.getName(), e.getClass().getSimpleName(), e.getRuleLimitApp(),
context.getOrigin(), count);
throw e;
}
小结:LogSlot没有过多的逻辑,在被阻塞时在sentinel-block.log记录日志,格式如下:
2019-10-01 20:33:02|1|abc,FlowException,default,|12,0
public void entry() throws Throwable {
try {
// 触发向下插槽执行
fireEntry();
// 请求通过递增线程数量与请求数量
node.increaseThreadNum();
node.addPassRequest(count);
// 调用源统计信息
if (context.getCurEntry().getOriginNode() != null) {
// 调用源递增线程数和请求数
context.getCurEntry().getOriginNode().increaseThreadNum();
context.getCurEntry().getOriginNode().addPassRequest(count);
}
// 进入流量全局统计
if (resourceWrapper.getType() == EntryType.IN) {
// 全局ClusterNode统计递增线程数和请求数
Constants.ENTRY_NODE.increaseThreadNum();
Constants.ENTRY_NODE.addPassRequest(count);
}
} catch (BlockException e) {
// 设置阻塞异常信息
context.getCurEntry().setError(e);
// 递增被阻塞的数量
node.increaseBlockQps(count);
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().increaseBlockQps(count);
}
if (resourceWrapper.getType() == EntryType.IN) {
// 全局ClusterNode递增阻塞请求数
Constants.ENTRY_NODE.increaseBlockQps(count);
}
}
throw e;
}
@Override
public void exit() {
DefaultNode node = (DefaultNode)context.getCurNode();
if (context.getCurEntry().getError() == null) {
// 统计RT
long rt = TimeUtil.currentTimeMillis() - context.getCurEntry().getCreateTime();
if (rt > Constants.TIME_DROP_VALVE) {
rt = Constants.TIME_DROP_VALVE;
}
node.addRtAndSuccess(rt, count);
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().addRtAndSuccess(rt, count);
}
// 递降线程数
node.decreaseThreadNum();
// 递降调用源线程数统计
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().decreaseThreadNum();
}
// 全局ClusterNode递减请求数
if (resourceWrapper.getType() == EntryType.IN) {
Constants.ENTRY_NODE.addRtAndSuccess(rt, count);
Constants.ENTRY_NODE.decreaseThreadNum();
}
}
fireExit(context, resourceWrapper, count);
}
小结:StatisticSlot职责主要统计信息:在请求通过时递增线程数和请求数;在被阻塞时设置异常信息以及递增被阻塞请求的数量;当退出时统计RT及递降线程数。Constants.ENTRY_NODE为final static的全局ClusterNode,为系统规则{@link SystemRule} 服务
SystemSlot职责在于给予StatisticSlot的Constants.ENTRY_NODE全局ClusterNode统计信息与设定的阀值进行判断。代码详见:SystemSlot#entry
AuthoritySlot职责主要在于对黑白名单进行判断。 代码详见:SystemSlot#entry
代码详见:FlowSlot#entry
小结:FlowSlot主要职责在于给予之前的Node统计信息,根据不同的策略进行规则校验;未达到阀值放行,超过阀值触发流控抛出FlowException。详细FlowSlot实现后面写文章整理。
代码详见:DegradeSlot#entry
小结:DegradeSlot职责在于给予CluserNode统计信息提供不同的降级策略,根据设定的阀值进行降级