集群限流

准备工作

基于sentine-1.4.2,在dashboard想要更好的查看集群限流相关配置,需要一些小修改

你也可以直接从github上拉取我的代码: git@github.com:spilledyear/Sentinel.git,对应的分支是 1.4.2

  • 开启集群规则界面 修改:resources/app/views/flow_v1.html,将其中和集群相关的按钮打开,最终效果如下:
  • 规则持久化 dashboard默认没有对规则持久化,但在集群规则界面添加的规则,其实是可以持久化到nacos的,只需要做一些简单的修改。将dashboard模块test目录下的com.alibaba.csp.sentinel.dashboard.rule.nacos类拷贝到java目录,如下:

然后修改com.alibaba.csp.sentinel.dashboard.controller.v2.FlowControllerV2文件,将其中的ruleProvider和rulePublisher改成刚刚新增的那两个

  • 启动nacos nacos的部署就不过多介绍,可以看官方文档 nacos手册

下面将从以下几个方面简单介绍集群限流

启动测试案例

以嵌入式模式为例,在源码的sentinel-demo模块种,已经准备好了相关测试案例,启动两个实例:ClusterDemoApplication,启动参数分别如下:

  • 实例一
-Dproject.name=clusterapp -Dserver.port=8081 -Dcsp.sentinel.dashboard.server=localhost:8080
  • 实例二
-Dproject.name=clusterapp -Dserver.port=8082 -Dcsp.sentinel.dashboard.server=localhost:8080

为了能够方便的修改规则信息,直观的观察效果,需要启动控制台

  • 启动控制台
-Dserver.port=8080

此时通过localhost:8080访问控制台,还无法看到任何应用信息,因为此时还没有任何的服务调用,通过以下快捷方式访问两个服务实例

curl localhost:8081/hello/luo

curl localhost:8082/hello/luo

这时候查看机器列表菜单选项,发现已经有两个实例了(端口区分):

但这时候还没有server和client的概念,需要简单配置:点击集群限流菜单项,然后点击右上角的"新增Toeken Server"

从中选取一台server,另一台指定为client,即:

此时,再查看集群流控菜单项,发现已经有了server信息,通过连接详情发现已有两个连接,这是这是嵌入式,server端本身也是一个应用实例

规则的推送

新建规则

以上准备工作完成之后,下面可以新建资源了。为了观察限流效果光差,新建的资源名与测试案例中的资源名一致:点击流控规则菜单项,然后点击右上角的回到集群界面

为什么这里要在集群界面新建规则呢?上面已经说过了,针对集群规则界面已经做了修改,规则可以持久化到nacos配置中心

然后新建一个规则,有关于规则的使用这里就不展开了

以上操作完成之后,会发现nacos中多了一条配置,具体内容就是规则的具体信息

查看限流效果

通过jmeter测试,让两个请求都分别请求不同的实例各20次:

发现每个请求都通过了10次,加起来刚好20次,多出来的请求抛出了FlowException异常,执行了blockHandler对应的逻辑,初步符合集群限流的效果

推送原理

在保存规则信息的时候,发现请求了以下接口:http://localhost:8080/v2/flow/rule/29 对应FlowControllerV2中的apiUpdateFlowRule,主要逻辑如下:

  • 将规则信息更新到dashboard的内存中,用于界面展示,这一部分主要和InMemoryRuleRepositoryAdapter的save方法相关;
  • 推送规则信息到nacos注册中心,这一部分和FlowRuleNacosPublisher相关;

如果dashboard使用了nacos持久化规则,对应的,在嵌入式模式下应该也会在server和client端使用NacosDatasource作为数据源,对应的源码在sentinel-datasource-nacos模块的NacosDataSource类中:

public NacosDataSource(final Properties properties, final String groupId, final String dataId,Converter<String, T> parser) {
    super(parser);
    this.configListener = new Listener() {
        @Override
        public Executor getExecutor() {
            return pool;
        }
        @Override
        public void receiveConfigInfo(final String configInfo) {
            RecordLog.info(String.format("[NacosDataSource] New property value received for (properties: %s) (dataId: %s, groupId: %s): %s",
                properties, dataId, groupId, configInfo));
            T newValue = NacosDataSource.this.parser.convert(configInfo);
            // Update the new value to the property.
            getProperty().updateValue(newValue);
        }
    };
    initNacosListener();
    loadInitialConfig();
}

从上可以看出,当规则信息更新了的时候,会同步到sentinel的内存结构中。

这里有一个小问题,如果没有使用注册中心,规则将怎么进行推送? 答案其实在FlowRuleApiPublisher中,如果没有使用注册中心,将通过SentinelApiClient发送http请求,将规则推送到各个服务实例,服务实例收到规则信息之后再加载到sentinel相关的内存结构,核心代码如下:

for (MachineInfo machine : set) {
    if (!MachineUtils.isMachineHealth(machine)) {
        continue;
    }
    // TODO: parse the results
    sentinelApiClient.setFlowRuleOfMachine(app, machine.getIp(), machine.getPort(), rules);
}

如果针对这个问题再次延申,还会有一些疑问,SentinelApiClient怎么就知道要将规则信息发送到哪里呢?哪个端口?这一部分肯定是sentine为我们隐藏起来了。

  • 第一个问题,哪个端口?这个端口其实是commandPort,即应用端暴露给 Sentinel 控制台的端口,ip@commandPort,其实就是界面上看到的那两个,分别为8720和8721;
  • 第二个问题,隐藏了哪些细节?其实就是隐藏了暴露端口的这部分细节,都在sentinel-transport模块中,提供了两种实现方式。
  • 方式一,sentinel-transport-simple-http模块中,通过ServerSocket方式暴露,对应的核心类为SimpleHttpCommandCenter,核心代码如下
