前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Sentinel控制台规则推送与机器列表【源码笔记】

Sentinel控制台规则推送与机器列表【源码笔记】

作者头像
瓜农老梁
发布2019-10-16 13:43:13
1.4K0
发布2019-10-16 13:43:13
举报
文章被收录于专栏:瓜农老梁瓜农老梁
一、问题思考

1.客户端是如何将机器上报到控制台的?(即控制台的机器列表)

2.控制台添加规则后如何通知客户端并使规则生效的?

二、交互示意图

注:客户端通过心跳将机器IP端口等信息上报给控制台;控制台新增规则后将规则推送到Sentinel客户端;控制台与客户端的通信通过sentinel-transport模块来实现。

三、心跳交互
1.客户端心跳上报
代码位置:InitExecutor#doInit()
代码语言:javascript
复制
ServiceLoader<InitFunc> loader = ServiceLoader.load(InitFunc.class); //@1
List<OrderWrapper> initList = new ArrayList<OrderWrapper>();
// ...
for (OrderWrapper w : initList) {
    w.func.init(); //@2
}

@1 SPI获取InitFunc实现类,默认配置两个实现类CommandCenterInitFunc和HeartbeatSenderInitFunc。 @2 CommandCenterInitFunc响应客户端请求,由Handler处理具体逻辑;HeartbeatSenderInitFunc负责心跳发送,具体由SimpleHttpHeartbeatSender或者HttpHeartbeatSender负责。

代码位置:HeartbeatSenderInitFunc#init()
代码语言:javascript
复制
//@1
HeartbeatSender sender = HeartbeatSenderProvider.getHeartbeatSender();
initSchedulerIfNeeded();//@2
//@3
long interval = retrieveInterval(sender);
setIntervalIfNotExists(interval);
scheduleHeartbeatTask(sender, interval); //@4

@1 SPI加载SimpleHttpHeartbeatSender或者HttpHeartbeatSender,本文以SimpleHttpHeartbeatSender实现来分析心跳上报逻辑。 @2 初始化定时任务线程池ScheduledThreadPoolExecutor。 @3 获取心跳时间默认10秒(可自定义)以及设置心跳时间。 @4 发送心跳定时任务

代码位置:SimpleHttpHeartbeatSender#sendHeartbeat()
代码语言:javascript
复制
InetSocketAddress addr = getAvailableAddress(); //@1
SimpleHttpRequest request = new SimpleHttpRequest(addr, HEARTBEAT_PATH); //@2
request.setParams(heartBeat.generateCurrentMessage()); //@3
SimpleHttpResponse response = httpClient.post(request); //@4

@1 获取控制台地址,在客户端启动时可以通过-Dcsp.sentinel.dashboard.server=127.0.0.1:8080传入 @2 心跳请求路径HEARTBEAT_PATH为/registry/machine @3 组织心跳信息 @4 通过HTTP请求向控制台发送心跳数据

2.心跳信息内容
代码语言:javascript
复制
"app" -> "melon-demo"
"hostname" -> "yongliangdembp"
"app_type" -> "0"
"port" -> "8719"
"v" -> "1.6.3"
"ip" -> "192.168.1.4"
"version" -> "1570936907910"
3.控制台处理心跳请求

代码位置:MachineRegistryController#receiveHeartBeat

代码语言:javascript
复制
MachineInfo machineInfo = new MachineInfo();
machineInfo.setApp(app);
machineInfo.setAppType(appType);
machineInfo.setHostname(hostname);
machineInfo.setIp(ip);
machineInfo.setPort(port);
machineInfo.setHeartbeatVersion(version);
machineInfo.setLastHeartbeat(System.currentTimeMillis());
machineInfo.setVersion(sentinelVersion);
appManagement.addMachine(machineInfo);
return Result.ofSuccessMsg("success");

控制台处理心跳逻辑比较简单,接收到请求后将信息存储在ConcurrentHashMap缓存中;控制台重启后信息将丢失。

四、规则推送
1.控制台新增规则
代码位置:FlowControllerV1#apiAddFlowRule
代码语言:javascript
复制
entity = repository.save(entity); //@1
if (!publishRules(entity.getApp(), entity.getIp(), entity.getPort())) {
    logger.error("Publish flow rules failed after rule add");
} // @2
return Result.ofSuccess(entity);

