前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >[享学Netflix] 二十六、Hystrix指标数据收集器:HystrixMetrics(HystrixDashboard的数据来源)

[享学Netflix] 二十六、Hystrix指标数据收集器:HystrixMetrics(HystrixDashboard的数据来源)

作者头像
YourBatman
发布2020-03-19 10:52:43
1.6K0
发布2020-03-19 10:52:43
举报
文章被收录于专栏:BAT的乌托邦BAT的乌托邦

看一个人的成功,不是看他赢了多少人,而是看他成就了多少人。

代码下载地址:https://github.com/f641385712/netflix-learning

前言

前面已经花了5篇文章专门介绍了Hystrix基于事件机制的数据收集、Stream流式处理,再回头来理解它的HystrixMetrics指标收集就一清二楚了。

HystrixCommand执行过程(开始执行、结束执行、异常、超时)时会不断发出各类事件,通过收集这些数据,提供给消费者。如断路器、Hystrix Dashboard可以统计分析这些数据,从而完成特定的功能。 Hystrix以command命令模式的方式来控制业务逻辑以及熔断逻辑的调用时机,所以说数据统计对它来说不算难事,但如何高效、精准的在内存中统计数据,还需要一定的技巧。

需要提前说明的是:像什么hystrix.streamHystrixDashboard面板查看等这些,本文均还不会体现。本文只阐述数据的采集,至于数据如何使用(存储or展示)放在后几篇文章。


正文

Hystrix收集数据是必不可少的一步,每个降级点(需要采取降级保护的点)的数据是独立的,所以我们可以给每个降级点配置单独的策略

这些策略一般是建立在我们对这些降级点的了解之上的,初期甚至可以先观察一下采集的数据来指定降级策略。

采集哪些数据?数据如何存储?数据如何上报?这都是Hystrix需要考虑的问题,Hystrix采用的是滑动窗口+分桶的形式来采集数据(原理还蛮复杂的,本文不不做讨论),这样既解决了数据在统计周期间切换而带来的跳变问题(通过时间窗口),也控制了切换了力度(通过桶大小)。

关于Metrics指标收集,就不得不再次请上第一篇文章已贴出的这张执行原理图了:

在这里插入图片描述
在这里插入图片描述

它从各个地方(包括正常逻辑执行、线程池/信号量资源监察)收集指标信息,然后提供给断路器使用,或者提供给监控大盘们使用(它们均是consumer)


HystrixRollingNumber

该类用来统计一段时间内的计数,也被称作Hystrix里用于qps计数的数据结构,采用滑动窗口 + 分桶的形式收集。

事件类型HystrixRollingNumberEvent:可以在HystrixRollingNumber中捕获的各种状态/事件。

public enum HystrixRollingNumberEvent {
    SUCCESS(1), FAILURE(1), TIMEOUT(1), SHORT_CIRCUITED(1), THREAD_POOL_REJECTED(1), SEMAPHORE_REJECTED(1), BAD_REQUEST(1),
    FALLBACK_SUCCESS(1), FALLBACK_FAILURE(1), FALLBACK_REJECTION(1), FALLBACK_MISSING(1), EXCEPTION_THROWN(1), COMMAND_MAX_ACTIVE(2), EMIT(1), FALLBACK_EMIT(1),
    THREAD_EXECUTION(1), THREAD_MAX_ACTIVE(2), COLLAPSED(1), RESPONSE_FROM_CACHE(1),
    COLLAPSER_REQUEST_BATCHED(1), COLLAPSER_BATCH(1);

    private final int type;
    private HystrixRollingNumberEvent(int type) {
        this.type = type;
    }

	// 可执行HystrixRollingNumber#increment/add/getRollingSum方法
    public boolean isCounter() { return type == 1; }
    // 可执行HystrixRollingNumber#updateRollingMax/getRollingMaxValue方法
    public boolean isMaxUpdater() { return type == 2; }

