前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >[享学Netflix] 三十五、Hystrix执行过程集大成者:AbstractCommand详解

[享学Netflix] 三十五、Hystrix执行过程集大成者:AbstractCommand详解

作者头像
YourBatman
发布2020-03-18 19:39:28
1.1K0
发布2020-03-18 19:39:28
举报

最快的脚步不是跨越,而是继续;最慢的步伐不是缓慢,而是徘徊。 代码下载地址:https://github.com/f641385712/netflix-learning

目录
  • 前言
  • 正文
    • AbstractCommand源码解析
      • 成员属性
      • toObservable() 所有执行方式的入口
        • 执行步骤文字描述
      • applyHystrixSemantics()
  • 总结
    • 声明

前言

Hystrix的源码因为是基于RxJava来书写的,一方面是很多小伙伴对RxJava并不熟悉,另一方面是基于观察者模式实现的代码绕来绕去就是不好理解,所以总的来说Hystrix的源码是比较难啃的。

前面我们已经把Hystrix的正常执行 + 异常fallback执行都“逐个击破”了,有了良好的知识铺垫,本文主要仅需做归并即可捋出其执行原理。另外,虽然我们最常使用的是HystrixCommand,而真正的执行逻辑99%都是在AbstractCommand里,它才是集大成者。

正常执行部分请参考这里:https://fangshixiang.blog.csdn.net/article/details/104556721 异常执行部分请参考这里:https://fangshixiang.blog.csdn.net/article/details/104718511


正文

如图,这是Hystrix的执行过程示意图:

在这里插入图片描述
在这里插入图片描述

AbstractCommand源码解析

它是HystrixCommandHystrixObservableCommand的抽象父类,实现了绝大部分的执行逻辑以及熔断器控制、事件发送等…

说明:xxxCollapser系列如:HystrixCollapserHystrixObservableCollapser它们是没有提取Abstract抽象实现的,而是直接实现的接口。

每个请求都会生成一个command实例,而每个command实例都对应着一个HystrixCommandKey、HystrixCircuitBreaker、HystrixThreadPool、HystrixConcurrencyStrategy、HystrixRequestCache...等等组件来实现各式各样的功能。

说明:每次请求command是新生成的一个实例,但是对应的那些组件们可不是新的实例哦,因为都是同一个HystrixCommandKey的请求共用一个组件实例的~

AbstractCommand是一个“很大的”类,此类源代码行数2000+,所以拆分成如下几个部分讲解。


成员属性

AbstractCommand类拥有近30个成员属性,但还好有了前面N篇文章的铺陈,大部分的API、组件功能都已了然于胸了,所以阅读起来还是很流畅的。

abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
	
	// command的id
	protected final HystrixCommandKey commandKey; 
	// 线程池分组名(理论上不同的Command可以共用一个线程池,节约资源嘛)
    protected final HystrixThreadPoolKey threadPoolKey; 
    // 逻辑分组。用于统计
    protected final HystrixCommandGroupKey commandGroup;

	// 各种properties配置  均可以通过SPI方式提供
	protected final HystrixCommandProperties properties;	
	
	// SPI接口(这几个接口有详细介绍,不陌生)
    protected final HystrixEventNotifier eventNotifier;
    protected final HystrixConcurrencyStrategy concurrencyStrategy;
    protected final HystrixCommandExecutionHook executionHook;

	// 熔断器。若你使用配置显示enable=false了
	// 其实现就是NoOpCircuitBreaker,否则就是默认实现
    protected final HystrixCircuitBreaker circuitBreaker;
    
    // 线程池。默认实现是HystrixThreadPoolDefault
    // 线程池参数使用HystrixThreadPoolProperties配置
    // 通过HystrixConcurrencyStrategy#getThreadPool()得到ThreadPoolExecutor执行器
    // 说明:每次getThreadPool()一下都会用最新的配置配置执行器。所以可以达到动态化的目的
    // 比如说CorePoolSize、MaximumPoolSize等等都可以在properties里动态改变
    protected final HystrixThreadPool threadPool;
	// Command指标收集器
	protected final HystrixCommandMetrics metrics;

	// 执行时候时使用的信号量(每个key一个信号量控制哦)
	// 说明:它并没有使用JDK的java.util.concurrent.Semaphore,而是自己的实现
	// 信号量的实现非常简单,所以就略喽。
	// 当没开启信号量隔离的时候,该实现类使用的是TryableSemaphoreNoOp
	protected final TryableSemaphore executionSemaphoreOverride;
	// 发生fallabck时的信号量,也是每个key一个。至于fallback都需要信号量隔离,前面有详细说明
	// 它哥俩均只有在隔离策略是SEMAPHORE才有效。它哥俩信号量默认值都是10
	protected final TryableSemaphore executionSemaphoreOverride;
 
 	// 各种状态值(这里也说明command是有状态的:一个实例只能执行一次)
    protected AtomicReference<CommandState> commandState = new AtomicReference<>(CommandState.NOT_STARTED);
    protected AtomicReference<ThreadState> threadState = new AtomicReference<>(ThreadState.NOT_USING_THREAD);
    protected final AtomicReference<TimedOutStatus> isCommandTimedOut = new AtomicReference<>(TimedOutStatus.NOT_EXECUTED);

	// 执行结果
	protected volatile ExecutionResult executionResult = ExecutionResult.EMPTY;
	// 取消时的执行结果
	protected volatile ExecutionResult executionResultAtTimeOfCancellation;
	
	// 表示为:响应是否来自于缓存
	// 若缓存开启了,并且请求时缓存命中了,那此值就会被置为true
	protected volatile boolean isResponseFromCache = false;
	// command开始执行的时刻(并不代表一定会执行到目标方法哦)
	protected volatile long commandStartTimestamp = -1L;

	// 缓存、日志相关
	protected final HystrixRequestCache requestCache;
	protected final HystrixRequestLog currentRequestLog;


	// 缓存默认名称(官方数据,加了这个缓存后效率提升了1-2微秒)
	// 默认的key名使用getSimpleName(),简单类名
	// 但若你没有简单类名(比如内部类),那就使用全类名getName()
	private static ConcurrentHashMap<Class<?>, String> defaultNameCache = new ConcurrentHashMap<>();
	// 缓存该key是否有fallback方法,这样如果木有就避免每次都反射去找了
	protected static ConcurrentHashMap<HystrixCommandKey, Boolean> commandContainsFallback = new ConcurrentHashMap<>();

	// 唯一构造器,完成了所有属性的初始化
	protected AbstractCommand(...){
		// group不能为null,但是key可以为null -> 自动用简单类名
        this.commandGroup = initGroupKey(group);
        this.commandKey = initCommandKey(key, getClass());
        ...
        // 自己没配置,那就使用传入的值。若传入为null,就使用groupKey
        // 大多数情况下让其保持和groupKey一样即可
		this.threadPoolKey = initThreadPoolKey(threadPoolKey, this.commandGroup, this.properties.executionIsolationThreadPoolKeyOverride().get());
		...
		this.threadPool = initThreadPool(threadPool, this.threadPoolKey, threadPoolPropertiesDefaults);
		...
	}
	... // 省略所有的init方法


	// 允许折叠器将此命令实例标记为用于折叠请求以及折叠多少请求
    void markAsCollapsedCommand(HystrixCollapserKey collapserKey, int sizeOfBatch) {
        eventNotifier.markEvent(HystrixEventType.COLLAPSED, this.commandKey);
        executionResult = executionResult.markCollapsed(collapserKey, sizeOfBatch);
    }


	// ========执行方法============
	public Observable<R> observe() { ... }
	public Observable<R> toObservable() { ... }

	// 抽象方法:目标方法以及针对的fallback方法
    protected abstract Observable<R> getExecutionObservable();
    protected abstract Observable<R> getFallbackObservable();
}

