前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >[享学Netflix] 二十二、Hystrix事件源与事件流:HystrixEvent和HystrixEventStream

[享学Netflix] 二十二、Hystrix事件源与事件流:HystrixEvent和HystrixEventStream

作者头像
YourBatman
发布2020-03-19 10:05:14
9040
发布2020-03-19 10:05:14
举报

入一行,先别惦记着挣钱,而是要先让自己值钱

代码下载地址:https://github.com/f641385712/netflix-learning

前言

HystrixCommand在执行过程中会持续不断的产生数据,Hystrix会收集到这些数据后做运算,进而按策略做出仲裁。数据的发送方为命令执行方,接收方为指标数据收集方,那么这些数据的传输通道是什么?以什么样的方式进行传输呢?这将是本文讲解的主要内容。

Hystrix自从1.5版本开始便通过数据流HystrixEventStream的方式持续不断的收集数据,而该数据流里面的内容便用HystrixEvent数据源俩表示。

说明:事件源与事件流 语义上等价于 数据源与数据流


正文

本文遵照本系列,使用的Hystrix版本是“最新的”1.5.18版本,因此接下来主要关心了解它如下两个API:

  • HystrixEvent:数据源/事件源
  • HystrixEventStream:数据流/事件流

HystrixEvent

Hystrix的指标收集都是基于事件驱动的,所以这个接口就是代表着数据源/事件源。但它是个标记接口而已,具体内容由实现类去指定。

在这里插入图片描述
在这里插入图片描述

HystrixCommandEvent

用于执行HystrixCommand事件流的数据类。

public abstract class HystrixCommandEvent implements HystrixEvent {

    private final HystrixCommandKey commandKey;
    private final HystrixThreadPoolKey threadPoolKey;
    ... // 省略构造器个属性的get方法

	// command是否开始
    public abstract boolean isExecutionStart();
    // 是否是在隔离线程里执行的(先线程池模式隔离)
    public abstract boolean isExecutedInThread();
    // 响应被线程池拒绝
    public abstract boolean isResponseThreadPoolRejected();
    // command是否执行完成(包括成功、错误等)
    public abstract boolean isCommandCompletion();
    // command命令是否执行过
    public abstract boolean didCommandExecute();

	// 提供两个public的功能函数,过滤
	// 过滤只需要已经完成了的HystrixCommandEvent数据
	// 过滤只需要执行过的数据	
	public static final Func1<HystrixCommandEvent, Boolean> filterCompletionsOnly = (commandEvent) -> commandEvent.isCommandCompletion();
	public static final Func1<HystrixCommandEvent, Boolean> filterActualExecutions = (commandEvent) -> commandEvent.didCommandExecute();
}

它是个抽象类,具体实现还得交给子类们。


HystrixCommandExecutionStarted

命令开始执行时,将其提供给事件流(发送事件)。

public class HystrixCommandExecutionStarted extends HystrixCommandEvent {
	// 开始执行的时候必须告诉我隔离策略,以及当前并发数
	private final HystrixCommandProperties.ExecutionIsolationStrategy isolationStrategy;
	private final int currentConcurrency;
	... // 省略构造器

	//=====实现抽象类方法======
    @Override
    public boolean isExecutionStart() {
        return true;
    }
    @Override
    public boolean isExecutedInThread() {
        return isolationStrategy == HystrixCommandProperties.ExecutionIsolationStrategy.THREAD;
    }
    ... // 其它几个方法很明显,都是返回固定值:false
}

当命令开始执行时,发送的数据包括了隔离类型(线程池or信号量),以及它当前的并发数。


HystrixCommandCompletion

它在命令完成时发送数据(几乎每一个HystrixEventType都会有一个结果)。

public class HystrixCommandCompletion extends HystrixCommandEvent {
	
	// 执行结果封装
    protected final ExecutionResult executionResult;
    // Hystrix的请求上下文。上上篇文章详细描述过
    // 默认会取值:HystrixRequestContext.getContextForCurrentThread()
    protected final HystrixRequestContext requestContext;
    // 所有的源生事件类型
	private final static HystrixEventType[] ALL_EVENT_TYPES = HystrixEventType.values();
	... // 省略构造方法
	
