专栏首页芋道源码1024熔断器 Hystrix 源码解析 —— 执行结果缓存

熔断器 Hystrix 源码解析 —— 执行结果缓存

本文主要基于 Hystrix 1.5.X 版本

  • 1. 概述
  • 2. 好处
  • 3. Observable#defer(...)
  • 4. AbstractCommand#toObservavle(...)
  • 5. HystrixCachedObservable
  • 6. HystrixCommandResponseFromCache

1. 概述

本文主要分享 Hystrix 执行命令的结果缓存

建议 :对 RxJava 已经有一定的了解的基础上阅读本文。

Hystrix 执行命令整体流程如下图:

FROM 《【翻译】Hystrix文档-实现原理》「流程图」

  • 红圈 :在 《Hystrix 源码解析 —— 执行命令方式》 有详细解析。
  • 紫圈 :在 #toObservable() 方法里,如果请求结果缓存这个特性被启用,并且缓存命中,则缓存的回应会立即通过一个 Observable 对象的形式返回;如果缓存未命中,则返回【订阅了执行命令的 Observable】的 ReplySubject 对象缓存执行结果。
    • ReplySubject 能够重放执行结果,从而实现缓存的功效。本文不对 ReplySubject 做太多拓展,感兴趣的同学可以阅读 《ReactiveX/RxJava文档中文版 —— Subject》 。

在官方提供的示例中,我们使用 CommandUsingRequestCache 进行调试 。

推荐 Spring Cloud 书籍

  • 请支持正版。下载盗版,等于主动编写低级 BUG
  • 程序猿DD —— 《Spring Cloud微服务实战》
  • 周立 —— 《Spring Cloud与Docker微服务架构实战》
  • 两书齐买,京东包邮。

2. 好处

点击 《【翻译】Hystrix文档-实现原理》「请求缓存」 ,查看对请求缓存的好处分享,写的真的很赞。

3. Observable#defer(...)

本小节为拓展内容,源码解析 RxJava ( 非 Hystrix ) 的 Observable#defer(...) 的方法实现。考虑到 Hystrix 大量使用,为了更好的理解,解析下源码。

《RxJava 源码解析 —— Observable#defer(...)》

4. AbstractCommand#toObservavle(...)

AbstractCommand#toObservavle(...) 方法,代码如下 :

