在回顾一下hystrix
的执行流程:
执行 Hystrix 命令需要集成 HystrixCommand
, 有四种调用方式:
toObservable
: 返回 Observable 对象observe
: 在调用 toObservable
的基础上, 向 Observable
上注册 rx.subjects.ReplaySubject
(这些都是 rxJava 的概念)queue
: 在调用 toObservable
的基础上:Observable.toBlocking
, 将 Observable 转换成阻塞的 rx.observables.BlockingObservable
BlockingObservable.toFuture
, 返回 run 方法的执行结果的 Future 对象execute
: 调用 queue
的基础上,调用 Future.get
,同步返回 run 的执行结果Hystrix 底层使用了大量的 RxJava, 这里就不把源代码贴出来了, 包括上面的执行方式也可以看出来 Hystrix 是依赖于 RxJava 的 Observable 实现的。
结合执行流程图再次全局的分析一下
HystrixCommand.run(),HystrixObservableCommand.construct()
Hystrix 处理失败的逻辑:
如果我们在 Command 中实现了 HystrixCommand.getFallback()
(或 HystrixObservableCommand.resumeWithFallback()
,Hystrix会返回对应方法的结果。如果没有实现这些方法的话,从底层看Hystrix将会返回一个空的Observable对象,并且可以通过onError来终止并处理错误。 如果从应用层来看:
1). execute()
方法将会抛出异常
2). queue()
将会返回一个失败状态的 Future
对象
3). observe()
和 toObservable()
都会返回上述的空 Observable
对象
执行命令入口和获取缓存的逻辑都需要结合 RxJava 来看源码, 这里就只挑断路器的部分来分析一下(基于 1.4.x 版本来分析, 1.5.x 都转换成了基于订阅的流式操作)
Hystrix 中的 Circuit Breaker 的实现比较清晰,整个 HystrixCircuitBreaker
接口一共有三个方法和三个静态类:
Factory 是用来获取 HystrixCircuitBreaker
的静态工厂, 实现如下:
public static class Factory {
// key 是 HystrixCommandKey 的 hashcode
private static ConcurrentHashMap<String, HystrixCircuitBreaker> circuitBreakersByCommand = new ConcurrentHashMap<String, HystrixCircuitBreaker>();
public static HystrixCircuitBreaker getInstance(HystrixCommandKey key, HystrixCommandGroupKey group, HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
// 先从缓存中获取
HystrixCircuitBreaker previouslyCached = circuitBreakersByCommand.get(key.name());
if (previouslyCached != null) {
return previouslyCached;
}
// 如果缓存中为空, 就初始化并放入缓存
HystrixCircuitBreaker cbForCommand = circuitBreakersByCommand.putIfAbsent(key.name(), new HystrixCircuitBreakerImpl(key, group, properties, metrics));
if (cbForCommand == null) {
// this means the putIfAbsent step just created a new one so let's retrieve and return it
return circuitBreakersByCommand.get(key.name());
} else {
// this means a race occurred and while attempting to 'put' another one got there before
// and we instead retrieved it and will now return it
return cbForCommand;
}
}
public static HystrixCircuitBreaker getInstance(HystrixCommandKey key) {
return circuitBreakersByCommand.get(key.name());
}
/* package */static void reset() {
circuitBreakersByCommand.clear();
}
}
这里的代码很简单。Factory 中维护了一个 ConcurrentHashMap
用于存储与每一个 HystrixCommandKey
相对应的 HystrixCircuitBreaker
。每当我们通过 getInstance
从中获取 HystrixCircuitBreaker
的时候,首先会检查ConcurrentHashMap中有没有对应的缓存的断路器,如果有的话直接返回。如果没有的话就会新创建一个 HystrixCircuitBreaker
实例,将其添加到缓存中并且返回。
HystrixCircuitBreakerImpl
静态类是 HystrixCircuitBreaker
接口的实现。
HystrixCircuitBreakerImpl
中有四个成员变量:properties
是对应 HystrixCommand 的属性类metrics
是对应 HystrixCommand 的度量数据类circuitOpen
代表断路器的状态(默认是false代表关闭,这里没有特意实现Half-Open这个状态),考虑并发使用了 AtomicBoolean
修饰circuitOpenedOrLastTestedTime
记录着断路恢复计时器的初始时间,用于Open状态向Close状态的转换。使用了一个 AtomicLong
类型的变量allowRequest()
表示是否允许指令执行isOpen()
表示断路器是否为开启状态markSuccess()
用于将断路器关闭public boolean allowRequest() {
if (properties.circuitBreakerForceOpen().get()) {
// 如果断路器强制开启,直接返回 false
return false;
}
if (properties.circuitBreakerForceClosed().get()) {
// 就算是强制关闭了, 也要调用 isOpen 来记录一些统计数据
isOpen();
// 因为是强制关闭, 不用在意 isOpen 的执行结果
return true;
}
// 判断断路器是否关闭或断路器恢复计时器是否到达时间
return !isOpen() || allowSingleTest();
}
@Override
public boolean isOpen() {
if (circuitOpen.get()) {
// 如果是 open 状态, 直接返回 true
return true;
}
// 如果是 cloes 状态, 就需要获取 healthCounts 来判断错误率是否达到需要打开断路器
HealthCounts health = metrics.getHealthCounts();
// check if we are past the statisticalWindowVolumeThreshold
if (health.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
// 请求数如果小于判断阈值则返回 false
return false;
}
if (health.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
// 判断错误比例是否小于设定的值, 默认是 50%
return false;
} else {
// 错误比例太高, cas 开启断路器
if (circuitOpen.compareAndSet(false, true)) {
// 将当前系统时间设置为短路定时器初始时间
circuitOpenedOrLastTestedTime.set(System.currentTimeMillis());
return true;
} else {
// 走到这里代表 cas 失败, 就是有其他的线程开启了断路器, 也返回 true
return true;
}
}
}
circuitOpen.get()
获取断路器的状态,如果是开启状态(true)则返回true。Metrics
数据中获取 HealthCounts
对象,然后检查对应的请求总数(totalCount)是否小于属性中的请求容量阈值(circuitBreakerRequestVolumeThreshold
),如果是的话表示断路器可以保持关闭状态,返回false。public boolean allowSingleTest() {
// 获取断路器定时器的初始时间
long timeCircuitOpenedOrWasLastTested = circuitOpenedOrLastTestedTime.get();
// 1) 断路器状态是 open
// 2) 当前时间与定时器初始时间的差大于 circuitBreakerSleepWindowInMilliseconds(睡眠窗口时间,默认 5s)
if (circuitOpen.get() && System.currentTimeMillis() > timeCircuitOpenedOrWasLastTested + properties.circuitBreakerSleepWindowInMilliseconds().get()) {
// cas 将断路定时器初始时间设置为当前时间
if (circuitOpenedOrLastTestedTime.compareAndSet(timeCircuitOpenedOrWasLastTested, System.currentTimeMillis())) {
// 如果设置成功则返回 true
return true;
}
}
// 返回 false 代表有其他线程放行了请求
return false;
}
这里的逻辑也很清晰, 当过了一个窗口时间后就试着放行一个请求