基于sentine-1.4.2,在dashboard想要更好的查看集群限流相关配置,需要一些小修改
你也可以直接从github上拉取我的代码: git@github.com:spilledyear/Sentinel.git,对应的分支是 1.4.2
然后修改com.alibaba.csp.sentinel.dashboard.controller.v2.FlowControllerV2文件,将其中的ruleProvider和rulePublisher改成刚刚新增的那两个
下面将从以下几个方面简单介绍集群限流
以嵌入式模式为例,在源码的sentinel-demo模块种,已经准备好了相关测试案例,启动两个实例:ClusterDemoApplication,启动参数分别如下:
-Dproject.name=clusterapp -Dserver.port=8081 -Dcsp.sentinel.dashboard.server=localhost:8080
-Dproject.name=clusterapp -Dserver.port=8082 -Dcsp.sentinel.dashboard.server=localhost:8080
为了能够方便的修改规则信息,直观的观察效果,需要启动控制台
-Dserver.port=8080
此时通过localhost:8080访问控制台,还无法看到任何应用信息,因为此时还没有任何的服务调用,通过以下快捷方式访问两个服务实例
curl localhost:8081/hello/luo
curl localhost:8082/hello/luo
这时候查看机器列表
菜单选项,发现已经有两个实例了(端口区分):
但这时候还没有server和client的概念,需要简单配置:点击集群限流
菜单项,然后点击右上角的"新增Toeken Server"
从中选取一台server,另一台指定为client,即:
此时,再查看集群流控
菜单项,发现已经有了server信息,通过连接详情发现已有两个连接,这是这是嵌入式,server端本身也是一个应用实例
以上准备工作完成之后,下面可以新建资源了。为了观察限流效果光差,新建的资源名与测试案例中的资源名一致:点击流控规则
菜单项,然后点击右上角的回到集群界面
:
为什么这里要在集群界面新建规则呢?上面已经说过了,针对集群规则界面已经做了修改,规则可以持久化到nacos配置中心
然后新建一个规则,有关于规则的使用这里就不展开了
以上操作完成之后,会发现nacos中多了一条配置,具体内容就是规则的具体信息
通过jmeter测试,让两个请求都分别请求不同的实例各20次:
发现每个请求都通过了10次,加起来刚好20次,多出来的请求抛出了FlowException异常,执行了blockHandler对应的逻辑,初步符合集群限流的效果
在保存规则信息的时候,发现请求了以下接口:http://localhost:8080/v2/flow/rule/29 对应FlowControllerV2中的apiUpdateFlowRule,主要逻辑如下:
如果dashboard使用了nacos持久化规则,对应的,在嵌入式模式下应该也会在server和client端使用NacosDatasource作为数据源,对应的源码在sentinel-datasource-nacos模块的NacosDataSource类中:
public NacosDataSource(final Properties properties, final String groupId, final String dataId,Converter<String, T> parser) {
super(parser);
this.configListener = new Listener() {
@Override
public Executor getExecutor() {
return pool;
}
@Override
public void receiveConfigInfo(final String configInfo) {
RecordLog.info(String.format("[NacosDataSource] New property value received for (properties: %s) (dataId: %s, groupId: %s): %s",
properties, dataId, groupId, configInfo));
T newValue = NacosDataSource.this.parser.convert(configInfo);
// Update the new value to the property.
getProperty().updateValue(newValue);
}
};
initNacosListener();
loadInitialConfig();
}
从上可以看出,当规则信息更新了的时候,会同步到sentinel的内存结构中。
这里有一个小问题,如果没有使用注册中心,规则将怎么进行推送? 答案其实在FlowRuleApiPublisher中,如果没有使用注册中心,将通过SentinelApiClient发送http请求,将规则推送到各个服务实例,服务实例收到规则信息之后再加载到sentinel相关的内存结构,核心代码如下:
for (MachineInfo machine : set) {
if (!MachineUtils.isMachineHealth(machine)) {
continue;
}
// TODO: parse the results
sentinelApiClient.setFlowRuleOfMachine(app, machine.getIp(), machine.getPort(), rules);
}
如果针对这个问题再次延申,还会有一些疑问,SentinelApiClient怎么就知道要将规则信息发送到哪里呢?哪个端口?这一部分肯定是sentine为我们隐藏起来了。
@Override
public void run() {
boolean success = false;
ServerSocket serverSocket = getServerSocketFromBasePort(port);
if (serverSocket != null) {
CommandCenterLog.info("[CommandCenter] Begin listening at port " + serverSocket.getLocalPort());
socketReference = serverSocket;
executor.submit(new ServerThread(serverSocket));
success = true;
port = serverSocket.getLocalPort();
} else {
CommandCenterLog.info("[CommandCenter] chooses port fail, http command center will not work");
}
if (!success) {
port = PORT_UNINITIALIZED;
}
TransportConfig.setRuntimePort(port);
executor.shutdown();
}
@Override
public void start() throws Exception {
pool.submit(new Runnable() {
@Override
public void run() {
try {
server.start();
} catch (Exception ex) {
RecordLog.info("Start netty server error", ex);
ex.printStackTrace();
System.exit(-1);
}
}
});
}
内部通过SPI机制加载,引用了哪个模块就会使用哪种机制。
dashboard是如何获取节点信息并将其展示在界面上的?核心原理还是在sentinel-transport模块中,不管是在sentinel-transport-simple-http还是sentinel-transport-netty-http中,都会向dashboard发送心跳上报当前节点信息,请求地址即:
dashboardIp:port/registry/machine,这里代表 localhost:8080/registry/machine
dashboar收到请求后会将节点信息保存到内存中。
有关于这一部分,sentinel-transport-simple-http模块中的核心类是SimpleHttpHeartbeatSender;sentinel-transport-netty-http模块中的核心类是HttpHeartbeatSender;
dashboard相关的逻辑如下
public Result<?> receiveHeartBeat(String app, Long version, String v, String hostname, String ip, Integer port) {
if (app == null) {
app = MachineDiscovery.UNKNOWN_APP_NAME;
}
if (ip == null) {
return Result.ofFail(-1, "ip can't be null");
}
if (port == null) {
return Result.ofFail(-1, "port can't be null");
}
if (port == -1) {
logger.info("Receive heartbeat from " + ip + " but port not set yet");
return Result.ofFail(-1, "your port not set yet");
}
String sentinelVersion = StringUtil.isEmpty(v) ? "unknown" : v;
long timestamp = version == null ? System.currentTimeMillis() : version;
try {
MachineInfo machineInfo = new MachineInfo();
machineInfo.setApp(app);
machineInfo.setHostname(hostname);
machineInfo.setIp(ip);
machineInfo.setPort(port);
machineInfo.setTimestamp(new Date(timestamp));
machineInfo.setVersion(sentinelVersion);
appManagement.addMachine(machineInfo);
return Result.ofSuccessMsg("success");
} catch (Exception e) {
logger.error("Receive heartbeat error", e);
return Result.ofFail(-1, e.getMessage());
}
}
所以,整个过程看起来是这样子的:
// 如果不配置默认default
ClusterServerConfigManager.loadServerNamespaceSet(Collections.singleton("cluster-" + appId));
// 集群限流规则配置,根据namespace动态生成Supplier,其实子
ClusterFlowRuleManager.setPropertySupplier(dataSource.getClusterFlowSupplier());
// 配置ServerTransportConfig:port、idleSeconds
ClusterServerConfigManager.registerServerTransportProperty(dataSource.getServerTransportConfigProperty());
// 为client设置requestTimeout
ClusterClientConfigManager.registerClientConfigProperty(dataSource.getClusterClientConfigProperty());
// 为client设置server的host和port,即serverHost、serverPort
ClusterClientConfigManager.registerServerAssignProperty(dataSource.getClusterClientAssignConfigProperty());
// 用于设置mode,设置0 代表client, 设置1代表 server
ClusterStateManager.registerProperty(dataSource.getClusterStateProperty());