1: public Observable<R> toObservable() {
  2:     final AbstractCommand<R> _cmd = this;
  3: 
  4:     //doOnCompleted handler already did all of the SUCCESS work
  5:     //doOnError handler already did all of the FAILURE/TIMEOUT/REJECTION/BAD_REQUEST work
  6:     final Action0 terminateCommandCleanup = new Action0() {} // ... 省略
  7: 
  8:     //mark the command as CANCELLED and store the latency (in addition to standard cleanup)
  9:     final Action0 unsubscribeCommandCleanup = new Action0() {} // ... 省略
 10: 
 11:     final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
 12:         @Override
 13:         public Observable<R> call() {
 14:             if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
 15:                 return Observable.never();
 16:             }
 17:             return applyHystrixSemantics(_cmd);
 18:         }
 19:     };
 20: 
 21:     final Func1<R, R> wrapWithAllOnNextHooks = new Func1<R, R>() {} // ... 省略 
 22: 
 23:     final Action0 fireOnCompletedHook = new Action0() {} // ... 省略 
 24: 
 25:     return Observable.defer(new Func0<Observable<R>>() {
 26:         @Override
 27:         public Observable<R> call() {
 28:             /* this is a stateful object so can only be used once */
 29:             if (!commandState.compareAndSet(CommandState.NOT_STARTED, CommandState.OBSERVABLE_CHAIN_CREATED)) {
 30:                 IllegalStateException ex = new IllegalStateException("This instance can only be executed once. Please instantiate a new instance.");
 31:                 //TODO make a new error type for this
 32:                 throw new HystrixRuntimeException(FailureType.BAD_REQUEST_EXCEPTION, _cmd.getClass(), getLogMessagePrefix() + " command executed multiple times - this is not permitted.", ex, null);
 33:             }
 34: 
 35:             // 命令开始时间戳
 36:             commandStartTimestamp = System.currentTimeMillis();
 37: 
 38:             // TODO【2001】【打印日志】
 39:             if (properties.requestLogEnabled().get()) {
 40:                 // log this command execution regardless of what happened
 41:                 if (currentRequestLog != null) {
 42:                     currentRequestLog.addExecutedCommand(_cmd);
 43:                 }
 44:             }
 45: 
 46:             // 缓存开关、缓存KEY
 47:             final boolean requestCacheEnabled = isRequestCachingEnabled();
 48:             final String cacheKey = getCacheKey();
 49: 
 50:             // 优先从缓存中获取
 51:             /* try from cache first */
 52:             if (requestCacheEnabled) {
 53:                 HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
 54:                 if (fromCache != null) {
 55:                     isResponseFromCache = true; // 标记 从缓存中结果
 56:                     return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
 57:                 }
 58:             }
 59: 
 60:             // 获得 执行命令Observable
 61:             Observable<R> hystrixObservable =
 62:                     Observable.defer(applyHystrixSemantics)
 63:                             .map(wrapWithAllOnNextHooks);
 64: 
 65:             // 获得 缓存Observable
 66:             Observable<R> afterCache;
 67:             // put in cache
 68:             if (requestCacheEnabled && cacheKey != null) {
 69:                 // wrap it for caching
 70:                 HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);
 71:                 // 并发若不存在
 72:                 HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);
 73:                 if (fromCache != null) { // 添加失败
 74:                     // another thread beat us so we'll use the cached value instead
 75:                     toCache.unsubscribe();
 76:                     isResponseFromCache = true; // 标记 从缓存中结果
 77:                     return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
 78:                 } else { // 添加成功
 79:                     // we just created an ObservableCommand so we cast and return it
 80:                     afterCache = toCache.toObservable();
 81:                 }
 82:             } else {
 83:                 afterCache = hystrixObservable;
 84:             }
 85: 
 86:             //
 87:             return afterCache
 88:                     .doOnTerminate(terminateCommandCleanup)     // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line))
 89:                     .doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once
 90:                     .doOnCompleted(fireOnCompletedHook);
 91:         }
 92:     });
 93: }
  • 第 2 行 : _cmd 指向当前命令对象,用于下面实现 FuncX ,ActionX 内部类使用。
  • 第 11 至 19 行 :当缓存特性未开启,或者缓存未命中时,使用 applyHystrixSemantics 传入 Observable#defer(...) 方法,声明执行命令的 Observable。
  • 第 25 行 :声明缓存 Observable 。Hystrix 执行命令的 Observable 声明关系如下:
  • 第 29 至 33 行 :一条命令只能执行一次
  • 第 36 行 :记录命令开始时间戳。
  • 第 38 至 44 行 :TODO【2001】【打印日志】
  • 第 47 至 48 行 :缓存存开关、KEY 。
  • 第 52 至 58 行 :如果请求结果缓存这个特性被启用,并且缓存命中,则缓存的回应会立即通过一个 Observable 对象的形式返回。
    • 第 53 行 : requestCache 缓存,在 TODO 【2008】【请求缓存】 详细解析。
    • 第 53 行 :「6. HystrixCommandResponseFromCache」 详细解析。
    • 第 56 行 : #handleRequestCacheHitAndEmitValues(...) 方法,在第 78 行详细解析。
  • 第 61 至 63 行 :获取执行命令的 Observable 。在 《Hystrix 源码解析 —— 命令执行(一)之正常执行逻辑》 详细解析。
  • 第 68 至 81 行 :当缓存特性开启,并且缓存未命中时,创建【订阅了执行命令的 Observable】的 HystrixCommandResponseFromCache 。
    • 第 75 行 :调用 HystrixCommandResponseFromCache#unsubscribe() 方法,取消 HystrixCommandResponseFromCache 的订阅。这一步很关键,因为我们不希望缓存不存在时,多个线程去执行命令,最好有且只有一个线程执行命令。在 「5. HystrixCachedObservable」 详细解析。
    • 第 77 行 :「6. HystrixCommandResponseFromCache」 详细解析。
    • 第 69 至 72 行 :创建 HystrixCommandResponseFromCache ,并添加到 requestCache 。哟, HystrixRequestCache#putIfAbsent(...) 方法,多个线程添加时,只有一个线程添加成功。
    • 第 73 至 77 行 :添加失败的线程( ):
  • 第 80 行 :调用 HystrixCommandResponseFromCachetoObservable() 方法,获得缓存 Observable 。
  • 第 82 至 84 行 :当缓存特性未开启,使用执行命令 Observable 。
  • 第 87 至 91 行 :在返回的 Observable 上,订阅一些清理的处理逻辑。对这几个方法有疑惑的同学,可以阅读 《给 Android 开发者的 RxJava 详解》「 3) Subscribe (订阅) 」 。

