前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >[享学Netflix] 三十六、Hystrix请求命令:HystrixCommand和HystrixObservableCommand

[享学Netflix] 三十六、Hystrix请求命令:HystrixCommand和HystrixObservableCommand

作者头像
YourBatman
发布2020-03-18 19:39:01
3.3K0
发布2020-03-18 19:39:01
举报
文章被收录于专栏:BAT的乌托邦BAT的乌托邦

穷则思变,既要变,更要实干。 代码下载地址:https://github.com/f641385712/netflix-learning

目录
  • 前言
  • 正文
    • HystrixCommand
      • 使用示例
    • HystrixObservableCommand
      • 使用示例
    • Hystrix的四种调用方法有何异同?
      • observe() vs toObservable()
  • 总结
    • 声明

前言

Hystrix内部使用了大量的RxJava代码来书写,使得把其代码精简到了极致,性能也提升了很多。虽说Hystrix的源代码难啃,但是它面向使用者提供的API是较为友好的,特别是HystrixCommand它非常暖心的提供了同步执行的execute()方法以及异步执行的queue()方法,使得你无论同步/异步均无需同RxJava直接打交道,大大降低了使用门槛。

前面几篇文章“啃”遍了源码,知道了AbstractCommand它是如何控制一个command命令执行的,以及如何降级、熔断。本文将介绍面向使用者的两个API:HystrixCommandHystrixObservableCommand以及分别给出使用参考示例。


正文

AbstractCommand实现了Command命令执行的几乎所有的逻辑,所以它的两个子类HystrixCommandHystrixObservableCommand是比较简单的。


HystrixCommand

HystrixCommand用在依赖服务返回单个操作结果的时候,99.99%的情况下我们使用的是它。

代码语言:javascript
复制
// 在父类的基础上,多实现了HystrixExecutable接口,从而新增提供了execute()/queue()方法
public abstract class HystrixCommand<R> extends AbstractCommand<R> implements HystrixExecutable<R> {

	... // 省略其所有构造器
	... // 省略静态配置内部类Setter

	// 执行线程Thread
	private final AtomicReference<Thread> executionThread = new AtomicReference<>();
	// 用于接收executionIsolationThreadInterruptOnFutureCancel这个属性值
	// 用于:Future.cancel(interruptOnFutureCancel )
	private final AtomicBoolean interruptOnFutureCancel = new AtomicBoolean(false);


	// 最为重要的一个抽象方法:你的目标逻辑写在这里面
	// execute()和queue()会调用方法
	// 小细节:此方法特意设计为protected,警告你执行时候不要直接调用此方法哦~
	protected abstract R run() throws Exception;
	// 详细你对`No fallback available.`这几个字无比熟悉吧,在项目中经常碰见。
	// 也就是当execute()任何情况失败了,都会尝试来调用这个方法给你个回滚的机会
	// 说明:这里应该做的工作是:不需要网络产生,也就是JVM内的调用
	// 换句话说:这个返回最好是一个常量值,或者是缓存值是最好的,不要耗时
	// 默认是抛出异常,建议你实现此方法,当然喽,具体需要具体分析~~~~~比较有些方法不能回退...
    protected R getFallback() {
        throw new UnsupportedOperationException("No fallback available.");
    }

	// ============实现父类抽象方法============
    @Deprecated // 这个方法过期了请一定不要用,不管它就成
    @Override
    protected String getFallbackMethodName() {
        return "getFallback";
    }
    // 父类方法:判断你是否提供了fallabck方法(看你是否复写了getFallback方法)
    @Override
    protected boolean isFallbackUserDefined() {
    	... // 查缓存
    	// 这里使用的是getDeclaredMethod,所以只有你实现类本类里有名为`getFallback`的方法才算的哦
    	getClass().getDeclaredMethod("getFallback");
    	... // 放缓存
    }
	
	// command是否是标量
	// 当commandIsScalar为true时,在markEmits就会circuitBreaker.markSuccess();
	// commandIsScalar为false时,在markOnCompleted动作时调用circuitBreaker.markSuccess()
	// 本处为true,另一实现HystrixObservableCommand为false
    @Override
    protected boolean commandIsScalar() {
        return true;
    }
	
