前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >hystrix工作原理及源码解析

hystrix工作原理及源码解析

作者头像
山行AI
发布2019-06-28 11:59:27
1.6K0
发布2019-06-28 11:59:27
举报
文章被收录于专栏:山行AI山行AI

1. 工作流程图

2. 源码执行流程

1. 先构造一个HystrixCommand或者HystrixObservableCommand类型的对象

如果你想返回一个Observable对象则使用HystrixObservableCommand,这里使用的是HystrixCommand:

代码语言:javascript
复制
 DubboHystrixCommand command = new DubboHystrixCommand(invoker, invocation);
        return command.execute();  
        
  new DubboHystrixCommand初始化执行过程中,会有下面这些操作:
   
  protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool,
		   HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults,
		   HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore,
		   HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook) {

	   this.commandGroup = initGroupKey(group);
	   this.commandKey = initCommandKey(key, getClass());
	   this.properties = initCommandProperties(this.commandKey, propertiesStrategy, commandPropertiesDefaults);
	   this.threadPoolKey = initThreadPoolKey(threadPoolKey, this.commandGroup, this.properties.executionIsolationThreadPoolKeyOverride().get());
	   //初始化metrics
	   this.metrics = initMetrics(metrics, this.commandGroup, this.threadPoolKey, this.commandKey, this.properties);
	   //初始化熔断器
	   this.circuitBreaker = initCircuitBreaker(this.properties.circuitBreakerEnabled().get(), circuitBreaker, this.commandGroup, this.commandKey, this.properties, this.metrics);
	   //初始化线程池
	   this.threadPool = initThreadPool(threadPool, this.threadPoolKey, threadPoolPropertiesDefaults);

	   //Strategies from plugins
	   this.eventNotifier = HystrixPlugins.getInstance().getEventNotifier();
	   this.concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
	   HystrixMetricsPublisherFactory.createOrRetrievePublisherForCommand(this.commandKey, this.commandGroup, this.metrics, this.circuitBreaker, this.properties);
	   this.executionHook = initExecutionHook(executionHook);

	   this.requestCache = HystrixRequestCache.getInstance(this.commandKey, this.concurrencyStrategy);
	   this.currentRequestLog = initRequestLog(this.properties.requestLogEnabled().get(), this.concurrencyStrategy);

	   /* fallback semaphore override if applicable */
	   this.fallbackSemaphoreOverride = fallbackSemaphore;

	   /* execution semaphore override if applicable */
	   this.executionSemaphoreOverride = executionSemaphore;
   }
   
   初始化metrics部分:
   com.netflix.hystrix.HystrixCommandMetrics#getInstance(com.netflix.hystrix.HystrixCommandKey, com.netflix.hystrix.HystrixCommandGroupKey, com.netflix.hystrix.HystrixThreadPoolKey, com.netflix.hystrix.HystrixCommandProperties):
   
    public static HystrixCommandMetrics getInstance(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, HystrixThreadPoolKey threadPoolKey, HystrixCommandProperties properties) {
           // attempt to retrieve from cache first
           //每个command key维护唯一一个实例
           HystrixCommandMetrics commandMetrics = metrics.get(key.name());
           if (commandMetrics != null) {
               return commandMetrics;
           } else {
               synchronized (HystrixCommandMetrics.class) {
                   HystrixCommandMetrics existingMetrics = metrics.get(key.name());
                   if (existingMetrics != null) {
                       return existingMetrics;
                   } else {
                       HystrixThreadPoolKey nonNullThreadPoolKey;
                       if (threadPoolKey == null) {
                           nonNullThreadPoolKey = HystrixThreadPoolKey.Factory.asKey(commandGroup.name());
                       } else {
                           nonNullThreadPoolKey = threadPoolKey;
                       }
                       HystrixCommandMetrics newCommandMetrics = new HystrixCommandMetrics(key, commandGroup, nonNullThreadPoolKey, properties, HystrixPlugins.getInstance().getEventNotifier());
                       metrics.putIfAbsent(key.name(), newCommandMetrics);
                       return newCommandMetrics;
                   }
               }
           }
       }
       构造方法:
      /* package */HystrixCommandMetrics(final HystrixCommandKey key, HystrixCommandGroupKey commandGroup, HystrixThreadPoolKey threadPoolKey, HystrixCommandProperties properties, HystrixEventNotifier eventNotifier) {
             super(null);
             this.key = key;
             this.group = commandGroup;
             this.threadPoolKey = threadPoolKey;
             this.properties = properties;
     		//每个command key维护唯一一个实例
             healthCountsStream = HealthCountsStream.getInstance(key, properties);
             //每个command key维护唯一一个实例
             rollingCommandEventCounterStream = RollingCommandEventCounterStream.getInstance(key, properties);
             //每个command key维护唯一一个实例
             cumulativeCommandEventCounterStream = CumulativeCommandEventCounterStream.getInstance(key, properties);
             //每个command key维护唯一一个实例
             rollingCommandLatencyDistributionStream = RollingCommandLatencyDistributionStream.getInstance(key, properties);
             //每个command key维护唯一一个实例
             rollingCommandUserLatencyDistributionStream = RollingCommandUserLatencyDistributionStream.getInstance(key, properties);
             //每个command key维护唯一一个实例
             rollingCommandMaxConcurrencyStream = RollingCommandMaxConcurrencyStream.getInstance(key, properties);
         }  
     
     在这里关注下:com.netflix.hystrix.HystrixCommandMetrics#getHealthCounts
     
     com.netflix.hystrix.HystrixCircuitBreaker.HystrixCircuitBreakerImpl中维护着HystrixCommandMetrics中统计的信息:
     com.netflix.hystrix.HystrixCircuitBreaker.HystrixCircuitBreakerImpl#isOpen的代码如下:
     @Override
	 public boolean isOpen() {
		 if (circuitOpen.get()) {
			 // if we're open we immediately return true and don't bother attempting to 'close' ourself as that is left to allowSingleTest and a subsequent successful test to close
			 return true;
		 }

		 // we're closed, so let's see if errors have made us so we should trip the circuit open
		 HealthCounts health = metrics.getHealthCounts();

		 // check if we are past the statisticalWindowVolumeThreshold
		 if (health.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
			 // we are not past the minimum volume threshold for the statisticalWindow so we'll return false immediately and not calculate anything
			 return false;
		 }

		 if (health.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
			 return false;
		 } else {
			 // our failure rate is too high, trip the circuit
			 if (circuitOpen.compareAndSet(false, true)) {
				 // if the previousValue was false then we want to set the currentTime
				 circuitOpenedOrLastTestedTime.set(System.currentTimeMillis());
				 return true;
			 } else {
				 // How could previousValue be true? If another thread was going through this code at the same time a race-condition could have
				 // caused another thread to set it to true already even though we were in the process of doing the same
				 // In this case, we know the circuit is open, so let the other thread set the currentTime and report back that the circuit is open
				 return true;
			 }
		 }
	 }
     
     com.netflix.hystrix.HystrixCommandMetrics#getHealthCounts方法:
     public HealthCounts getHealthCounts() {
             return healthCountsStream.getLatest();
         }
         
     com.netflix.hystrix.metric.consumer.BucketedCounterStream#getLatest:       
      public Output getLatest() {
             startCachingStreamValuesIfUnstarted();
             if (counterSubject.hasValue()) {
                 return counterSubject.getValue();
             } else {
                 return getEmptyOutputValue();
             }
         }
      com.netflix.hystrix.metric.consumer.BucketedCounterStream#startCachingStreamValuesIfUnstarted:
      public void startCachingStreamValuesIfUnstarted() {
              if (subscription.get() == null) {
                  //the stream is not yet started
                  //这里的observe返回的是sourceStream 见com.netflix.hystrix.metric.consumer.BucketedRollingCounterStream#observe
                  Subscription candidateSubscription = observe().subscribe(counterSubject);
                  if (subscription.compareAndSet(null, candidateSubscription)) {
                      //won the race to set the subscription
                  } else {
                      //lost the race to set the subscription, so we need to cancel this one
                      candidateSubscription.unsubscribe();
                  }
              }
          }   
          
      com.netflix.hystrix.metric.consumer.BucketedRollingCounterStream#observe:
       @Override
          public Observable<Output> observe() {
              return sourceStream;
          }