5. HystrixCachedObservable

com.netflix.hystrix.HystrixCachedObservable ,缓存 Observable 。

HystrixCachedObservable 构造方法,代码如下 :

1: public class HystrixCachedObservable<R> {
  2: /**
  3:  * 订阅
  4:  */
  5: protected final Subscription originalSubscription;
  6: /**
  7:  * 缓存 cachedObservable
  8:  */
  9: protected final Observable<R> cachedObservable;
 10: /**
 11:  * TODO 【2006】【outstandingSubscriptions】
 12:  */
 13: private volatile int outstandingSubscriptions = 0;
 14: //private AtomicInteger outstandingSubscriptions2 = new AtomicInteger(0);
 15: 
 16: protected HystrixCachedObservable(final Observable<R> originalObservable) {
 17:     ReplaySubject<R> replaySubject = ReplaySubject.create();
 18:     this.originalSubscription = originalObservable
 19:             .subscribe(replaySubject);
 20: 
 21:     this.cachedObservable = replaySubject
 22:             .doOnUnsubscribe(new Action0() {
 23:                 @Override
 24:                 public void call() {
 25:                     outstandingSubscriptions--;
 26:                     if (outstandingSubscriptions == 0) {
 27:                         originalSubscription.unsubscribe();
 28:                     }
 29:                 }
 30:             })
 31:             .doOnSubscribe(new Action0() {
 32:                 @Override
 33:                 public void call() {
 34:                     outstandingSubscriptions++;
 35:                 }
 36:             });
 37: }
  • 第 17 至 19 行 :实际上,HystrixCachedObservable 不是一个 Observable 的子类,而是对传入的 Observable 封装 :使用 ReplaySubject 向传入的 Observable 发起订阅,通过 ReplaySubject 能够重放执行结果,从而实现缓存的功效。这里有几个卡到笔者的并且很有趣的点,我们一一道来 :从上文中,我们可以看到,传入的 originalObservablehystrixObservable 执行命令 Observable 。在 Hystrix 里,提供了两种执行命令的隔离方式 :线程池( THREAD ) 和信号量( SEMAPHORE )。
    • 当使用 THREAD 隔离时, #subscribe(replaySubject) 调用完成时,实际命令并未开始执行,或者说,这是一个异步的执行命令的过程。那么,会不会影响返回执行结果呢?答案当然是不会,BlockingObservable 在得到执行完成才会结束阻塞,此时已经有执行结果。
    • 当使用 SEMAPHORE 隔离时, #subscribe(replaySubject) 调用完成时,实际命令已经执行完成,所以即使 AbstractCommand#toObservavle(...) 的第 75 行 :调用 HystrixCommandResponseFromCache#unsubscribe() 方法,也会浪费,重复执行命令。而对于 THREAD 隔离的情况,通过取消订阅的方式,只会执行一次命令。当然,如果“恶搞” THREAD 隔离的情况,增加 sleep 的调用如下,就能达到重复执行命令的效果。
  • 第 21 至 36 行 :TODO 【2006】【outstandingSubscriptions】原子性没问题么?历史版本使用的是 AtomicInteger 。

HystrixCachedObservable 的其他方法,点击 链接 查看。

6. HystrixCommandResponseFromCache

com.netflix.hystrix.HystrixCommandResponseFromCache ,是 HystrixCachedObservable 的子类。在父类的基础上,增加了对 AbstractCommand.executionResult 的关注。

HystrixCachedObservable#from(Observable, AbstractCommand) 方法,创建 HystrixCommandResponseFromCache 对象,点击 链接 查看。


HystrixCommandResponseFromCache#toObservableWithStateCopiedInto(...) 方法,点击 链接 查看。

  • 通过 completionLogicRun 属性,保证 #doOnError()#doOnCompleted()#doOnUnsubscribe() 方法有且只有一个方法执行具体逻辑。
    • #doOnError()#doOnCompleted() 执行时,调用 #commandCompleted() 方法,从缓存命令( HystrixCommandResponseFromCache.originalCommand ) 复制 executionResult 属性给当前命令( commandToCopyStateInto ) 。
    • #doOnUnsubscribe() 执行时,调用 #commandUnsubscribed() 方法,使用当前命令( commandToCopyStateInto )自己executionResult ,不进行复制。
  • TODO 【2007】【executionResult】用途

本文分享自微信公众号 - 芋道源码(YunaiV),作者:芋艿

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2017-11-27

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 手写一个 web 服务器!

    作为后端开发人员,在实际的工作中我们会非常高频地使用到web服务器。而tomcat作为web服务器领域中举足轻重的一个web框架,又是不能不学习和了解的。

    芋道源码
  • 熔断器 Hystrix 源码解析 —— 命令执行(一)之正常执行逻辑

    本文主要基于 Hystrix 1.5.X 版本 1. 概述 2. #applyHystrixSemantics(...) 3. TryableSemaphor...

    芋道源码
  • 芋道 Spring Boot MongoDB 入门

    MongoDB 中的许多概念在 MySQL 中具有相近的类比。本表概述了每个系统中的一些常见概念。

    芋道源码
  • TencentDataCenter:On the Road

    2014年9月10日,腾讯IDC平台部在北京腾讯汇举办了第一届腾讯数据中心分享日,主题为“开放创新,合作共赢”。此次分享日邀请了工信部电信研究院、BAT互联网公...

    腾讯数据中心
  • 【PPT】清华大学演讲:讲解人工智能技术与产业发展

    导读:这是清华大学计算机科学与技术系教授、博士生导师孙富春在机械工业出版社讲座交流时的PPT,内容包括了人工智能产业的历史背景、现状及未来发展趋势。这180页的...

    钱塘数据
  • 前期准备,搭建渗透测试环境

      我们会使用一个叫Virtual Box的程序来搭建我们的渗透测试的测试环境。 我们可以使用Virtual Box在我们当前系统中创建虚拟机,在我们的实验中我...

    小老鼠
  • 像搭积木一样拼装数据中心

    你可能知道腾讯是国内互联网三巨头之一,可能是腾讯两亿多同时在线的QQ用户和4.38亿微信活跃用户中的一员,但是你可能并不了解是什么在支撑着这些亿级用户平台的顺畅...

    腾讯数据中心
  • 上传单个文件到多台机器工具

    一见
  • 批量远程执行shell命令工具

    使用示例(使用了默认用户root,和默认端口号22): ./mooon_ssh --h=192.168.4.1,192.168.4.2 -P=password...

    一见
  • (含源码)「自然语言处理(NLP)」Question Answering(QA)论文整理(七)

    本次文章主要介绍了ERNIE-GEN(语言生成任务)、统一预训练语言模型(UniLM)、问答系统数据集(CoQA)、端到端神经生成问答(GENQA)、生成式问答...

    ShuYini

扫码关注云+社区

领取腾讯云代金券