前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Sentinel控制台实时监控【源码笔记

Sentinel控制台实时监控【源码笔记

作者头像
瓜农老梁
发布2019-10-24 15:44:58
2.6K0
发布2019-10-24 15:44:58
举报
文章被收录于专栏:瓜农老梁瓜农老梁
一、实时监控流程

在展开之前先画了一个整体的流程图,如下:

二、控制台实时监控

从控制台监控来看,可以看出时间戳、通过QPS、拒绝的QPS、响应时间以及相应的波动曲线,接下来分析这些数据怎么来的?

1.前端定时请求

实时数据由客户端js触发,每隔10秒钟向后台触发请求获取监控数据。

http://localhost:8080/metric/queryTopResourceMetric.json?app=sentinel-aop-demo&desc=true&pageIndex=1&pageSize=6
2.实时监控处理流程

代码坐标:MetricController#queryTopResourceMetric

if (endTime == null) {
    endTime = System.currentTimeMillis();
}
if (startTime == null) {
    startTime = endTime - 1000 * 60 * 5;
}
if (endTime - startTime > maxQueryIntervalMs) {
    return Result.ofFail(-1, "time intervalMs is too big, must <= 1h");
} // @1
List<String> resources = metricStore.listResourcesOfApp(app); // @2
for (final String resource : topResource) {
    List<MetricEntity> entities = metricStore.queryByAppAndResourceBetween(
        app, resource, startTime, endTime); // @3
    List<MetricVo> vos = MetricVo.fromMetricEntities(entities, resource);
    Iterable<MetricVo> vosSorted = sortMetricVoAndDistinct(vos);
    map.put(resource, vosSorted);
}
...// @4

@1 maxQueryIntervalMs为1小时,即查询的时间间隔不能大于1小时。 @2 获取监控数据 @3 监控数据与时间范围进行匹配 @4 数据分页后返回结果(省略)

小结:查询监控的时间范围不能超过1小时,先后去监控数据,然后筛选匹配查询时间范围的,最后将结果返回。

3.实时监控数据获取

代码坐标:InMemoryMetricsRepository#listResourcesOfApp

Map<String, ConcurrentLinkedHashMap<Long, MetricEntity>> resourceMap = allMetrics.get(app); // @1
final long minTimeMs = System.currentTimeMillis() - 1000 * 60;
Map<String, MetricEntity> resourceCount = new ConcurrentHashMap<>(32);
for (Entry<String, ConcurrentLinkedHashMap<Long, MetricEntity>> resourceMetrics : resourceMap.entrySet()) {
for (Entry<Long, MetricEntity> metrics : resourceMetrics.getValue().entrySet()) {
    if (metrics.getKey() < minTimeMs) {
        continue;
    } // @2
    MetricEntity newEntity = metrics.getValue();
    if (resourceCount.containsKey(resourceMetrics.getKey())) {
        MetricEntity oldEntity = resourceCount.get(resourceMetrics.getKey());
        oldEntity.addPassQps(newEntity.getPassQps());
        oldEntity.addRtAndSuccessQps(newEntity.getRt(), newEntity.getSuccessQps());
        oldEntity.addBlockQps(newEntity.getBlockQps()); // @3
        oldEntity.addExceptionQps(newEntity.getExceptionQps());
        oldEntity.addCount(1);
    } else {
        resourceCount.put(resourceMetrics.getKey(), MetricEntity.copyOf(newEntity));
    }
}
}

@1 从内存allMetrics中获取监控数据

allMetrics数据结构

Map<String/*AppId*/, Map<String/*资源名称*/, ConcurrentLinkedHashMap<Long/*时间戳*/, MetricEntity/*统计信息*/>>>allMetrics = new ConcurrentHashMap<>();

allMetrics运行时截图

MetricEntity统计信息示例

{id=null, gmtCreate=Sun Oct 20 11:13:46 CST 2019, gmtModified=Sun Oct 20 11:13:46 CST 2019, app='sentinel-aop-demo', timestamp=Sun Oct 20 11:13:39 CST 2019, resource='test', passQps=1, blockQps=0, successQps=1, exceptionQps=0, rt=2.0, count=1, resourceCode=3556498}

