专栏首页瓜农老梁Sentinel控制台实时监控【源码笔记

Sentinel控制台实时监控【源码笔记

一、实时监控流程

在展开之前先画了一个整体的流程图,如下:

二、控制台实时监控

从控制台监控来看,可以看出时间戳、通过QPS、拒绝的QPS、响应时间以及相应的波动曲线,接下来分析这些数据怎么来的?

1.前端定时请求

实时数据由客户端js触发,每隔10秒钟向后台触发请求获取监控数据。

http://localhost:8080/metric/queryTopResourceMetric.json?app=sentinel-aop-demo&desc=true&pageIndex=1&pageSize=6
2.实时监控处理流程

代码坐标:MetricController#queryTopResourceMetric

if (endTime == null) {
    endTime = System.currentTimeMillis();
}
if (startTime == null) {
    startTime = endTime - 1000 * 60 * 5;
}
if (endTime - startTime > maxQueryIntervalMs) {
    return Result.ofFail(-1, "time intervalMs is too big, must <= 1h");
} // @1
List<String> resources = metricStore.listResourcesOfApp(app); // @2
for (final String resource : topResource) {
    List<MetricEntity> entities = metricStore.queryByAppAndResourceBetween(
        app, resource, startTime, endTime); // @3
    List<MetricVo> vos = MetricVo.fromMetricEntities(entities, resource);
    Iterable<MetricVo> vosSorted = sortMetricVoAndDistinct(vos);
    map.put(resource, vosSorted);
}
...// @4

@1 maxQueryIntervalMs为1小时,即查询的时间间隔不能大于1小时。 @2 获取监控数据 @3 监控数据与时间范围进行匹配 @4 数据分页后返回结果(省略)

小结:查询监控的时间范围不能超过1小时,先后去监控数据,然后筛选匹配查询时间范围的,最后将结果返回。

3.实时监控数据获取

代码坐标:InMemoryMetricsRepository#listResourcesOfApp

Map<String, ConcurrentLinkedHashMap<Long, MetricEntity>> resourceMap = allMetrics.get(app); // @1
final long minTimeMs = System.currentTimeMillis() - 1000 * 60;
Map<String, MetricEntity> resourceCount = new ConcurrentHashMap<>(32);
for (Entry<String, ConcurrentLinkedHashMap<Long, MetricEntity>> resourceMetrics : resourceMap.entrySet()) {
for (Entry<Long, MetricEntity> metrics : resourceMetrics.getValue().entrySet()) {
    if (metrics.getKey() < minTimeMs) {
        continue;
    } // @2
    MetricEntity newEntity = metrics.getValue();
    if (resourceCount.containsKey(resourceMetrics.getKey())) {
        MetricEntity oldEntity = resourceCount.get(resourceMetrics.getKey());
        oldEntity.addPassQps(newEntity.getPassQps());
        oldEntity.addRtAndSuccessQps(newEntity.getRt(), newEntity.getSuccessQps());
        oldEntity.addBlockQps(newEntity.getBlockQps()); // @3
        oldEntity.addExceptionQps(newEntity.getExceptionQps());
        oldEntity.addCount(1);
    } else {
        resourceCount.put(resourceMetrics.getKey(), MetricEntity.copyOf(newEntity));
    }
}
}

@1 从内存allMetrics中获取监控数据

allMetrics数据结构

Map<String/*AppId*/, Map<String/*资源名称*/, ConcurrentLinkedHashMap<Long/*时间戳*/, MetricEntity/*统计信息*/>>>allMetrics = new ConcurrentHashMap<>();

allMetrics运行时截图

MetricEntity统计信息示例

{id=null, gmtCreate=Sun Oct 20 11:13:46 CST 2019, gmtModified=Sun Oct 20 11:13:46 CST 2019, app='sentinel-aop-demo', timestamp=Sun Oct 20 11:13:39 CST 2019, resource='test', passQps=1, blockQps=0, successQps=1, exceptionQps=0, rt=2.0, count=1, resourceCode=3556498}

@2 统计1秒内的数据,不在一秒内(minTimeMs)则不处理 @3 对1秒内的数据进行累计计算

小结:统计信息从allMetrics获取,并对1秒内的数据进行累计统计;那allMetric中的数据又是如何来的呢?

4.缓存(allMetrics)中的监控数据

调用链 代码坐标:MetricFetcher

