在展开之前先画了一个整体的流程图,如下:
从控制台监控来看,可以看出时间戳、通过QPS、拒绝的QPS、响应时间以及相应的波动曲线,接下来分析这些数据怎么来的?
实时数据由客户端js触发,每隔10秒钟向后台触发请求获取监控数据。
http://localhost:8080/metric/queryTopResourceMetric.json?app=sentinel-aop-demo&desc=true&pageIndex=1&pageSize=6
代码坐标:MetricController#queryTopResourceMetric
if (endTime == null) {
endTime = System.currentTimeMillis();
}
if (startTime == null) {
startTime = endTime - 1000 * 60 * 5;
}
if (endTime - startTime > maxQueryIntervalMs) {
return Result.ofFail(-1, "time intervalMs is too big, must <= 1h");
} // @1
List<String> resources = metricStore.listResourcesOfApp(app); // @2
for (final String resource : topResource) {
List<MetricEntity> entities = metricStore.queryByAppAndResourceBetween(
app, resource, startTime, endTime); // @3
List<MetricVo> vos = MetricVo.fromMetricEntities(entities, resource);
Iterable<MetricVo> vosSorted = sortMetricVoAndDistinct(vos);
map.put(resource, vosSorted);
}
...// @4
@1 maxQueryIntervalMs为1小时,即查询的时间间隔不能大于1小时。 @2 获取监控数据 @3 监控数据与时间范围进行匹配 @4 数据分页后返回结果(省略)
小结:查询监控的时间范围不能超过1小时,先后去监控数据,然后筛选匹配查询时间范围的,最后将结果返回。
代码坐标:InMemoryMetricsRepository#listResourcesOfApp
Map<String, ConcurrentLinkedHashMap<Long, MetricEntity>> resourceMap = allMetrics.get(app); // @1
final long minTimeMs = System.currentTimeMillis() - 1000 * 60;
Map<String, MetricEntity> resourceCount = new ConcurrentHashMap<>(32);
for (Entry<String, ConcurrentLinkedHashMap<Long, MetricEntity>> resourceMetrics : resourceMap.entrySet()) {
for (Entry<Long, MetricEntity> metrics : resourceMetrics.getValue().entrySet()) {
if (metrics.getKey() < minTimeMs) {
continue;
} // @2
MetricEntity newEntity = metrics.getValue();
if (resourceCount.containsKey(resourceMetrics.getKey())) {
MetricEntity oldEntity = resourceCount.get(resourceMetrics.getKey());
oldEntity.addPassQps(newEntity.getPassQps());
oldEntity.addRtAndSuccessQps(newEntity.getRt(), newEntity.getSuccessQps());
oldEntity.addBlockQps(newEntity.getBlockQps()); // @3
oldEntity.addExceptionQps(newEntity.getExceptionQps());
oldEntity.addCount(1);
} else {
resourceCount.put(resourceMetrics.getKey(), MetricEntity.copyOf(newEntity));
}
}
}
@1 从内存allMetrics中获取监控数据
allMetrics数据结构
Map<String/*AppId*/, Map<String/*资源名称*/, ConcurrentLinkedHashMap<Long/*时间戳*/, MetricEntity/*统计信息*/>>>allMetrics = new ConcurrentHashMap<>();
allMetrics运行时截图
MetricEntity统计信息示例
{id=null, gmtCreate=Sun Oct 20 11:13:46 CST 2019, gmtModified=Sun Oct 20 11:13:46 CST 2019, app='sentinel-aop-demo', timestamp=Sun Oct 20 11:13:39 CST 2019, resource='test', passQps=1, blockQps=0, successQps=1, exceptionQps=0, rt=2.0, count=1, resourceCode=3556498}
@2 统计1秒内的数据,不在一秒内(minTimeMs)则不处理 @3 对1秒内的数据进行累计计算
小结:统计信息从allMetrics获取,并对1秒内的数据进行累计统计;那allMetric中的数据又是如何来的呢?
4.缓存(allMetrics)中的监控数据
调用链 代码坐标:MetricFetcher
MetricFetcher(){
...
start()
}
private void start() {
fetchScheduleService.scheduleAtFixedRate(() -> {
try {
fetchAllApp();
} catch (Exception e) {
logger.info("fetchAllApp error:", e);
}
}, 10, intervalSecond, TimeUnit.SECONDS); // @1
}
// 监控获取调用链
fetchAllApp->doFetchAppMetric->fetchOnce
Set<MachineInfo> machines = appInfo.getMachines();
for (final MachineInfo machine : machines) { // @2
final String url = "http://" + machine.getIp() + ":" + machine.getPort() + "/" + METRIC_URL_PATH
+ "?startTime=" + startTime + "&endTime=" + endTime + "&refetch=" + false;
final HttpGet httpGet = new HttpGet(url);
httpGet.setHeader(HTTP.CONN_DIRECTIVE, HTTP.CONN_CLOSE);
httpclient.execute(httpGet, new FutureCallback<HttpResponse>(){
@Override
public void completed(final HttpResponse response) {
try {
handleResponse(response, machine, metricMap); // @3
success.incrementAndGet();
} catch (Exception e) {
logger.error(msg + " metric " + url + " error:", e);
} finally {
latch.countDown();
}
}
}
}
// 结果处理调用链条
handleResponse->handleBody
entity.addPassQps(node.getPassQps()); // @4
entity.addBlockQps(node.getBlockQps());
entity.addRtAndSuccessQps(node.getRt(), node.getSuccessQps());
entity.addExceptionQps(node.getExceptionQps());
entity.addCount(1);
@1 服务启动时,定时任务每秒获取一次监控数据 @2 循环AppId的每个节点并向其发送http请求获取监控数据;appId及机器节点信息来自心跳数据 @3 会请求结果进行处理 @4 累计统计信息包括appId对应所有节点
小结:控制台向该appId的每台机器发送http请求获取监控数据,并将每台获取的数据进行累加计算。控制台实际调用客户端暴露服务接口,并将各个节点数据汇总。
客户端监控接口如下(详见:Sentinel动态规则API模式命令【实战笔记】)
http://192.168.1.4:8720/metric?startTime=1571575945000&endTime=1571575951000&refetch=false 备注:get and aggregate metrics, accept param: startTime={startTime}&endTime={endTime}&maxLines={maxLines}&identify={resourceName}
1571575945000|test|5|0|5|0|0|0
1571575945000|hello|5|0|5|0|0|0
1571575964000|__system_load__|20000|0|0|0|0|0
1571575964000|__cpu_usage__|1592|0|0|0|0|0
代码坐标:FlowRuleManager#static
SCHEDULER.scheduleAtFixedRate(new MetricTimerListener(), 0, 1, TimeUnit.SECONDS); // @1
@1 定时任务在FlowRuleManager初始化时启动,每秒钟记录一次
代码坐标:MetricTimerListener#run
public void run() {
Map<Long, List<MetricNode>> maps = new TreeMap<Long, List<MetricNode>>();
// @1
for (Entry<ResourceWrapper, ClusterNode> e : ClusterBuilderSlot.getClusterNodeMap().entrySet()) {
String name = e.getKey().getName();
ClusterNode node = e.getValue();
Map<Long, MetricNode> metrics = node.metrics();
aggregate(maps, metrics, name);
}
aggregate(maps, Constants.ENTRY_NODE.metrics(), Constants.TOTAL_IN_RESOURCE_NAME);
if (!maps.isEmpty()) {
for (Entry<Long, List<MetricNode>> entry : maps.entrySet()) {
try {
// @2
metricWriter.write(entry.getKey(), entry.getValue());
} catch (Exception e) {
RecordLog.warn("[MetricTimerListener] Write metric error", e);
}
}
}
}
@1 从ClusterBuilderSlot获取资源对应的ClusterNode统计信息 @2 将统计信息记录到日志文件
代码坐标:MetricWriter#write
// @1
for (MetricNode node : nodes) {
node.setTimestamp(time);
}
// @2
if (curMetricFile == null) {
baseFileName = formMetricFileName(appName, pid);
closeAndNewFile(nextFileNameOfDay(time));
}
// @3
if (!(curMetricFile.exists() && curMetricIndexFile.exists())) {
closeAndNewFile(nextFileNameOfDay(time));
}
// @4
long second = time / 1000;
// ...
// @5
writeIndex(second, outMetric.getChannel().position());
// @6
for (MetricNode node : nodes) {
outMetricBuf.write(node.toFatString().getBytes(CHARSET));
}
outMetricBuf.flush();
// @7
if (!validSize()) {
closeAndNewFile(nextFileNameOfDay(time));
}
// @8
lastSecond = second;
@1 如果传入了时间戳所有的nodes都是time @2 第一次创建日志文件;日志目录可以通过csp.sentinel.log.dir参数设置; 默认日志目录为:user.home/logs/csp 流控文件名称为:appName-metrics.log.yyyy-MM-dd.n(n每创建一个日志文件会递增) 例如:sentinel-aop-demo-metrics.log.2019-10-20.2 可以通过logNameUsePid将pid加入到日志文件名中,默认false @3 日志文件和索引文件都不存在先创建 索引文件名称为:流控文件名称.idx 例如:sentinel-aop-demo-metrics.log.2019-10-20.2.idx @4 时间戳去掉秒数 @5 记录日志索引 索引文件先写入8个字节的时间戳;再写入8个字节的offset即metric与该时间戳对应的日志写入位置 @6 记录监控日志; 日志格式为:1571565694000|2019-10-20 18:01:34|hello|3|0|3|0|0|0 @7 校验metric文件大小默认为50M;超过后创建新文件 @8 内存中记录去除秒数的时间戳;也是写入索引的时间戳,据此可以获取对应metric文件位置(offset)
小结:定时任务在FlowRuleManager初始化时启动,每秒钟执行一次;从ClusterBuilderSlot获取资源对应的ClusterNode统计信息,记录metric索引文件和metric日志文件;缓存中记录上次写入的时间戳。
2.处理控制台请求
代码坐标:SendMetricCommandHandler#handle
if (StringUtil.isNotBlank(endTimeStr)) {
long endTime = Long.parseLong(endTimeStr);
// @1
list = searcher.findByTimeAndResource(startTime, endTime, identity);
} else {
if (StringUtil.isNotBlank(maxLinesStr)) {
maxLines = Integer.parseInt(maxLinesStr);
}
maxLines = Math.min(maxLines, 12000);
// @2
list = searcher.find(startTime, maxLines);
}
@1 根据时间范围和资源名称查找监控日志 @2 如果没有时间戳,默认查找最大12000条数据
小结:处理请求的模块在通信模块sentinel-transport;具体负责检索的逻辑在sentinel-core模块MetricSearcher类。
代码坐标:MetricSearcher#findByTimeAndResource
// @1
List<String> fileNames = MetricWriter.listMetricFiles(baseDir, baseFileName);
int i = 0;
// @2
long offsetInIndex = 0;
if (validPosition(beginTimeMs)) {
i = fileNames.indexOf(lastPosition.metricFileName);
if (i == -1) {
i = 0;
} else {
offsetInIndex = lastPosition.offsetInIndex;
}
} else {
...
}
for (; i < fileNames.size(); i++) {
String fileName = fileNames.get(i);
// @3
long offset = findOffset(beginTimeMs, fileName,
MetricWriter.formIndexFileName(fileName), offsetInIndex);
offsetInIndex = 0;
if (offset != -1) {
// @4
return metricsReader.readMetricsByEndTime(fileNames, i, offset, beginTimeMs, endTimeMs, identity);
}
}
@1 读取日志目录下的日志文件;日志目录见:记录metric索引和日志文件 @2 offsetInIndex上次读取索引文件的位置;初始值为0即从头开始读取 @3 根据时间戳、上次索引文件读取位置offsetInIndex在索引文件中查找offset;即日志文件的读取位置 @4 根据offset在日志文件中读取监控信息
代码坐标:MetricSearcher#findOffset
lastPosition.metricFileName = null;
lastPosition.indexFileName = null;
if (!new File(idxFileName).exists()) {
return -1;
}
// @1
long beginSecond = beginTime / 1000;
FileInputStream in = new FileInputStream(idxFileName);
// @2
in.getChannel().position(offsetInIndex);
DataInputStream indexIn = new DataInputStream(in);
long offset;
try {
long second;
lastPosition.offsetInIndex = in.getChannel().position();
// @3
while ((second = indexIn.readLong()) < beginSecond) {
// @4
offset = indexIn.readLong();
// @5
lastPosition.offsetInIndex = in.getChannel().position();
}
// @6
offset = indexIn.readLong();
// @7
lastPosition.metricFileName = metricFileName;
// @8
lastPosition.indexFileName = idxFileName;
// @9
lastPosition.second = second;
return offset;
} catch (EOFException ignore) {
return -1;
} finally {
indexIn.close();
}
@1 去掉秒数 @2 定位到上次读取索引文件的位置 @3 先读取8个字节的时间戳;比较一直到大于等于待查找的时间戳为止 @4 更新metric日志文件偏移量 @5 更新内存索引文件偏移量 @6 读取metric文件偏移量 @7 metric文件名称 @8 索引文件名称 @9 上次读取的索引文件的时间戳
代码坐标:MetricsReader#readMetricsByEndTime
List<MetricNode> list = new ArrayList<MetricNode>(1024);
if (readMetricsInOneFileByEndTime(list, fileNames.get(pos++), offset, beginTimeMs, endTimeMs, identity)) { // @1
while (pos < fileNames.size()
&&
readMetricsInOneFileByEndTime(list, fileNames.get(pos++), 0, beginTimeMs, endTimeMs, identity)) { // @2
}
}
return list;
@1 校验是否应该继续读取文件内容 @2 读取具体文件内容;如果制定identity(具体的resource)则只读取该resource的监控信息;反之全部读取
in = new FileInputStream(fileName);
// @1
in.getChannel().position(offset);
BufferedReader reader = new BufferedReader(new InputStreamReader(in, charset));
String line;
while ((line = reader.readLine()) != null) {
MetricNode node = MetricNode.fromFatString(line);
// @2
long currentSecond = node.getTimestamp() / 1000;
// @3
if (currentSecond < beginSecond) {
return false;
}
if (currentSecond <= endSecond) {
if (identity == null) {
list.add(node);
} else if (node.getResource().equals(identity)) {
list.add(node); // @4
}
} else {
return false;
}
if (list.size() >= MAX_LINES_RETURN) { // @5
return false;
}
}
@1 定位到监控日志读取位置 @2 解析监控统计的时间戳 @3 监控统计日志的时间戳需要在待查询时间范围内 @4 指定resource则只收集该resource的统计信息 @5 读取数据不得超过MAX_LINES_RETURN(10万)条
小结:监控日志检索先根据去掉秒数的时间戳从索引文件中读取offset即日志文件的偏移量;再根据offset检索日志文件并将结果返回。