本文主要研究一下sentinel的StatisticSlot
com/alibaba/csp/sentinel/slots/statistic/StatisticSlot.java
public class StatisticSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, Object... args)
throws Throwable {
try {
fireEntry(context, resourceWrapper, node, count, args);
node.increaseThreadNum();
node.addPassRequest();
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().increaseThreadNum();
context.getCurEntry().getOriginNode().addPassRequest();
}
if (resourceWrapper.getType() == EntryType.IN) {
Constants.ENTRY_NODE.increaseThreadNum();
Constants.ENTRY_NODE.addPassRequest();
}
} catch (BlockException e) {
context.getCurEntry().setError(e);
// Add block count.
node.increaseBlockedQps();
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().increaseBlockedQps();
}
if (resourceWrapper.getType() == EntryType.IN) {
Constants.ENTRY_NODE.increaseBlockedQps();
}
throw e;
} catch (Throwable e) {
context.getCurEntry().setError(e);
// Should not happen
node.increaseExceptionQps();
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().increaseExceptionQps();
}
if (resourceWrapper.getType() == EntryType.IN) {
Constants.ENTRY_NODE.increaseExceptionQps();
}
throw e;
}
}
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
DefaultNode node = (DefaultNode)context.getCurNode();
if (context.getCurEntry().getError() == null) {
long rt = TimeUtil.currentTimeMillis() - context.getCurEntry().getCreateTime();
if (rt > Constants.TIME_DROP_VALVE) {
rt = Constants.TIME_DROP_VALVE;
}
node.rt(rt);
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().rt(rt);
}
node.decreaseThreadNum();
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().decreaseThreadNum();
}
if (resourceWrapper.getType() == EntryType.IN) {
Constants.ENTRY_NODE.rt(rt);
Constants.ENTRY_NODE.decreaseThreadNum();
}
} else {
// error may happen
// node.rt(-2);
}
fireExit(context, resourceWrapper, count);
}
}
com/alibaba/csp/sentinel/node/StatisticNode.java
public class StatisticNode implements Node {
private transient Metric rollingCounterInSecond = new ArrayMetric(1000 / SampleCountProperty.sampleCount,
IntervalProperty.INTERVAL);
private transient Metric rollingCounterInMinute = new ArrayMetric(1000, 2 * 60);
private AtomicInteger curThreadNum = new AtomicInteger(0);
private long lastFetchTime = -1;
//......
@Override
public int curThreadNum() {
return curThreadNum.get();
}
@Override
public void addPassRequest() {
rollingCounterInSecond.addPass();
rollingCounterInMinute.addPass();
}
@Override
public void increaseBlockedQps() {
rollingCounterInSecond.addBlock();
rollingCounterInMinute.addBlock();
}
@Override
public void increaseExceptionQps() {
rollingCounterInSecond.addException();
rollingCounterInMinute.addException();
}
@Override
public void increaseThreadNum() {
curThreadNum.incrementAndGet();
}
@Override
public void decreaseThreadNum() {
curThreadNum.decrementAndGet();
}
}
com/alibaba/csp/sentinel/slots/block/flow/controller/DefaultController.java
public class DefaultController implements Controller {
double count = 0;
int grade = 0;
public DefaultController(double count, int grade) {
super();
this.count = count;
this.grade = grade;
}
@Override
public boolean canPass(Node node, int acquireCount) {
int curCount = avgUsedTokens(node);
if (curCount + acquireCount > count) {
return false;
}
return true;
}
private int avgUsedTokens(Node node) {
if (node == null) {
return -1;
}
return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)node.passQps();
}
}
com/alibaba/csp/sentinel/slots/DefaultSlotsChainBuilder.java
public class DefaultSlotsChainBuilder implements SlotsChainBuilder {
@Override
public ProcessorSlotChain build() {
ProcessorSlotChain chain = new DefaultProcessorSlotChain();
chain.addLast(new NodeSelectorSlot());
chain.addLast(new ClusterBuilderSlot());
chain.addLast(new LogSlot());
chain.addLast(new StatisticSlot());
chain.addLast(new SystemSlot());
chain.addLast(new AuthoritySlot());
chain.addLast(new FlowSlot());
chain.addLast(new DegradeSlot());
return chain;
}
}
StatisticSlot虽然在SystemSlot、SystemSlot、AuthoritySlot、FlowSlot前面,但是其递增统计数则是fireEntry动作之后进行的,相当于aop拦截的after,也可以将Node的curThreadNum理解为通过限流条件之后的请求线程数