前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊sentinel的StatisticSlot

聊聊sentinel的StatisticSlot

作者头像
code4it
发布2018-09-17 17:19:00
5210
发布2018-09-17 17:19:00
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下sentinel的StatisticSlot

StatisticSlot

com/alibaba/csp/sentinel/slots/statistic/StatisticSlot.java

代码语言:javascript
复制
public class StatisticSlot extends AbstractLinkedProcessorSlot<DefaultNode> {

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, Object... args)
        throws Throwable {

        try {
            fireEntry(context, resourceWrapper, node, count, args);
            node.increaseThreadNum();
            node.addPassRequest();

            if (context.getCurEntry().getOriginNode() != null) {
                context.getCurEntry().getOriginNode().increaseThreadNum();
                context.getCurEntry().getOriginNode().addPassRequest();
            }

            if (resourceWrapper.getType() == EntryType.IN) {
                Constants.ENTRY_NODE.increaseThreadNum();
                Constants.ENTRY_NODE.addPassRequest();
            }

        } catch (BlockException e) {
            context.getCurEntry().setError(e);

            // Add block count.
            node.increaseBlockedQps();
            if (context.getCurEntry().getOriginNode() != null) {
                context.getCurEntry().getOriginNode().increaseBlockedQps();
            }

            if (resourceWrapper.getType() == EntryType.IN) {
                Constants.ENTRY_NODE.increaseBlockedQps();
            }

            throw e;
        } catch (Throwable e) {
            context.getCurEntry().setError(e);

            // Should not happen
            node.increaseExceptionQps();
            if (context.getCurEntry().getOriginNode() != null) {
                context.getCurEntry().getOriginNode().increaseExceptionQps();
            }

            if (resourceWrapper.getType() == EntryType.IN) {
                Constants.ENTRY_NODE.increaseExceptionQps();
            }
            throw e;
        }
    }

    @Override
    public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
        DefaultNode node = (DefaultNode)context.getCurNode();

        if (context.getCurEntry().getError() == null) {
            long rt = TimeUtil.currentTimeMillis() - context.getCurEntry().getCreateTime();
            if (rt > Constants.TIME_DROP_VALVE) {
                rt = Constants.TIME_DROP_VALVE;
            }

            node.rt(rt);
            if (context.getCurEntry().getOriginNode() != null) {
                context.getCurEntry().getOriginNode().rt(rt);
            }

            node.decreaseThreadNum();

            if (context.getCurEntry().getOriginNode() != null) {
                context.getCurEntry().getOriginNode().decreaseThreadNum();
            }

            if (resourceWrapper.getType() == EntryType.IN) {
                Constants.ENTRY_NODE.rt(rt);
                Constants.ENTRY_NODE.decreaseThreadNum();
            }
        } else {
            // error may happen
            // node.rt(-2);
        }

        fireExit(context, resourceWrapper, count);
    }

}
  • 重写了entry以及exit方法
  • entry是在成功通过的时候,调用node的increaseThreadNum以及addPassRequest;不通过的话,则调用increaseBlockedQps;异常的话,则调用increaseExceptionQps
  • exit方法调用了node的decreaseThreadNum,最后触发真正的exit操作

StatisticNode

com/alibaba/csp/sentinel/node/StatisticNode.java

代码语言:javascript
复制
public class StatisticNode implements Node {

    private transient Metric rollingCounterInSecond = new ArrayMetric(1000 / SampleCountProperty.sampleCount,
        IntervalProperty.INTERVAL);

    private transient Metric rollingCounterInMinute = new ArrayMetric(1000, 2 * 60);

    private AtomicInteger curThreadNum = new AtomicInteger(0);

    private long lastFetchTime = -1;

    //......

    @Override
    public int curThreadNum() {
        return curThreadNum.get();
    }

    @Override
    public void addPassRequest() {
        rollingCounterInSecond.addPass();
        rollingCounterInMinute.addPass();
    }

    @Override
    public void increaseBlockedQps() {
        rollingCounterInSecond.addBlock();
        rollingCounterInMinute.addBlock();
    }

    @Override
    public void increaseExceptionQps() {
        rollingCounterInSecond.addException();
        rollingCounterInMinute.addException();

    }

    @Override
    public void increaseThreadNum() {
        curThreadNum.incrementAndGet();
    }

    @Override
    public void decreaseThreadNum() {
        curThreadNum.decrementAndGet();
    }

}
  • StatisticNode有个curThreadNum,记录了每个node的调用线程数,方便后面进行流控

DefaultController

com/alibaba/csp/sentinel/slots/block/flow/controller/DefaultController.java

代码语言:javascript
复制
public class DefaultController implements Controller {

    double count = 0;
    int grade = 0;

    public DefaultController(double count, int grade) {
        super();
        this.count = count;
        this.grade = grade;
    }

    @Override
    public boolean canPass(Node node, int acquireCount) {
        int curCount = avgUsedTokens(node);
        if (curCount + acquireCount > count) {
            return false;
        }

        return true;
    }

    private int avgUsedTokens(Node node) {
        if (node == null) {
            return -1;
        }
        return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)node.passQps();
    }

}
  • 这里的canPass方法通过node的相关统计数来进行限流判断
  • 如果是FLOW_GRADE_THREAD,则采用curThreadNum,否则采用passQps统计数

DefaultSlotsChainBuilder

com/alibaba/csp/sentinel/slots/DefaultSlotsChainBuilder.java

代码语言:javascript
复制
public class DefaultSlotsChainBuilder implements SlotsChainBuilder {

    @Override
    public ProcessorSlotChain build() {
        ProcessorSlotChain chain = new DefaultProcessorSlotChain();
        chain.addLast(new NodeSelectorSlot());
        chain.addLast(new ClusterBuilderSlot());
        chain.addLast(new LogSlot());
        chain.addLast(new StatisticSlot());
        chain.addLast(new SystemSlot());
        chain.addLast(new AuthoritySlot());
        chain.addLast(new FlowSlot());
        chain.addLast(new DegradeSlot());

        return chain;
    }

}

StatisticSlot虽然在SystemSlot、SystemSlot、AuthoritySlot、FlowSlot前面,但是其递增统计数则是fireEntry动作之后进行的,相当于aop拦截的after,也可以将Node的curThreadNum理解为通过限流条件之后的请求线程数

小结

  • StatisticSlot类似after拦截,在通过后续所有slot的条件之后,才进行统计
  • 统计通过StatisticNode维护了curThreadNum、rollingCounterInSecond、rollingCounterInMinute等相关数据
  • DefaultController默认根据指定的限流条件进行控制,如果是FLOW_GRADE_THREAD,则采用curThreadNum,否则采用passQps统计数来判断

doc

  • DefaultController
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2018-08-23,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • StatisticSlot
  • StatisticNode
  • DefaultController
  • DefaultSlotsChainBuilder
  • 小结
  • doc
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档