前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >SpringCloud - Hystrix的执行流程

SpringCloud - Hystrix的执行流程

作者头像
JavaEdge
发布2022-11-30 15:13:47
3640
发布2022-11-30 15:13:47
举报
文章被收录于专栏:JavaEdge

0 Hystrix执行原理图

1 创建HystrixCommand/HystrixObservableCommand

一个HystrixCommandHystrixObservableCommand对象,代表对某个依赖服务发起的一次请求或者调用 构造的时候,可在构造器中传入任何需要的参数。

  • HystrixCommand仅返回一个结果的调用。
  • HystrixObservableCommand可能会返回多条结果的调用。

直接继承HystrixCommand并实现run方法即可。

代码语言:javascript
复制
public class GetUserAccountCommand extends HystrixCommand<UserAccount> {
	...
    @Override
    protected UserAccount run() {
        /* 模拟执行网络调用以检索用户信息 */
        try {
            Thread.sleep((int) (Math.random() * 10) + 2);
        } catch (InterruptedException e) {
        }

        /* 5%的时间失败来说明fallback的工作原理 */
        if (Math.random() > 0.95) {
            throw new RuntimeException("random failure processing UserAccount network response");
        }

        /* 延迟会增加5%的时间,因此有时会触发超时 */
        if (Math.random() > 0.95) {
            // 随机等待时间尖峰
            try {
                Thread.sleep((int) (Math.random() * 300) + 25);
            } catch (InterruptedException e) {
            }
        }

        /* 成功...使用远程服务响应的数据创建UserAccount */
        return new UserAccount(86975, "John James", 2, true, false, true);
    }
	...
}

2 调用command的执行方法

执行Command就可以发起一次对依赖服务的调用

要执行Command,需要在4个方法中选择其中的一个

  • 前两种是HystrixCommand独有的哦

2.1 execute()

代码语言:javascript
复制
    /**
     * 用于同步执行 command.
     * 
     * @return R
     *         如果command由于任何原因失败,则执行 #run 或从 #getFallback() fallback的结果.
     *
     * @throws HystrixRuntimeException
     *             如果发生故障并且无法检索fallback
     * @throws HystrixBadRequestException
     *             如果使用了无效的参数或状态来表示用户故障,而不是系统故障
     *
     * @throws IllegalStateException
     *             如果多次调用
     */
    public R execute() {
        try {
            return queue().get();
        } catch (Exception e) {
            throw Exceptions.sneakyThrow(decomposeException(e));
        }
    }

调用后直接阻塞,直到依赖服务返回单条结果,或抛异常

2.2 queue()

代码语言:javascript
复制
    /**
     * 用于异步执行命令 command.
     *
     * 这将使该command在线程池上排队,并在完成后返回一个 Future 以获取结果.
     * 注意:如果配置为不在单独的线程中运行,则其效果与 #execute() 相同,并会阻塞.
     * 不会抛出异常,而只是切换为同步执行,因此无需更改代码即可 将command从运行在单独的线程切换到调用线程.
     * (switch a command from running on a separate thread to the calling thread.)
     * 
     * @return {@code Future <R>}执行 #run() 的结果,或者如果command由于任何原因失败,则返回 #getFallback() 的结果.
     * @throws HystrixRuntimeException
     *             如果不存在fallback
     *             如果通过 ExecutionException#getCause() 中的{@code Future.get(), 如果不存在失败发生的话
     *             或者如果无法将命令排队(如,短路,线程池/信号被拒绝),则立即返回
     * @throws HystrixBadRequestException
     *         通过 ExecutionException#getCause() 中的 Future.get() 如果使用了无效的参数或状态来表示用户故障而不是系统故障
     * @throws IllegalStateException
     *             如果多次调用
     */
    public Future<R> queue() {
        /*
         * 当Future.cancel(boolean)的“ mayInterrupt”标志设为true时
         * 由Observable.toBlocking().toFuture() 返回的Future不实现执行线程的中断
         * 因此,为了遵守Future的约定,我们必须围绕它.
         */
        final Future<R> delegate = toObservable().toBlocking().toFuture();
    	
        final Future<R> f = new Future<R>() {

            @Override
            public boolean cancel(boolean mayInterruptIfRunning) {
                if (delegate.isCancelled()) {
                    return false;
                }

                if (HystrixCommand.this.getProperties().executionIsolationThreadInterruptOnFutureCancel().get()) {
                    /*
                     * The most consistent way to deal with this scenario is to say that if *any* cancellation is invoked with interruption,
                     * than that interruption request cannot be taken back.
                     * 这里唯一有效的转换是false -> true.
                     * 如果存在由该命令创建(这很奇怪,但从未禁止过)的两个futures,例如f1和f2,
                     * 并且对f1.cancel(true)和f2.cancel(false)的调用是由不同的线程发起,
                     * 尚不清楚在检查mayInterruptOnCancel时将使用什么值.
                     * 处理这种情况的最一致的方法是说,如果在中断的情况下调用了任何cancellation,则无法撤回该中断请求.
                     */
                    interruptOnFutureCancel.compareAndSet(false, mayInterruptIfRunning);
        		}

                final boolean res = delegate.cancel(interruptOnFutureCancel.get());

                if (!isExecutionComplete() && interruptOnFutureCancel.get()) {
                    final Thread t = executionThread.get();
                    if (t != null && !t.equals(Thread.currentThread())) {
                        t.interrupt();
                    }
                }

                return res;
			}

            @Override
            public boolean isCancelled() {
                return delegate.isCancelled();
			}

            @Override
            public boolean isDone() {
                return delegate.isDone();
			}

            @Override
            public R get() throws InterruptedException, ExecutionException {
                return delegate.get();
            }

            @Override
            public R get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
                return delegate.get(timeout, unit);
            }
        	
        };

        /* 对立即抛出的错误状态的特殊处理 */
        if (f.isDone()) {
            try {
                f.get();
                return f;
            } catch (Exception e) {
                Throwable t = decomposeException(e);
                if (t instanceof HystrixBadRequestException) {
                    return f;
                } else if (t instanceof HystrixRuntimeException) {
                    HystrixRuntimeException hre = (HystrixRuntimeException) t;
                    switch (hre.getFailureType()) {
					case COMMAND_EXCEPTION:
					case TIMEOUT:
                        // 不会仅从 queue().get() 中将这些类型从 queue() 中抛出, 因为它们是执行错误
						return f;
					default:
						// these are errors we throw from queue() as they as rejection type errors
                        // 这些是从 queue() 抛出的错误,因为它们是拒绝类型错误
						throw hre;
					}
                } else {
                    throw Exceptions.sneakyThrow(t);
                }
            }
        }

        return f;
    }

