最快的脚步不是跨越,而是继续;最慢的步伐不是缓慢,而是徘徊。 代码下载地址:https://github.com/f641385712/netflix-learning
Hystrix的源码因为是基于RxJava来书写的,一方面是很多小伙伴对RxJava并不熟悉,另一方面是基于观察者模式实现的代码绕来绕去就是不好理解,所以总的来说Hystrix的源码是比较难啃的。
前面我们已经把Hystrix的正常执行 + 异常fallback执行都“逐个击破”了,有了良好的知识铺垫,本文主要仅需做归并即可捋出其执行原理。另外,虽然我们最常使用的是HystrixCommand
,而真正的执行逻辑99%
都是在AbstractCommand
里,它才是集大成者。
正常执行部分请参考这里:https://fangshixiang.blog.csdn.net/article/details/104556721 异常执行部分请参考这里:https://fangshixiang.blog.csdn.net/article/details/104718511
如图,这是Hystrix的执行过程示意图:
它是HystrixCommand
和HystrixObservableCommand
的抽象父类,实现了绝大部分的执行逻辑以及熔断器控制、事件发送等…
说明:
xxxCollapser
系列如:HystrixCollapser
和HystrixObservableCollapser
它们是没有提取Abstract抽象实现的,而是直接实现的接口。
每个请求都会生成一个command
实例,而每个command
实例都对应着一个HystrixCommandKey、HystrixCircuitBreaker、HystrixThreadPool、HystrixConcurrencyStrategy、HystrixRequestCache...
等等组件来实现各式各样的功能。
说明:每次请求
command
是新生成的一个实例,但是对应的那些组件们可不是新的实例哦,因为都是同一个HystrixCommandKey
的请求共用一个组件实例的~
AbstractCommand
是一个“很大的”类,此类源代码行数2000+,所以拆分成如下几个部分讲解。
AbstractCommand
类拥有近30个成员属性,但还好有了前面N篇文章的铺陈,大部分的API、组件功能都已了然于胸了,所以阅读起来还是很流畅的。
abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
// command的id
protected final HystrixCommandKey commandKey;
// 线程池分组名(理论上不同的Command可以共用一个线程池,节约资源嘛)
protected final HystrixThreadPoolKey threadPoolKey;
// 逻辑分组。用于统计
protected final HystrixCommandGroupKey commandGroup;
// 各种properties配置 均可以通过SPI方式提供
protected final HystrixCommandProperties properties;
// SPI接口(这几个接口有详细介绍,不陌生)
protected final HystrixEventNotifier eventNotifier;
protected final HystrixConcurrencyStrategy concurrencyStrategy;
protected final HystrixCommandExecutionHook executionHook;
// 熔断器。若你使用配置显示enable=false了
// 其实现就是NoOpCircuitBreaker,否则就是默认实现
protected final HystrixCircuitBreaker circuitBreaker;
// 线程池。默认实现是HystrixThreadPoolDefault
// 线程池参数使用HystrixThreadPoolProperties配置
// 通过HystrixConcurrencyStrategy#getThreadPool()得到ThreadPoolExecutor执行器
// 说明:每次getThreadPool()一下都会用最新的配置配置执行器。所以可以达到动态化的目的
// 比如说CorePoolSize、MaximumPoolSize等等都可以在properties里动态改变
protected final HystrixThreadPool threadPool;
// Command指标收集器
protected final HystrixCommandMetrics metrics;
// 执行时候时使用的信号量(每个key一个信号量控制哦)
// 说明:它并没有使用JDK的java.util.concurrent.Semaphore,而是自己的实现
// 信号量的实现非常简单,所以就略喽。
// 当没开启信号量隔离的时候,该实现类使用的是TryableSemaphoreNoOp
protected final TryableSemaphore executionSemaphoreOverride;
// 发生fallabck时的信号量,也是每个key一个。至于fallback都需要信号量隔离,前面有详细说明
// 它哥俩均只有在隔离策略是SEMAPHORE才有效。它哥俩信号量默认值都是10
protected final TryableSemaphore executionSemaphoreOverride;
// 各种状态值(这里也说明command是有状态的:一个实例只能执行一次)
protected AtomicReference<CommandState> commandState = new AtomicReference<>(CommandState.NOT_STARTED);
protected AtomicReference<ThreadState> threadState = new AtomicReference<>(ThreadState.NOT_USING_THREAD);
protected final AtomicReference<TimedOutStatus> isCommandTimedOut = new AtomicReference<>(TimedOutStatus.NOT_EXECUTED);
// 执行结果
protected volatile ExecutionResult executionResult = ExecutionResult.EMPTY;
// 取消时的执行结果
protected volatile ExecutionResult executionResultAtTimeOfCancellation;
// 表示为:响应是否来自于缓存
// 若缓存开启了,并且请求时缓存命中了,那此值就会被置为true
protected volatile boolean isResponseFromCache = false;
// command开始执行的时刻(并不代表一定会执行到目标方法哦)
protected volatile long commandStartTimestamp = -1L;
// 缓存、日志相关
protected final HystrixRequestCache requestCache;
protected final HystrixRequestLog currentRequestLog;
// 缓存默认名称(官方数据,加了这个缓存后效率提升了1-2微秒)
// 默认的key名使用getSimpleName(),简单类名
// 但若你没有简单类名(比如内部类),那就使用全类名getName()
private static ConcurrentHashMap<Class<?>, String> defaultNameCache = new ConcurrentHashMap<>();
// 缓存该key是否有fallback方法,这样如果木有就避免每次都反射去找了
protected static ConcurrentHashMap<HystrixCommandKey, Boolean> commandContainsFallback = new ConcurrentHashMap<>();
// 唯一构造器,完成了所有属性的初始化
protected AbstractCommand(...){
// group不能为null,但是key可以为null -> 自动用简单类名
this.commandGroup = initGroupKey(group);
this.commandKey = initCommandKey(key, getClass());
...
// 自己没配置,那就使用传入的值。若传入为null,就使用groupKey
// 大多数情况下让其保持和groupKey一样即可
this.threadPoolKey = initThreadPoolKey(threadPoolKey, this.commandGroup, this.properties.executionIsolationThreadPoolKeyOverride().get());
...
this.threadPool = initThreadPool(threadPool, this.threadPoolKey, threadPoolPropertiesDefaults);
...
}
... // 省略所有的init方法
// 允许折叠器将此命令实例标记为用于折叠请求以及折叠多少请求
void markAsCollapsedCommand(HystrixCollapserKey collapserKey, int sizeOfBatch) {
eventNotifier.markEvent(HystrixEventType.COLLAPSED, this.commandKey);
executionResult = executionResult.markCollapsed(collapserKey, sizeOfBatch);
}
// ========执行方法============
public Observable<R> observe() { ... }
public Observable<R> toObservable() { ... }
// 抽象方法:目标方法以及针对的fallback方法
protected abstract Observable<R> getExecutionObservable();
protected abstract Observable<R> getFallbackObservable();
}
光看AbstractCommand
拥有的成员属性,就知道它有多复杂了。需要注意的是:它的属性的访问权限大都是protected的,所以子类均可直接访问。它的这些属性的初始化均在唯一的构造器里完成,每个属性的初始化逻辑大体相似:缓存 -> SPI -> 初始化 -> 默认值,因为比较简单,本文略。
完成成员属性的准备工作后,下面就开始它的执行过程部分了。
我们知道HystrixCommand
的执行方法有多种,但其实不管哪种执行方法,最终都依赖于toObservable()
这个方法,toObservable()
它是执行原理的集大成者,所有执行方式的入口。
execute() 依赖于 queue() 依赖于 toObservable() observe() 依赖于 toObservable()
该方法作用:用于订阅Observable
的回调命令的异步执行,也就是说自己返回一个可被订阅的对象:数据发射器。一旦有订阅者就会延迟的发送数据/命令,新订阅者是不会监听到历史数据的。
AbstractCommand:
public Observable<R> toObservable() {
... // 暂时省略非常多的Action和Func们
// 需要注意的是:这里使用的是defer(),所以里面内容是不会立马执行的,知道有订阅者订阅了就执行
// 也就是说observable = command.toObservable()是不会执行
// observable.subscribe(xxx)订阅后,才会开始执行
// 下面使用defer的效果都一样~~~~~~~~~
return Observable.defer(() -> {
... // 检验线程状态CommandState,每个command只能被执行一次,否则额抛出HystrixRuntimeException异常
commandStartTimestamp = System.currentTimeMillis(); // 命令开始执行
... // 记录日志:不管发生了什么,都要记录这个命令的执行
// 是否允许请求缓存:properties.requestCacheEnabled().get() && getCacheKey() != null;
// 虽然properties里默认是true开启的,但是需要你重写getCacheKey()缓存才会生效的呀
boolean requestCacheEnabled = isRequestCachingEnabled();
String cacheKey = getCacheKey(); // 默认是null。重写它缓存才会生效
// 如果开启了缓存,就先从缓存里找结果
if (requestCacheEnabled) {
... // 一旦命中缓存,就处理好数据后return(关于请求缓存会后面再聊)
// 若命中了缓存,设置isResponseFromCache = true;
}
// =========如果没有获取到缓存,则需要执行命令获得结果。========
// applyHystrixSemantics函数:执行熔断器 + 目标方法等核心逻辑(最为复杂和关键的一个函数,下有详解)
// 所以这里返回的hystrixObservable已经是目标命令结果了
// wrapWithAllOnNextHooks:触发HystrixCommandExecutionHook相关回调
// 另外,此处也是defer实现哦,所以目标方法并不会立马执行哦~~~(执行时机交给调用者才对嘛)
Observable<R> hystrixObservable = Observable.defer(applyHystrixSemantics)
.map(wrapWithAllOnNextHooks);
// 它是最终的return
Observable<R> afterCache;
// 若开启了缓存
if(requestCacheEnabled){
... // 把结果hystrixObservable缓存起来并处理后返回
} else {
afterCache = hystrixObservable;
}
// terminateCommandCleanup:执行清理(分为目标方法执行了or没执行)
// unsubscribeCommandCleanup:取消订阅
// fireOnCompletedHook:触发executionHook.onSuccess(_cmd)该方法
return afterCache.doOnTerminate(terminateCommandCleanup)
.doOnUnsubscribe(unsubscribeCommandCleanup)
.doOnCompleted(fireOnCompletedHook);
});
}
NOT_STARTED
,否则抛出HystrixRuntimeException
异常:一个命令只能执行一次HystrixRequestLog
记录该命令的执行(显示配置requestLogEnabled = false
可关闭日志的记录)requestCacheEnabled = true
且且且getCacheKey() != null
。所以你若想要请求缓存有效,请重写此方法并不要返回nullObservable.defer()
保证了目标方法此时并不会被执行,而是订阅时才异步执行(交给调用者决定嘛)applyHystrixSemantics()
方法为执行目标方法最最最核心逻辑,后有详解terminateCommandCleanup
:把线程状态标记为TERMINAL
。分为两种情况: HystrixCommandMetrics#markCommandDone()
,触发执行完成后的函数回调(若endCurrentThreadExecutingCommand
不为null的话)markCommandDone(true)
此处传true而已unsubscribeCommandCleanup
:把线程状态标记为UNSUBSCRIBED
。触发executionHook.onUnsubscribe
等动作,并且,并且重复和上步骤一模一样的动作fireOnCompletedHook
: 仅触发动作executionHook.onSuccess(_cmd)
这里主要是套了一层缓存,以及清理相关动作。但其实最为核心的还是在applyHystrixSemantics()
这个函数里,它才是真正的关键。
AbstractCommand:
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
executionHook.onStart(_cmd); // 开始执行
// 若断路器放行(断路器闭合状态,当然喽也可能是半开状态)
if (circuitBreaker.allowRequest()) {
// 若你实现的是线程池隔离,那么此处实现就是TryableSemaphoreNoOp
// 若你使用信号量隔离,它就会生效啦
TryableSemaphore executionSemaphore = getExecutionSemaphore();
...
// 尝试申请信号资源
if (executionSemaphore.tryAcquire()) {
executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
return executeCommandAndObserve(_cmd) // =====执行目标方法=====
.doOnError(markExceptionThrown) // eventNotifier.markEvent() ...
.doOnTerminate(singleSemaphoreRelease) // 确保释放信号量
.doOnUnsubscribe(singleSemaphoreRelease); // 确保释放信号量
} else { // 若木有信号资源了,进入到信号量的fallabck
return handleSemaphoreRejectionViaFallback();
}
} else { // 断路器打开了禁止你访问,那就直接执行fallabck逻辑
return handleShortCircuitViaFallback();
}
}
这里有一个小技巧:TryableSemaphore
信号量看起来是不管咋样都存在的,但是若你是线程池隔离的话,它的实现是NoOp空的,完美的嵌入到了正常流程里。
执行步骤:
circuitBreaker.allowRequest()
,若不允许执行直接执行ShortCircuit
短路fallabck逻辑,否则继续executeCommandAndObserve(_cmd)
到这里,执行流程分为两大分支:正常执行和异常执行,刚好和前面内容完成接轨:
关于Hystrix的执行原理,AbstractCommand详解就介绍到这了,到此关于AbstractCommand
的整个内容算是全部讲述完成,所以你对Hystrix的原理应该基本掌握。Hystrix在设计上还是蛮值得借鉴的,面向使用者的API可以提供多个,但是最终都归一到一处,达到更高的内聚效果,维护起来也更加的方便。