say say Easy,do do har(说起来容易,做起来难)
代码下载地址:https://github.com/f641385712/netflix-learning
前面文章已经把Hystrix的数据源、数据流、指标数据收集都介绍了,从本文开始将要进入到数据生产方:command
命令的执行过程,数据发射过程的学习。在此直线,本着组件化逐个攻破的学习的方式,以最为复杂的、最为常用的AbstractCommand
为例,先了解其主要框架、主要组件的功能,这样便能对它有个更全面的掌握,使用起来也更加的得心应手。
本文将要介绍贯穿其中两个非常重要的组件:事件计数器EventCounts和执行结果ExecutionResult。
每一个command
命令执行都会有一个结果,这个结果便是使用的ExecutionResult
进行表示。因为Hystrix的源码均基于RxJava来书写,因此再次之前先“复习”一些RxJava的一个小知识点。
理解RxJava写的代码,最重要的便是理解它各方法的执行顺序,这样看起Hystrix源码来就能做大几乎没有障碍了。
下面通过一个具体示例,来说明一切:
@Test
public void fun3() {
Observable.just(1, 2, 3, 4)
.doOnNext((d) -> System.out.println("我是doOnNext" + d))
.doOnCompleted(() -> System.out.println("我是doOnCompleted"))
// onErrorResumeNext只有发射数据是抛错了才会执行它。比如这里牛可以fallabck了
.onErrorResumeNext(Observable.just(6, 7, 8, 9))
.doOnTerminate(() -> System.out.println("我是doOnTerminate"))
.doOnUnsubscribe(() -> System.out.println("我是doOnUnsubscribe"))
.subscribe((d) -> System.out.println(d));
System.out.println("===============模拟发射数据过程中出现异常===============");
// 模拟发射数据时发生错误了(若正常执行,效果完全同上)
Observable.create((Observable.OnSubscribe<Integer>) subscriber -> {
subscriber.onStart();
subscriber.onNext(1);
subscriber.onNext(2);
System.out.println(1 / 0); // 中途出现异常
subscriber.onNext(3);
subscriber.onNext(4);
subscriber.onCompleted(); // 请别忘了告诉结束。否则doOnCompleted不会执行的
}).doOnNext((d) -> System.out.println("我是doOnNext" + d))
.doOnCompleted(() -> System.out.println("我是doOnCompleted"))
.onErrorResumeNext(Observable.just(6, 7, 8, 9))
.doOnTerminate(() -> System.out.println("我是doOnTerminate"))
.doOnUnsubscribe(() -> System.out.println("我是doOnUnsubscribe"))
.subscribe((d) -> System.out.println(d));
}
运行程序,控制台输出:
我是doOnNext1
1
我是doOnNext2
2
我是doOnNext3
3
我是doOnNext4
4
我是doOnCompleted
我是doOnTerminate
我是doOnUnsubscribe
===============模拟发射数据过程中出现异常===============
我是doOnNext1
1
我是doOnNext2
2
6
7
8
9
我是doOnTerminate
我是doOnUnsubscribe
doOnNext
:在每次发射数据之前执行,但一旦发生异常后,后续的他将不再执行onErrorResumeNext
:发生错误时用于恢复Next俩生产数据,但是它并不再触发doOnNext
动作 rx.exceptions.OnErrorNotImplementedException: / by zero
。但是doOnTerminate/doOnUnsubscribe
还是会执行的(doOnCompleted
不执行)doOnCompleted
:只有在不抛出异常正常结束subscriber.onCompleted()
调用时它才会被执行(执行一次)本例列出的是Hystrix源码中最常使用的一些方法的执行顺序,该顺序需要礼节性记忆才能更好的阅读后续源码哦。
事件计数器。它是ExecutionResult
的静态内部类,内部使用一个BitSet
(其index和事件枚举的ordinal()
对应)来维护着它所记录的事件们,并且通过相应字段维护其事件总数:
public static class EventCounts {
private final BitSet events;
private final int numEmissions;
private final int numFallbackEmissions;
private final int numCollapsed;
}
需要注意的是这三个字段分别对应着“所属”的事件类型:
numEmissions
:对应HystrixEventType.EMIT
总数 doOnNext
时发送此事件numFallbackEmissions
:对应HystrixEventType.FALLBACK_EMIT
总数 doOnNext
里发送此事件numCollapsed
:对应HystrixEventType.COLLAPSED
总数 markAsCollapsedCommand
时候发送此事件也就是说,只有这三种事件可被识别,对于其它类型的事件,一律执行newBitSet.set(eventType.ordinal());
,也就说仅仅只记录值关心此事件发生过与否,而并不关心触发次数。
EventCounts
所有的构造器都是非public的,其中最为核心的逻辑为下面这段代码:
EventCounts:
...
switch (eventType) {
case EMIT:
newBitSet.set(HystrixEventType.EMIT.ordinal());
localNumEmits++;
break;
case FALLBACK_EMIT:
newBitSet.set(HystrixEventType.FALLBACK_EMIT.ordinal());
localNumFallbackEmits++;
break;
case COLLAPSED:
newBitSet.set(HystrixEventType.COLLAPSED.ordinal());
localNumCollapsed++;
break;
default:
newBitSet.set(eventType.ordinal());
break;
}
...
这段代码完美解释了上面的文字描述:事件计数器仅关心那3种事件的次数。所有关于事件计数的方法均基于此逻辑:
EventCounts:
// 为指定事件增加一次计数
EventCounts plus(HystrixEventType eventType) {
return plus(eventType, 1);
}
// 增加指定次数(一次性可增加N次)
EventCounts plus(HystrixEventType eventType, int count) {
... // 以上核心逻辑
return new EventCounts(newBitSet, ... );
}
这两个方法是事件计数方法,但是非public,仅ExecutionResult
内会有调用。事件次数记录下来后,下面便是提供的public访问方法:
EventCounts:
// 用于判断:事件计数器里是否记录有此事件(非常重要)
// isSuccessfulExecution -> 看是否记录SUCCESS事件
// isFailedExecution -> 看是否记录了FAILURE事件
// isResponseFromFallback -> 看是否记录了FALLBACK_SUCCESS事件
// isResponseTimedOut -> 看是否记录了TIMEOUT事件
// isResponseShortCircuited -> 看是否记录了SHORT_CIRCUITED事件
public boolean contains(HystrixEventType eventType) {
return events.get(eventType.ordinal());
}
// 只要包含other里的任何一个事件类型,就会返回true
// 使用:比如异常类型(BAD_REQUEST/FALLBACK_FAILURE)只要有一种就算异常状态呗
// 还有TERMINAL类型,如SUCCESS/BAD_REQUEST/FALLBACK_SUCCESS/FALLBACK_FAILURE均属于结束类型
public boolean containsAnyOf(BitSet other) {
return events.intersects(other);
}
// 拿到指定事件类型的**次数**
public int getCount(HystrixEventType eventType) {
switch (eventType) {
case EMIT: return numEmissions;
case FALLBACK_EMIT: return numFallbackEmissions;
case EXCEPTION_THROWN: return containsAnyOf(EXCEPTION_PRODUCING_EVENTS) ? 1 : 0;
case COLLAPSED: return numCollapsed;
default: return contains(eventType) ? 1 : 0;
}
}
针对getCount()
方法,做如下补充说明:
EXCEPTION_THROWN
异常类型记数做了分类处理:BAD_REQUEST、FALLBACK_FAILURE、FALLBACK_MISSING、FALLBACK_REJECTION
四种类型均数据异常抛出类型。为嘛自己这种类型不算?是因为该种类型系统不会发射出来,所以没必要算作里面~这就是事件计数器EventCounts
的所有内容。它对公并未曝露public
的事件记录方法,这种动作是被ExecutionResult
所代劳。
xxxCommand
系列的执行结果,它是Immutable不可变的,其实可以简单粗暴的理解为它就是一个POJO。
public class ExecutionResult {
// 事件计数器
private final EventCounts eventCounts;
// 执行时异常。通过setException设置进来值
// 唯一放置地:`AbstractCommand#handleFailureViaFallback`
private final Exception failedExecutionException;
// 通过`setExecutionException`放进来。只要是执行失败(线程池拒绝,timeout等等都会设置)
private final Exception executionException;
// 准备执行目标方法的时候,标记一下时刻
private final long startTimestamp;
//time spent in run() method
private final int executionLatency;
//time elapsed between caller thread submitting request and response being visible to it
private final int userThreadLatency;
// 只要目标方法执行了就标记为true(不管成功or失败)
private final boolean executionOccurred;
// 是否在线程池隔离中执行的
private final boolean isExecutedInThread;
private final HystrixCollapserKey collapserKey;
// EXCEPTION_PRODUCING_EVENTS和TERMINAL_EVENTS对事件类型进行了归类
private static final BitSet EXCEPTION_PRODUCING_EVENTS = new BitSet(NUM_EVENT_TYPES);
private static final BitSet TERMINAL_EVENTS = new BitSet(NUM_EVENT_TYPES);
static {
for (HystrixEventType eventType: HystrixEventType.EXCEPTION_PRODUCING_EVENT_TYPES) {
EXCEPTION_PRODUCING_EVENTS.set(eventType.ordinal());
}
for (HystrixEventType eventType: HystrixEventType.TERMINAL_EVENT_TYPES) {
TERMINAL_EVENTS.set(eventType.ordinal());
}
}
}
以上是该"POJO"的所有属性,它提供一些"set"方法用于给各个字段赋值,提供get方法可以从执行结果里面获取所需信息:
ExecutionResult:
// 内部调用new EventCounts(eventTypes)用于记录事件(对应事件次数+1)
public static ExecutionResult from(HystrixEventType... eventTypes) {
...
return new ExecutionResult(new EventCounts(eventTypes), -1L, -1, -1, ... );
}
// ===各种set方法:每个set方法都返回一个新的ExecutionResult 实例===
public ExecutionResult setExecutionOccurred() {
return new ExecutionResult(... true, ...);
}
public ExecutionResult setExecutionLatency(int executionLatency) {
return new ExecutionResult(...,executionLatency,...);
}
...
// 注意:这方法里写死的事件类型:COLLAPSED
public ExecutionResult markCollapsed(HystrixCollapserKey collapserKey, int sizeOfBatch) {
return new ExecutionResult(eventCounts.plus(HystrixEventType.COLLAPSED, sizeOfBatch), startTimestamp, ... , collapserKey)
}
...
// 记录事件:记录事件,对应事件+1
// 内部依赖于事件计数器的plus方法
public ExecutionResult addEvent(HystrixEventType eventType) {
return new ExecutionResult(eventCounts.plus(eventType), ... );
}
// ======读方法:从结果中获取各个维度的数据======
...
// 纳秒的表现形式
public long getCommandRunStartTimeInNanos() {
return startTimestamp * 1000 * 1000;
}
...
// 是否被信号量拒绝:只需看是否记录了此事件即可
public boolean isResponseSemaphoreRejected() {
return eventCounts.contains(HystrixEventType.SEMAPHORE_REJECTED);
}
public boolean isResponseThreadPoolRejected() {
return eventCounts.contains(HystrixEventType.THREAD_POOL_REJECTED);
}
public boolean isResponseRejected() {
return isResponseThreadPoolRejected() || isResponseSemaphoreRejected();
}
...
// 结果里是否包含有终止时间:
//SUCCESS/BAD_REQUEST/FALLBACK_SUCCESS/FALLBACK_FAILURE/FALLBACK_REJECTION
// FALLBACK_MISSING/RESPONSE_FROM_CACHE/CANCELLED等这些都算终止喽
public boolean containsTerminalEvent() {
return eventCounts.containsAnyOf(TERMINAL_EVENTS);
}
事件计数器EventCounts
并不直接供以去访问记录事件,ExecutionResult
它代表着命令的执行结果,记录着执行过程中的各种“参数”,并且提供相关访问方法。
关于Hystrix事件计数器EventCounts和执行结果ExecutionResult就介绍到这了,本篇内容比较简单,更像是介绍了两个POJO
。
ExecutionResult
用于收集、记录事件,以及记录执行事件、异常等信息,存储起来并且提供相关访问方法。它所收集的信息大多数和下面即将讲述的HystrixInvokableInfo
吻合,他俩具有形如接口和实现类的意思,该接口在下文进行解释。