调用,返回一个Future,后面可以通过Future获取单条结果

2.3 observe()

订阅一个Observable对象,Observable代表的是依赖服务返回的结果,获取到一个那个代表结果的Observable对象的拷贝对象

  • toObservable() 返回一个Observable对象,如果我们订阅这个对象,就会执行command并且获取返回结果

其中execute()和queue()仅对HystrixCommand适用

代码语言:javascript
复制
K             value   = command.execute();
Future<K>     fValue  = command.queue();
Observable<K> ohValue = command.observe();         
Observable<K> ocValue = command.toObservable();    
  • execute()实际上会调用queue().get()
  • 在 queue() 方法中,会调用toObservable().toBlocking().toFuture()

即,无论是哪种执行command的方式,最终都是依赖toObservable() 也就是说同步的HystrixCommand最终都会依赖Observable,尽管HystrixCommand是用来发射单个事件的

3 检查是否开启缓存

如果这个command开启了请求缓存(request cache),而且这个调用的结果在缓存中存在,那么直接从缓存中返回结果 否则,继续往后

代码语言:javascript
复制
        return Observable.defer(new Func0<Observable<R>>() {
            @Override
            public Observable<R> call() {
                 /* 这是一个有状态的对象,因此只能使用一次 */
                if (!commandState.compareAndSet(CommandState.NOT_STARTED, CommandState.OBSERVABLE_CHAIN_CREATED)) {
                    IllegalStateException ex = new IllegalStateException("This instance can only be executed once. Please instantiate a new instance.");
                    // TODO 为此创建新的错误类型
                    throw new HystrixRuntimeException(FailureType.BAD_REQUEST_EXCEPTION, _cmd.getClass(), getLogMessagePrefix() + " command executed multiple times - this is not permitted.", ex, null);
                }

                commandStartTimestamp = System.currentTimeMillis();

                if (properties.requestLogEnabled().get()) {
                    // 记录此命令执行,无论发生什么情况
                    if (currentRequestLog != null) {
                        currentRequestLog.addExecutedCommand(_cmd);
                    }
                }

                final boolean requestCacheEnabled = isRequestCachingEnabled();
                final String cacheKey = getCacheKey();

                /* 首先尝试从缓存 */
                if (requestCacheEnabled) {
                    HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
                    if (fromCache != null) {
                        isResponseFromCache = true;
                        return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
                    }
                }

                Observable<R> hystrixObservable =
                        Observable.defer(applyHystrixSemantics)
                                .map(wrapWithAllOnNextHooks);

                Observable<R> afterCache;

                // 放入缓存
                if (requestCacheEnabled && cacheKey != null) {
                    // 包装以缓存
                    HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);
                    HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);
                    if (fromCache != null) {
                        // 另一个线程击败了我们,因此使用缓存值
                        toCache.unsubscribe();
                        isResponseFromCache = true;
                        return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
                    } else {
                        // 我们刚刚创建了一个ObservableCommand,所以我们进行了强制转换并返回了它
                        afterCache = toCache.toObservable();
                    }
                } else {
                    afterCache = hystrixObservable;
                }

                return afterCache
                        .doOnTerminate(terminateCommandCleanup)     // 进行一次清理(在正常终端状态(此行)或退订(下一行))
                        .doOnUnsubscribe(unsubscribeCommandCleanup) // 进行一次清理
                        .doOnCompleted(fireOnCompletedHook);
            }
        });