光看AbstractCommand拥有的成员属性,就知道它有多复杂了。需要注意的是:它的属性的访问权限大都是protected的,所以子类均可直接访问。它的这些属性的初始化均在唯一的构造器里完成,每个属性的初始化逻辑大体相似:缓存 -> SPI -> 初始化 -> 默认值,因为比较简单,本文略。

完成成员属性的准备工作后,下面就开始它的执行过程部分了。


toObservable() 所有执行方式的入口

我们知道HystrixCommand的执行方法有多种,但其实不管哪种执行方法,最终都依赖于toObservable()这个方法,toObservable()它是执行原理的集大成者,所有执行方式的入口。

execute() 依赖于 queue() 依赖于 toObservable() observe() 依赖于 toObservable()

该方法作用:用于订阅Observable的回调命令的异步执行,也就是说自己返回一个可被订阅的对象:数据发射器。一旦有订阅者就会延迟的发送数据/命令,新订阅者是不会监听到历史数据的

AbstractCommand:

	public Observable<R> toObservable() {
		... // 暂时省略非常多的Action和Func们
		// 需要注意的是:这里使用的是defer(),所以里面内容是不会立马执行的,知道有订阅者订阅了就执行
		// 也就是说observable = command.toObservable()是不会执行
		// observable.subscribe(xxx)订阅后,才会开始执行
		// 下面使用defer的效果都一样~~~~~~~~~
		return Observable.defer(() -> {
			
			... // 检验线程状态CommandState,每个command只能被执行一次,否则额抛出HystrixRuntimeException异常
			commandStartTimestamp = System.currentTimeMillis(); // 命令开始执行
			... // 记录日志:不管发生了什么,都要记录这个命令的执行
			// 是否允许请求缓存:properties.requestCacheEnabled().get() && getCacheKey() != null;
			// 虽然properties里默认是true开启的,但是需要你重写getCacheKey()缓存才会生效的呀
			boolean requestCacheEnabled = isRequestCachingEnabled();
			String cacheKey = getCacheKey(); // 默认是null。重写它缓存才会生效
			// 如果开启了缓存,就先从缓存里找结果
			if (requestCacheEnabled) {
				... // 一旦命中缓存,就处理好数据后return(关于请求缓存会后面再聊)
				// 若命中了缓存,设置isResponseFromCache = true;
			}
			
			// =========如果没有获取到缓存,则需要执行命令获得结果。========
			// applyHystrixSemantics函数:执行熔断器 + 目标方法等核心逻辑(最为复杂和关键的一个函数,下有详解)
			// 所以这里返回的hystrixObservable已经是目标命令结果了
			// wrapWithAllOnNextHooks:触发HystrixCommandExecutionHook相关回调
			// 另外,此处也是defer实现哦,所以目标方法并不会立马执行哦~~~(执行时机交给调用者才对嘛)
			Observable<R> hystrixObservable = Observable.defer(applyHystrixSemantics)
														.map(wrapWithAllOnNextHooks);
			// 它是最终的return
			Observable<R> afterCache;
			// 若开启了缓存
			if(requestCacheEnabled){
				... // 把结果hystrixObservable缓存起来并处理后返回
			} else {
				afterCache = hystrixObservable;
			}
			
			// terminateCommandCleanup:执行清理(分为目标方法执行了or没执行)
			// unsubscribeCommandCleanup:取消订阅
			// fireOnCompletedHook:触发executionHook.onSuccess(_cmd)该方法
			return afterCache.doOnTerminate(terminateCommandCleanup)
							.doOnUnsubscribe(unsubscribeCommandCleanup)
							.doOnCompleted(fireOnCompletedHook);
		});
	}