	// ============实现接口方法============

	// 可以看到execute()的实现其实就是queue的实现,只是立马get了而已
    public R execute() {
        try {
            return queue().get();
        } catch (Exception e) {
            throw Exceptions.sneakyThrow(decomposeException(e));
        }
    }
    // 可以看到,Futrue的实现实际上还是Observable
    // 只不过它在其基础上对cancle方法特殊处理了一下:中断线程与否
    public Future<R> queue() {
		Future<R> delegate = toObservable().toBlocking().toFuture();
		Future<R> f = new Future<R>() { ... };
		
		// 对立即抛出的错误状态的特殊处理
		// 我在想,什么情况下这么快就isDone了呢?delegate Futrue刚创建就抛错了?
        if (f.isDone()) {
            try {
                f.get();
                return f;
            } catch (Exception e) { ... }
		}
		return f;
    }
}

它也是抽象类,有且仅有一个抽象方法:run()方法,它会执行你的目标逻辑。需要注意的是:它的访问权限是protected,所以它的调用是交给Hystrix的。另外,回退方法getFallback()它并不是抽象方法,也就是说书写一个Command命令,fallback回退实现并不是必须的。

在父类AbstractCommand的基础上,它提供了使用更为频繁的public R execute()以及public Future<R> queue()方法供以使用者调用。


使用示例
代码语言:javascript
复制
@Test
public void fun1(){
    MyCommand command = new MyCommand();
    Integer result = command.execute();
    System.out.println(result);
}


private static class MyCommand extends HystrixCommand<Integer>{

    protected MyCommand() {
        super(HystrixCommandGroupKey.Factory.asKey("MyCommandGroup"));
    }

    // 目标方法
    @Override
    protected Integer run() throws Exception {
        return 1;
    }

    // fallback方法逻辑,并不是必须提供的哦
    @Override
    protected Integer getFallback() {
        return -1;
    }
}

运行程序,控制台打印:

代码语言:javascript
复制
1

HystrixObservableCommand

用在依赖服务返回多个操作结果的时候,其实可以认为返回的是个事件发射器,可以持续发射数据

说明:类似于数据流的形式可持续发射数据,一般使用较少

代码语言:javascript
复制
// 它比HystrixCommand少实现了一个`HystrixExecutable`,所以它仅有异步的能力
public abstract class HystrixObservableCommand<R> extends AbstractCommand<R> implements HystrixObservable<R> {
	
	... // 省略所有构造器
	... // 省略Setter

	// ============实现父类抽象方法============
	// 如果有下一个事件应该报告
    @Override
    protected boolean shouldOutputOnNextEvents() {
        return true;
    }
    @Override // 标记为过期了,禁用
    protected String getFallbackMethodName() {
        return "resumeWithFallback";
    }
    // 它的方法名改为了resumeWithFallback,请务必注意哦
    @Override
    protected boolean isFallbackUserDefined() {
    	... // 缓存
    	getClass().getDeclaredMethod("resumeWithFallback");
    	... // 缓存
    }
    // 若你想fallback,请复写此方法
    protected Observable<R> resumeWithFallback() {
        return Observable.error(new UnsupportedOperationException("No fallback available."));
    }

    @Override
    protected boolean commandIsScalar() {
        return false;
    }


    @Override
    final protected Observable<R> getExecutionObservable() {
        return construct();
    }
    // 目标方法。你的逻辑写在这里面
    // HystrixCommand的叫run方法嘛,这个叫construct方法~~~~
	protected abstract Observable<R> construct();
	// ============实现接口方法============
	... // 父类均已实现
}

相应的,本类的construct()抽象方法作用同上的run()抽象方法;本类的resumeWithFallback()回退方法同上的getFallback()方法。


