前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >[享学Netflix] 三十四、Hystrix目标方法执行逻辑源码解读:executeCommandAndObserve

[享学Netflix] 三十四、Hystrix目标方法执行逻辑源码解读:executeCommandAndObserve

作者头像
YourBatman
发布2020-03-18 19:39:48
9470
发布2020-03-18 19:39:48
举报
文章被收录于专栏:BAT的乌托邦BAT的乌托邦

如果你害怕失败,你就会失败。 代码下载地址:https://github.com/f641385712/netflix-learning

目录
  • 前言
  • 正文
    • executeCommandAndObserve()源码解读
      • doOnNext(markEmits)
      • doOnCompleted(markOnCompleted)
      • onErrorResumeNext(handleFallback)
      • doOnEach(setRequestContext)
    • executeCommandWithSpecifiedIsolation()
      • THREAD线程池隔离下执行
      • SEMAPHORE信号量隔离下执行
  • 总结
    • 声明

前言

前面用了几篇文章内容分析了Hystrix执行fallback的逻辑以及导致降级的各种情况,但是作为正常执行的逻辑均还没涉及。比如需要知道:在线程池隔离下如何执行?在信号量隔离下如何执行呢?

介绍过了异常情况的处理,本文将介绍Hystrix的正常执行流程以及源码解析。


正文

对于方法的执行,Hystrix面向使用者一共提供了四种方法:execute/queue/observe/toObservable,分别应用于不同的场景。而最终要执行目标方法的话,都会归并到一出,这边是本文入口:executeCommandAndObserve()方法。


executeCommandAndObserve()源码解读

该方法是AbstractCommand的一个私有方法,语义是:执行Command命令并且Observe返回一个可被观察的实例Observable<R>。很明显,它将围绕着目标方法的执行而展开~

代码语言:javascript
复制
AbstractCommand:

	private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
		// 执行上下文。保证线程池内亦能获取到主线程里的参数
		HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();
		... // 暂时忽略一些内置的function们
		
		Observable<R> execution;
		// 二者的唯一却别是:若开启了超时支持的话,就只可观察对象的结果处
		// lift一个HystrixObservableTimeoutOperator实例,以监控超时情况
		// 关于Hystrix是如何实现超时的,这在后面专文讲解~~是个较大的,也较难的话题
        if (properties.executionTimeoutEnabled().get()) {
            execution = executeCommandWithSpecifiedIsolation(_cmd)
                    .lift(new HystrixObservableTimeoutOperator<R>(_cmd));
        } else {
            execution = executeCommandWithSpecifiedIsolation(_cmd);
        }

		// 得到execution后,开始注册一些基本的事件、观察者
        return execution.doOnNext(markEmits)
                .doOnCompleted(markOnCompleted)
                .onErrorResumeNext(handleFallback)
                .doOnEach(setRequestContext);
	}

目标方法的执行在executeCommandWithSpecifiedIsolation()方法里。但在此之前下介绍下其它函数的作用:

doOnNext(markEmits)

观察者被回调之前的调用(此时其实数据已经发送,也就是目标方法已经执行了)。

代码语言:javascript
复制
markEmits函数内容:

	markEmits = r -> {
		// 是否应该在onNext这步报告数据
		// HystrixCommand -> false
		// HystrixObservableCommand -> true
	    if (shouldOutputOnNextEvents()) {
	        executionResult = executionResult.addEvent(HystrixEventType.EMIT);
	        eventNotifier.markEvent(HystrixEventType.EMIT, commandKey);
	    }

		// 命令是否是标量
		// HystrixCommand -> true
		// HystrixObservableCommand -> false
	    if (commandIsScalar()) {
	        long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
	        eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList());

			// 这几句代码是重点:
			// 记录结果为SUCCESS成功
			// 并且,并且,并且circuitBreaker.markSuccess();(若断路器是打开的,此处就关闭了)
	        eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey);
	        executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS);
	        circuitBreaker.markSuccess();
	    }
	}

此步骤最重要的两件事:

  1. 记录result结果事件为:SUCCESS
  2. 闭合circuitBreaker断路器(若已经是闭合的就忽略呗)
doOnCompleted(markOnCompleted)

正常完成(发射、监听全部正常时)