执行步骤文字描述
  1. 判断线程状态是否是NOT_STARTED,否则抛出HystrixRuntimeException异常:一个命令只能执行一次
  2. 命令开始,使用HystrixRequestLog记录该命令的执行(显示配置requestLogEnabled = false可关闭日志的记录)
  3. 若开启了请求缓存,那就先从缓存里找结果(不会执行目标方法)
    1. 缓存开启的条件是:requestCacheEnabled = true且且且getCacheKey() != null。所以你若想要请求缓存有效,请重写此方法并不要返回null
  4. 没开启缓存(缓存没命中),则需要执行目标命令获得结果
    1. Observable.defer()保证了目标方法此时并不会被执行,而是订阅时才异步执行(交给调用者决定嘛)
    2. applyHystrixSemantics()方法为执行目标方法最最最核心逻辑,后有详解
  5. 若开启了缓存,把结果放进缓存里
  6. 返回结果。并且注册上相关清理动作:
    1. terminateCommandCleanup:把线程状态标记为TERMINAL。分为两种情况:
      1. 目标代码没有被执行(比如从缓存里拿的结果):清空定时监听、记录执行耗时、HystrixCommandMetrics#markCommandDone(),触发执行完成后的函数回调(若endCurrentThreadExecutingCommand不为null的话)
      2. 目标代码执行了。逻辑完全同上,只是markCommandDone(true)此处传true而已
    2. unsubscribeCommandCleanup:把线程状态标记为UNSUBSCRIBED。触发executionHook.onUnsubscribe等动作,并且,并且重复和上步骤一模一样的动作
    3. fireOnCompletedHook: 仅触发动作executionHook.onSuccess(_cmd)

这里主要是套了一层缓存,以及清理相关动作。但其实最为核心的还是在applyHystrixSemantics()这个函数里,它才是真正的关键。


applyHystrixSemantics()
AbstractCommand:

	private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
		executionHook.onStart(_cmd); // 开始执行

		// 若断路器放行(断路器闭合状态,当然喽也可能是半开状态)
		if (circuitBreaker.allowRequest()) {
			// 若你实现的是线程池隔离,那么此处实现就是TryableSemaphoreNoOp
			// 若你使用信号量隔离,它就会生效啦
			TryableSemaphore executionSemaphore = getExecutionSemaphore();
			...
			// 尝试申请信号资源
			if (executionSemaphore.tryAcquire()) {
				executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());

                return executeCommandAndObserve(_cmd) // =====执行目标方法=====
                        .doOnError(markExceptionThrown) // eventNotifier.markEvent() ... 
                        .doOnTerminate(singleSemaphoreRelease) // 确保释放信号量
                        .doOnUnsubscribe(singleSemaphoreRelease); // 确保释放信号量
			} else { // 若木有信号资源了,进入到信号量的fallabck
				return handleSemaphoreRejectionViaFallback();
			}
		} else { // 断路器打开了禁止你访问,那就直接执行fallabck逻辑
			return handleShortCircuitViaFallback();
		}

	}

这里有一个小技巧:TryableSemaphore信号量看起来是不管咋样都存在的,但是若你是线程池隔离的话,它的实现是NoOp空的,完美的嵌入到了正常流程里。

执行步骤:

  1. 询问断路器是否允许请求:circuitBreaker.allowRequest(),若不允许执行直接执行ShortCircuit短路fallabck逻辑,否则继续
  2. 尝试缓存信号量资源(若是线程池隔离,此处永远为true),若没有信号量资源了,触发信号量拒绝的fallabck逻辑,否则继续
  3. 执行目标方法逻辑executeCommandAndObserve(_cmd)

到这里,执行流程分为两大分支:正常执行和异常执行,刚好和前面内容完成接轨


总结

关于Hystrix的执行原理,AbstractCommand详解就介绍到这了,到此关于AbstractCommand的整个内容算是全部讲述完成,所以你对Hystrix的原理应该基本掌握。Hystrix在设计上还是蛮值得借鉴的,面向使用者的API可以提供多个,但是最终都归一到一处,达到更高的内聚效果,维护起来也更加的方便。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 目录
  • 前言
  • 正文
    • AbstractCommand源码解析
      • 成员属性
      • toObservable() 所有执行方式的入口
      • applyHystrixSemantics()
  • 总结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档