前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【一起学源码-微服务】Hystrix 源码三:Hystrix核心流程:Hystix降级、熔断等原理剖析

【一起学源码-微服务】Hystrix 源码三:Hystrix核心流程:Hystix降级、熔断等原理剖析

作者头像
一枝花算不算浪漫
发布2020-02-14 11:59:32
1.1K0
发布2020-02-14 11:59:32
举报

前言

前情回顾

上一讲我们讲解了Hystrix在配合feign的过程中,一个正常的请求逻辑该怎样处理,这里涉及到线程池的创建、HystrixCommand的执行等逻辑。

如图所示:

Hystrix线程池创建过程及线程调用原理.jpg
Hystrix线程池创建过程及线程调用原理.jpg

高清大图:https://www.processon.com/view/link/5e1c128ce4b0169fb51ce77e

本讲目录

这一讲开始讲解Hystrix的看家本领:熔断+降级。 熔断功能是Hystrix最核心的组件,当然也是最复杂的一块。 源码中细节太多,本讲我们主要还是专注于它的设计思想去学习。

目录如下:

  1. HystrixCircuitBreaker初始化过程
  2. Hystrix熔断机制(CLOSED/OPEN/HALF_OPEN)
  3. fallback降级机制

源码分析

HystrixCircuitBreaker初始化过程

我们还是会以AbstractCommand为突破口,这里继续看它的构造函数,其中里面有初始化熔断器initCircuitBreaker()的过程,具体代码如下:

代码语言:javascript
复制
abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {

    private static HystrixCircuitBreaker initCircuitBreaker(boolean enabled, HystrixCircuitBreaker fromConstructor,
                                                            HystrixCommandGroupKey groupKey, HystrixCommandKey commandKey,
                                                            HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
        if (enabled) {
            if (fromConstructor == null) {
                // 构建默认的HystrixCircuitBreaker
                return HystrixCircuitBreaker.Factory.getInstance(commandKey, groupKey, properties, metrics);
            } else {
                return fromConstructor;
            }
        } else {
            return new NoOpCircuitBreaker();
        }
    }
}


public interface HystrixCircuitBreaker {
    public static HystrixCircuitBreaker getInstance(HystrixCommandKey key, HystrixCommandGroupKey group, HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
        // circuitBreakersByCommand是一个map,key为commandKey,也就是FeignClient中定义的方法名
        // 类似于ServiceAFeignClient.sayHello(String)
        HystrixCircuitBreaker previouslyCached = circuitBreakersByCommand.get(key.name());
        if (previouslyCached != null) {
            return previouslyCached;
        }

        // 每个commandKey都对应着自己的熔断器,如果没有则会构造一个HystrixCircuitBreaker
        HystrixCircuitBreaker cbForCommand = circuitBreakersByCommand.putIfAbsent(key.name(), new HystrixCircuitBreakerImpl(key, group, properties, metrics));
        if (cbForCommand == null) {
            return circuitBreakersByCommand.get(key.name());
        } else {
            return cbForCommand;
        }
    }

    class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker {
        private final HystrixCommandProperties properties;
        private final HystrixCommandMetrics metrics;

        private Subscription subscribeToStream() {
            // 对HealthCounts进行订阅
            // HealthCounts中包含 总请求次数、总失败次数、失败率
            // HealthCounts 统计数据有变化则会回调到这里来
            return metrics.getHealthCountsStream()
                    .observe()
                    .subscribe(new Subscriber<HealthCounts>() {
                        @Override
                        public void onCompleted() {

                        }

                        @Override
                        public void onError(Throwable e) {

                        }

                        // 判断是否要降级的核心逻辑
                        @Override
                        public void onNext(HealthCounts hc) {
                            // 一个时间窗口(默认10s钟)总请求次数是否大于circuitBreakerRequestVolumeThreshold 默认为20s
                            if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
                                
                            } else {
                                // 错误率(总错误次数/总请求次数)小于circuitBreakerErrorThresholdPercentage(默认50%)
                                if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {

                                } else {
                                    // 反之,熔断状态将从CLOSED变为OPEN,且circuitOpened==>当前时间戳
                                    if (status.compareAndSet(Status.CLOSED, Status.OPEN)) {
                                        circuitOpened.set(System.currentTimeMillis());
                                    }
                                }
                            }
                        }
                    });
        }
    }
}

