入一行,先别惦记着挣钱,而是要先让自己值钱
代码下载地址:https://github.com/f641385712/netflix-learning
HystrixCommand
在执行过程中会持续不断的产生数据,Hystrix会收集到这些数据后做运算,进而按策略做出仲裁。数据的发送方为命令执行方,接收方为指标数据收集方,那么这些数据的传输通道是什么?以什么样的方式进行传输呢?这将是本文讲解的主要内容。
Hystrix
自从1.5版本开始便通过数据流HystrixEventStream
的方式持续不断的收集数据,而该数据流里面的内容便用HystrixEvent
数据源俩表示。
说明:事件源与事件流 语义上等价于 数据源与数据流
本文遵照本系列,使用的Hystrix版本是“最新的”1.5.18
版本,因此接下来主要关心了解它如下两个API:
HystrixEvent
:数据源/事件源HystrixEventStream
:数据流/事件流Hystrix的指标收集都是基于事件驱动的,所以这个接口就是代表着数据源/事件源。但它是个标记接口而已,具体内容由实现类去指定。
用于执行HystrixCommand
的事件流的数据类。
public abstract class HystrixCommandEvent implements HystrixEvent {
private final HystrixCommandKey commandKey;
private final HystrixThreadPoolKey threadPoolKey;
... // 省略构造器个属性的get方法
// command是否开始
public abstract boolean isExecutionStart();
// 是否是在隔离线程里执行的(先线程池模式隔离)
public abstract boolean isExecutedInThread();
// 响应被线程池拒绝
public abstract boolean isResponseThreadPoolRejected();
// command是否执行完成(包括成功、错误等)
public abstract boolean isCommandCompletion();
// command命令是否执行过
public abstract boolean didCommandExecute();
// 提供两个public的功能函数,过滤
// 过滤只需要已经完成了的HystrixCommandEvent数据
// 过滤只需要执行过的数据
public static final Func1<HystrixCommandEvent, Boolean> filterCompletionsOnly = (commandEvent) -> commandEvent.isCommandCompletion();
public static final Func1<HystrixCommandEvent, Boolean> filterActualExecutions = (commandEvent) -> commandEvent.didCommandExecute();
}
它是个抽象类,具体实现还得交给子类们。
当命令开始执行时,将其提供给事件流(发送事件)。
public class HystrixCommandExecutionStarted extends HystrixCommandEvent {
// 开始执行的时候必须告诉我隔离策略,以及当前并发数
private final HystrixCommandProperties.ExecutionIsolationStrategy isolationStrategy;
private final int currentConcurrency;
... // 省略构造器
//=====实现抽象类方法======
@Override
public boolean isExecutionStart() {
return true;
}
@Override
public boolean isExecutedInThread() {
return isolationStrategy == HystrixCommandProperties.ExecutionIsolationStrategy.THREAD;
}
... // 其它几个方法很明显,都是返回固定值:false
}
当命令开始执行时,发送的数据包括了隔离类型(线程池or信号量),以及它当前的并发数。
它在命令完成时发送数据(几乎每一个HystrixEventType
都会有一个结果)。
public class HystrixCommandCompletion extends HystrixCommandEvent {
// 执行结果封装
protected final ExecutionResult executionResult;
// Hystrix的请求上下文。上上篇文章详细描述过
// 默认会取值:HystrixRequestContext.getContextForCurrentThread()
protected final HystrixRequestContext requestContext;
// 所有的源生事件类型
private final static HystrixEventType[] ALL_EVENT_TYPES = HystrixEventType.values();
... // 省略构造方法
@Override
public boolean isExecutionStart() {
return false;
}
...
@Override
public boolean didCommandExecute() {
return executionResult.executionOccurred();
}
... // 生路其它get方法
public HystrixRequestContext getRequestContext() {
return this.requestContext;
}
}
该数据类主要包含一个执行结果对象:ExecutionResult
,它封装有很多结果数据。
它代表执行的结果,是一个Immutable不可变对象。
这个对象可以被父线程和子线程引用和“修改”,也可以被HystrixCommand的不同实例引用和“修改”1个实例可以创建一个ExecutionResult,缓存一个引用它的Future,然后第二个实例执行检索一个Future 来自缓存,并希望将RESPONSE_FROM_CACHE
追加到第一个命令执行的任何ExecutionResult中。
在请求合并、请求缓存时还会再遇见它
public class ExecutionResult {
private final EventCounts eventCounts;
private final Exception failedExecutionException;
private final Exception executionException;
private final long startTimestamp;
private final int executionLatency;
private final int userThreadLatency;
private final boolean executionOccurred;
private final boolean isExecutedInThread;
private final HystrixCollapserKey collapserKey;
...
}
它是一个普通的POJO,因此仅需对这些属性进行解释即可:
EventCounts eventCounts
:它是一个POJO,维护着如下字段 BitSet events
:HystrixEventType
的所有可能值int numEmissions
:发射数。发送HystrixEventType.EMIT
事件时,+1int numFallbackEmissions
:降级的发射数。发送FALLBACK_EMIT
事件时,+1int numCollapsed
:合并数。发送HystrixEventType.COLLAPSED
事件时,+1Exception failedExecutionException
:executionException
:在检查fallabck之前发出的异常(也就是执行run时就抛出异常了)。当是timeout/short-circuit/rejection/bad request
等情况时,值同上startTimestamp
:命令开始执行的时刻executionLatency
:time spent in run() methoduserThreadLatency
:使用线程方式提交任务到得到resposne之间的时间间隔executionOccurred
:是否执行isExecutedInThread
:是否在线程隔离里执行的请求合并数据类,暂略。
针对以上数据源,均有对应的数据流xxxStream:
它是一个接口,代表数据流/事件流。它会得到一个数据发射器:类型为 RxJava 中的 Observable,即观察者模式中的 Publisher,会源源不断地产生事件/数据,数据源便是HystrixEvent
。
// Observable表示数据发射器
public interface HystrixEventStream<E extends HystrixEvent> {
Observable<E> observe();
}
HystrixEvent
文上已介绍,那么相对应的它都有对应的数据流来发送。HystrixEventStream
的实现类如下:
它发送的数据是HystrixCommandExecutionStarted
事件(由HystrixThreadEventStream
负责写入),事件在执行命令的同一线程中同步发出。
public class HystrixCommandStartStream implements HystrixEventStream<HystrixCommandExecutionStarted> {
private final HystrixCommandKey commandKey;
// Subject它既能发送,又能监听
// 发送和接受的数据类型是一样,均是HystrixCommandExecutionStarted类型
private final Subject<HystrixCommandExecutionStarted, HystrixCommandExecutionStarted> writeOnlySubject;
private final Observable<HystrixCommandExecutionStarted> readOnlyStream;
// 缓存:每个commandKey对应同一个HystrixCommandStartStream实例
// 这样发送数据流也方便统计
private static final ConcurrentMap<String, HystrixCommandStartStream> streams = new ConcurrentHashMap<>();
// 获取commandKey对应的Stream发射器实例
public static HystrixCommandStartStream getInstance(HystrixCommandKey commandKey) {
... // 先查缓存,没有就构造一个放进缓存
}
HystrixCommandStartStream(final HystrixCommandKey commandKey) {
this.commandKey = commandKey;
this.writeOnlySubject = new SerializedSubject(PublishSubject.create());
// 这个只读的(共享的)流非常有意思,下面有介绍
this.readOnlyStream = writeOnlySubject.share();
}
public static void reset() {
streams.clear();
}
// 重要。提供写方法:把该event写到发射器里面去,这样订阅者就能读啦
// 该方法的唯一调用处是:HystrixThreadEventStream
public void write(HystrixCommandExecutionStarted event) {
writeOnlySubject.onNext(event);
}
// 获取Observable对象.它是只读的,并不能发射数据哦
// 但是你可以对它做流式处理,如.window.flatMap.share()...
@Override
public Observable<HystrixCommandExecutionStarted> observe() {
return readOnlyStream;
}
...
}
该数据流用于不断的传送HystrixCommandExecutionStarted
事件内容,也就是在线程池开始执行的时候会发送数据、指标信息,对应的监听者会监听该事件。
readOnlyStream
是只读的、可以被共享消费的流。是 writeOnlySubject
的只读版本,它是通过 share 操作符产生的。share
操作符产生一种特殊的 Observable
:当有一个订阅者去消费事件流时它就开始产生事件,可以有多个订阅者去订阅,同一时刻收到的事件是一致的;直到最后一个订阅者取消订阅以后,事件流才停止产生事件。
它的底层实现非常有意思,很创建也稍费脑,属于RxJava的核心思想内容(对发布-订阅、生产者、消费者、背压的处理)
截图中所有其它实现类实现逻辑完全同上,唯一不同的就是事件源不一样:
HystrixCommandCompletionStream
:HystrixCommandCompletionHystrixThreadPoolStartStream
:HystrixCommandExecutionStartedHystrixThreadPoolCompletionStream
:HystrixCommandCompletionHystrixCollapserEventStream
:HystrixCollapserEvent注意:xxxThreadPoolxxxStream
监听的事件同xxxCommandxxxStream
哦~
@Test
public void fun1() {
HystrixCommandKey commandKey = HystrixCommandKey.Factory.asKey("demo");
HystrixThreadPoolKey threadPoolKey = HystrixThreadPoolKey.Factory.asKey("demoThreadPool");
HystrixCommandProperties.ExecutionIsolationStrategy isolationStrategy = HystrixCommandProperties.ExecutionIsolationStrategy.THREAD;
HystrixCommandStartStream startStream = HystrixCommandStartStream.getInstance(commandKey);
// 注册监听者
startStream.observe()
// .subscribeOn(Schedulers.io())
// .observeOn(Schedulers.io())
.observeOn(Schedulers.immediate())
.subscribe(new Subscriber<HystrixCommandExecutionStarted>() {
@Override
public void onCompleted() {
System.out.println("数据发射完成啦");
}
@Override
public void onError(Throwable e) {
System.out.println("数据发射出错啦:" + e.getMessage());
}
@Override
public void onNext(HystrixCommandExecutionStarted hystrixCommand) {
System.out.printf("线程[%s] 数据发射start:%s %s %s %s",
Thread.currentThread().getName(),
hystrixCommand.getCommandKey(),
hystrixCommand.getThreadPoolKey(),
hystrixCommand.isExecutedInThread(),
hystrixCommand.getCurrentConcurrency());
}
});
// 写数据:会马上发射出去
startStream.write(new HystrixCommandExecutionStarted(commandKey, threadPoolKey, isolationStrategy, 6));
}
运行程序,控制台打印:
线程[main] 数据发射start:demo demoThreadPool true 6
这是一个简单示例,HystrixEventStream
对RxJava的封装、使用…
关于Netflix Hystrix事件源与事件流就介绍到这了,简单的理解它就是对ReJava的包装,完全基于发布-订阅去实现,Hystrix的数据收集方式恰好非常匹配这种方案。
这篇文章依旧是为更好的了解Hystrix指标信息的收集、滑动窗口收集数据打下基础,了解了数据源和流通的通道,接下来只需重点关注两端了。