上述代码中有个判断final boolean requestCacheEnabled = isRequestCachingEnabled();可以看到如果我们的Command实现了getCacheKey方法,并且requestCacheEnabled(这个属性默认是true,可以通过调用HystrixCommand的构造方法传入一个setter对象修改默认属性)这样就不会执行后续的run方法,就会直接返回一个缓存的Observable。(必须是同一个request context里面的两个command才能用到缓存)

4 检查是否开启短路器

检查这个command对应的依赖服务是否开启短路器

如果断路器被打开了,那么hystrix就不会执行这个command,而是直接执行fallback降级

5 检查线程池/队列/semaphore是否已满

如果command对应的线程池/队列/semaphore已满,那么也不会执行command,而是直接去调用fallback降级机制,同时发送 reject 信息给断路器统计

6 执行command

调用HystrixObservableCommand.construct()或HystrixCommand.run()来实际执行这个command

  • HystrixCommand.run() 返回一个单条结果,或者抛出一个异常
  • HystrixObservableCommand.construct() 返回一个Observable对象,可以获取多条结果

如果HystrixCommand.run()或HystrixObservableCommand.construct()的执行,超过了timeout时长的话,那么command所在的线程就会抛出一个TimeoutException 如果timeout了,也会去执行fallback降级机制,而且就不会管run()或construct()返回的值

我们是不可能终止掉一个调用严重延迟的依赖服务的线程的,只能说给你抛出来一个TimeoutException,但是还是可能会因为严重延迟的调用线程占满整个线程池的 即使这个时候新来的流量都被限流了。。。

如果没有timeout的话,那么就会拿到一些调用依赖服务获取到的结果,然后hystrix会做一些logging记录和metric统计

7 短路健康检查

Hystrix会将每一个依赖服务的调用成功,失败,拒绝,超时,等事件,都会发送给circuit breaker断路器

短路器就会对调用成功/失败/拒绝/超时等事件的次数进行统计

短路器会根据这些统计次数来决定,是否要进行短路,如果打开了短路器,那么在一段时间内就会直接短路,然后如果在之后第一次检查发现调用成功了,就关闭断路器

8 调用fallback降级机制

failfast

在 run 方法中直接抛异常快速失败。

fail silent

run 里有个降级方法,内部返回一个空

代码语言:javascript
复制
return null ;
return new Option<T>();
return Collections . emptyList();
return Collections . emptyMap( );

static fallback

  • 降级方法中返回一个默认值。
代码语言:javascript
复制
return true;
return DEFAULT_OBJECT;

fallback by network

  • 主服务挂了,调用辅助服务的一个降级方法(需通过网络传输请求了)

primary + sencondary with fallback

主次降级。将新功能封装在老功能后面。

请求合并

将请求通过时间窗口都合并在一个队列中。

请求缓存

在以下几种情况中,hystrix会调用fallback降级机制

  • run()或construct()抛出一个异常
  • 短路器打开
  • 线程池/队列/semaphore满了
  • command执行超时了

一般在降级机制中,都建议给出一些默认的返回值,比如静态的一些代码逻辑,或者从内存中的缓存中提取一些数据,尽量在这里不要再进行网络请求了 即使在降级中,一定要进行网络调用,也应该将那个调用放在一个HystrixCommand中,进行隔离

  • 在HystrixCommand中,实现getFallback()方法,可以提供降级机制
  • 在HystirxObservableCommand中,实现一个resumeWithFallback()方法,返回一个Observable对象,可以提供降级结果

如果fallback返回了结果,那么hystrix就会返回这个结果

  • 对于HystrixCommand,会返回一个Observable对象,其中会发返回对应的结果
  • 对于HystrixObservableCommand,会返回一个原始的Observable对象

如果没有实现fallback,或者是fallback抛出了异常,Hystrix会返回一个Observable,但是不会返回任何数据

不同的command执行方式,其fallback为空或者异常时的返回结果不同

  • 对于execute(),直接抛出异常
  • 对于queue(),返回一个Future,调用get()时抛出异常
  • 对于observe(),返回一个Observable对象,但是调用subscribe()方法订阅它时,理解抛出调用者的onError方法
  • 对于toObservable(),返回一个Observable对象,但是调用subscribe()方法订阅它时,理解抛出调用者的onError方法

9 不同的执行方式

  • execute(),获取一个Future.get(),然后拿到单个结果
  • queue(),返回一个Future
  • observer(),立即订阅Observable,然后启动8大执行步骤,返回一个拷贝的Observable,订阅时理解回调给你结果
  • toObservable(),返回一个原始的Observable,必须手动订阅才会去执行8大步骤
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2019-07-15,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 0 Hystrix执行原理图
  • 1 创建HystrixCommand/HystrixObservableCommand
  • 2 调用command的执行方法
    • 2.1 execute()
      • 2.2 queue()
        • 2.3 observe()
        • 3 检查是否开启缓存
        • 4 检查是否开启短路器
        • 5 检查线程池/队列/semaphore是否已满
        • 6 执行command
        • 7 短路健康检查
        • 8 调用fallback降级机制
          • failfast
            • fail silent
              • static fallback
                • fallback by network
                  • primary + sencondary with fallback
                    • 请求合并
                      • 请求缓存
                      • 9 不同的执行方式
                      领券
                      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档