上面就是熔断器初始化过程,这里面做了几件事:

  1. 每个commandKey都有自己的一个熔断器 commandKey表现形式为:ServiceAFeignClient#sayHello(String)
  2. 如果commandKey不存在熔断器,则构建默认熔断器 默认熔断器会对HealthCounts进行订阅。HealthCounts中包含时间窗口内(默认10s钟)请求的总次数、失败次数、失败率
  3. HealthCounts中统计数据有变化则会回调subscribe.onNext()方法进行熔断开启判断
  4. 熔断开启条件:
  • 时间窗口内(默认10s钟)总请求次数大于20次
  • 时间窗口内(默认10s钟)失败率大于50%
  • 满足上述两个条件后熔断器状态从CLOSED变成OPEN

熔断器在第一次请求时会初始化AbtractCommand,同时也会创建对应commandKey的熔断器 ,熔断器默认都是关闭的(可配置为强制开启),只有满足触发条件才会被开启。下面就一起来看下熔断、半开等状态是如何触发的吧。

Hystrix熔断机制(CLOSED/OPEN/HALF_OPEN)

这里我们以AbstractCommand.applyHystrixSemantics() 为入口,一步步往下探究,这个方法在上一讲已经提到过,一个正常的Feign请求都会调用此方法。

代码语言:javascript
复制
abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
    private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
        // 如果熔断了,这这里返回为false
        // 这里也包含HALF_OPEN逻辑
        if (circuitBreaker.attemptExecution()) {
            final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
                @Override
                public void call(Throwable t) {
                    eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
                }
            };

            if (executionSemaphore.tryAcquire()) {
                try {
                    executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
                    return executeCommandAndObserve(_cmd)
                            .doOnError(markExceptionThrown)
                            .doOnTerminate(singleSemaphoreRelease)
                            .doOnUnsubscribe(singleSemaphoreRelease);
                } catch (RuntimeException e) {
                    return Observable.error(e);
                }
            } else {
                return handleSemaphoreRejectionViaFallback();
            }
        } else {
            return handleShortCircuitViaFallback();
        }
    }
}

circuitBreaker.attemptExecution() 这个逻辑就是判断,如果熔断了,那么返回false。而且这里还包含HALF_OPEN的逻辑,我们先看如何触发熔断的,这个后面再接着看。

接着往下跟进executeCommandAndObserve() 方法:

代码语言:javascript
复制
abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
    private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
        final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();

        // 省略部分代码...

        // 运行过程中,出现异常等都会进入此回调函数
        final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
            @Override
            public Observable<R> call(Throwable t) {
                circuitBreaker.markNonSuccess();
                Exception e = getExceptionFromThrowable(t);
                executionResult = executionResult.setExecutionException(e);
                if (e instanceof RejectedExecutionException) {
                    return handleThreadPoolRejectionViaFallback(e);
                } else if (t instanceof HystrixTimeoutException) {
                    return handleTimeoutViaFallback();
                } else if (t instanceof HystrixBadRequestException) {
                    return handleBadRequestByEmittingError(e);
                } else {
                    /*
                     * Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException.
                     */
                    if (e instanceof HystrixBadRequestException) {
                        eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
                        return Observable.error(e);
                    }

                    return handleFailureViaFallback(e);
                }
            }
        };

        Observable<R> execution;
        if (properties.executionTimeoutEnabled().get()) {
            // 这里创建一个 HystrixObservableTimeoutOperator
            execution = executeCommandWithSpecifiedIsolation(_cmd)
                    .lift(new HystrixObservableTimeoutOperator<R>(_cmd));
        } else {
            execution = executeCommandWithSpecifiedIsolation(_cmd);
        }

        return execution.doOnNext(markEmits)
                .doOnCompleted(markOnCompleted)
                .onErrorResumeNext(handleFallback)
                .doOnEach(setRequestContext);
    }
}