	// HystrixEventType转为HystrixRollingNumberEvent 
	public static HystrixRollingNumberEvent from(HystrixEventType eventType) {
		...
	}
}

可以看到,每一个HystrixEventType类型都能匹配到一个HystrixRollingNumberEvent从而被收集进来。

HystrixRollingNumber它是一个工具类,位于Util包:com.netflix.hystrix.util滑动窗口 + 分桶逻辑实现复杂,但它作为一个工具类给提供了非常实用的获取数据的方法:

HystrixRollingNumber:
	
	final int numberOfBuckets;
	...
	// 环形桶:因为时间窗口需要滑动
	final BucketCircularArray buckets;
	...
    public void increment(HystrixRollingNumberEvent type) {
        getCurrentBucket().getAdder(type).increment();
    }
    ...
    // 获取自JVM启动以来所有桶的累积和
    public long getCumulativeSum(HystrixRollingNumberEvent type) { ... }
    // 获取给定指定event类型的**滚动计数器**中所有桶的总和(常用)
    public long getRollingSum(HystrixRollingNumberEvent type) { ... }
    // 获取滚动过程中,最新的桶的值
    public long getValueOfLatestBucket(HystrixRollingNumberEvent type) { ... }
    // 获取滚动中,所有桶的值们
    public long[] getValues(HystrixRollingNumberEvent type) { ... }
    // 基于getValues的基础上排序,然后取出最大值
    public long getRollingMaxValue(HystrixRollingNumberEvent type) { ... }
	...

HystrixRollingNumber统计一定时间内的统计数值,基本思想就是分段/分桶统计,比如说要统计qps,即1秒内的请求总数。如下图所示,我们可以将1s的时间分成10段,每段100ms。在第一个100ms内,写入第一个段中进行计数,在第二个100ms内,写入第二个段中进行计数,这样如果要统计当前时间的qps,我们总是可以通过统计当前时间前1s(共10段)的计数总和值。

说明:注意它和RollingDistributionStream的区别哦~


Metrics如何统计

Metrics在统计各种状态时,时运用滑动窗口思想进行统计的,在一个滑动窗口时间中又划分了若干个Bucket(滑动窗口时间与Bucket成整数倍关系),滑动窗口的移动是以Bucket为单位进行滑动的。

如:HealthCounts 记录的是一个Buckets的监控状态,Buckets为一个滑动窗口的一小部分,如果一个滑动窗口时间为 t ,Bucket数量为 n,那么每隔t/n秒将新建一个HealthCounts对象。

在这里插入图片描述
在这里插入图片描述

Metrics收集步骤

根据前面几篇文章的表述,这里简单总结指标信息的收集步骤:

  1. 命令在开始执行前会向开始消息流(HystrixCommandStartStream)发送开始消息(HystrixCommandExecutionStarted)
  2. 如果是线程池执行,执行前会向线程池开始消息流(HystrixThreadPoolStartStream)发送开始消息(HystrixCommandExecutionStarted)
  3. 如果是线程池执行,执行后会向线程池结束消息流(HystrixThreadPoolCompletionStream)发送完成消息(HystrixCommandCompletion)
  4. 命令在结束执行前会向完成消息流(HystrixCommandCompletionStream)发送完成消息(HystrixCommandCompletion)
  5. 不同类型的统计流(比如滑动窗口统计、累计统计、最大并发统计等等),会监听开始消息流或完成消息流,根据接受到的消息内容,进行统计

HystrixMetrics

指标数据采集的基类。当前服务的健康状况, 包括服务调用总次数和服务调用失败次数等. 根据Metrics的计数, 熔断器从而能计算出当前服务的调用失败率, 用来和设定的阈值比较从而决定熔断器的状态切换逻辑. 因此Metrics的实现非常重要

public abstract class HystrixMetrics {
	protected final HystrixRollingNumber counter;
    protected HystrixMetrics(HystrixRollingNumber counter) {
        this.counter = counter;
    }
	