MetricFetcher(){
    ...
    start()
}
private void start() {
    fetchScheduleService.scheduleAtFixedRate(() -> {
        try {
            fetchAllApp();
        } catch (Exception e) {
            logger.info("fetchAllApp error:", e);
        }
    }, 10, intervalSecond, TimeUnit.SECONDS); // @1
}
// 监控获取调用链
fetchAllApp->doFetchAppMetric->fetchOnce
Set<MachineInfo> machines = appInfo.getMachines();
for (final MachineInfo machine : machines) { // @2
    final String url = "http://" + machine.getIp() + ":" + machine.getPort() + "/" + METRIC_URL_PATH
                + "?startTime=" + startTime + "&endTime=" + endTime + "&refetch=" + false;
   final HttpGet httpGet = new HttpGet(url);
   httpGet.setHeader(HTTP.CONN_DIRECTIVE, HTTP.CONN_CLOSE);
   httpclient.execute(httpGet, new FutureCallback<HttpResponse>(){
    @Override
    public void completed(final HttpResponse response) {
        try {
            handleResponse(response, machine, metricMap); // @3
            success.incrementAndGet();
        } catch (Exception e) {
            logger.error(msg + " metric " + url + " error:", e);
        } finally {
            latch.countDown();
        }
    }
   }
}
// 结果处理调用链条
handleResponse->handleBody
entity.addPassQps(node.getPassQps()); // @4
entity.addBlockQps(node.getBlockQps());
entity.addRtAndSuccessQps(node.getRt(), node.getSuccessQps());
entity.addExceptionQps(node.getExceptionQps());
entity.addCount(1);

@1 服务启动时,定时任务每秒获取一次监控数据 @2 循环AppId的每个节点并向其发送http请求获取监控数据;appId及机器节点信息来自心跳数据 @3 会请求结果进行处理 @4 累计统计信息包括appId对应所有节点

小结:控制台向该appId的每台机器发送http请求获取监控数据,并将每台获取的数据进行累加计算。控制台实际调用客户端暴露服务接口,并将各个节点数据汇总。

客户端监控接口如下(详见:Sentinel动态规则API模式命令【实战笔记】

http://192.168.1.4:8720/metric?startTime=1571575945000&endTime=1571575951000&refetch=false 备注:get and aggregate metrics, accept param: startTime={startTime}&endTime={endTime}&maxLines={maxLines}&identify={resourceName}

1571575945000|test|5|0|5|0|0|0
1571575945000|hello|5|0|5|0|0|0
1571575964000|__system_load__|20000|0|0|0|0|0
1571575964000|__cpu_usage__|1592|0|0|0|0|0
三、客户端监控数据
1.监控日志落盘
1.1定时任务记录日志

代码坐标:FlowRuleManager#static

SCHEDULER.scheduleAtFixedRate(new MetricTimerListener(), 0, 1, TimeUnit.SECONDS); // @1

@1 定时任务在FlowRuleManager初始化时启动,每秒钟记录一次

代码坐标:MetricTimerListener#run

public void run() {
Map<Long, List<MetricNode>> maps = new TreeMap<Long, List<MetricNode>>();
// @1
for (Entry<ResourceWrapper, ClusterNode> e : ClusterBuilderSlot.getClusterNodeMap().entrySet()) {
    String name = e.getKey().getName();
    ClusterNode node = e.getValue();
    Map<Long, MetricNode> metrics = node.metrics();
    aggregate(maps, metrics, name);
}
aggregate(maps, Constants.ENTRY_NODE.metrics(), Constants.TOTAL_IN_RESOURCE_NAME);
if (!maps.isEmpty()) {
    for (Entry<Long, List<MetricNode>> entry : maps.entrySet()) {
        try {
            // @2
            metricWriter.write(entry.getKey(), entry.getValue());
        } catch (Exception e) {
            RecordLog.warn("[MetricTimerListener] Write metric error", e);
        }
    }
}
}

@1 从ClusterBuilderSlot获取资源对应的ClusterNode统计信息 @2 将统计信息记录到日志文件

1.2.记录metric索引和日志文件

代码坐标:MetricWriter#write

// @1
for (MetricNode node : nodes) {
    node.setTimestamp(time);
}
// @2
if (curMetricFile == null) {
    baseFileName = formMetricFileName(appName, pid);
    closeAndNewFile(nextFileNameOfDay(time));
}
// @3
if (!(curMetricFile.exists() && curMetricIndexFile.exists())) {
    closeAndNewFile(nextFileNameOfDay(time));
}
// @4
long second = time / 1000;
// ...
// @5
writeIndex(second, outMetric.getChannel().position());
// @6
for (MetricNode node : nodes) {
    outMetricBuf.write(node.toFatString().getBytes(CHARSET));
}
outMetricBuf.flush();
// @7
if (!validSize()) {
    closeAndNewFile(nextFileNameOfDay(time));
}
// @8
lastSecond = second;

@1 如果传入了时间戳所有的nodes都是time @2 第一次创建日志文件;日志目录可以通过csp.sentinel.log.dir参数设置; 默认日志目录为:user.home/logs/csp 流控文件名称为:appName-metrics.log.yyyy-MM-dd.n(n每创建一个日志文件会递增) 例如:sentinel-aop-demo-metrics.log.2019-10-20.2 可以通过logNameUsePid将pid加入到日志文件名中,默认false @3 日志文件和索引文件都不存在先创建 索引文件名称为:流控文件名称.idx 例如:sentinel-aop-demo-metrics.log.2019-10-20.2.idx @4 时间戳去掉秒数 @5 记录日志索引 索引文件先写入8个字节的时间戳;再写入8个字节的offset即metric与该时间戳对应的日志写入位置 @6 记录监控日志; 日志格式为:1571565694000|2019-10-20 18:01:34|hello|3|0|3|0|0|0 @7 校验metric文件大小默认为50M;超过后创建新文件 @8 内存中记录去除秒数的时间戳;也是写入索引的时间戳,据此可以获取对应metric文件位置(offset)

小结:定时任务在FlowRuleManager初始化时启动,每秒钟执行一次;从ClusterBuilderSlot获取资源对应的ClusterNode统计信息,记录metric索引文件和metric日志文件;缓存中记录上次写入的时间戳。

2.处理控制台请求

代码坐标:SendMetricCommandHandler#handle

if (StringUtil.isNotBlank(endTimeStr)) {
    long endTime = Long.parseLong(endTimeStr); 
    // @1
    list = searcher.findByTimeAndResource(startTime, endTime, identity); 
} else {
    if (StringUtil.isNotBlank(maxLinesStr)) {
        maxLines = Integer.parseInt(maxLinesStr);
    }
    maxLines = Math.min(maxLines, 12000);
    // @2
    list = searcher.find(startTime, maxLines); 
}

@1 根据时间范围和资源名称查找监控日志 @2 如果没有时间戳,默认查找最大12000条数据

小结:处理请求的模块在通信模块sentinel-transport;具体负责检索的逻辑在sentinel-core模块MetricSearcher类。

3.检索监控日志

代码坐标:MetricSearcher#findByTimeAndResource

// @1
List<String> fileNames = MetricWriter.listMetricFiles(baseDir, baseFileName);
int i = 0;
// @2
long offsetInIndex = 0;
if (validPosition(beginTimeMs)) {
    i = fileNames.indexOf(lastPosition.metricFileName);
    if (i == -1) {
        i = 0;
    } else {
        offsetInIndex = lastPosition.offsetInIndex;
    }
} else {
    ...
}
for (; i < fileNames.size(); i++) {
    String fileName = fileNames.get(i);
    // @3
    long offset = findOffset(beginTimeMs, fileName,
            MetricWriter.formIndexFileName(fileName), offsetInIndex);
    offsetInIndex = 0;
    if (offset != -1) {
        // @4
        return metricsReader.readMetricsByEndTime(fileNames, i, offset, beginTimeMs, endTimeMs, identity);
    }
}

@1 读取日志目录下的日志文件;日志目录见:记录metric索引和日志文件 @2 offsetInIndex上次读取索引文件的位置;初始值为0即从头开始读取 @3 根据时间戳、上次索引文件读取位置offsetInIndex在索引文件中查找offset;即日志文件的读取位置 @4 根据offset在日志文件中读取监控信息

3.1从索引文件中查找offset

代码坐标:MetricSearcher#findOffset

lastPosition.metricFileName = null;
lastPosition.indexFileName = null;
if (!new File(idxFileName).exists()) {
    return -1;
}
// @1
long beginSecond = beginTime / 1000;
FileInputStream in = new FileInputStream(idxFileName);
// @2
in.getChannel().position(offsetInIndex);
DataInputStream indexIn = new DataInputStream(in);
long offset;
try {
    long second;
    lastPosition.offsetInIndex = in.getChannel().position();
    // @3
    while ((second = indexIn.readLong()) < beginSecond) {
        // @4
        offset = indexIn.readLong();
        // @5
        lastPosition.offsetInIndex = in.getChannel().position();
    }
    // @6
    offset = indexIn.readLong();
    // @7
    lastPosition.metricFileName = metricFileName;
    // @8
    lastPosition.indexFileName = idxFileName;
    // @9
    lastPosition.second = second;
    return offset;
} catch (EOFException ignore) {
    return -1;
} finally {
    indexIn.close();
}

@1 去掉秒数 @2 定位到上次读取索引文件的位置 @3 先读取8个字节的时间戳;比较一直到大于等于待查找的时间戳为止 @4 更新metric日志文件偏移量 @5 更新内存索引文件偏移量 @6 读取metric文件偏移量 @7 metric文件名称 @8 索引文件名称 @9 上次读取的索引文件的时间戳

3.2根据offset读取监控信息

代码坐标:MetricsReader#readMetricsByEndTime

List<MetricNode> list = new ArrayList<MetricNode>(1024);
if (readMetricsInOneFileByEndTime(list, fileNames.get(pos++), offset, beginTimeMs, endTimeMs, identity)) { // @1
    while (pos < fileNames.size()
        &&
        readMetricsInOneFileByEndTime(list, fileNames.get(pos++), 0, beginTimeMs, endTimeMs, identity)) { // @2
    }
}
return list;

@1 校验是否应该继续读取文件内容 @2 读取具体文件内容;如果制定identity(具体的resource)则只读取该resource的监控信息;反之全部读取

3.3监控日志读取逻辑
in = new FileInputStream(fileName);
// @1
in.getChannel().position(offset);
BufferedReader reader = new BufferedReader(new InputStreamReader(in, charset));
String line;
while ((line = reader.readLine()) != null) {
    MetricNode node = MetricNode.fromFatString(line);
    // @2
    long currentSecond = node.getTimestamp() / 1000;
    // @3
    if (currentSecond < beginSecond) {
        return false;
    }
    if (currentSecond <= endSecond) {
        if (identity == null) {
            list.add(node);
        } else if (node.getResource().equals(identity)) {
            list.add(node); // @4
        }
    } else {
        return false;
    }
    if (list.size() >= MAX_LINES_RETURN) { // @5
        return false;
    }
}

@1 定位到监控日志读取位置 @2 解析监控统计的时间戳 @3 监控统计日志的时间戳需要在待查询时间范围内 @4 指定resource则只收集该resource的统计信息 @5 读取数据不得超过MAX_LINES_RETURN(10万)条

小结:监控日志检索先根据去掉秒数的时间戳从索引文件中读取offset即日志文件的偏移量;再根据offset检索日志文件并将结果返回。

本文分享自微信公众号 - 瓜农老梁(gh_01130ae30a83),作者:梁勇

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2019-10-23

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • RocketMQ/Kafka监控项整理【实战笔记】

    瓜农老梁
  • RocketMQ存储--消息追加【源码笔记】

    commitLog内存(ByteBuffer)写入位点,标记消息写到哪了,下次从该位置开始写。

    瓜农老梁
  • Java NIO通道概览与文件通道【源码笔记】

    系统I/O即字节的传输,Channel即传输的通道,文件或网络Socket服务即传输的目的地。

    瓜农老梁
  • Leetcode: Valid Parentheses

    题目: Given a string containing just the characters ‘(‘, ‘)’, ‘{‘, ‘}’, ‘[’ and ...

    卡尔曼和玻尔兹曼谁曼
  • new cloud-blockchain-book

    heidsoft
  • 【NPM库】- 0x06 - WebSocket

    WebSocket 是一种通信协议,可在单个 TCP 连接上进行全双工通信。WebSocket 使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客...

    WEBJ2EE
  • 半自动化搭建Data Guard的想法和实践(三)(r9笔记第81天)

    今天总算抽了些时间把半自动化的脚本完成了大半,目前还缺少两部分的脚本,一部分是安装前的检查脚本,可以做一个预检查。虽然目前来看还不是必须,但是这些是标准和规范的...

    jeanron100
  • 从AFNetworking源码分析到应用全解 原

        AFNetworking是iOS/OS开发中常用的一个第三方网络库,可以说它是目前最流行的网络库,但其代码结构其实并不复杂,也可以说非常简洁优美。在AF...

    珲少
  • MessagePack Java 0.6.X 不使用注解(annotations)来序列化

    如果你不能添加 @Message 到你的定义对象中但是你还是希望进行序列化。你可以使用 register 方法来在类中启用序列化对象。

    HoneyMoose
  • 小处显逼格:细节提升气质 - 腾讯ISUX

    腾讯ISUX

扫码关注云+社区

领取腾讯云代金券