当我们服务调用中出现异常都会进入handleFallback()中,里面的方法我们就不继续跟入了,猜测里面会有更新HealthCounts中的属性,然后触发 HystrixCircuitBreaker中的onNext()方法,当满足熔断条件时 则会将熔断状态从CLOSED变成OPEN

这里我们会跟进下HystrixObservableTimeoutOperator 代码,这个是对我们执行过程中判断是否超时。 上面代码中,执行executeCommandWithSpecifiedIsolation() 方法时也会创建一个超时监视器:

代码语言:javascript
复制
private static class HystrixObservableTimeoutOperator<R> implements Operator<R, R> {

    final AbstractCommand<R> originalCommand;

    public HystrixObservableTimeoutOperator(final AbstractCommand<R> originalCommand) {
        this.originalCommand = originalCommand;
    }

    @Override
    public Subscriber<? super R> call(final Subscriber<? super R> child) {
        TimerListener listener = new TimerListener() {

            @Override
            public void tick() {
                // 判断command的timeOut状态,如果是未执行状态,则更新为已超时
                if (originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.TIMED_OUT)) {
                    originalCommand.eventNotifier.markEvent(HystrixEventType.TIMEOUT, originalCommand.commandKey);
                    s.unsubscribe();

                    final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(originalCommand.concurrencyStrategy, hystrixRequestContext, new Runnable() {
                        @Override
                        public void run() {
                            child.onError(new HystrixTimeoutException());
                        }
                    });


                    timeoutRunnable.run();
                }
            }

            @Override
            public int getIntervalTimeInMilliseconds() {
                return originalCommand.properties.executionTimeoutInMilliseconds().get();
            }
        };

        final Reference<TimerListener> tl = HystrixTimer.getInstance().addTimerListener(listener);
        originalCommand.timeoutTimer.set(tl);

        // 省略部分代码...
        s.add(parent);

        return parent;
    }
}

public class HystrixTimer {
    public Reference<TimerListener> addTimerListener(final TimerListener listener) {
        startThreadIfNeeded();
        Runnable r = new Runnable() {
            @Override
            public void run() {
                try {
                    // 执行上面的tick方法,改变command timeout状态
                    listener.tick();
                } catch (Exception e) {
                    logger.error("Failed while ticking TimerListener", e);
                }
            }
        };

        // 执行调度任务,延迟加载,延迟时间和调度时间默认都为1s钟
        // 这里使用线程池,coreSize=cpu核心数 maxSize为Integer.Max
        ScheduledFuture<?> f = executor.get().getThreadPool().scheduleAtFixedRate(r, listener.getIntervalTimeInMilliseconds(), listener.getIntervalTimeInMilliseconds(), TimeUnit.MILLISECONDS);
        return new TimerReference(listener, f);
    }
}

这里面核心业务是起一个调度任务,默认每秒钟执行一次,然后调用tick()方法,如果当前command状态还是NOT_EXECUTED状态,那么将command状态改为TIMED_OUT 。此时会进入到之前的handleFallback回调函数中,这里又会更新HealthCounts中的数据,对应的触发之前熔断的判断条件:

代码语言:javascript
复制
protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, final HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
  this.properties = properties;
  this.metrics = metrics;

  //On a timer, this will set the circuit between OPEN/CLOSED as command executions occur
  Subscription s = subscribeToStream();
  activeSubscription.set(s);
}

