本文主要基于 Hystrix 1.5.X 版本
本文主要分享 Hystrix 执行命令的结果缓存。
建议 :对 RxJava 已经有一定的了解的基础上阅读本文。
Hystrix 执行命令整体流程如下图:
FROM 《【翻译】Hystrix文档-实现原理》「流程图」
#toObservable()
方法里,如果请求结果缓存这个特性被启用,并且缓存命中,则缓存的回应会立即通过一个 Observable 对象的形式返回;如果缓存未命中,则返回【订阅了执行命令的 Observable】的 ReplySubject 对象缓存执行结果。在官方提供的示例中,我们使用 CommandUsingRequestCache 进行调试 。
推荐 Spring Cloud 书籍:
点击 《【翻译】Hystrix文档-实现原理》「请求缓存」 ,查看对请求缓存的好处分享,写的真的很赞。
本小节为拓展内容,源码解析 RxJava ( 非 Hystrix ) 的 Observable#defer(...)
的方法实现。考虑到 Hystrix 大量使用,为了更好的理解,解析下源码。
《RxJava 源码解析 —— Observable#defer(...)》
AbstractCommand#toObservavle(...)
方法,代码如下 :
1: public Observable<R> toObservable() {
2: final AbstractCommand<R> _cmd = this;
3:
4: //doOnCompleted handler already did all of the SUCCESS work
5: //doOnError handler already did all of the FAILURE/TIMEOUT/REJECTION/BAD_REQUEST work
6: final Action0 terminateCommandCleanup = new Action0() {} // ... 省略
7:
8: //mark the command as CANCELLED and store the latency (in addition to standard cleanup)
9: final Action0 unsubscribeCommandCleanup = new Action0() {} // ... 省略
10:
11: final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
12: @Override
13: public Observable<R> call() {
14: if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
15: return Observable.never();
16: }
17: return applyHystrixSemantics(_cmd);
18: }
19: };
20:
21: final Func1<R, R> wrapWithAllOnNextHooks = new Func1<R, R>() {} // ... 省略
22:
23: final Action0 fireOnCompletedHook = new Action0() {} // ... 省略
24:
25: return Observable.defer(new Func0<Observable<R>>() {
26: @Override
27: public Observable<R> call() {
28: /* this is a stateful object so can only be used once */
29: if (!commandState.compareAndSet(CommandState.NOT_STARTED, CommandState.OBSERVABLE_CHAIN_CREATED)) {
30: IllegalStateException ex = new IllegalStateException("This instance can only be executed once. Please instantiate a new instance.");
31: //TODO make a new error type for this
32: throw new HystrixRuntimeException(FailureType.BAD_REQUEST_EXCEPTION, _cmd.getClass(), getLogMessagePrefix() + " command executed multiple times - this is not permitted.", ex, null);
33: }
34:
35: // 命令开始时间戳
36: commandStartTimestamp = System.currentTimeMillis();
37:
38: // TODO【2001】【打印日志】
39: if (properties.requestLogEnabled().get()) {
40: // log this command execution regardless of what happened
41: if (currentRequestLog != null) {
42: currentRequestLog.addExecutedCommand(_cmd);
43: }
44: }
45:
46: // 缓存开关、缓存KEY
47: final boolean requestCacheEnabled = isRequestCachingEnabled();
48: final String cacheKey = getCacheKey();
49:
50: // 优先从缓存中获取
51: /* try from cache first */
52: if (requestCacheEnabled) {
53: HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
54: if (fromCache != null) {
55: isResponseFromCache = true; // 标记 从缓存中结果
56: return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
57: }
58: }
59:
60: // 获得 执行命令Observable
61: Observable<R> hystrixObservable =
62: Observable.defer(applyHystrixSemantics)
63: .map(wrapWithAllOnNextHooks);
64:
65: // 获得 缓存Observable
66: Observable<R> afterCache;
67: // put in cache
68: if (requestCacheEnabled && cacheKey != null) {
69: // wrap it for caching
70: HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);
71: // 并发若不存在
72: HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);
73: if (fromCache != null) { // 添加失败
74: // another thread beat us so we'll use the cached value instead
75: toCache.unsubscribe();
76: isResponseFromCache = true; // 标记 从缓存中结果
77: return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
78: } else { // 添加成功
79: // we just created an ObservableCommand so we cast and return it
80: afterCache = toCache.toObservable();
81: }
82: } else {
83: afterCache = hystrixObservable;
84: }
85:
86: //
87: return afterCache
88: .doOnTerminate(terminateCommandCleanup) // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line))
89: .doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once
90: .doOnCompleted(fireOnCompletedHook);
91: }
92: });
93: }
_cmd
指向当前命令对象,用于下面实现 FuncX ,ActionX 内部类使用。
applyHystrixSemantics
传入 Observable#defer(...)
方法,声明执行命令的 Observable。
requestCache
缓存,在 TODO 【2008】【请求缓存】 详细解析。#handleRequestCacheHitAndEmitValues(...)
方法,在第 78 行详细解析。HystrixCommandResponseFromCache#unsubscribe()
方法,取消 HystrixCommandResponseFromCache 的订阅。这一步很关键,因为我们不希望缓存不存在时,多个线程去执行命令,最好有且只有一个线程执行命令。在 「5. HystrixCachedObservable」 详细解析。requestCache
。哟, HystrixRequestCache#putIfAbsent(...)
方法,多个线程添加时,只有一个线程添加成功。HystrixCommandResponseFromCachetoObservable()
方法,获得缓存 Observable 。
com.netflix.hystrix.HystrixCachedObservable
,缓存 Observable 。
HystrixCachedObservable 构造方法,代码如下 :
1: public class HystrixCachedObservable<R> {
2: /**
3: * 订阅
4: */
5: protected final Subscription originalSubscription;
6: /**
7: * 缓存 cachedObservable
8: */
9: protected final Observable<R> cachedObservable;
10: /**
11: * TODO 【2006】【outstandingSubscriptions】
12: */
13: private volatile int outstandingSubscriptions = 0;
14: //private AtomicInteger outstandingSubscriptions2 = new AtomicInteger(0);
15:
16: protected HystrixCachedObservable(final Observable<R> originalObservable) {
17: ReplaySubject<R> replaySubject = ReplaySubject.create();
18: this.originalSubscription = originalObservable
19: .subscribe(replaySubject);
20:
21: this.cachedObservable = replaySubject
22: .doOnUnsubscribe(new Action0() {
23: @Override
24: public void call() {
25: outstandingSubscriptions--;
26: if (outstandingSubscriptions == 0) {
27: originalSubscription.unsubscribe();
28: }
29: }
30: })
31: .doOnSubscribe(new Action0() {
32: @Override
33: public void call() {
34: outstandingSubscriptions++;
35: }
36: });
37: }
originalObservable
为 hystrixObservable
执行命令 Observable 。在 Hystrix 里,提供了两种执行命令的隔离方式 :线程池( THREAD
) 和信号量( SEMAPHORE
)。
THREAD
隔离时, #subscribe(replaySubject)
调用完成时,实际命令并未开始执行,或者说,这是一个异步的执行命令的过程。那么,会不会影响返回执行结果呢?答案当然是不会,BlockingObservable 在得到执行完成才会结束阻塞,此时已经有执行结果。SEMAPHORE
隔离时, #subscribe(replaySubject)
调用完成时,实际命令已经执行完成,所以即使 AbstractCommand#toObservavle(...)
的第 75 行 :调用 HystrixCommandResponseFromCache#unsubscribe()
方法,也会浪费,重复执行命令。而对于 THREAD
隔离的情况,通过取消订阅的方式,只会执行一次命令。当然,如果“恶搞” THREAD
隔离的情况,增加 sleep
的调用如下,就能达到重复执行命令的效果。
HystrixCachedObservable 的其他方法,点击 链接 查看。
com.netflix.hystrix.HystrixCommandResponseFromCache
,是 HystrixCachedObservable 的子类。在父类的基础上,增加了对 AbstractCommand.executionResult
的关注。
HystrixCachedObservable#from(Observable, AbstractCommand)
方法,创建 HystrixCommandResponseFromCache 对象,点击 链接 查看。
HystrixCommandResponseFromCache#toObservableWithStateCopiedInto(...)
方法,点击 链接 查看。
completionLogicRun
属性,保证 #doOnError()
, #doOnCompleted()
, #doOnUnsubscribe()
方法有且只有一个方法执行具体逻辑。#doOnError()
, #doOnCompleted()
执行时,调用 #commandCompleted()
方法,从缓存命令( HystrixCommandResponseFromCache.originalCommand
) 复制 executionResult
属性给当前命令( commandToCopyStateInto
) 。#doOnUnsubscribe()
执行时,调用 #commandUnsubscribed()
方法,使用当前命令( commandToCopyStateInto
)自己的 executionResult
,不进行复制。