代码语言:javascript
复制
markOnCompleted函数内容:

	markOnCompleted = () -> {
		// 注意这个!符号
		if (!commandIsScalar()) {
			... // 逻辑完全同上面的Scalar部分
		}
	}

该函数主要是确保非Scala类型结果也能够正常关闭断路器以及标记Success。

onErrorResumeNext(handleFallback)

重要。当目标方法执行过程中发生错误会执行此函数,用于Resume恢复而不是立马停止线程:这边是触发fallback逻辑的入口

代码语言:javascript
复制
handleFallback函数内容:

	handleFallback = (Throwable t) -> {
		// 把Throwable t强转为Exception e(若不是Exception类型就包装为Exception类型)
		// 比如若t是NPE异常,那么t和e是完全一样的。
		// 只有当t是error类时,t才和e不相等
        Exception e = getExceptionFromThrowable(t);
        // 既然发生错误了,那就记录执行时候的异常e
        executionResult = executionResult.setExecutionException(e);
        
		// 若异常类型是RejectedExecutionException:线程池拒绝
		// 若异常类型是HystrixTimeoutException:目标方法执行超时
		// 若异常类型是HystrixBadRequestException:下文详细分解
        if (e instanceof RejectedExecutionException) {
            return handleThreadPoolRejectionViaFallback(e);
        } else if (t instanceof HystrixTimeoutException) {
            return handleTimeoutViaFallback();
        } else if (t instanceof HystrixBadRequestException) {
            return handleBadRequestByEmittingError(e);
        } else {
            // 什么时候会进入到这里?只有当子类复写了getExceptionFromThrowable()方法的时候才有可能进入到这里
            // 这里算是一种兜底:保证不管咋样HystrixBadRequestException都不会触发熔断
            // 其实我倒觉得,不让getExceptionFromThrowable这个方法被复写也行的
            if (e instanceof HystrixBadRequestException) {
                eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
                return Observable.error(e);
            }

            return handleFailureViaFallback(e);
        }
	}

当目标方法执行过程中抛出异常(可能是程序问题、可能是超时等等)时候,会进入到这里来处理,处理case可分为两大类:

doOnEach(setRequestContext)

该步骤每次都会执行:为子线程设置请求上下文,保证数据打通。

代码语言:javascript
复制
	setRequestContext = (Notification<? super R> rNotification) -> {
		if (!HystrixRequestContext.isCurrentThreadInitialized()) {
			HystrixRequestContext.setContextOnCurrentThread(currentRequestContext);
		}
	}

关于Hystrix是如何实现通过HystrixRequestContext完成跨线程通信的,可参考这篇文章



executeCommandWithSpecifiedIsolation()

它也是AbstractCommand的一个私有方法,只有被executeCommandAndObserve调用。它俩的区别你可简单理解为:

  • executeCommandWithSpecifiedIsolation()用于对目标方法的真正执行
  • executeCommandAndObserve在其基础上封装,加上了执行结果处理、超时处理、出现异常后的fallback处理等额外的逻辑。

该方法字面意思:在规定的隔离方式里执行Command命令,这里规定的隔离策略有且仅有两种:

  • THREAD:线程池隔离(默认)
  • SEMAPHORE:信号量隔离

THREAD线程池隔离下执行

线程池隔离逻辑描述:

代码语言:javascript
复制
AbstractCommand:

	private Observable<R> executeCommandWithSpecifiedIsolation(AbstractCommand<R> _cmd) {
		// 标记我们正在一个线程中执行(即使我们最终被拒绝
		// 我们仍然是一个线程执行,而不是信号量)
		if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
			// 默认是一个defer延迟执行的可观察对象
			// 注意:进到这个回调里面来后,就是使用的线程池的资源去执行了(获取到了线程池资源)
			// 比如此处线程号就是:hystrix-fallbackDemoGroup-1
			return Observable.defer(() -> {
				// 记录:目标方法已经执行(不管出异常与否,反正就是执行了)
				// 应为如果被熔断了,或者线程池拒绝了它是不会被执行的
				executionResult = executionResult.setExecutionOccurred();
				// 线程状态必须是OBSERVABLE_CHAIN_CREATED时才让执行
				// 而此状态是由toObservable()方法设置过来的
                if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
                    return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));
                }
                // 收集指标信息:开始执行
				metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD);

				// 这个判断非常的有意思:如果在run方法还没执行之前
				// 也就是在线程切换之间就超时了,那就直接返回一个错误
				if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) {
					return Observable.error(new RuntimeException("timed out before executing run()"));
				}

				// 把执行的线程状态标记为STARTED:启动
				if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) {
					// 全局计数器+1
					// 此处是唯一调用处。信号量里是木有此调用的哦
					HystrixCounters.incrementGlobalConcurrentThreads();
					// 标记线程池已经开始准备执行了
					threadPool.markThreadExecution(); 

                    // store the command that is being run
                    // 这个保存使用的ThreadLocal<ConcurrentStack<HystrixCommandKey>>和当前线程绑定
                    // 这样确保了命令在执行时的线程安全~~~~~~~
                    endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
                    executionResult = executionResult.setExecutedInThread();

					// 执行钩子程序,以及执行目标run方法程序
					// getUserExecutionObservable:getExecutionObservable()抽象方法获取到目标方法
					// 本处也是该抽象方法的唯一调用处哦
					// 若这里面任何一个方法抛出异常(哪怕是hook方法),就原样抛出
                    try {
                        executionHook.onThreadStart(_cmd);
                        executionHook.onRunStart(_cmd);
                        executionHook.onExecutionStart(_cmd);
                        return getUserExecutionObservable(_cmd);
                    } catch (Throwable ex) {
                        return Observable.error(ex);
                    }
				} else { // 说明已经unsubscribed了,就抛错
					return Observable.error(new RuntimeException("unsubscribed before executing run()"));
				}
			});
		}
	}

待执行的目标Observable实例由getUserExecutionObservable()提供,内部便是调用了抽象方法getExecutionObservable()由子类来提供的。除此之外还需关心注册在Observable上的其它操作符:

  • doOnTerminate:当线程停止时(不管正常停or异常停)。作用是改变线程状态为STARTED
  • doOnUnsubscribe:当取消订阅时会执行。作用基本同上
  • subscribeOn重要。它决定了数据发射在哪个线程里执行,是线程池调度的入口:
代码语言:javascript
复制
AbstractCommand:

		// 核心是threadPool.getScheduler()获取到一个Scheduler
		.subscribeOn(threadPool.getScheduler(() -> {
			properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT
		}));

关于Hystrix是如何去调用线程池资源执行目标方法的,具体详情可参见这篇文章Hystrix执行目标方法时,如何调用线程池资源?


SEMAPHORE信号量隔离下执行

当隔离模式选择信号量隔离时,那就执行如下逻辑:

代码语言:javascript
复制
AbstractCommand:

	return Observable.defer(() -> {
		// 检测线程状态。记录线程执行(不管成功与否)
        executionResult = executionResult.setExecutionOccurred();
        if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
            return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));
        }
	
		// 一样的,标记线程开始执行了。隔离模式是:SEMAPHORE哦
		metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.SEMAPHORE);
		
		// 不解释
		endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
        try {
            executionHook.onRunStart(_cmd);
            executionHook.onExecutionStart(_cmd);
            return getUserExecutionObservable(_cmd);
        } catch (Throwable ex) {
            return Observable.error(ex);
        }
	});

信号量下隔离执行的逻辑几乎完全同线程池方式。不同的是它更加的轻量:不需要线程调度,因此也就不需要使用subscribeOn()调度线程,也不需要使用doOnTerminate/doOnUnsubscribe等去还原线程状态了。


总结

关于Hystrix目标方法执行逻辑源码解读方面就介绍到这了。

此处有个小知识点:在Command目标开始执行的时候,调用了HystrixCommandMetrics#markCommandStart()方法,当结束的时候会自动调用其HystrixCommandMetrics#markCommandDone()方法,只是结束方法的调用时机在入口方法toObservable()处管理着,这将在后文会再次提起,敬请关注。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 目录
  • 前言
  • 正文
    • executeCommandAndObserve()源码解读
      • doOnNext(markEmits)
      • doOnCompleted(markOnCompleted)
      • onErrorResumeNext(handleFallback)
      • doOnEach(setRequestContext)
    • executeCommandWithSpecifiedIsolation()
      • THREAD线程池隔离下执行
      • SEMAPHORE信号量隔离下执行
  • 总结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档