private Subscription subscribeToStream() {
  //这里会在每次执行onNext()事件的时候来评估是否需要打开或者关闭断路器
  return metrics.getHealthCountsStream()
    .observe()
    .subscribe(new Subscriber<HealthCounts>() {
      @Override
      public void onCompleted() {

      }

      @Override
      public void onError(Throwable e) {

      }

      @Override
      public void onNext(HealthCounts hc) {
        //首先校验的时在时间窗范围内的请求次数,如果低于阈值(默认是20),不做处理,如果高于阈值,则去判断接口请求的错误率
        if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {           // 如果没有超过统计阈值的最低窗口值,就没有必要去改变断路器的状态
          // 当前如果断路器是关闭的,那么就保持关闭状态无需更改;
          // 如果断路器状态为半开状态,需要等待直到有成功的命令执行;
          // 如果断路器是打开状态,需要等待休眠窗口过期。
        } else {
          //判断接口请求的错误率(阈值默认是50),如果高于这个值,则断路器打开
          if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
    
            // 如果当前请求的错误率小于断路器设置的容错率百分比,也不会拦截请求
          } else {
            // 如果当前错误率太高则打开断路器
            if (status.compareAndSet(Status.CLOSED, Status.OPEN)) {
              circuitOpened.set(System.currentTimeMillis());
            }
          }
        }
      }
    });
}

如果符合熔断条件,那么command熔断状态就会变为OPEN,此时熔断器打开。

如果我们command执行成功,那么就会清理掉这个timeout timer schedule任务。

代码语言:javascript
复制
abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
    private void handleCommandEnd(boolean commandExecutionStarted) {
        Reference<TimerListener> tl = timeoutTimer.get();
        // 如果timeOutTimer不为空,这里则clear一下
        // clear会关闭启动的调度任务
        if (tl != null) {
            tl.clear();
        }

        long userThreadLatency = System.currentTimeMillis() - commandStartTimestamp;
        executionResult = executionResult.markUserThreadCompletion((int) userThreadLatency);
        if (executionResultAtTimeOfCancellation == null) {
            // metrics统计数据
            metrics.markCommandDone(executionResult, commandKey, threadPoolKey, commandExecutionStarted);
        } else {
            metrics.markCommandDone(executionResultAtTimeOfCancellation, commandKey, threadPoolKey, commandExecutionStarted);
        }

        if (endCurrentThreadExecutingCommand != null) {
            endCurrentThreadExecutingCommand.call();
        }
    }
}

如上所属,我们已经知道了熔断开启的触发时机,那么如果一个commandKey开启了熔断,下次的请求是该如何直接降级呢?我们来看下代码:

代码语言:javascript
复制
abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
    private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {

        // 这个if条件就代表是否开启熔断
        if (circuitBreaker.attemptExecution()) {
            // 执行业务逻辑代码...
        } else {
            return handleShortCircuitViaFallback();
        }
    }
}

class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker {
    public boolean attemptExecution() {
            // 如果熔断配置的为强制开启,那么直接返回false执行熔断逻辑
            if (properties.circuitBreakerForceOpen().get()) {
                return false;
            }
            // 如果熔断配置为强制关闭,那么永远不走熔断逻辑
            if (properties.circuitBreakerForceClosed().get()) {
                return true;
            }
            // 熔断开启时 circuitOpened设置为当前时间戳
            if (circuitOpened.get() == -1) {
                return true;
            } else {
                // 如果当前时间距离熔断小于5s钟,那么将熔断状态从OPEN改为HALF_OPEN
                if (isAfterSleepWindow()) {
                    if (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) {
                        //only the first request after sleep window should execute
                        return true;
                    } else {
                        return false;
                    }
                } else {
                    return false;
                }
            }
        }
    }

    private boolean isAfterSleepWindow() {
        final long circuitOpenTime = circuitOpened.get();
        final long currentTime = System.currentTimeMillis();
        // circuitBreakerSleepWindowInMilliseconds 默认为5s钟
        final long sleepWindowTime = properties.circuitBreakerSleepWindowInMilliseconds().get();
        // 当前熔断距离熔断是否超过5s钟
        return currentTime > circuitOpenTime + sleepWindowTime;
    }
}

我们可以看到,在applyHystrixSemantics()这个核心的方法中,先判断是否熔断,如果熔断则直接走fallback逻辑。