@2 统计1秒内的数据,不在一秒内(minTimeMs)则不处理 @3 对1秒内的数据进行累计计算

小结:统计信息从allMetrics获取,并对1秒内的数据进行累计统计;那allMetric中的数据又是如何来的呢?

4.缓存(allMetrics)中的监控数据

调用链 代码坐标:MetricFetcher

MetricFetcher(){
    ...
    start()
}
private void start() {
    fetchScheduleService.scheduleAtFixedRate(() -> {
        try {
            fetchAllApp();
        } catch (Exception e) {
            logger.info("fetchAllApp error:", e);
        }
    }, 10, intervalSecond, TimeUnit.SECONDS); // @1
}
// 监控获取调用链
fetchAllApp->doFetchAppMetric->fetchOnce
Set<MachineInfo> machines = appInfo.getMachines();
for (final MachineInfo machine : machines) { // @2
    final String url = "http://" + machine.getIp() + ":" + machine.getPort() + "/" + METRIC_URL_PATH
                + "?startTime=" + startTime + "&endTime=" + endTime + "&refetch=" + false;
   final HttpGet httpGet = new HttpGet(url);
   httpGet.setHeader(HTTP.CONN_DIRECTIVE, HTTP.CONN_CLOSE);
   httpclient.execute(httpGet, new FutureCallback<HttpResponse>(){
    @Override
    public void completed(final HttpResponse response) {
        try {
            handleResponse(response, machine, metricMap); // @3
            success.incrementAndGet();
        } catch (Exception e) {
            logger.error(msg + " metric " + url + " error:", e);
        } finally {
            latch.countDown();
        }
    }
   }
}
// 结果处理调用链条
handleResponse->handleBody
entity.addPassQps(node.getPassQps()); // @4
entity.addBlockQps(node.getBlockQps());
entity.addRtAndSuccessQps(node.getRt(), node.getSuccessQps());
entity.addExceptionQps(node.getExceptionQps());
entity.addCount(1);

@1 服务启动时,定时任务每秒获取一次监控数据 @2 循环AppId的每个节点并向其发送http请求获取监控数据;appId及机器节点信息来自心跳数据 @3 会请求结果进行处理 @4 累计统计信息包括appId对应所有节点

小结:控制台向该appId的每台机器发送http请求获取监控数据,并将每台获取的数据进行累加计算。控制台实际调用客户端暴露服务接口,并将各个节点数据汇总。

客户端监控接口如下(详见:Sentinel动态规则API模式命令【实战笔记】

http://192.168.1.4:8720/metric?startTime=1571575945000&endTime=1571575951000&refetch=false 备注:get and aggregate metrics, accept param: startTime={startTime}&endTime={endTime}&maxLines={maxLines}&identify={resourceName}

1571575945000|test|5|0|5|0|0|0
1571575945000|hello|5|0|5|0|0|0
1571575964000|__system_load__|20000|0|0|0|0|0
1571575964000|__cpu_usage__|1592|0|0|0|0|0
三、客户端监控数据
1.监控日志落盘
1.1定时任务记录日志

代码坐标:FlowRuleManager#static

SCHEDULER.scheduleAtFixedRate(new MetricTimerListener(), 0, 1, TimeUnit.SECONDS); // @1

@1 定时任务在FlowRuleManager初始化时启动,每秒钟记录一次

代码坐标:MetricTimerListener#run

public void run() {
Map<Long, List<MetricNode>> maps = new TreeMap<Long, List<MetricNode>>();
// @1
for (Entry<ResourceWrapper, ClusterNode> e : ClusterBuilderSlot.getClusterNodeMap().entrySet()) {
    String name = e.getKey().getName();
    ClusterNode node = e.getValue();
    Map<Long, MetricNode> metrics = node.metrics();
    aggregate(maps, metrics, name);
}
aggregate(maps, Constants.ENTRY_NODE.metrics(), Constants.TOTAL_IN_RESOURCE_NAME);
if (!maps.isEmpty()) {
    for (Entry<Long, List<MetricNode>> entry : maps.entrySet()) {
        try {
            // @2
            metricWriter.write(entry.getKey(), entry.getValue());
        } catch (Exception e) {
            RecordLog.warn("[MetricTimerListener] Write metric error", e);
        }
    }
}
}

@1 从ClusterBuilderSlot获取资源对应的ClusterNode统计信息 @2 将统计信息记录到日志文件

1.2.记录metric索引和日志文件

代码坐标:MetricWriter#write

// @1
for (MetricNode node : nodes) {
    node.setTimestamp(time);
}
// @2
if (curMetricFile == null) {
    baseFileName = formMetricFileName(appName, pid);
    closeAndNewFile(nextFileNameOfDay(time));
}
// @3
if (!(curMetricFile.exists() && curMetricIndexFile.exists())) {
    closeAndNewFile(nextFileNameOfDay(time));
}
// @4
long second = time / 1000;
// ...
// @5
writeIndex(second, outMetric.getChannel().position());
// @6
for (MetricNode node : nodes) {
    outMetricBuf.write(node.toFatString().getBytes(CHARSET));
}
outMetricBuf.flush();
// @7
if (!validSize()) {
    closeAndNewFile(nextFileNameOfDay(time));
}
// @8
lastSecond = second;

@1 如果传入了时间戳所有的nodes都是time @2 第一次创建日志文件;日志目录可以通过csp.sentinel.log.dir参数设置; 默认日志目录为:user.home/logs/csp 流控文件名称为:appName-metrics.log.yyyy-MM-dd.n(n每创建一个日志文件会递增) 例如:sentinel-aop-demo-metrics.log.2019-10-20.2 可以通过logNameUsePid将pid加入到日志文件名中,默认false @3 日志文件和索引文件都不存在先创建 索引文件名称为:流控文件名称.idx 例如:sentinel-aop-demo-metrics.log.2019-10-20.2.idx @4 时间戳去掉秒数 @5 记录日志索引 索引文件先写入8个字节的时间戳;再写入8个字节的offset即metric与该时间戳对应的日志写入位置 @6 记录监控日志; 日志格式为:1571565694000|2019-10-20 18:01:34|hello|3|0|3|0|0|0 @7 校验metric文件大小默认为50M;超过后创建新文件 @8 内存中记录去除秒数的时间戳;也是写入索引的时间戳,据此可以获取对应metric文件位置(offset)

小结:定时任务在FlowRuleManager初始化时启动,每秒钟执行一次;从ClusterBuilderSlot获取资源对应的ClusterNode统计信息,记录metric索引文件和metric日志文件;缓存中记录上次写入的时间戳。

2.处理控制台请求

代码坐标:SendMetricCommandHandler#handle

if (StringUtil.isNotBlank(endTimeStr)) {
    long endTime = Long.parseLong(endTimeStr); 
    // @1
    list = searcher.findByTimeAndResource(startTime, endTime, identity); 
} else {
    if (StringUtil.isNotBlank(maxLinesStr)) {
        maxLines = Integer.parseInt(maxLinesStr);
    }
    maxLines = Math.min(maxLines, 12000);
    // @2
    list = searcher.find(startTime, maxLines); 
}

@1 根据时间范围和资源名称查找监控日志 @2 如果没有时间戳,默认查找最大12000条数据

小结:处理请求的模块在通信模块sentinel-transport;具体负责检索的逻辑在sentinel-core模块MetricSearcher类。

3.检索监控日志

代码坐标:MetricSearcher#findByTimeAndResource

// @1
List<String> fileNames = MetricWriter.listMetricFiles(baseDir, baseFileName);
int i = 0;
// @2
long offsetInIndex = 0;
if (validPosition(beginTimeMs)) {
    i = fileNames.indexOf(lastPosition.metricFileName);
    if (i == -1) {
        i = 0;
    } else {
        offsetInIndex = lastPosition.offsetInIndex;
    }
} else {
    ...
}
for (; i < fileNames.size(); i++) {
    String fileName = fileNames.get(i);
    // @3
    long offset = findOffset(beginTimeMs, fileName,
            MetricWriter.formIndexFileName(fileName), offsetInIndex);
    offsetInIndex = 0;
    if (offset != -1) {
        // @4
        return metricsReader.readMetricsByEndTime(fileNames, i, offset, beginTimeMs, endTimeMs, identity);
    }
}

@1 读取日志目录下的日志文件;日志目录见:记录metric索引和日志文件 @2 offsetInIndex上次读取索引文件的位置;初始值为0即从头开始读取 @3 根据时间戳、上次索引文件读取位置offsetInIndex在索引文件中查找offset;即日志文件的读取位置 @4 根据offset在日志文件中读取监控信息

3.1从索引文件中查找offset

代码坐标:MetricSearcher#findOffset

lastPosition.metricFileName = null;
lastPosition.indexFileName = null;
if (!new File(idxFileName).exists()) {
    return -1;
}
// @1
long beginSecond = beginTime / 1000;
FileInputStream in = new FileInputStream(idxFileName);
// @2
in.getChannel().position(offsetInIndex);
DataInputStream indexIn = new DataInputStream(in);
long offset;
try {
    long second;
    lastPosition.offsetInIndex = in.getChannel().position();
    // @3
    while ((second = indexIn.readLong()) < beginSecond) {
        // @4
        offset = indexIn.readLong();
        // @5
        lastPosition.offsetInIndex = in.getChannel().position();
    }
    // @6
    offset = indexIn.readLong();
    // @7
    lastPosition.metricFileName = metricFileName;
    // @8
    lastPosition.indexFileName = idxFileName;
    // @9
    lastPosition.second = second;
    return offset;
} catch (EOFException ignore) {
    return -1;
} finally {
    indexIn.close();
}

@1 去掉秒数 @2 定位到上次读取索引文件的位置 @3 先读取8个字节的时间戳;比较一直到大于等于待查找的时间戳为止 @4 更新metric日志文件偏移量 @5 更新内存索引文件偏移量 @6 读取metric文件偏移量 @7 metric文件名称 @8 索引文件名称 @9 上次读取的索引文件的时间戳

3.2根据offset读取监控信息

代码坐标:MetricsReader#readMetricsByEndTime

List<MetricNode> list = new ArrayList<MetricNode>(1024);
if (readMetricsInOneFileByEndTime(list, fileNames.get(pos++), offset, beginTimeMs, endTimeMs, identity)) { // @1
    while (pos < fileNames.size()
        &&
        readMetricsInOneFileByEndTime(list, fileNames.get(pos++), 0, beginTimeMs, endTimeMs, identity)) { // @2
    }
}
return list;

@1 校验是否应该继续读取文件内容 @2 读取具体文件内容;如果制定identity(具体的resource)则只读取该resource的监控信息;反之全部读取

3.3监控日志读取逻辑
in = new FileInputStream(fileName);
// @1
in.getChannel().position(offset);
BufferedReader reader = new BufferedReader(new InputStreamReader(in, charset));
String line;
while ((line = reader.readLine()) != null) {
    MetricNode node = MetricNode.fromFatString(line);
    // @2
    long currentSecond = node.getTimestamp() / 1000;
    // @3
    if (currentSecond < beginSecond) {
        return false;
    }
    if (currentSecond <= endSecond) {
        if (identity == null) {
            list.add(node);
        } else if (node.getResource().equals(identity)) {
            list.add(node); // @4
        }
    } else {
        return false;
    }
    if (list.size() >= MAX_LINES_RETURN) { // @5
        return false;
    }
}

@1 定位到监控日志读取位置 @2 解析监控统计的时间戳 @3 监控统计日志的时间戳需要在待查询时间范围内 @4 指定resource则只收集该resource的统计信息 @5 读取数据不得超过MAX_LINES_RETURN(10万)条

小结:监控日志检索先根据去掉秒数的时间戳从索引文件中读取offset即日志文件的偏移量;再根据offset检索日志文件并将结果返回。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-10-23,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 瓜农老梁 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、实时监控流程
  • 二、控制台实时监控
  • 1.前端定时请求
  • 2.实时监控处理流程
  • 3.实时监控数据获取
  • 三、客户端监控数据
  • 1.监控日志落盘
  • 1.1定时任务记录日志
  • 1.2.记录metric索引和日志文件
  • 3.检索监控日志
  • 3.1从索引文件中查找offset
  • 3.2根据offset读取监控信息
  • 3.3监控日志读取逻辑
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档