如果你害怕失败,你就会失败。 代码下载地址:https://github.com/f641385712/netflix-learning
前面用了几篇文章内容分析了Hystrix执行fallback的逻辑以及导致降级的各种情况,但是作为正常执行的逻辑均还没涉及。比如需要知道:在线程池隔离下如何执行?在信号量隔离下如何执行呢?
介绍过了异常情况的处理,本文将介绍Hystrix的正常执行流程以及源码解析。
对于方法的执行,Hystrix面向使用者一共提供了四种方法:execute/queue/observe/toObservable
,分别应用于不同的场景。而最终要执行目标方法的话,都会归并到一出,这边是本文入口:executeCommandAndObserve()
方法。
该方法是AbstractCommand
的一个私有方法,语义是:执行Command命令并且Observe
返回一个可被观察的实例Observable<R>
。很明显,它将围绕着目标方法的执行而展开~
AbstractCommand:
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
// 执行上下文。保证线程池内亦能获取到主线程里的参数
HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();
... // 暂时忽略一些内置的function们
Observable<R> execution;
// 二者的唯一却别是:若开启了超时支持的话,就只可观察对象的结果处
// lift一个HystrixObservableTimeoutOperator实例,以监控超时情况
// 关于Hystrix是如何实现超时的,这在后面专文讲解~~是个较大的,也较难的话题
if (properties.executionTimeoutEnabled().get()) {
execution = executeCommandWithSpecifiedIsolation(_cmd)
.lift(new HystrixObservableTimeoutOperator<R>(_cmd));
} else {
execution = executeCommandWithSpecifiedIsolation(_cmd);
}
// 得到execution后,开始注册一些基本的事件、观察者
return execution.doOnNext(markEmits)
.doOnCompleted(markOnCompleted)
.onErrorResumeNext(handleFallback)
.doOnEach(setRequestContext);
}
目标方法的执行在executeCommandWithSpecifiedIsolation()
方法里。但在此之前下介绍下其它函数的作用:
观察者被回调之前的调用(此时其实数据已经发送,也就是目标方法已经执行了)。
markEmits函数内容:
markEmits = r -> {
// 是否应该在onNext这步报告数据
// HystrixCommand -> false
// HystrixObservableCommand -> true
if (shouldOutputOnNextEvents()) {
executionResult = executionResult.addEvent(HystrixEventType.EMIT);
eventNotifier.markEvent(HystrixEventType.EMIT, commandKey);
}
// 命令是否是标量
// HystrixCommand -> true
// HystrixObservableCommand -> false
if (commandIsScalar()) {
long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList());
// 这几句代码是重点:
// 记录结果为SUCCESS成功
// 并且,并且,并且circuitBreaker.markSuccess();(若断路器是打开的,此处就关闭了)
eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey);
executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS);
circuitBreaker.markSuccess();
}
}
此步骤最重要的两件事:
正常完成(发射、监听全部正常时)
markOnCompleted函数内容:
markOnCompleted = () -> {
// 注意这个!符号
if (!commandIsScalar()) {
... // 逻辑完全同上面的Scalar部分
}
}
该函数主要是确保非Scala类型结果也能够正常关闭断路器以及标记Success。
重要。当目标方法执行过程中发生错误会执行此函数,用于Resume恢复而不是立马停止线程:这边是触发fallback逻辑的入口。
handleFallback函数内容:
handleFallback = (Throwable t) -> {
// 把Throwable t强转为Exception e(若不是Exception类型就包装为Exception类型)
// 比如若t是NPE异常,那么t和e是完全一样的。
// 只有当t是error类时,t才和e不相等
Exception e = getExceptionFromThrowable(t);
// 既然发生错误了,那就记录执行时候的异常e
executionResult = executionResult.setExecutionException(e);
// 若异常类型是RejectedExecutionException:线程池拒绝
// 若异常类型是HystrixTimeoutException:目标方法执行超时
// 若异常类型是HystrixBadRequestException:下文详细分解
if (e instanceof RejectedExecutionException) {
return handleThreadPoolRejectionViaFallback(e);
} else if (t instanceof HystrixTimeoutException) {
return handleTimeoutViaFallback();
} else if (t instanceof HystrixBadRequestException) {
return handleBadRequestByEmittingError(e);
} else {
// 什么时候会进入到这里?只有当子类复写了getExceptionFromThrowable()方法的时候才有可能进入到这里
// 这里算是一种兜底:保证不管咋样HystrixBadRequestException都不会触发熔断
// 其实我倒觉得,不让getExceptionFromThrowable这个方法被复写也行的
if (e instanceof HystrixBadRequestException) {
eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
return Observable.error(e);
}
return handleFailureViaFallback(e);
}
}
当目标方法执行过程中抛出异常(可能是程序问题、可能是超时等等)时候,会进入到这里来处理,处理case可分为两大类:
该步骤每次都会执行:为子线程设置请求上下文,保证数据打通。
setRequestContext = (Notification<? super R> rNotification) -> {
if (!HystrixRequestContext.isCurrentThreadInitialized()) {
HystrixRequestContext.setContextOnCurrentThread(currentRequestContext);
}
}
关于Hystrix是如何实现通过HystrixRequestContext
完成跨线程通信的,可参考这篇文章。
它也是AbstractCommand
的一个私有方法,只有被executeCommandAndObserve
调用。它俩的区别你可简单理解为:
executeCommandWithSpecifiedIsolation()
用于对目标方法的真正执行executeCommandAndObserve
在其基础上封装,加上了执行结果处理、超时处理、出现异常后的fallback处理等额外的逻辑。该方法字面意思:在规定的隔离方式里执行Command命令,这里规定的隔离策略有且仅有两种:
线程池隔离逻辑描述:
AbstractCommand:
private Observable<R> executeCommandWithSpecifiedIsolation(AbstractCommand<R> _cmd) {
// 标记我们正在一个线程中执行(即使我们最终被拒绝
// 我们仍然是一个线程执行,而不是信号量)
if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
// 默认是一个defer延迟执行的可观察对象
// 注意:进到这个回调里面来后,就是使用的线程池的资源去执行了(获取到了线程池资源)
// 比如此处线程号就是:hystrix-fallbackDemoGroup-1
return Observable.defer(() -> {
// 记录:目标方法已经执行(不管出异常与否,反正就是执行了)
// 应为如果被熔断了,或者线程池拒绝了它是不会被执行的
executionResult = executionResult.setExecutionOccurred();
// 线程状态必须是OBSERVABLE_CHAIN_CREATED时才让执行
// 而此状态是由toObservable()方法设置过来的
if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));
}
// 收集指标信息:开始执行
metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD);
// 这个判断非常的有意思:如果在run方法还没执行之前
// 也就是在线程切换之间就超时了,那就直接返回一个错误
if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) {
return Observable.error(new RuntimeException("timed out before executing run()"));
}
// 把执行的线程状态标记为STARTED:启动
if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) {
// 全局计数器+1
// 此处是唯一调用处。信号量里是木有此调用的哦
HystrixCounters.incrementGlobalConcurrentThreads();
// 标记线程池已经开始准备执行了
threadPool.markThreadExecution();
// store the command that is being run
// 这个保存使用的ThreadLocal<ConcurrentStack<HystrixCommandKey>>和当前线程绑定
// 这样确保了命令在执行时的线程安全~~~~~~~
endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
executionResult = executionResult.setExecutedInThread();
// 执行钩子程序,以及执行目标run方法程序
// getUserExecutionObservable:getExecutionObservable()抽象方法获取到目标方法
// 本处也是该抽象方法的唯一调用处哦
// 若这里面任何一个方法抛出异常(哪怕是hook方法),就原样抛出
try {
executionHook.onThreadStart(_cmd);
executionHook.onRunStart(_cmd);
executionHook.onExecutionStart(_cmd);
return getUserExecutionObservable(_cmd);
} catch (Throwable ex) {
return Observable.error(ex);
}
} else { // 说明已经unsubscribed了,就抛错
return Observable.error(new RuntimeException("unsubscribed before executing run()"));
}
});
}
}
待执行的目标Observable
实例由getUserExecutionObservable()
提供,内部便是调用了抽象方法getExecutionObservable()
由子类来提供的。除此之外还需关心注册在Observable
上的其它操作符:
doOnTerminate
:当线程停止时(不管正常停or异常停)。作用是改变线程状态为STARTED
等doOnUnsubscribe
:当取消订阅时会执行。作用基本同上subscribeOn
:重要。它决定了数据发射在哪个线程里执行,是线程池调度的入口:AbstractCommand:
// 核心是threadPool.getScheduler()获取到一个Scheduler
.subscribeOn(threadPool.getScheduler(() -> {
properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT
}));
关于Hystrix
是如何去调用线程池资源执行目标方法的,具体详情可参见这篇文章Hystrix执行目标方法时,如何调用线程池资源?
当隔离模式选择信号量隔离时,那就执行如下逻辑:
AbstractCommand:
return Observable.defer(() -> {
// 检测线程状态。记录线程执行(不管成功与否)
executionResult = executionResult.setExecutionOccurred();
if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));
}
// 一样的,标记线程开始执行了。隔离模式是:SEMAPHORE哦
metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.SEMAPHORE);
// 不解释
endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
try {
executionHook.onRunStart(_cmd);
executionHook.onExecutionStart(_cmd);
return getUserExecutionObservable(_cmd);
} catch (Throwable ex) {
return Observable.error(ex);
}
});
信号量下隔离执行的逻辑几乎完全同线程池方式。不同的是它更加的轻量:不需要线程调度,因此也就不需要使用subscribeOn()
调度线程,也不需要使用doOnTerminate/doOnUnsubscribe
等去还原线程状态了。
关于Hystrix目标方法执行逻辑源码解读方面就介绍到这了。
此处有个小知识点:在Command目标开始执行的时候,调用了HystrixCommandMetrics#markCommandStart()
方法,当结束的时候会自动调用其HystrixCommandMetrics#markCommandDone()
方法,只是结束方法的调用时机在入口方法toObservable()
处管理着,这将在后文会再次提起,敬请关注。