2. 执行command

  • com.netflix.hystrix.HystrixCommand#execute: 阻塞,并返回被保护的依赖部分执行的结果,代码如下:
代码语言:javascript
复制
 /**
     * Used for synchronous execution of command.
     * 
     * @return R
     *         Result of {@link #run()} execution or a fallback from {@link #getFallback()} if the command fails for any reason.
     * @throws HystrixRuntimeException
     *             if a failure occurs and a fallback cannot be retrieved
     * @throws HystrixBadRequestException
     *             if invalid arguments or state were used representing a user failure, not a system failure
     * @throws IllegalStateException
     *             if invoked more than once
     */
    public R execute() {
        try {
            return queue().get();
        } catch (Exception e) {
            throw decomposeException(e);
        }
    }
  • com.netflix.hystrix.HystrixCommand#queue:返加一个Future,并且从这个Future中异步地获得响应的结果:
代码语言:javascript
复制
public Future<R> queue() {
        /*
         * The Future returned by Observable.toBlocking().toFuture() does not implement the
         * interruption of the execution thread when the "mayInterrupt" flag of Future.cancel(boolean) is set to true;
         * thus, to comply with the contract of Future, we must wrap around it.
         */
        final Future<R> delegate = toObservable().toBlocking().toFuture();
    	
        final Future<R> f = new Future<R>() {
      		--------------省略了部分---------------------
        }

        return f;
    }
  • com.netflix.hystrix.AbstractCommand#observe(hot observable): 用来订阅依赖部分的代表响应的Observable,并且返回一个新的复制了源Observable的Observable:
代码语言:javascript
复制
public Observable<R> observe() {
        // us a ReplaySubject to buffer the eagerly subscribed-to Observable
        ReplaySubject<R> subject = ReplaySubject.create();
        // eagerly kick off subscription
        final Subscription sourceSubscription = toObservable().subscribe(subject);
        // return the subject that can be subscribed to later while the execution has already started
        return subject.doOnUnsubscribe(new Action0() {
            @Override
            public void call() {
                sourceSubscription.unsubscribe();
            }
        });
    }
  • com.netflix.hystrix.AbstractCommand#toObservable(cold observable):返回一个Observable,当你订阅它的时候,它就会执行hystrix command并且发出响应信息。
代码语言:javascript
复制
    public Observable<R> toObservable() {
        final AbstractCommand<R> _cmd = this;

        //doOnCompleted handler already did all of the SUCCESS work
        //doOnError handler already did all of the FAILURE/TIMEOUT/REJECTION/BAD_REQUEST work
        final Action0 terminateCommandCleanup = new Action0() {

            @Override
            public void call() {
            //CAS修改command执行状态
                if (_cmd.commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.TERMINAL)) {
                    handleCommandEnd(false); //user code never ran
                } else if (_cmd.commandState.compareAndSet(CommandState.USER_CODE_EXECUTED, CommandState.TERMINAL)) {
                    handleCommandEnd(true); //user code did run
                }
            }
        };

        //mark the command as CANCELLED and store the latency (in addition to standard cleanup)
        final Action0 unsubscribeCommandCleanup = new Action0() {
            @Override
            public void call() {
            //CAS修改command执行状态
                if (_cmd.commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.UNSUBSCRIBED)) {
                    if (!_cmd.executionResult.containsTerminalEvent()) {
                        _cmd.eventNotifier.markEvent(HystrixEventType.CANCELLED, _cmd.commandKey);
                        _cmd.executionResultAtTimeOfCancellation = _cmd.executionResult
                                .addEvent((int) (System.currentTimeMillis() - _cmd.commandStartTimestamp), HystrixEventType.CANCELLED);
                    }
                    handleCommandEnd(false); //user code never ran
                } else if (_cmd.commandState.compareAndSet(CommandState.USER_CODE_EXECUTED, CommandState.UNSUBSCRIBED)) {
                    if (!_cmd.executionResult.containsTerminalEvent()) {
                        _cmd.eventNotifier.markEvent(HystrixEventType.CANCELLED, _cmd.commandKey);
                        _cmd.executionResultAtTimeOfCancellation = _cmd.executionResult
                                .addEvent((int) (System.currentTimeMillis() - _cmd.commandStartTimestamp), HystrixEventType.CANCELLED);
                    }
                    handleCommandEnd(true); //user code did run
                }
            }
        };
		
		//核心执行逻辑
        final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
            @Override
            public Observable<R> call() {
                if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
                    return Observable.never();
                }
                //执行command核心部分
                return applyHystrixSemantics(_cmd);
            }
        };

      ------------省略部分代码---------------------

        return Observable.defer(new Func0<Observable<R>>() {
            @Override
            public Observable<R> call() {
                 /* this is a stateful object so can only be used once */
                if (!commandState.compareAndSet(CommandState.NOT_STARTED, CommandState.OBSERVABLE_CHAIN_CREATED)) {
                    IllegalStateException ex = new IllegalStateException("This instance can only be executed once. Please instantiate a new instance.");
                    //TODO make a new error type for this
                    throw new HystrixRuntimeException(FailureType.BAD_REQUEST_EXCEPTION, _cmd.getClass(), getLogMessagePrefix() + " command executed multiple times - this is not permitted.", ex, null);
                }

                commandStartTimestamp = System.currentTimeMillis();

                if (properties.requestLogEnabled().get()) {
                    // log this command execution regardless of what happened
                    if (currentRequestLog != null) {
                        currentRequestLog.addExecutedCommand(_cmd);
                    }
                }
				//请求缓存部分
                final boolean requestCacheEnabled = isRequestCachingEnabled();
                final String cacheKey = getCacheKey();

                /* try from cache first */
                if (requestCacheEnabled) {
                    HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
                    if (fromCache != null) {
                        isResponseFromCache = true;
                        return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
                    }
                }
				//产生源observable的部分,它执行的核心部分是applyHystrixSemantics
                Observable<R> hystrixObservable =
                        Observable.defer(applyHystrixSemantics)
                                .map(wrapWithAllOnNextHooks);

                Observable<R> afterCache;

                // put in cache
                if (requestCacheEnabled && cacheKey != null) {
                    // wrap it for caching
                    //对源Observable进行包装之后进行缓存
                    HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);
                    HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);
                    if (fromCache != null) {
                        // another thread beat us so we'll use the cached value instead
                        //说明有其他线程在此之前执行了上面的操作
                        toCache.unsubscribe();
                        isResponseFromCache = true;
                        return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
                    } else {
                        // we just created an ObservableCommand so we cast and return it
                        //这个返回的Observale是HystrixCachedObservable.from执行后封装过的
                        afterCache = toCache.toObservable();
                    }
                } else {
                    afterCache = hystrixObservable;
                }

                return afterCache
                        .doOnTerminate(terminateCommandCleanup)     // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line))
                        .doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once
                        .doOnCompleted(fireOnCompletedHook);
            }
        });
    }
  1. 缓存部分需要关注的是: HystrixCachedObservable.from(hystrixObservable, _cmd);