	// 获取累计总数
    public long getCumulativeCount(HystrixRollingNumberEvent event) {
        return counter.getCumulativeSum(event);
    }
    // 获取当前滑动窗口内的总数
    public long getRollingCount(HystrixRollingNumberEvent event) {
        return counter.getRollingSum(event);
    }
}

Hystrix的Metrics功能模块中存储了与Hystrix运行相关的度量信息,主要有三类类型:

在这里插入图片描述
在这里插入图片描述

HystrixCommandMetrics

保存hystrix命令执行的度量信息。

public class HystrixCommandMetrics extends HystrixMetrics {

	private static final HystrixEventType[] ALL_EVENT_TYPES = HystrixEventType.values();
	// 静态Map,key为HystrixCommandKey,缓存
	private static final ConcurrentHashMap<String, HystrixCommandMetrics> metrics = new ConcurrentHashMap<>();
	
	// 这两个public的函数,在介绍前面介绍HealthCountsStream已讲过,略
	public static final Func2<long[], HystrixCommandCompletion, long[]> appendEventToBucket = ...
	public static final Func2<long[], long[], long[]> bucketAggregator = ...
}

此处解释一下,为何有metrics这个static变量:由于每次请求都需要新创建command对象,而每创建一次command对象都有好多属性需要初始化(具体参见讲解AbstractCommand文章),那么是不是非常的耗时呢???

其实构造函数中的很多初始化工作只会集中在创建第一个Command时来做,后续创建的Command对象主要是从静态Map中取对应的实例来赋值,比如监控器、断路器和线程池的初始化,因为相同的Command的command key和线程池key都是一致的,在HystrixCommandMetricsHystrixCircuitBreaker.FactoryHystrixThreadPool中均有类似于metrics这样的static Map缓存用于提高效率的。

HystrixMetrics负责收集指标数据,它会借用多个Stream来进行多维护收集,所以它有众多成员属性:

HystrixCommandMetrics:

    private final HystrixCommandProperties properties;
    private final HystrixCommandKey key;
    private final HystrixCommandGroupKey group;
    private final HystrixThreadPoolKey threadPoolKey;
    // 记录当前正在执行的总命令数
    // 命令start执行的时候+1,执行结束-1
    private final AtomicInteger concurrentExecutionCount = new AtomicInteger();

 	// 使用各种纬度的Stream,进行监听信息流
    private HealthCountsStream healthCountsStream;
    private final RollingCommandEventCounterStream rollingCommandEventCounterStream;
    private final CumulativeCommandEventCounterStream cumulativeCommandEventCounterStream;
    private final RollingCommandLatencyDistributionStream rollingCommandLatencyDistributionStream;
    private final RollingCommandUserLatencyDistributionStream rollingCommandUserLatencyDistributionStream;
    private final RollingCommandMaxConcurrencyStream rollingCommandMaxConcurrencyStream;

	... // 私有化构造器:给所有的属性赋值
	... // 省略get方法


	...

	// 当前**正在执行**的总数:HystrixCommand#run()
    public int getCurrentConcurrentExecutionCount() {
        return concurrentExecutionCount.get();
    }
    ...

	// 检索总请求、错误计数和错误百分比的快照。
    public HealthCounts getHealthCounts() {
        return healthCountsStream.getLatest();
    }
	...

	// 因为收集指标信息都是异步收集的,这个方法可以解除所有的订阅
    private void unsubscribeAll() {
        healthCountsStream.unsubscribe();
        rollingCommandEventCounterStream.unsubscribe();
        cumulativeCommandEventCounterStream.unsubscribe();
        rollingCommandLatencyDistributionStream.unsubscribe();
        rollingCommandUserLatencyDistributionStream.unsubscribe();
        rollingCommandMaxConcurrencyStream.unsubscribe();
    }