@1 通过规则新增请求/v1/flow/rule定位到该代码,规则保存在缓存ConcurrentHashMap中(allRules、machineRules、appRules) @2 通过publishRules来向客户端发布规则

代码位置:SentinelApiClient#setRules
代码语言:javascript
复制
String data = JSON.toJSONString(entities.stream().map(r -> r.toRule()).collect(Collectors.toList()));
Map<String, String> params = new HashMap<>(2);
params.put("type", type);
params.put("data", data); @1
String result = executeCommand(app, ip, port, SET_RULES_PATH, params, true).get(); //@2

@1 params流控规则组装内容格式如下

代码语言:javascript
复制
type -> flow
data -> [{"clusterConfig":{"fallbackToLocalWhenFail":true,"sampleCount":10,"strategy":0,"thresholdType":0,"windowIntervalMs":1000},"clusterMode":false,"controlBehavior":0,"count":2.0,"grade":1,"limitApp":"default","maxQueueingTimeMs":500,"resource":"melonSentinel","strategy":0,"warmUpPeriodSec":10}]

@2 数据组装与执行 app应用名称。例如:melon-demo SET_RULES_PATH客户端的请求路径。固定为setRules ip为Sentinel客户端地址。例如:192.168.1.4 port为客户端启动时设定的端口。例如:8719

代码位置:SentinelApiClien#executeCommand
代码语言:javascript
复制
CloseableHttpAsyncClient httpClient;
httpClient.execute(request, new FutureCallback<HttpResponse>() {
    @Override
    public void completed(final HttpResponse response) {
        int statusCode = response.getStatusLine().getStatusCode();
        try {
            String value = getBody(response);
            // ...
       } catch (Exception ex) {
           // ...
        }
    }
}); //@1

@1 控制台通过httpClient将流控规则发送到客户端。

2.客户端处理推送规则

客户端通过InitExecutor#doInit初始化,CommandCenterInitFunc响应客户端请求,由Handler处理具体逻辑。

代码位置:CommandCenterInitFunc#init()
代码语言:javascript
复制
//@1
CommandCenter commandCenter = CommandCenterProvider.getCommandCenter();
//@2
commandCenter.beforeStart();
commandCenter.start();

@1 SPI获取CommandCenter(SimpleHttpCommandCenter或者NettyHttpCommandCenter) 默认为SimpleHttpCommandCenter. @2 本文以SimpleHttpCommandCenter进行分析

代码位置:SimpleHttpCommandCenter
代码语言:javascript
复制
//@1
public void beforeStart() throws Exception {
    Map<String, CommandHandler> handlers = CommandHandlerProvider.getInstance().namedHandlers();
    registerCommands(handlers);
}
//@2
public void start() throws Exception {
    // ...
    ServerSocket serverSocket = getServerSocketFromBasePort(port);
    executor.submit(new ServerThread(serverSocket));
    // ...
}

@1 SPI加载各个CommandHandler注册到缓存中,有众多Handler负责不同的职责,本文通过ModifyRulesCommandHandler分析流程。 @2 初始化ServerSocket,监听处理请求。

实现CommandHandler的Handler列表

代码语言:javascript
复制
com.alibaba.csp.sentinel.command.handler.BasicInfoCommandHandler
com.alibaba.csp.sentinel.command.handler.FetchActiveRuleCommandHandler
com.alibaba.csp.sentinel.command.handler.FetchClusterNodeByIdCommandHandler
com.alibaba.csp.sentinel.command.handler.FetchClusterNodeHumanCommandHandler
com.alibaba.csp.sentinel.command.handler.FetchJsonTreeCommandHandler
com.alibaba.csp.sentinel.command.handler.FetchOriginCommandHandler
com.alibaba.csp.sentinel.command.handler.FetchSimpleClusterNodeCommandHandler
com.alibaba.csp.sentinel.command.handler.FetchSystemStatusCommandHandler
com.alibaba.csp.sentinel.command.handler.FetchTreeCommandHandler
com.alibaba.csp.sentinel.command.handler.ModifyRulesCommandHandler
com.alibaba.csp.sentinel.command.handler.OnOffGetCommandHandler
com.alibaba.csp.sentinel.command.handler.OnOffSetCommandHandler
com.alibaba.csp.sentinel.command.handler.SendMetricCommandHandler
com.alibaba.csp.sentinel.command.handler.VersionCommandHandler
com.alibaba.csp.sentinel.command.handler.cluster.FetchClusterModeCommandHandler
com.alibaba.csp.sentinel.command.handler.cluster.ModifyClusterModeCommandHandler
com.alibaba.csp.sentinel.command.handler.ApiCommandHandler
代码位置:SimpleHttpCommandCenter#ServerThread
代码语言:javascript
复制
public void run() {
    while (true) {
        Socket socket = null;
        try {
            socket = this.serverSocket.accept(); //@1
            setSocketSoTimeout(socket);
            HttpEventTask eventTask = new HttpEventTask(socket); //@2
            bizExecutor.submit(eventTask);
        } catch (Exception e) {
            ...
        }
    }
    }