    @Override
    public boolean isExecutionStart() {
        return false;
    }
    ...
    @Override
    public boolean didCommandExecute() {
        return executionResult.executionOccurred();
    }
    ... // 生路其它get方法
    public HystrixRequestContext getRequestContext() {
        return this.requestContext;
    }
}

该数据类主要包含一个执行结果对象:ExecutionResult,它封装有很多结果数据。


ExecutionResult

它代表执行的结果,是一个Immutable不可变对象。

这个对象可以被父线程和子线程引用和“修改”,也可以被HystrixCommand的不同实例引用和“修改”1个实例可以创建一个ExecutionResult,缓存一个引用它的Future,然后第二个实例执行检索一个Future 来自缓存,并希望将RESPONSE_FROM_CACHE追加到第一个命令执行的任何ExecutionResult中。

在请求合并、请求缓存时还会再遇见它

public class ExecutionResult {

    private final EventCounts eventCounts;
    private final Exception failedExecutionException;
    private final Exception executionException;
    private final long startTimestamp;
    private final int executionLatency;
    private final int userThreadLatency;
    private final boolean executionOccurred;
    private final boolean isExecutedInThread;
    private final HystrixCollapserKey collapserKey;
    ... 
}

它是一个普通的POJO,因此仅需对这些属性进行解释即可:

  • EventCounts eventCounts:它是一个POJO,维护着如下字段
    • BitSet eventsHystrixEventType的所有可能值
    • int numEmissions:发射数。发送HystrixEventType.EMIT事件时,+1
    • int numFallbackEmissions:降级的发射数。发送FALLBACK_EMIT事件时,+1
    • int numCollapsed:合并数。发送HystrixEventType.COLLAPSED事件时,+1
    • 其它事件,不记录…
  • Exception failedExecutionException
  • executionException:在检查fallabck之前发出的异常(也就是执行run时就抛出异常了)。当是timeout/short-circuit/rejection/bad request等情况时,值同上
  • startTimestamp:命令开始执行的时刻
  • executionLatency:time spent in run() method
  • userThreadLatency:使用线程方式提交任务到得到resposne之间的时间间隔
  • executionOccurred:是否执行
  • isExecutedInThread:是否在线程隔离里执行的

HystrixCollapserEvent

请求合并数据类,暂略。


针对以上数据源,均有对应的数据流xxxStream:

HystrixEventStream

它是一个接口,代表数据流/事件流。它会得到一个数据发射器:类型为 RxJava 中的 Observable,即观察者模式中的 Publisher,会源源不断地产生事件/数据,数据源便是HystrixEvent

// Observable表示数据发射器
public interface HystrixEventStream<E extends HystrixEvent> {
    Observable<E> observe();
}

HystrixEvent文上已介绍,那么相对应的它都有对应的数据流来发送。HystrixEventStream的实现类如下:

在这里插入图片描述
在这里插入图片描述

HystrixCommandStartStream

它发送的数据是HystrixCommandExecutionStarted事件(由HystrixThreadEventStream负责写入),事件在执行命令的同一线程中同步发出

public class HystrixCommandStartStream implements HystrixEventStream<HystrixCommandExecutionStarted> {

	private final HystrixCommandKey commandKey;
	// Subject它既能发送,又能监听
	// 发送和接受的数据类型是一样,均是HystrixCommandExecutionStarted类型
	private final Subject<HystrixCommandExecutionStarted, HystrixCommandExecutionStarted> writeOnlySubject;
	private final Observable<HystrixCommandExecutionStarted> readOnlyStream;

	// 缓存:每个commandKey对应同一个HystrixCommandStartStream实例
	// 这样发送数据流也方便统计
	private static final ConcurrentMap<String, HystrixCommandStartStream> streams = new ConcurrentHashMap<>();

	// 获取commandKey对应的Stream发射器实例
	public static HystrixCommandStartStream getInstance(HystrixCommandKey commandKey) {
		... // 先查缓存,没有就构造一个放进缓存
	}
    HystrixCommandStartStream(final HystrixCommandKey commandKey) {
        this.commandKey = commandKey;

        this.writeOnlySubject = new SerializedSubject(PublishSubject.create());
        // 这个只读的(共享的)流非常有意思,下面有介绍
        this.readOnlyStream = writeOnlySubject.share();
    }
    public static void reset() {
        streams.clear();
    }