代码语言:javascript
复制
public static <R> HystrixCachedObservable<R> from(Observable<R> o, AbstractCommand<R> originalCommand) {
        return new HystrixCommandResponseFromCache<R>(o, originalCommand);
    }

HystrixCommandResponseFromCache(Observable<R> originalObservable, final AbstractCommand<R> originalCommand) {
        super(originalObservable);
        this.originalCommand = originalCommand;
    }
protected HystrixCachedObservable(final Observable<R> originalObservable) {
        ReplaySubject<R> replaySubject = ReplaySubject.create();
        //用replaySubject去订阅源Observalbe
        this.originalSubscription = originalObservable
                .subscribe(replaySubject);

        this.cachedObservable = replaySubject
                .doOnUnsubscribe(new Action0() {
                    @Override
                    public void call() {
                        outstandingSubscriptions--;
                        if (outstandingSubscriptions == 0) {
                            originalSubscription.unsubscribe();
                        }
                    }
                })
                .doOnSubscribe(new Action0() {
                    @Override
                    public void call() {
                        outstandingSubscriptions++;
                    }
                });
    }    
 
 上面的toCache.toObservale()返回的就是,它订阅了源Observable:
   public Observable<R> toObservable() {
         return cachedObservable;
     }       
  1. 需要关注的核心逻辑部分:
代码语言:javascript
复制
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
        // mark that we're starting execution on the ExecutionHook
        // if this hook throws an exception, then a fast-fail occurs with no fallback.  No state is left inconsistent
        executionHook.onStart(_cmd);
		 //	circuitBreaker打开状态后
        /* determine if we're allowed to execute */
        if (circuitBreaker.allowRequest()) {
            final TryableSemaphore executionSemaphore = getExecutionSemaphore();
            final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
            final Action0 singleSemaphoreRelease = new Action0() {
                @Override
                public void call() {
                    if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
                        executionSemaphore.release();
                    }
                }
            };

            final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
                @Override
                public void call(Throwable t) {
                    eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
                }
            };

            if (executionSemaphore.tryAcquire()) {
                try {
                    /* used to track userThreadExecutionTime */
                    executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
                    //执行部分
                    return executeCommandAndObserve(_cmd)
                            .doOnError(markExceptionThrown)
                            .doOnTerminate(singleSemaphoreRelease)
                            .doOnUnsubscribe(singleSemaphoreRelease);
                } catch (RuntimeException e) {
                    return Observable.error(e);
                }
            } else {
                return handleSemaphoreRejectionViaFallback();
            }
        } else {
            return handleShortCircuitViaFallback();
        }
    }

executeCommandAndObserve部分的代码为:

代码语言:javascript
复制
private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {
        if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {//如果是线程池隔离模式
            // mark that we are executing in a thread (even if we end up being rejected we still were a THREAD execution and not SEMAPHORE)
            return Observable.defer(new Func0<Observable<R>>() {
                @Override
                public Observable<R> call() {
                    executionResult = executionResult.setExecutionOccurred();
                    if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
                        return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));
                    }

                    metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD);

                    if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) {
                        // the command timed out in the wrapping thread so we will return immediately
                        // and not increment any of the counters below or other such logic
                        return Observable.error(new RuntimeException("timed out before executing run()"));
                    }
                    if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) {
                        //we have not been unsubscribed, so should proceed
                        HystrixCounters.incrementGlobalConcurrentThreads();
                        //com.netflix.hystrix.HystrixThreadPool.HystrixThreadPoolDefault#markThreadExecution会执行metrics.markThreadExecution();
                        threadPool.markThreadExecution();
                        // store the command that is being run
                        endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
                        executionResult = executionResult.setExecutedInThread();
                        /**
                         * If any of these hooks throw an exception, then it appears as if the actual execution threw an error
                         */
                        try {
                            executionHook.onThreadStart(_cmd);
                            executionHook.onRunStart(_cmd);
                            executionHook.onExecutionStart(_cmd);
                            //在这里会调用com.netflix.hystrix.AbstractCommand#getUserExecutionObservable
                            //然后执行com.netflix.hystrix.HystrixCommand#getExecutionObservable返回的是Observable.just(run())
                            //其中该run()为自定义的command实现的run方法,即业务执行部分的代码
                            //看代码之前需要熟悉下rxjava的lift和just操作符
                            return getUserExecutionObservable(_cmd);
                        } catch (Throwable ex) {
                            return Observable.error(ex);
                        }
                    } else {
                        //command has already been unsubscribed, so return immediately
                        return Observable.error(new RuntimeException("unsubscribed before executing run()"));
                    }
                }
            }).doOnTerminate(new Action0() {
        ------------------省略了部分代码----------------------
        } else {
            return Observable.defer(new Func0<Observable<R>>() {
                @Override
                public Observable<R> call() {
                    executionResult = executionResult.setExecutionOccurred();
                    if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
                        return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));
                    }

                    metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.SEMAPHORE);
                    // semaphore isolated
                    // store the command that is being run
                    endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
                    try {
                        executionHook.onRunStart(_cmd);
                        executionHook.onExecutionStart(_cmd);
                        return getUserExecutionObservable(_cmd);  //the getUserExecutionObservable method already wraps sync exceptions, so this shouldn't throw
                    } catch (Throwable ex) {
                        //If the above hooks throw, then use that as the result of the run method
                        return Observable.error(ex);
                    }
                }
            });
        }
    }
    
    线程池执行部分:
    com.netflix.hystrix.strategy.concurrency.HystrixContextScheduler.ThreadPoolWorker#schedule(rx.functions.Action0):
    其中使用了com.netflix.hystrix.HystrixThreadPool中的executor