attemptExecution()判断条件中还涉及到HALF_OPEN的逻辑,如果熔断开启,下一次请求的时候,会判断当前时间距离上一次时间是否超过了5s钟,如果没有超过,则会将熔断状态从OPEN变为HALF_OPEN,此时会放一个请求按照正常逻辑去执行:

  1. 执行失败,熔断状态又会从HALF_OPEN变成OPEN
  2. 执行成功,熔断状态从HALF_OPEN变成CLOSED,并清除熔断相关设置

执行成功后代码:

代码语言:javascript
复制
class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker {
    public void markSuccess() {
        if (status.compareAndSet(Status.HALF_OPEN, Status.CLOSED)) {
            //This thread wins the race to close the circuit - it resets the stream to start it over from 0
            metrics.resetStream();
            Subscription previousSubscription = activeSubscription.get();
            if (previousSubscription != null) {
                previousSubscription.unsubscribe();
            }
            Subscription newSubscription = subscribeToStream();
            activeSubscription.set(newSubscription);
            circuitOpened.set(-1L);
        }
    }
}

上面对整个熔断的状态:CLOSED、OPEN、HALF_OPEN梳理的已经很清楚了,下面看看降级是该如何处理的吧。

fallback降级机制

上面已经讲解了Hystrix 熔断开启的机制等内容,这里主要是说如果一个请求失败(线程池拒绝、超时、badRequest等),那么Hystrix是如何执行降级的呢?

还是回到我们最初的代码 HystrixInvocationHandler类中,看看其invoke()方法中的getFallback回调函数:

代码语言:javascript
复制
protected Object getFallback() {
    if (fallbackFactory == null) {
      return super.getFallback();
    }
    try {
      // 通过我们配置好的fallbackFactory找到对应的FeignClient,这里是获取ServiceAFeignClient
      Object fallback = fallbackFactory.create(getExecutionException());
      // fallbackMap中key为ServiceAFeignClient.sayHello(Integer)
      // 获取具体的降级method方法
      Object result = fallbackMethodMap.get(method).invoke(fallback, args);
      if (isReturnsHystrixCommand(method)) {
        return ((HystrixCommand) result).execute();
      } else if (isReturnsObservable(method)) {
        // Create a cold Observable
        return ((Observable) result).toBlocking().first();
      } else if (isReturnsSingle(method)) {
        // Create a cold Observable as a Single
        return ((Single) result).toObservable().toBlocking().first();
      } else if (isReturnsCompletable(method)) {
        ((Completable) result).await();
        return null;
      } else {
        return result;
      }
    } catch (IllegalAccessException e) {
      // shouldn't happen as method is public due to being an interface
      throw new AssertionError(e);
    } catch (InvocationTargetException e) {
      // Exceptions on fallback are tossed by Hystrix
      throw new AssertionError(e.getCause());
    }
  }
};

这里很简单,其实就是先获取到我们自己在FallbackFactory中配置的的降级方法,然后执行降级逻辑。

总结

这一讲核心逻辑主要是Hystrix熔断状态的变化,主要是CLOSED、OPEN、HALF_OPEN几种状态触发的时间,互相转变的流程,以及执行降级逻辑的原理。

我们仍然是用一个流程图来总结一下:

Hystrix熔断_降级机制原理.jpg
Hystrix熔断_降级机制原理.jpg

高清大图链接: https://www.processon.com/view/link/5e1ee0afe4b0c62462aae684

(点击原文可以直接查看大图哦?)

申明

本文章首发自本人博客:https://www.cnblogs.com/wang-meng 和公众号:壹枝花算不算浪漫,如若转载请标明来源!

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2020-01-17 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
    • 前情回顾
      • 本讲目录
      • 源码分析
        • HystrixCircuitBreaker初始化过程
          • Hystrix熔断机制(CLOSED/OPEN/HALF_OPEN)
            • fallback降级机制
            • 总结
            • 申明
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档