	// 重要。提供写方法:把该event写到发射器里面去,这样订阅者就能读啦
	// 该方法的唯一调用处是:HystrixThreadEventStream
    public void write(HystrixCommandExecutionStarted event) {
        writeOnlySubject.onNext(event);
    }

	// 获取Observable对象.它是只读的,并不能发射数据哦
	// 但是你可以对它做流式处理,如.window.flatMap.share()...
    @Override
    public Observable<HystrixCommandExecutionStarted> observe() {
        return readOnlyStream;
    }
    ...
}

该数据流用于不断的传送HystrixCommandExecutionStarted事件内容,也就是在线程池开始执行的时候会发送数据、指标信息,对应的监听者会监听该事件。

readOnlyStream是只读的、可以被共享消费的流。是 writeOnlySubject 的只读版本,它是通过 share 操作符产生的。share 操作符产生一种特殊的 Observable:当有一个订阅者去消费事件流时它就开始产生事件,可以有多个订阅者去订阅,同一时刻收到的事件是一致的;直到最后一个订阅者取消订阅以后,事件流才停止产生事件。

它的底层实现非常有意思,很创建也稍费脑,属于RxJava的核心思想内容(对发布-订阅、生产者、消费者、背压的处理)


其它实现类

截图中所有其它实现类实现逻辑完全同上,唯一不同的就是事件源不一样:

  • HystrixCommandCompletionStream:HystrixCommandCompletion
  • HystrixThreadPoolStartStream:HystrixCommandExecutionStarted
  • HystrixThreadPoolCompletionStream:HystrixCommandCompletion
  • HystrixCollapserEventStream:HystrixCollapserEvent

注意:xxxThreadPoolxxxStream监听的事件同xxxCommandxxxStream哦~


使用示例

@Test
public void fun1() {
    HystrixCommandKey commandKey = HystrixCommandKey.Factory.asKey("demo");
    HystrixThreadPoolKey threadPoolKey = HystrixThreadPoolKey.Factory.asKey("demoThreadPool");
    HystrixCommandProperties.ExecutionIsolationStrategy isolationStrategy = HystrixCommandProperties.ExecutionIsolationStrategy.THREAD;

    HystrixCommandStartStream startStream = HystrixCommandStartStream.getInstance(commandKey);

    // 注册监听者
    startStream.observe()
            // .subscribeOn(Schedulers.io())
            // .observeOn(Schedulers.io())
            .observeOn(Schedulers.immediate())
            .subscribe(new Subscriber<HystrixCommandExecutionStarted>() {

                @Override
                public void onCompleted() {
                    System.out.println("数据发射完成啦");
                }

                @Override
                public void onError(Throwable e) {
                    System.out.println("数据发射出错啦:" + e.getMessage());
                }

                @Override
                public void onNext(HystrixCommandExecutionStarted hystrixCommand) {
                    System.out.printf("线程[%s] 数据发射start:%s %s %s %s",
                            Thread.currentThread().getName(),
                            hystrixCommand.getCommandKey(),
                            hystrixCommand.getThreadPoolKey(),
                            hystrixCommand.isExecutedInThread(),
                            hystrixCommand.getCurrentConcurrency());
                }
            });


    // 写数据:会马上发射出去
    startStream.write(new HystrixCommandExecutionStarted(commandKey, threadPoolKey, isolationStrategy, 6));
}

运行程序,控制台打印:

线程[main] 数据发射start:demo demoThreadPool true 6

这是一个简单示例,HystrixEventStream对RxJava的封装、使用…


总结

关于Netflix Hystrix事件源与事件流就介绍到这了,简单的理解它就是对ReJava的包装,完全基于发布-订阅去实现,Hystrix的数据收集方式恰好非常匹配这种方案。

这篇文章依旧是为更好的了解Hystrix指标信息的收集、滑动窗口收集数据打下基础,了解了数据源和流通的通道,接下来只需重点关注两端了。

分隔线
分隔线
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 正文
    • HystrixEvent
      • HystrixCommandEvent
      • HystrixCollapserEvent
    • HystrixEventStream
      • HystrixCommandStartStream
      • 其它实现类
    • 使用示例
    • 总结
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档