3. 熔断器原理分析

代码语言:javascript
复制
 protected BucketedRollingCounterStream(HystrixEventStream<Event> stream, final int numBuckets, int bucketSizeInMs,
                                           final Func2<Bucket, Event, Bucket> appendRawEventToBucket,
                                           final Func2<Output, Bucket, Output> reduceBucket) {
        super(stream, numBuckets, bucketSizeInMs, appendRawEventToBucket);
        Func1<Observable<Bucket>, Observable<Output>> reduceWindowToSummary = new Func1<Observable<Bucket>, Observable<Output>>() {
            @Override
            public Observable<Output> call(Observable<Bucket> window) {
                return window.scan(getEmptyOutputValue(), reduceBucket).skip(numBuckets);
            }
        };
        this.sourceStream = bucketedStream      //stream broken up into buckets
                .window(numBuckets, 1)          //emit overlapping windows of buckets
                .flatMap(reduceWindowToSummary) //convert a window of bucket-summaries into a single summary
                .doOnSubscribe(new Action0() {
                    @Override
                    public void call() {
                        isSourceCurrentlySubscribed.set(true);
                    }
                })
                .doOnUnsubscribe(new Action0() {
                    @Override
                    public void call() {
                        isSourceCurrentlySubscribed.set(false);
                    }
                })
                .share()                        //multiple subscribers should get same data
                .onBackpressureDrop();          //if there are slow consumers, data should not buffer
    }

熔断器部分主要依赖两点,状态变化和滑动窗口+bucket的统计机制

  1. 通过HystrixCommandProperties.circuitBreakerRequestVolumeThreshold())设置临界值
  2. HystrixCommandProperties.circuitBreakerErrorThresholdPercentage():允许错误超过临界值的百分比
  3. Then the circuit-breaker transitions from CLOSED to OPEN.
  4. While it is open, it short-circuits all requests made against that circuit-breaker.
  5. After some amount of time (HystrixCommandProperties.circuitBreakerSleepWindowInMilliseconds()), the next single request is let through (this is the HALF-OPEN state). If the request fails, the circuit-breaker returns to the OPEN state for the duration of the sleep window. If the request succeeds, the circuit-breaker transitions to CLOSED and the logic in 1. takes over again.

基于滑动窗口和桶来实现。滑动窗口相当于一个时间窗,在这个时间窗中会有很多请求进入,如果每进入一个请求就统计一次这个时间窗的请求总数会有较低的性能,所以将这个时间窗口分成 十份,每份是一个桶,时间窗滑动到每个桶结束点时就统计一下这个桶内的请求数,就可以统计出整个窗口的请求数了。bucket(桶)一般是窗口的N分之一。

4. 隔离

看下官方wiki的图:

线程池隔离:

用户线程与隔离线程示例图关系:

详细流程参考:https://design.codelytics.io/hystrix/how-it-works 讲得十分形象。

5. 详细内容参考:

https://github.com/Netflix/Hystrix/wiki/How-it-Works

https://design.codelytics.io/hystrix/how-it-works

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-05-25,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 开发架构二三事 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. 工作流程图
  • 2. 源码执行流程
    • 1. 先构造一个HystrixCommand或者HystrixObservableCommand类型的对象
      • 2. 执行command
      • 3. 熔断器原理分析
      • 4. 隔离
      • 5. 详细内容参考:
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档