一个HystrixCommand
或HystrixObservableCommand
对象,代表对某个依赖服务发起的一次请求或者调用
构造的时候,可在构造器中传入任何需要的参数。
直接继承HystrixCommand并实现run方法即可。
public class GetUserAccountCommand extends HystrixCommand<UserAccount> {
...
@Override
protected UserAccount run() {
/* 模拟执行网络调用以检索用户信息 */
try {
Thread.sleep((int) (Math.random() * 10) + 2);
} catch (InterruptedException e) {
}
/* 5%的时间失败来说明fallback的工作原理 */
if (Math.random() > 0.95) {
throw new RuntimeException("random failure processing UserAccount network response");
}
/* 延迟会增加5%的时间,因此有时会触发超时 */
if (Math.random() > 0.95) {
// 随机等待时间尖峰
try {
Thread.sleep((int) (Math.random() * 300) + 25);
} catch (InterruptedException e) {
}
}
/* 成功...使用远程服务响应的数据创建UserAccount */
return new UserAccount(86975, "John James", 2, true, false, true);
}
...
}
执行Command就可以发起一次对依赖服务的调用
要执行Command,需要在4个方法中选择其中的一个
/**
* 用于同步执行 command.
*
* @return R
* 如果command由于任何原因失败,则执行 #run 或从 #getFallback() fallback的结果.
*
* @throws HystrixRuntimeException
* 如果发生故障并且无法检索fallback
* @throws HystrixBadRequestException
* 如果使用了无效的参数或状态来表示用户故障,而不是系统故障
*
* @throws IllegalStateException
* 如果多次调用
*/
public R execute() {
try {
return queue().get();
} catch (Exception e) {
throw Exceptions.sneakyThrow(decomposeException(e));
}
}
调用后直接阻塞,直到依赖服务返回单条结果,或抛异常
/**
* 用于异步执行命令 command.
*
* 这将使该command在线程池上排队,并在完成后返回一个 Future 以获取结果.
* 注意:如果配置为不在单独的线程中运行,则其效果与 #execute() 相同,并会阻塞.
* 不会抛出异常,而只是切换为同步执行,因此无需更改代码即可 将command从运行在单独的线程切换到调用线程.
* (switch a command from running on a separate thread to the calling thread.)
*
* @return {@code Future <R>}执行 #run() 的结果,或者如果command由于任何原因失败,则返回 #getFallback() 的结果.
* @throws HystrixRuntimeException
* 如果不存在fallback
* 如果通过 ExecutionException#getCause() 中的{@code Future.get(), 如果不存在失败发生的话
* 或者如果无法将命令排队(如,短路,线程池/信号被拒绝),则立即返回
* @throws HystrixBadRequestException
* 通过 ExecutionException#getCause() 中的 Future.get() 如果使用了无效的参数或状态来表示用户故障而不是系统故障
* @throws IllegalStateException
* 如果多次调用
*/
public Future<R> queue() {
/*
* 当Future.cancel(boolean)的“ mayInterrupt”标志设为true时
* 由Observable.toBlocking().toFuture() 返回的Future不实现执行线程的中断
* 因此,为了遵守Future的约定,我们必须围绕它.
*/
final Future<R> delegate = toObservable().toBlocking().toFuture();
final Future<R> f = new Future<R>() {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
if (delegate.isCancelled()) {
return false;
}
if (HystrixCommand.this.getProperties().executionIsolationThreadInterruptOnFutureCancel().get()) {
/*
* The most consistent way to deal with this scenario is to say that if *any* cancellation is invoked with interruption,
* than that interruption request cannot be taken back.
* 这里唯一有效的转换是false -> true.
* 如果存在由该命令创建(这很奇怪,但从未禁止过)的两个futures,例如f1和f2,
* 并且对f1.cancel(true)和f2.cancel(false)的调用是由不同的线程发起,
* 尚不清楚在检查mayInterruptOnCancel时将使用什么值.
* 处理这种情况的最一致的方法是说,如果在中断的情况下调用了任何cancellation,则无法撤回该中断请求.
*/
interruptOnFutureCancel.compareAndSet(false, mayInterruptIfRunning);
}
final boolean res = delegate.cancel(interruptOnFutureCancel.get());
if (!isExecutionComplete() && interruptOnFutureCancel.get()) {
final Thread t = executionThread.get();
if (t != null && !t.equals(Thread.currentThread())) {
t.interrupt();
}
}
return res;
}
@Override
public boolean isCancelled() {
return delegate.isCancelled();
}
@Override
public boolean isDone() {
return delegate.isDone();
}
@Override
public R get() throws InterruptedException, ExecutionException {
return delegate.get();
}
@Override
public R get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return delegate.get(timeout, unit);
}
};
/* 对立即抛出的错误状态的特殊处理 */
if (f.isDone()) {
try {
f.get();
return f;
} catch (Exception e) {
Throwable t = decomposeException(e);
if (t instanceof HystrixBadRequestException) {
return f;
} else if (t instanceof HystrixRuntimeException) {
HystrixRuntimeException hre = (HystrixRuntimeException) t;
switch (hre.getFailureType()) {
case COMMAND_EXCEPTION:
case TIMEOUT:
// 不会仅从 queue().get() 中将这些类型从 queue() 中抛出, 因为它们是执行错误
return f;
default:
// these are errors we throw from queue() as they as rejection type errors
// 这些是从 queue() 抛出的错误,因为它们是拒绝类型错误
throw hre;
}
} else {
throw Exceptions.sneakyThrow(t);
}
}
}
return f;
}
调用,返回一个Future,后面可以通过Future获取单条结果
订阅一个Observable对象,Observable代表的是依赖服务返回的结果,获取到一个那个代表结果的Observable对象的拷贝对象
其中execute()和queue()仅对HystrixCommand适用
K value = command.execute();
Future<K> fValue = command.queue();
Observable<K> ohValue = command.observe();
Observable<K> ocValue = command.toObservable();
即,无论是哪种执行command的方式,最终都是依赖toObservable()
也就是说同步的HystrixCommand最终都会依赖Observable,尽管HystrixCommand是用来发射单个事件的
如果这个command开启了请求缓存(request cache),而且这个调用的结果在缓存中存在,那么直接从缓存中返回结果 否则,继续往后
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
/* 这是一个有状态的对象,因此只能使用一次 */
if (!commandState.compareAndSet(CommandState.NOT_STARTED, CommandState.OBSERVABLE_CHAIN_CREATED)) {
IllegalStateException ex = new IllegalStateException("This instance can only be executed once. Please instantiate a new instance.");
// TODO 为此创建新的错误类型
throw new HystrixRuntimeException(FailureType.BAD_REQUEST_EXCEPTION, _cmd.getClass(), getLogMessagePrefix() + " command executed multiple times - this is not permitted.", ex, null);
}
commandStartTimestamp = System.currentTimeMillis();
if (properties.requestLogEnabled().get()) {
// 记录此命令执行,无论发生什么情况
if (currentRequestLog != null) {
currentRequestLog.addExecutedCommand(_cmd);
}
}
final boolean requestCacheEnabled = isRequestCachingEnabled();
final String cacheKey = getCacheKey();
/* 首先尝试从缓存 */
if (requestCacheEnabled) {
HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
if (fromCache != null) {
isResponseFromCache = true;
return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
}
}
Observable<R> hystrixObservable =
Observable.defer(applyHystrixSemantics)
.map(wrapWithAllOnNextHooks);
Observable<R> afterCache;
// 放入缓存
if (requestCacheEnabled && cacheKey != null) {
// 包装以缓存
HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);
HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);
if (fromCache != null) {
// 另一个线程击败了我们,因此使用缓存值
toCache.unsubscribe();
isResponseFromCache = true;
return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
} else {
// 我们刚刚创建了一个ObservableCommand,所以我们进行了强制转换并返回了它
afterCache = toCache.toObservable();
}
} else {
afterCache = hystrixObservable;
}
return afterCache
.doOnTerminate(terminateCommandCleanup) // 进行一次清理(在正常终端状态(此行)或退订(下一行))
.doOnUnsubscribe(unsubscribeCommandCleanup) // 进行一次清理
.doOnCompleted(fireOnCompletedHook);
}
});
上述代码中有个判断final boolean requestCacheEnabled = isRequestCachingEnabled();可以看到如果我们的Command实现了getCacheKey方法,并且requestCacheEnabled(这个属性默认是true,可以通过调用HystrixCommand的构造方法传入一个setter对象修改默认属性)这样就不会执行后续的run方法,就会直接返回一个缓存的Observable。(必须是同一个request context里面的两个command才能用到缓存)
检查这个command对应的依赖服务是否开启短路器
如果断路器被打开了,那么hystrix就不会执行这个command,而是直接执行fallback降级
如果command对应的线程池/队列/semaphore已满,那么也不会执行command,而是直接去调用fallback降级机制,同时发送 reject 信息给断路器统计
调用HystrixObservableCommand.construct(
)或HystrixCommand.run()
来实际执行这个command
如果HystrixCommand.run()或HystrixObservableCommand.construct()的执行,超过了timeout时长的话,那么command所在的线程就会抛出一个TimeoutException 如果timeout了,也会去执行fallback降级机制,而且就不会管run()或construct()返回的值
我们是不可能终止掉一个调用严重延迟的依赖服务的线程的,只能说给你抛出来一个TimeoutException,但是还是可能会因为严重延迟的调用线程占满整个线程池的 即使这个时候新来的流量都被限流了。。。
如果没有timeout的话,那么就会拿到一些调用依赖服务获取到的结果,然后hystrix会做一些logging记录和metric统计
Hystrix会将每一个依赖服务的调用成功,失败,拒绝,超时,等事件,都会发送给circuit breaker断路器
短路器就会对调用成功/失败/拒绝/超时等事件的次数进行统计
短路器会根据这些统计次数来决定,是否要进行短路,如果打开了短路器,那么在一段时间内就会直接短路,然后如果在之后第一次检查发现调用成功了,就关闭断路器
在 run 方法中直接抛异常快速失败。
run 里有个降级方法,内部返回一个空
return null ;
return new Option<T>();
return Collections . emptyList();
return Collections . emptyMap( );
return true;
return DEFAULT_OBJECT;
主次降级。将新功能封装在老功能后面。
将请求通过时间窗口都合并在一个队列中。
在以下几种情况中,hystrix会调用fallback降级机制
一般在降级机制中,都建议给出一些默认的返回值,比如静态的一些代码逻辑,或者从内存中的缓存中提取一些数据,尽量在这里不要再进行网络请求了 即使在降级中,一定要进行网络调用,也应该将那个调用放在一个HystrixCommand中,进行隔离
如果fallback返回了结果,那么hystrix就会返回这个结果
如果没有实现fallback,或者是fallback抛出了异常,Hystrix会返回一个Observable,但是不会返回任何数据
不同的command执行方式,其fallback为空或者异常时的返回结果不同