它对外提供非常多的方法/能力,其中绝大多数都是委托给各种xxxStream来完成,下面对主要方法做如下描述:

  • markCommandStart:当命令开始执行,调用该方法
  • markCommandDone:命令执行完成,调用该方法
    • 说明:以上2方法均不是public方法,由AbstractCommand调用。其中有个API:HystrixThreadEventStream后续会有详细介绍
  • getRollingCount:获取某一事件类型窗口期内的统计数值(委托rollingCommandEventCounterStream
  • getCumulativeCount:获取某一事件类型持续的统计数值
  • getExecutionTimePercentile:获取某一百分比的请求执行时间(委托rollingCommandLatencyDistributionStream
  • getExecutionTimeMean:获取平均请求执行时间(委托rollingCommandLatencyDistributionStream
  • getTotalTimePercentile:获取某一百分比的请求执行总时间(委托rollingCommandUserLatencyDistributionStream
  • getTotalTimeMean:获取平均请求执行总时间
  • getRollingMaxConcurrentExecutions:获取上一个窗口期内最大的并发数
  • getHealthCountsStream:获取窗口期内的失败次数,总次数,失败比率

另外,构建一个HystrixCommandMetrics的实例,依旧以static静态方法对外提供,加缓存来提高效率:

HystrixCommandMetrics:

	// 根据参数,得到一个HystrixCommandMetrics 实例
	// 如果缓存里已经有了,就直接返回
    public static HystrixCommandMetrics getInstance(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, HystrixCommandProperties properties) {
        return getInstance(key, commandGroup, null, properties);
    }
    public static HystrixCommandMetrics getInstance(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, HystrixThreadPoolKey threadPoolKey, HystrixCommandProperties properties) {
    	
    	// 双重校验锁
        HystrixCommandMetrics commandMetrics = metrics.get(key.name());
        if (commandMetrics != null) 
            return commandMetrics; 
		// 线程安全
		synchronized (HystrixCommandMetrics.class) {
			HystrixCommandMetrics existingMetrics = metrics.get(key.name());
                if (existingMetrics != null)
                    return existingMetrics;
			
				// 创建一个新的实例
				// =======线程的key默认情况下就是groupKey======
                HystrixThreadPoolKey nonNullThreadPoolKey;
                if (threadPoolKey == null) {
                    nonNullThreadPoolKey = HystrixThreadPoolKey.Factory.asKey(commandGroup.name());
                } else {
                    nonNullThreadPoolKey = threadPoolKey;
                }

				// 使用构造器初始化一个实例,并且放进Map里缓存起来
                HystrixCommandMetrics newCommandMetrics = new HystrixCommandMetrics(key, commandGroup, nonNullThreadPoolKey, properties, HystrixPlugins.getInstance().getEventNotifier());
                metrics.putIfAbsent(key.name(), newCommandMetrics);
                return newCommandMetrics;

		}
    }

	// 这个getInstance只查找缓存,若缓存中木有,就返回null
    public static HystrixCommandMetrics getInstance(HystrixCommandKey key) {
        return metrics.get(key.name());
    }

HystrixThreadPoolMetrics

原理同上,只是管理的Stream流不一样而已:

HystrixThreadPoolMetrics:

    private final HystrixThreadPoolKey threadPoolKey;
    private final ThreadPoolExecutor threadPool;
    private final HystrixThreadPoolProperties properties;


    private final AtomicInteger concurrentExecutionCount = new AtomicInteger();
    private final RollingThreadPoolEventCounterStream rollingCounterStream;
    private final CumulativeThreadPoolEventCounterStream cumulativeCounterStream;
    private final RollingThreadPoolMaxConcurrencyStream rollingThreadPoolMaxConcurrencyStream;

主要方法:

  • markThreadExecution:当线程执行时,调用此方法,次数+1
  • markThreadCompletion:执行完,次数-1
  • markThreadRejection:command任务被线程池拒绝时,次数-1
  • getRollingCount:和上面不一样,这里委托的是RollingThreadPoolEventCounterStream
  • getCumulativeCount

获取实例的方法同上。


HystrixCollapserMetrics
HystrixCollapserMetrics:

    private final HystrixCollapserKey collapserKey;
    private final HystrixCollapserProperties properties;

    private final RollingCollapserEventCounterStream rollingCollapserEventCounterStream;
    private final CumulativeCollapserEventCounterStream cumulativeCollapserEventCounterStream;
    private final RollingCollapserBatchSizeDistributionStream rollingCollapserBatchSizeDistributionStream;

略。


使用示例

在例子之前,需要提醒的是:以上方法虽然最终是委托给Stream去执行的,但是它们并不会有延迟,是立即的(因为Stream流一般都会有窗口,比如1s一次,500ms一次等等),但是,但是,但是它是getLatest哦,也就是拿最新的数据,官方解释为:

// 同步调用以检索上次计算的bucket而不用等待
// Synchronous call to retrieve the last calculated bucket without waiting for any emissions
public Output getLatest() { ... }

示例代码:

@Test
public void fun1() throws InterruptedException {
    HystrixCommandKey commandKey = HystrixCommandKey.Factory.asKey("CommandHelloWorld");
    HystrixCommandGroupKey commandGroupKey = HystrixCommandGroupKey.Factory.asKey("MyAppGroup");
    HystrixThreadPoolKey threadPoolKey = HystrixThreadPoolKey.Factory.asKey("MyAppGroup");
    HystrixPropertiesCommandDefault properties = new HystrixPropertiesCommandDefault(commandKey, HystrixCommandProperties.Setter());

    // command指标信息
    HystrixCommandMetrics commandMetrics = HystrixCommandMetrics.getInstance(commandKey, commandGroupKey, threadPoolKey, properties);

    // 发送事件(发送多次)
    CommandHelloWorld helloWorld = new CommandHelloWorld("YoutBatman");
    helloWorld.execute();
    helloWorld = new CommandHelloWorld("YoutBatman");
    helloWorld.queue();
    // 走fallabck
    helloWorld = new CommandHelloWorld(null);
    helloWorld.queue();


    // 打印指标信息
    TimeUnit.SECONDS.sleep(1); // 需要留给指标收集的时间
    System.out.println("===========commandMetrics信息===========");
    System.out.println(commandMetrics.getRollingCount(HystrixEventType.SUCCESS));
    System.out.println(commandMetrics.getRollingCount(HystrixEventType.FAILURE));
    System.out.println(commandMetrics.getRollingCount(HystrixEventType.FALLBACK_SUCCESS));

    System.out.println(commandMetrics.getCumulativeCount(HystrixEventType.SUCCESS));
    System.out.println(commandMetrics.getCumulativeCount(HystrixEventType.FAILURE));
    System.out.println(commandMetrics.getCumulativeCount(HystrixEventType.FALLBACK_SUCCESS));


    System.out.println(commandMetrics.getHealthCounts());
    System.out.println(commandMetrics.getExecutionTimeMean());
}

运行程序控制台打印:

===========commandMetrics信息===========
0
0
0
0
0
0
HealthCounts[1 / 3 : 33%]
0

说明:至于为何很多是0,这个和getLatest以及本地测试不好控有关,暂可忽略。


总结

关于Netflix Hystrix指标数据收集器:HystrixMetrics就介绍到这了,你可能会觉得它和前面讲的xxxStream有非常多的功能相似之处。从源码能看出,HystrixMetrics似乎是对xxxStream的一些包装,内部事件最终都是委托给xxxStream去完成了的。

只不过最大的区别是:HystrixMetrics所有的获取指标信息的方法,获取的都是瞬时的(最新的)值,而并不需要等待,这是和流式统计计算最大的区别。

分隔线
分隔线
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 正文
    • HystrixRollingNumber
      • Metrics如何统计
        • Metrics收集步骤
          • HystrixMetrics
            • HystrixCommandMetrics
            • HystrixThreadPoolMetrics
            • HystrixCollapserMetrics
          • 使用示例
          • 总结
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档