@1 接受连接到ServerSocket连接 @2 处理客户发来的请求数据

代码位置:HttpEventTask#run()
代码语言:javascript
复制
BufferedReader in = null;
    PrintWriter printWriter = null;
    try {
        in = new BufferedReader(new InputStreamReader(socket.getInputStream(), SentinelConfig.charset()));
        OutputStream outputStream = socket.getOutputStream();
        printWriter = new PrintWriter(
            new OutputStreamWriter(outputStream, Charset.forName(SentinelConfig.charset())));
        String line = in.readLine();
        CommandRequest request = parseRequest(line); //@1
        CommandHandler<?> commandHandler = SimpleHttpCommandCenter.getHandler(commandName); //@2
        if (commandHandler != null) {
            CommandResponse<?> response = commandHandler.handle(request);
            handleResponse(response, printWriter, outputStream);
        }
    } catch (Throwable e) {
        ...
    }

@1 解析客户端请求并解析请求内容。 @2 根据commandName或者相应处理CommandHandler;例如:setRules对应ModifyRulesCommandHandler来处理。

代码位置:ModifyRulesCommandHandler#handle
代码语言:javascript
复制
if (FLOW_RULE_TYPE.equalsIgnoreCase(type)) {
    List<FlowRule> flowRules = JSONArray.parseArray(data, FlowRule.class);
    FlowRuleManager.loadRules(flowRules); //@1
    if (!writeToDataSource(getFlowDataSource(), flowRules)) {
        result = WRITE_DS_FAILURE_MSG;
    }
    return CommandResponse.ofSuccess(result);
}

@1 Sentinel客户端通过FlowRuleManager.loadRules(flowRules)将流控规则更新到缓存使其生效。

小结:本部分分析了Sentinel控制台新增流控规则以及推送到Sentinel客户端的详细过程。

五、实际使用

机器列表是客户端定时(10秒)向控制台发送心跳来存储在内存中,机器列表从内存获取而显示;添加的流控规则同样存储在控制台和客户端的缓存中,在应用重启后将会消失。 该原始模式通常不会应用于生产环境,在生产环境中更多使用Push模式,如图:

通过控制台将规则写入到配置中心,客户端监听规则变化,更新到本地缓存时规则生效。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-10-13,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 瓜农老梁 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、问题思考
  • 二、交互示意图
  • 三、心跳交互
  • 1.客户端心跳上报
  • 代码位置:InitExecutor#doInit()
  • 代码位置:HeartbeatSenderInitFunc#init()
  • 代码位置:SimpleHttpHeartbeatSender#sendHeartbeat()
  • 2.心跳信息内容
  • 3.控制台处理心跳请求
  • 四、规则推送
  • 1.控制台新增规则
  • 代码位置:FlowControllerV1#apiAddFlowRule
  • 代码位置:SentinelApiClient#setRules
  • 代码位置:SentinelApiClien#executeCommand
  • 2.客户端处理推送规则
  • 代码位置:CommandCenterInitFunc#init()
  • 代码位置:SimpleHttpCommandCenter
  • 代码位置:SimpleHttpCommandCenter#ServerThread
  • 代码位置:HttpEventTask#run()
  • 代码位置:ModifyRulesCommandHandler#handle
  • 五、实际使用
相关产品与服务
对象存储
对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档