1.客户端是如何将机器上报到控制台的?(即控制台的机器列表)
2.控制台添加规则后如何通知客户端并使规则生效的?
注:客户端通过心跳将机器IP端口等信息上报给控制台;控制台新增规则后将规则推送到Sentinel客户端;控制台与客户端的通信通过sentinel-transport模块来实现。
ServiceLoader<InitFunc> loader = ServiceLoader.load(InitFunc.class); //@1
List<OrderWrapper> initList = new ArrayList<OrderWrapper>();
// ...
for (OrderWrapper w : initList) {
w.func.init(); //@2
}
@1 SPI获取InitFunc实现类,默认配置两个实现类CommandCenterInitFunc和HeartbeatSenderInitFunc。 @2 CommandCenterInitFunc响应客户端请求,由Handler处理具体逻辑;HeartbeatSenderInitFunc负责心跳发送,具体由SimpleHttpHeartbeatSender或者HttpHeartbeatSender负责。
//@1
HeartbeatSender sender = HeartbeatSenderProvider.getHeartbeatSender();
initSchedulerIfNeeded();//@2
//@3
long interval = retrieveInterval(sender);
setIntervalIfNotExists(interval);
scheduleHeartbeatTask(sender, interval); //@4
@1 SPI加载SimpleHttpHeartbeatSender或者HttpHeartbeatSender,本文以SimpleHttpHeartbeatSender实现来分析心跳上报逻辑。 @2 初始化定时任务线程池ScheduledThreadPoolExecutor。 @3 获取心跳时间默认10秒(可自定义)以及设置心跳时间。 @4 发送心跳定时任务
InetSocketAddress addr = getAvailableAddress(); //@1
SimpleHttpRequest request = new SimpleHttpRequest(addr, HEARTBEAT_PATH); //@2
request.setParams(heartBeat.generateCurrentMessage()); //@3
SimpleHttpResponse response = httpClient.post(request); //@4
@1 获取控制台地址,在客户端启动时可以通过-Dcsp.sentinel.dashboard.server=127.0.0.1:8080传入 @2 心跳请求路径HEARTBEAT_PATH为/registry/machine @3 组织心跳信息 @4 通过HTTP请求向控制台发送心跳数据
"app" -> "melon-demo"
"hostname" -> "yongliangdembp"
"app_type" -> "0"
"port" -> "8719"
"v" -> "1.6.3"
"ip" -> "192.168.1.4"
"version" -> "1570936907910"
代码位置:MachineRegistryController#receiveHeartBeat
MachineInfo machineInfo = new MachineInfo();
machineInfo.setApp(app);
machineInfo.setAppType(appType);
machineInfo.setHostname(hostname);
machineInfo.setIp(ip);
machineInfo.setPort(port);
machineInfo.setHeartbeatVersion(version);
machineInfo.setLastHeartbeat(System.currentTimeMillis());
machineInfo.setVersion(sentinelVersion);
appManagement.addMachine(machineInfo);
return Result.ofSuccessMsg("success");
控制台处理心跳逻辑比较简单,接收到请求后将信息存储在ConcurrentHashMap缓存中;控制台重启后信息将丢失。
entity = repository.save(entity); //@1
if (!publishRules(entity.getApp(), entity.getIp(), entity.getPort())) {
logger.error("Publish flow rules failed after rule add");
} // @2
return Result.ofSuccess(entity);
@1 通过规则新增请求/v1/flow/rule定位到该代码,规则保存在缓存ConcurrentHashMap中(allRules、machineRules、appRules) @2 通过publishRules来向客户端发布规则
String data = JSON.toJSONString(entities.stream().map(r -> r.toRule()).collect(Collectors.toList()));
Map<String, String> params = new HashMap<>(2);
params.put("type", type);
params.put("data", data); @1
String result = executeCommand(app, ip, port, SET_RULES_PATH, params, true).get(); //@2
@1 params流控规则组装内容格式如下
type -> flow
data -> [{"clusterConfig":{"fallbackToLocalWhenFail":true,"sampleCount":10,"strategy":0,"thresholdType":0,"windowIntervalMs":1000},"clusterMode":false,"controlBehavior":0,"count":2.0,"grade":1,"limitApp":"default","maxQueueingTimeMs":500,"resource":"melonSentinel","strategy":0,"warmUpPeriodSec":10}]
@2 数据组装与执行 app应用名称。例如:melon-demo SET_RULES_PATH客户端的请求路径。固定为setRules ip为Sentinel客户端地址。例如:192.168.1.4 port为客户端启动时设定的端口。例如:8719
CloseableHttpAsyncClient httpClient;
httpClient.execute(request, new FutureCallback<HttpResponse>() {
@Override
public void completed(final HttpResponse response) {
int statusCode = response.getStatusLine().getStatusCode();
try {
String value = getBody(response);
// ...
} catch (Exception ex) {
// ...
}
}
}); //@1
@1 控制台通过httpClient将流控规则发送到客户端。
客户端通过InitExecutor#doInit初始化,CommandCenterInitFunc响应客户端请求,由Handler处理具体逻辑。
//@1
CommandCenter commandCenter = CommandCenterProvider.getCommandCenter();
//@2
commandCenter.beforeStart();
commandCenter.start();
@1 SPI获取CommandCenter(SimpleHttpCommandCenter或者NettyHttpCommandCenter) 默认为SimpleHttpCommandCenter. @2 本文以SimpleHttpCommandCenter进行分析
//@1
public void beforeStart() throws Exception {
Map<String, CommandHandler> handlers = CommandHandlerProvider.getInstance().namedHandlers();
registerCommands(handlers);
}
//@2
public void start() throws Exception {
// ...
ServerSocket serverSocket = getServerSocketFromBasePort(port);
executor.submit(new ServerThread(serverSocket));
// ...
}
@1 SPI加载各个CommandHandler注册到缓存中,有众多Handler负责不同的职责,本文通过ModifyRulesCommandHandler分析流程。 @2 初始化ServerSocket,监听处理请求。
实现CommandHandler的Handler列表
com.alibaba.csp.sentinel.command.handler.BasicInfoCommandHandler
com.alibaba.csp.sentinel.command.handler.FetchActiveRuleCommandHandler
com.alibaba.csp.sentinel.command.handler.FetchClusterNodeByIdCommandHandler
com.alibaba.csp.sentinel.command.handler.FetchClusterNodeHumanCommandHandler
com.alibaba.csp.sentinel.command.handler.FetchJsonTreeCommandHandler
com.alibaba.csp.sentinel.command.handler.FetchOriginCommandHandler
com.alibaba.csp.sentinel.command.handler.FetchSimpleClusterNodeCommandHandler
com.alibaba.csp.sentinel.command.handler.FetchSystemStatusCommandHandler
com.alibaba.csp.sentinel.command.handler.FetchTreeCommandHandler
com.alibaba.csp.sentinel.command.handler.ModifyRulesCommandHandler
com.alibaba.csp.sentinel.command.handler.OnOffGetCommandHandler
com.alibaba.csp.sentinel.command.handler.OnOffSetCommandHandler
com.alibaba.csp.sentinel.command.handler.SendMetricCommandHandler
com.alibaba.csp.sentinel.command.handler.VersionCommandHandler
com.alibaba.csp.sentinel.command.handler.cluster.FetchClusterModeCommandHandler
com.alibaba.csp.sentinel.command.handler.cluster.ModifyClusterModeCommandHandler
com.alibaba.csp.sentinel.command.handler.ApiCommandHandler
public void run() {
while (true) {
Socket socket = null;
try {
socket = this.serverSocket.accept(); //@1
setSocketSoTimeout(socket);
HttpEventTask eventTask = new HttpEventTask(socket); //@2
bizExecutor.submit(eventTask);
} catch (Exception e) {
...
}
}
}
@1 接受连接到ServerSocket连接 @2 处理客户发来的请求数据
BufferedReader in = null;
PrintWriter printWriter = null;
try {
in = new BufferedReader(new InputStreamReader(socket.getInputStream(), SentinelConfig.charset()));
OutputStream outputStream = socket.getOutputStream();
printWriter = new PrintWriter(
new OutputStreamWriter(outputStream, Charset.forName(SentinelConfig.charset())));
String line = in.readLine();
CommandRequest request = parseRequest(line); //@1
CommandHandler<?> commandHandler = SimpleHttpCommandCenter.getHandler(commandName); //@2
if (commandHandler != null) {
CommandResponse<?> response = commandHandler.handle(request);
handleResponse(response, printWriter, outputStream);
}
} catch (Throwable e) {
...
}
@1 解析客户端请求并解析请求内容。 @2 根据commandName或者相应处理CommandHandler;例如:setRules对应ModifyRulesCommandHandler来处理。
if (FLOW_RULE_TYPE.equalsIgnoreCase(type)) {
List<FlowRule> flowRules = JSONArray.parseArray(data, FlowRule.class);
FlowRuleManager.loadRules(flowRules); //@1
if (!writeToDataSource(getFlowDataSource(), flowRules)) {
result = WRITE_DS_FAILURE_MSG;
}
return CommandResponse.ofSuccess(result);
}
@1 Sentinel客户端通过FlowRuleManager.loadRules(flowRules)将流控规则更新到缓存使其生效。
小结:本部分分析了Sentinel控制台新增流控规则以及推送到Sentinel客户端的详细过程。
机器列表是客户端定时(10秒)向控制台发送心跳来存储在内存中,机器列表从内存获取而显示;添加的流控规则同样存储在控制台和客户端的缓存中,在应用重启后将会消失。 该原始模式通常不会应用于生产环境,在生产环境中更多使用Push模式,如图:
通过控制台将规则写入到配置中心,客户端监听规则变化,更新到本地缓存时规则生效。