@Override
public void run() {
    boolean success = false;
    ServerSocket serverSocket = getServerSocketFromBasePort(port);

    if (serverSocket != null) {
        CommandCenterLog.info("[CommandCenter] Begin listening at port " + serverSocket.getLocalPort());
        socketReference = serverSocket;
        executor.submit(new ServerThread(serverSocket));
        success = true;
        port = serverSocket.getLocalPort();
    } else {
        CommandCenterLog.info("[CommandCenter] chooses port fail, http command center will not work");
    }

    if (!success) {
        port = PORT_UNINITIALIZED;
    }

    TransportConfig.setRuntimePort(port);
    executor.shutdown();
}
  • 方式二,在sentinel-transport-netty-http模块中,通过netty暴露,核心类是NettyHttpCommandCenter,核心代码如下:
@Override
public void start() throws Exception {
    pool.submit(new Runnable() {
        @Override
        public void run() {
            try {
                server.start();
            } catch (Exception ex) {
                RecordLog.info("Start netty server error", ex);
                ex.printStackTrace();
                System.exit(-1);
            }
        }
    });
}

内部通过SPI机制加载,引用了哪个模块就会使用哪种机制。

节点发现

dashboard是如何获取节点信息并将其展示在界面上的?核心原理还是在sentinel-transport模块中,不管是在sentinel-transport-simple-http还是sentinel-transport-netty-http中,都会向dashboard发送心跳上报当前节点信息,请求地址即:

dashboardIp:port/registry/machine,这里代表 localhost:8080/registry/machine

dashboar收到请求后会将节点信息保存到内存中。

有关于这一部分,sentinel-transport-simple-http模块中的核心类是SimpleHttpHeartbeatSender;sentinel-transport-netty-http模块中的核心类是HttpHeartbeatSender;

dashboard相关的逻辑如下

public Result<?> receiveHeartBeat(String app, Long version, String v, String hostname, String ip, Integer port) {
    if (app == null) {
        app = MachineDiscovery.UNKNOWN_APP_NAME;
    }
    if (ip == null) {
        return Result.ofFail(-1, "ip can't be null");
    }
    if (port == null) {
        return Result.ofFail(-1, "port can't be null");
    }
    if (port == -1) {
        logger.info("Receive heartbeat from " + ip + " but port not set yet");
        return Result.ofFail(-1, "your port not set yet");
    }
    String sentinelVersion = StringUtil.isEmpty(v) ? "unknown" : v;
    long timestamp = version == null ? System.currentTimeMillis() : version;
    try {
        MachineInfo machineInfo = new MachineInfo();
        machineInfo.setApp(app);
        machineInfo.setHostname(hostname);
        machineInfo.setIp(ip);
        machineInfo.setPort(port);
        machineInfo.setTimestamp(new Date(timestamp));
        machineInfo.setVersion(sentinelVersion);
        appManagement.addMachine(machineInfo);
        return Result.ofSuccessMsg("success");
    } catch (Exception e) {
        logger.error("Receive heartbeat error", e);
        return Result.ofFail(-1, e.getMessage());
    }
}

所以,整个过程看起来是这样子的:

配置项

  • NameSpace NameSpace主要是用于区分不同的应用,其实在嵌入式的模式下作用不大,嵌入式模式下一般是一种对等结构,这时候NameSpace一般就是一个,即:应用名。只有在独立模式下才能体现它的作用:区分不同的应用。
// 如果不配置默认default
ClusterServerConfigManager.loadServerNamespaceSet(Collections.singleton("cluster-" + appId));
  • Supplier 主要作用就是就是根据NameSpace找到一个DynamicSentinelProperty,其实在嵌入式模式下,一般也就是写死一个DynamicSentinelProperty,因为这时候的NameSpace也就只有一个
// 集群限流规则配置,根据namespace动态生成Supplier,其实子
ClusterFlowRuleManager.setPropertySupplier(dataSource.getClusterFlowSupplier());
  • ServerTransportProperty 作用比较大,针对server端,会根据ServerTransportProperty中的信息在server端通过netty开启一个端口,用于和client交互
// 配置ServerTransportConfig:port、idleSeconds
ClusterServerConfigManager.registerServerTransportProperty(dataSource.getServerTransportConfigProperty());
  • ClientConfigProperty client端的相关配置,其实只有一个属性:请求server端的超时时间(requestTimeout)
// 为client设置requestTimeout
ClusterClientConfigManager.registerClientConfigProperty(dataSource.getClusterClientConfigProperty());
  • ServerAssignProperty client端的相关配置,里面保存的是server端的相关信息:server的host和port
// 为client设置server的host和port,即serverHost、serverPort
ClusterClientConfigManager.registerServerAssignProperty(dataSource.getClusterClientAssignConfigProperty());
  • ClusterStateManager 在嵌入式模式下,可以通过API来改变client和server的身份,大致逻辑就是:将server中的那个netty服务stop,然后根据新的配置在client开启一个新的netty服务(注意,服务开启成功之后,client就转变成server了)
// 用于设置mode,设置0 代表client, 设置1代表 server
ClusterStateManager.registerProperty(dataSource.getClusterStateProperty());

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

扫码关注云+社区

领取腾讯云代金券