使用示例
代码语言:javascript
复制
@Test
public void fun2() {
    MyObservableCommand command = new MyObservableCommand();
    Observable<Integer> observe = command.observe();
    // Observable<Integer> observe = command.toObservable();

    System.out.println("==========订阅者之前的一句话==========");
    observe.subscribe(d -> System.out.println(d));
    System.out.println("==========订阅者之后的一句话==========");
}


private static class MyObservableCommand extends HystrixObservableCommand<Integer> {

    protected MyObservableCommand() {
        super(HystrixCommandGroupKey.Factory.asKey("MyCommandGroup"));
    }

    // 一次性发送5个返回值,持续不断
    @Override
    protected Observable<Integer> construct() {
        // return Observable.just(1, 2, 3, 4);
        return Observable.create(subscriber -> {
            subscriber.onStart();
            for (int i = 1; i <= 4; i++) {
                System.out.println("开始发射数据:" + i);
                subscriber.onNext(i);
            }
            subscriber.onCompleted();
        });
    }

    @Override
    protected Observable<Integer> resumeWithFallback() {
        return Observable.just(-1);
    }
}

运行程序,控制台打印:

代码语言:javascript
复制
// 使用observe()方法结果:发射数据动作立马执行
开始发射数据:1
开始发射数据:2
开始发射数据:3
开始发射数据:4
==========订阅者之前的一句话==========
1
2
3
4
==========订阅者之后的一句话==========

// 使用toObservable()方法结果:发射数据动作明显是木有立马开始的
==========订阅者之前的一句话==========
开始发射数据:1
1
开始发射数据:2
2
开始发射数据:3
3
开始发射数据:4
4
==========订阅者之后的一句话==========

Hystrix的四种调用方法有何异同?

众所周知,Hystrix一共提供了4种调用方法供以使用:

  • toObservable() :未做订阅,只是返回一个Observable
  • observe():调用 #toObservable() 方法,并向 Observable 注册,rx.subjects.ReplaySubject发起订阅,因此它具有回放的能力
    • observe() 方法使用了ReplaySubject缓存了toObservable的消息,使得执行后再监听也可以收到所有消息。新订阅者连历史数据也能够监听到(1分钟内)
  • queue():调用toObservable().toBlocking().toFuture()返回 Future 对象
  • execute():调用#queue() 方法的基础上,马上调用 Future#get() 方法,同步返回 #run() 的执行结果。

observe() vs toObservable()

四种调用方法中,最难区分的当属observe() 和oObservable()了,这里做进一步的解释说明和对比。

observe()和toObservable()虽然都返回了Observable对象,但是observe()返回的是Hot Observable,该命令会在observe()调用的时候立即执行,当Observable每次被订阅的时候会重放他的行为; 而toObservable()返回的是Cold Observable,toObservable()执行之后,命令不会被立即执行,只有当所有订阅者都订阅它之后才会执行。

execute()、queue()也都使用了RxJava来实现,并且queue()是通过toObservable()来获得一个Cold Observable(不会立马执行),并且通过toBlocking()将该Observable转换成BlockingObservable,它可以把数据以阻塞的方式发出来,而toFuture方法则是把BlockingObservable转换成一个Future,该方法只是创建一个Future返回,并不会阻塞,这使得消费者可以自己决定如何处理异步操作

说明:Future实例出来后,目标方法是立马执行的,只是它不会阻塞主线程,并且执行结果你可以在其它地方get获取(若还没执行完成时去get,会阻碍)


总结

关于Hystrix请求命令:HystrixCommand和HystrixObservableCommand就介绍到这了,到本篇文章截止,关于Hystrix的核心内容(也就是hystrix-core)就基本介绍完了。后面会再有基于@HystrixCommand注解的开发、和Servlet容器的整合以及监控大盘hystrix-dashboard的“制作”(当然更少不了和Spring、Spring Cloud的整合),欢迎你的持续关注。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 目录
  • 前言
  • 正文
    • HystrixCommand
      • 使用示例
    • HystrixObservableCommand
      • 使用示例
    • Hystrix的四种调用方法有何异同?
      • observe() vs toObservable()
  • 总结
相关产品与服务
容器服务
腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档