如果你想返回一个Observable对象则使用HystrixObservableCommand,这里使用的是HystrixCommand:
DubboHystrixCommand command = new DubboHystrixCommand(invoker, invocation);
return command.execute();
new DubboHystrixCommand初始化执行过程中,会有下面这些操作:
protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool,
HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults,
HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore,
HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook) {
this.commandGroup = initGroupKey(group);
this.commandKey = initCommandKey(key, getClass());
this.properties = initCommandProperties(this.commandKey, propertiesStrategy, commandPropertiesDefaults);
this.threadPoolKey = initThreadPoolKey(threadPoolKey, this.commandGroup, this.properties.executionIsolationThreadPoolKeyOverride().get());
//初始化metrics
this.metrics = initMetrics(metrics, this.commandGroup, this.threadPoolKey, this.commandKey, this.properties);
//初始化熔断器
this.circuitBreaker = initCircuitBreaker(this.properties.circuitBreakerEnabled().get(), circuitBreaker, this.commandGroup, this.commandKey, this.properties, this.metrics);
//初始化线程池
this.threadPool = initThreadPool(threadPool, this.threadPoolKey, threadPoolPropertiesDefaults);
//Strategies from plugins
this.eventNotifier = HystrixPlugins.getInstance().getEventNotifier();
this.concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
HystrixMetricsPublisherFactory.createOrRetrievePublisherForCommand(this.commandKey, this.commandGroup, this.metrics, this.circuitBreaker, this.properties);
this.executionHook = initExecutionHook(executionHook);
this.requestCache = HystrixRequestCache.getInstance(this.commandKey, this.concurrencyStrategy);
this.currentRequestLog = initRequestLog(this.properties.requestLogEnabled().get(), this.concurrencyStrategy);
/* fallback semaphore override if applicable */
this.fallbackSemaphoreOverride = fallbackSemaphore;
/* execution semaphore override if applicable */
this.executionSemaphoreOverride = executionSemaphore;
}
初始化metrics部分:
com.netflix.hystrix.HystrixCommandMetrics#getInstance(com.netflix.hystrix.HystrixCommandKey, com.netflix.hystrix.HystrixCommandGroupKey, com.netflix.hystrix.HystrixThreadPoolKey, com.netflix.hystrix.HystrixCommandProperties):
public static HystrixCommandMetrics getInstance(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, HystrixThreadPoolKey threadPoolKey, HystrixCommandProperties properties) {
// attempt to retrieve from cache first
//每个command key维护唯一一个实例
HystrixCommandMetrics commandMetrics = metrics.get(key.name());
if (commandMetrics != null) {
return commandMetrics;
} else {
synchronized (HystrixCommandMetrics.class) {
HystrixCommandMetrics existingMetrics = metrics.get(key.name());
if (existingMetrics != null) {
return existingMetrics;
} else {
HystrixThreadPoolKey nonNullThreadPoolKey;
if (threadPoolKey == null) {
nonNullThreadPoolKey = HystrixThreadPoolKey.Factory.asKey(commandGroup.name());
} else {
nonNullThreadPoolKey = threadPoolKey;
}
HystrixCommandMetrics newCommandMetrics = new HystrixCommandMetrics(key, commandGroup, nonNullThreadPoolKey, properties, HystrixPlugins.getInstance().getEventNotifier());
metrics.putIfAbsent(key.name(), newCommandMetrics);
return newCommandMetrics;
}
}
}
}
构造方法:
/* package */HystrixCommandMetrics(final HystrixCommandKey key, HystrixCommandGroupKey commandGroup, HystrixThreadPoolKey threadPoolKey, HystrixCommandProperties properties, HystrixEventNotifier eventNotifier) {
super(null);
this.key = key;
this.group = commandGroup;
this.threadPoolKey = threadPoolKey;
this.properties = properties;
//每个command key维护唯一一个实例
healthCountsStream = HealthCountsStream.getInstance(key, properties);
//每个command key维护唯一一个实例
rollingCommandEventCounterStream = RollingCommandEventCounterStream.getInstance(key, properties);
//每个command key维护唯一一个实例
cumulativeCommandEventCounterStream = CumulativeCommandEventCounterStream.getInstance(key, properties);
//每个command key维护唯一一个实例
rollingCommandLatencyDistributionStream = RollingCommandLatencyDistributionStream.getInstance(key, properties);
//每个command key维护唯一一个实例
rollingCommandUserLatencyDistributionStream = RollingCommandUserLatencyDistributionStream.getInstance(key, properties);
//每个command key维护唯一一个实例
rollingCommandMaxConcurrencyStream = RollingCommandMaxConcurrencyStream.getInstance(key, properties);
}
在这里关注下:com.netflix.hystrix.HystrixCommandMetrics#getHealthCounts
com.netflix.hystrix.HystrixCircuitBreaker.HystrixCircuitBreakerImpl中维护着HystrixCommandMetrics中统计的信息:
com.netflix.hystrix.HystrixCircuitBreaker.HystrixCircuitBreakerImpl#isOpen的代码如下:
@Override
public boolean isOpen() {
if (circuitOpen.get()) {
// if we're open we immediately return true and don't bother attempting to 'close' ourself as that is left to allowSingleTest and a subsequent successful test to close
return true;
}
// we're closed, so let's see if errors have made us so we should trip the circuit open
HealthCounts health = metrics.getHealthCounts();
// check if we are past the statisticalWindowVolumeThreshold
if (health.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
// we are not past the minimum volume threshold for the statisticalWindow so we'll return false immediately and not calculate anything
return false;
}
if (health.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
return false;
} else {
// our failure rate is too high, trip the circuit
if (circuitOpen.compareAndSet(false, true)) {
// if the previousValue was false then we want to set the currentTime
circuitOpenedOrLastTestedTime.set(System.currentTimeMillis());
return true;
} else {
// How could previousValue be true? If another thread was going through this code at the same time a race-condition could have
// caused another thread to set it to true already even though we were in the process of doing the same
// In this case, we know the circuit is open, so let the other thread set the currentTime and report back that the circuit is open
return true;
}
}
}
com.netflix.hystrix.HystrixCommandMetrics#getHealthCounts方法:
public HealthCounts getHealthCounts() {
return healthCountsStream.getLatest();
}
com.netflix.hystrix.metric.consumer.BucketedCounterStream#getLatest:
public Output getLatest() {
startCachingStreamValuesIfUnstarted();
if (counterSubject.hasValue()) {
return counterSubject.getValue();
} else {
return getEmptyOutputValue();
}
}
com.netflix.hystrix.metric.consumer.BucketedCounterStream#startCachingStreamValuesIfUnstarted:
public void startCachingStreamValuesIfUnstarted() {
if (subscription.get() == null) {
//the stream is not yet started
//这里的observe返回的是sourceStream 见com.netflix.hystrix.metric.consumer.BucketedRollingCounterStream#observe
Subscription candidateSubscription = observe().subscribe(counterSubject);
if (subscription.compareAndSet(null, candidateSubscription)) {
//won the race to set the subscription
} else {
//lost the race to set the subscription, so we need to cancel this one
candidateSubscription.unsubscribe();
}
}
}
com.netflix.hystrix.metric.consumer.BucketedRollingCounterStream#observe:
@Override
public Observable<Output> observe() {
return sourceStream;
}
/**
* Used for synchronous execution of command.
*
* @return R
* Result of {@link #run()} execution or a fallback from {@link #getFallback()} if the command fails for any reason.
* @throws HystrixRuntimeException
* if a failure occurs and a fallback cannot be retrieved
* @throws HystrixBadRequestException
* if invalid arguments or state were used representing a user failure, not a system failure
* @throws IllegalStateException
* if invoked more than once
*/
public R execute() {
try {
return queue().get();
} catch (Exception e) {
throw decomposeException(e);
}
}
public Future<R> queue() {
/*
* The Future returned by Observable.toBlocking().toFuture() does not implement the
* interruption of the execution thread when the "mayInterrupt" flag of Future.cancel(boolean) is set to true;
* thus, to comply with the contract of Future, we must wrap around it.
*/
final Future<R> delegate = toObservable().toBlocking().toFuture();
final Future<R> f = new Future<R>() {
--------------省略了部分---------------------
}
return f;
}
public Observable<R> observe() {
// us a ReplaySubject to buffer the eagerly subscribed-to Observable
ReplaySubject<R> subject = ReplaySubject.create();
// eagerly kick off subscription
final Subscription sourceSubscription = toObservable().subscribe(subject);
// return the subject that can be subscribed to later while the execution has already started
return subject.doOnUnsubscribe(new Action0() {
@Override
public void call() {
sourceSubscription.unsubscribe();
}
});
}
public Observable<R> toObservable() {
final AbstractCommand<R> _cmd = this;
//doOnCompleted handler already did all of the SUCCESS work
//doOnError handler already did all of the FAILURE/TIMEOUT/REJECTION/BAD_REQUEST work
final Action0 terminateCommandCleanup = new Action0() {
@Override
public void call() {
//CAS修改command执行状态
if (_cmd.commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.TERMINAL)) {
handleCommandEnd(false); //user code never ran
} else if (_cmd.commandState.compareAndSet(CommandState.USER_CODE_EXECUTED, CommandState.TERMINAL)) {
handleCommandEnd(true); //user code did run
}
}
};
//mark the command as CANCELLED and store the latency (in addition to standard cleanup)
final Action0 unsubscribeCommandCleanup = new Action0() {
@Override
public void call() {
//CAS修改command执行状态
if (_cmd.commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.UNSUBSCRIBED)) {
if (!_cmd.executionResult.containsTerminalEvent()) {
_cmd.eventNotifier.markEvent(HystrixEventType.CANCELLED, _cmd.commandKey);
_cmd.executionResultAtTimeOfCancellation = _cmd.executionResult
.addEvent((int) (System.currentTimeMillis() - _cmd.commandStartTimestamp), HystrixEventType.CANCELLED);
}
handleCommandEnd(false); //user code never ran
} else if (_cmd.commandState.compareAndSet(CommandState.USER_CODE_EXECUTED, CommandState.UNSUBSCRIBED)) {
if (!_cmd.executionResult.containsTerminalEvent()) {
_cmd.eventNotifier.markEvent(HystrixEventType.CANCELLED, _cmd.commandKey);
_cmd.executionResultAtTimeOfCancellation = _cmd.executionResult
.addEvent((int) (System.currentTimeMillis() - _cmd.commandStartTimestamp), HystrixEventType.CANCELLED);
}
handleCommandEnd(true); //user code did run
}
}
};
//核心执行逻辑
final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
return Observable.never();
}
//执行command核心部分
return applyHystrixSemantics(_cmd);
}
};
------------省略部分代码---------------------
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
/* this is a stateful object so can only be used once */
if (!commandState.compareAndSet(CommandState.NOT_STARTED, CommandState.OBSERVABLE_CHAIN_CREATED)) {
IllegalStateException ex = new IllegalStateException("This instance can only be executed once. Please instantiate a new instance.");
//TODO make a new error type for this
throw new HystrixRuntimeException(FailureType.BAD_REQUEST_EXCEPTION, _cmd.getClass(), getLogMessagePrefix() + " command executed multiple times - this is not permitted.", ex, null);
}
commandStartTimestamp = System.currentTimeMillis();
if (properties.requestLogEnabled().get()) {
// log this command execution regardless of what happened
if (currentRequestLog != null) {
currentRequestLog.addExecutedCommand(_cmd);
}
}
//请求缓存部分
final boolean requestCacheEnabled = isRequestCachingEnabled();
final String cacheKey = getCacheKey();
/* try from cache first */
if (requestCacheEnabled) {
HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
if (fromCache != null) {
isResponseFromCache = true;
return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
}
}
//产生源observable的部分,它执行的核心部分是applyHystrixSemantics
Observable<R> hystrixObservable =
Observable.defer(applyHystrixSemantics)
.map(wrapWithAllOnNextHooks);
Observable<R> afterCache;
// put in cache
if (requestCacheEnabled && cacheKey != null) {
// wrap it for caching
//对源Observable进行包装之后进行缓存
HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);
HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);
if (fromCache != null) {
// another thread beat us so we'll use the cached value instead
//说明有其他线程在此之前执行了上面的操作
toCache.unsubscribe();
isResponseFromCache = true;
return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
} else {
// we just created an ObservableCommand so we cast and return it
//这个返回的Observale是HystrixCachedObservable.from执行后封装过的
afterCache = toCache.toObservable();
}
} else {
afterCache = hystrixObservable;
}
return afterCache
.doOnTerminate(terminateCommandCleanup) // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line))
.doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once
.doOnCompleted(fireOnCompletedHook);
}
});
}
public static <R> HystrixCachedObservable<R> from(Observable<R> o, AbstractCommand<R> originalCommand) {
return new HystrixCommandResponseFromCache<R>(o, originalCommand);
}
HystrixCommandResponseFromCache(Observable<R> originalObservable, final AbstractCommand<R> originalCommand) {
super(originalObservable);
this.originalCommand = originalCommand;
}
protected HystrixCachedObservable(final Observable<R> originalObservable) {
ReplaySubject<R> replaySubject = ReplaySubject.create();
//用replaySubject去订阅源Observalbe
this.originalSubscription = originalObservable
.subscribe(replaySubject);
this.cachedObservable = replaySubject
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
outstandingSubscriptions--;
if (outstandingSubscriptions == 0) {
originalSubscription.unsubscribe();
}
}
})
.doOnSubscribe(new Action0() {
@Override
public void call() {
outstandingSubscriptions++;
}
});
}
上面的toCache.toObservale()返回的就是,它订阅了源Observable:
public Observable<R> toObservable() {
return cachedObservable;
}
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
// mark that we're starting execution on the ExecutionHook
// if this hook throws an exception, then a fast-fail occurs with no fallback. No state is left inconsistent
executionHook.onStart(_cmd);
// circuitBreaker打开状态后
/* determine if we're allowed to execute */
if (circuitBreaker.allowRequest()) {
final TryableSemaphore executionSemaphore = getExecutionSemaphore();
final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
final Action0 singleSemaphoreRelease = new Action0() {
@Override
public void call() {
if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
executionSemaphore.release();
}
}
};
final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
@Override
public void call(Throwable t) {
eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
}
};
if (executionSemaphore.tryAcquire()) {
try {
/* used to track userThreadExecutionTime */
executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
//执行部分
return executeCommandAndObserve(_cmd)
.doOnError(markExceptionThrown)
.doOnTerminate(singleSemaphoreRelease)
.doOnUnsubscribe(singleSemaphoreRelease);
} catch (RuntimeException e) {
return Observable.error(e);
}
} else {
return handleSemaphoreRejectionViaFallback();
}
} else {
return handleShortCircuitViaFallback();
}
}
executeCommandAndObserve部分的代码为:
private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {
if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {//如果是线程池隔离模式
// mark that we are executing in a thread (even if we end up being rejected we still were a THREAD execution and not SEMAPHORE)
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
executionResult = executionResult.setExecutionOccurred();
if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));
}
metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD);
if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) {
// the command timed out in the wrapping thread so we will return immediately
// and not increment any of the counters below or other such logic
return Observable.error(new RuntimeException("timed out before executing run()"));
}
if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) {
//we have not been unsubscribed, so should proceed
HystrixCounters.incrementGlobalConcurrentThreads();
//com.netflix.hystrix.HystrixThreadPool.HystrixThreadPoolDefault#markThreadExecution会执行metrics.markThreadExecution();
threadPool.markThreadExecution();
// store the command that is being run
endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
executionResult = executionResult.setExecutedInThread();
/**
* If any of these hooks throw an exception, then it appears as if the actual execution threw an error
*/
try {
executionHook.onThreadStart(_cmd);
executionHook.onRunStart(_cmd);
executionHook.onExecutionStart(_cmd);
//在这里会调用com.netflix.hystrix.AbstractCommand#getUserExecutionObservable
//然后执行com.netflix.hystrix.HystrixCommand#getExecutionObservable返回的是Observable.just(run())
//其中该run()为自定义的command实现的run方法,即业务执行部分的代码
//看代码之前需要熟悉下rxjava的lift和just操作符
return getUserExecutionObservable(_cmd);
} catch (Throwable ex) {
return Observable.error(ex);
}
} else {
//command has already been unsubscribed, so return immediately
return Observable.error(new RuntimeException("unsubscribed before executing run()"));
}
}
}).doOnTerminate(new Action0() {
------------------省略了部分代码----------------------
} else {
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
executionResult = executionResult.setExecutionOccurred();
if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));
}
metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.SEMAPHORE);
// semaphore isolated
// store the command that is being run
endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
try {
executionHook.onRunStart(_cmd);
executionHook.onExecutionStart(_cmd);
return getUserExecutionObservable(_cmd); //the getUserExecutionObservable method already wraps sync exceptions, so this shouldn't throw
} catch (Throwable ex) {
//If the above hooks throw, then use that as the result of the run method
return Observable.error(ex);
}
}
});
}
}
线程池执行部分:
com.netflix.hystrix.strategy.concurrency.HystrixContextScheduler.ThreadPoolWorker#schedule(rx.functions.Action0):
其中使用了com.netflix.hystrix.HystrixThreadPool中的executor
protected BucketedRollingCounterStream(HystrixEventStream<Event> stream, final int numBuckets, int bucketSizeInMs,
final Func2<Bucket, Event, Bucket> appendRawEventToBucket,
final Func2<Output, Bucket, Output> reduceBucket) {
super(stream, numBuckets, bucketSizeInMs, appendRawEventToBucket);
Func1<Observable<Bucket>, Observable<Output>> reduceWindowToSummary = new Func1<Observable<Bucket>, Observable<Output>>() {
@Override
public Observable<Output> call(Observable<Bucket> window) {
return window.scan(getEmptyOutputValue(), reduceBucket).skip(numBuckets);
}
};
this.sourceStream = bucketedStream //stream broken up into buckets
.window(numBuckets, 1) //emit overlapping windows of buckets
.flatMap(reduceWindowToSummary) //convert a window of bucket-summaries into a single summary
.doOnSubscribe(new Action0() {
@Override
public void call() {
isSourceCurrentlySubscribed.set(true);
}
})
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
isSourceCurrentlySubscribed.set(false);
}
})
.share() //multiple subscribers should get same data
.onBackpressureDrop(); //if there are slow consumers, data should not buffer
}
熔断器部分主要依赖两点,状态变化和滑动窗口+bucket的统计机制
基于滑动窗口和桶来实现。滑动窗口相当于一个时间窗,在这个时间窗中会有很多请求进入,如果每进入一个请求就统计一次这个时间窗的请求总数会有较低的性能,所以将这个时间窗口分成 十份,每份是一个桶,时间窗滑动到每个桶结束点时就统计一下这个桶内的请求数,就可以统计出整个窗口的请求数了。bucket(桶)一般是窗口的N分之一。
看下官方wiki的图:
线程池隔离:
用户线程与隔离线程示例图关系:
详细流程参考:https://design.codelytics.io/hystrix/how-it-works 讲得十分形象。
https://github.com/Netflix/Hystrix/wiki/How-it-Works
https://design.codelytics.io/hystrix/how-it-works