本文主要基于 Hystrix 1.5.X 版本
本文主要分享 断路器 HystrixCircuitBreaker。
HystrixCircuitBreaker 有三种状态 :
CLOSED
:关闭OPEN
:打开HALF_OPEN
:半开其中,断路器处于 OPEN
状态时,链路处于非健康状态,命令执行时,直接调用回退逻辑,跳过正常逻辑。
HystrixCircuitBreaker 状态变迁如下图 :
CLOSED
状态,链路处于健康状态。当满足如下条件,断路器从 CLOSED
变成 OPEN
状态:
HystrixCommandProperties.default_metricsRollingStatisticalWindow=10000ms
)内,总请求数超过一定量( 可配, HystrixCommandProperties.circuitBreakerRequestVolumeThreshold=20
) 。HystrixCommandProperties.circuitBreakerErrorThresholdPercentage=50%
) 。OPEN
状态,命令执行时,若当前时间超过断路器开启时间一定时间( HystrixCommandProperties.circuitBreakerSleepWindowInMilliseconds=5000ms
),断路器变成 HALF_OPEN
状态,尝试调用正常逻辑,根据执行是否成功,打开或关闭熔断器【蓝线】。
com.netflix.hystrix.HystrixCircuitBreaker
,Hystrix 断路器接口。定义接口如下代码 :
public interface HystrixCircuitBreaker {
/**
* Every {@link HystrixCommand} requests asks this if it is allowed to proceed or not. It is idempotent and does
* not modify any internal state, and takes into account the half-open logic which allows some requests through
* after the circuit has been opened
*
* @return boolean whether a request should be permitted
*/
boolean allowRequest();
/**
* Whether the circuit is currently open (tripped).
*
* @return boolean state of circuit breaker
*/
boolean isOpen();
/**
* Invoked on successful executions from {@link HystrixCommand} as part of feedback mechanism when in a half-open state.
*/
void markSuccess();
/**
* Invoked on unsuccessful executions from {@link HystrixCommand} as part of feedback mechanism when in a half-open state.
*/
void markNonSuccess();
/**
* Invoked at start of command execution to attempt an execution. This is non-idempotent - it may modify internal
* state.
*/
boolean attemptExecution();
}
#allowRequest()
和 #attemptExecution()
方法,方法目的基本类似,差别在于当断路器满足尝试关闭条件时,前者不会将断路器不会修改状态( CLOSE=>HALF-OPEN
),而后者会。HystrixCircuitBreaker 有两个子类实现 :
在 AbstractCommand 创建时,初始化 HystrixCircuitBreaker ,代码如下 :
/* package */abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
/**
* 断路器
*/
protected final HystrixCircuitBreaker circuitBreaker;
protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool,
HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults,
HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore,
HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook) {
// ... 省略无关代码
// 初始化 断路器
this.circuitBreaker = initCircuitBreaker(this.properties.circuitBreakerEnabled().get(), circuitBreaker, this.commandGroup, this.commandKey, this.properties, this.metrics);
// ... 省略无关代码
}
private static HystrixCircuitBreaker initCircuitBreaker(boolean enabled, HystrixCircuitBreaker fromConstructor,
HystrixCommandGroupKey groupKey, HystrixCommandKey commandKey,
HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
if (enabled) {
if (fromConstructor == null) {
// get the default implementation of HystrixCircuitBreaker
return HystrixCircuitBreaker.Factory.getInstance(commandKey, groupKey, properties, metrics);
} else {
return fromConstructor;
}
} else {
return new NoOpCircuitBreaker();
}
}
}
HystrixCommandProperties.circuitBreakerEnabled=true
时,即断路器功能开启,使用 Factory 获得 HystrixCircuitBreakerImpl 对象。在 「3. HystrixCircuitBreaker.Factory」 详细解析。HystrixCommandProperties.circuitBreakerEnabled=false
时,即断路器功能关闭,创建 NoOpCircuitBreaker 对象。另外,NoOpCircuitBreaker 代码简单到脑残,点击 链接 查看实现。com.netflix.hystrix.HystrixCircuitBreaker.Factory
,HystrixCircuitBreaker 工厂,主要用于:
private
static
ConcurrentHashMap<String,
HystrixCircuitBreaker> circuitBreakersByCommand =
new
ConcurrentHashMap<String,
HystrixCircuitBreaker>();
整体代码灰常清晰,点击 链接 查看代码。
com.netflix.hystrix.HystrixCircuitBreaker.HystrixCircuitBreakerImpl
,完整的断路器实现。
我们来逐个方法看看 HystrixCircuitBreakerImpl 的具体实现。
构造方法,代码如下 :
/* package */class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker {
private final HystrixCommandProperties properties;
private final HystrixCommandMetrics metrics;
enum Status {
CLOSED, OPEN, HALF_OPEN
}
private final AtomicReference<Status> status = new AtomicReference<Status>(Status.CLOSED);
private final AtomicLong circuitOpened = new AtomicLong(-1);
private final AtomicReference<Subscription> activeSubscription = new AtomicReference<Subscription>(null);
protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, final HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
this.properties = properties;
this.metrics = metrics;
//On a timer, this will set the circuit between OPEN/CLOSED as command executions occur
Subscription s = subscribeToStream();
activeSubscription.set(s);
}
}
status
属性,断路器的状态。circuitOpened
属性,断路器打开,即状态变成 OPEN
的时间。activeSubscription
属性,基于 Hystrix Metrics 对请求量统计 Observable 的订阅,在 「4.2 #subscribeToStream()」 详细解析。#subscribeToStream()
方法,向 Hystrix Metrics 对请求量统计 Observable 的发起订阅。代码如下 :
private Subscription subscribeToStream() {
1: private Subscription subscribeToStream() {
2: /*
3: * This stream will recalculate the OPEN/CLOSED status on every onNext from the health stream
4: */
5: return metrics.getHealthCountsStream()
6: .observe()
7: .subscribe(new Subscriber<HealthCounts>() {
8: @Override
9: public void onCompleted() {
10:
11: }
12:
13: @Override
14: public void onError(Throwable e) {
15:
16: }
17:
18: @Override
19: public void onNext(HealthCounts hc) {
20: System.out.println("totalRequests" + hc.getTotalRequests()); // 芋艿,用于调试
21: // check if we are past the statisticalWindowVolumeThreshold
22: if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
23: // we are not past the minimum volume threshold for the stat window,
24: // so no change to circuit status.
25: // if it was CLOSED, it stays CLOSED
26: // if it was half-open, we need to wait for a successful command execution
27: // if it was open, we need to wait for sleep window to elapse
28: } else {
29: if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
30: //we are not past the minimum error threshold for the stat window,
31: // so no change to circuit status.
32: // if it was CLOSED, it stays CLOSED
33: // if it was half-open, we need to wait for a successful command execution
34: // if it was open, we need to wait for sleep window to elapse
35: } else {
36: // our failure rate is too high, we need to set the state to OPEN
37: if (status.compareAndSet(Status.CLOSED, Status.OPEN)) {
38: circuitOpened.set(System.currentTimeMillis());
39: }
40: }
41: }
42: }
43: });
44: }
#onNext()
方法将不断被调用,每次计算断路器的状态。HystrixCommandProperties.default_metricsRollingStatisticalWindow=10000ms
)内,总请求数超过一定量( 可配, HystrixCommandProperties.circuitBreakerRequestVolumeThreshold=20
) 。
00:00
内发起了 N 个请求, 00:11
不计算这 N 个请求。HystrixCommandProperties.circuitBreakerErrorThresholdPercentage=50%
) 。
CLOSED=>OPEN
),并设置打开时间( circuitOpened
) 。
Observable#window(timespan, unit)
方法,固定周期( 可配, HystrixCommandProperties.metricsHealthSnapshotIntervalInMilliseconds=500ms
),发射 Observable 窗口。点击 BucketedCounterStream 构造方法 查看调用处的代码。Observable#window(count, skip)
方法,每发射一次( skip
) Observable 忽略 count
( 可配, HystrixCommandProperties.circuitBreakerRequestVolumeThreshold=20
) 个数据项。为什么?答案在第 22 行的代码,周期内达到一定请求量是断路器打开的一个条件。点击 BucketedRollingCounterStream 构造方法 查看调用处的代码。目前该方法有两处调用 :
CLOSE
)。如下是 AbstractCommand#applyHystrixSemantics(_cmd)
方法,对 HystrixCircuitBreakerImpl#attemptExecution
方法的调用的代码 :
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
// ... 省略无关代码
/* determine if we're allowed to execute */
if (circuitBreaker.attemptExecution()) {
// 执行【正常逻辑】
} else {
// 执行【回退逻辑】
}
}
HystrixCircuitBreakerImpl#attemptExecution
方法,判断是否可以执行正常逻辑。#attemptExecution
方法,代码如下 :
1: @Override
2: public boolean attemptExecution() {
3: // 强制 打开
4: if (properties.circuitBreakerForceOpen().get()) {
5: return false;
6: }
7: // 强制 关闭
8: if (properties.circuitBreakerForceClosed().get()) {
9: return true;
10: }
11: // 打开时间为空
12: if (circuitOpened.get() == -1) {
13: return true;
14: } else {
15: // 满足间隔尝试断路器时间
16: if (isAfterSleepWindow()) {
17: //only the first request after sleep window should execute
18: //if the executing command succeeds, the status will transition to CLOSED
19: //if the executing command fails, the status will transition to OPEN
20: //if the executing command gets unsubscribed, the status will transition to OPEN
21: if (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) {
22: return true;
23: } else {
24: return false;
25: }
26: } else {
27: return false;
28: }
29: }
30: }
HystrixCommandProperties.circuitBreakerForceOpen=true
( 默认值 : false
) 时,即断路器强制打开,返回 false
。当该配置接入配置中心后,可以动态实现打开熔断。为什么会有该配置?当 HystrixCircuitBreaker 创建完成后,无法动态切换 NoOpCircuitBreaker 和 HystrixCircuitBreakerImpl ,通过该配置以实现类似效果。HystrixCommandProperties.circuitBreakerForceClose=true
( 默认值 : false
) 时,即断路器强制关闭,返回 true
。当该配置接入配置中心后,可以动态实现关闭熔断。为什么会有该配置?当 HystrixCircuitBreaker 创建完成后,无法动态切换 NoOpCircuitBreaker 和 HystrixCircuitBreakerImpl ,通过该配置以实现类似效果。circuitOpened
) 为"空",返回 true
。#isAfterSleepWindow()
方法,判断是否满足尝试调用正常逻辑的间隔时间。当满足,使用CAS 方式修改断路器状态( OPEN=>HALF_OPEN
),从而保证有且仅有一个线程能够尝试调用正常逻辑。#isAfterSleepWindow()
方法,代码如下 :
private boolean isAfterSleepWindow() {
final long circuitOpenTime = circuitOpened.get();
final long currentTime = System.currentTimeMillis();
final long sleepWindowTime = properties.circuitBreakerSleepWindowInMilliseconds().get();
return currentTime > circuitOpenTime + sleepWindowTime;
}
HystrixCommandProperties.circuitBreakerSleepWindowInMilliseconds
( 默认值, 5000ms
),返回 true
。当尝试调用正常逻辑成功时,调用 #markSuccess()
方法,关闭断路器。代码如下 :
1: @Override
2: public void markSuccess() {
3: if (status.compareAndSet(Status.HALF_OPEN, Status.CLOSED)) {
4: // 清空 Hystrix Metrics 对请求量统计 Observable 的**统计信息**
5: //This thread wins the race to close the circuit - it resets the stream to start it over from 0
6: metrics.resetStream();
7: // 取消原有订阅
8: Subscription previousSubscription = activeSubscription.get();
9: if (previousSubscription != null) {
10: previousSubscription.unsubscribe();
11: }
12: // 发起新的订阅
13: Subscription newSubscription = subscribeToStream();
14: activeSubscription.set(newSubscription);
15: // 设置断路器打开时间为空
16: circuitOpened.set(-1L);
17: }
18: }
HALF_OPEN=>CLOSED
)。如下两处调用了 #markNonSuccess()
方法 :
markEmits
markOnCompleted
当尝试调用正常逻辑失败时,调用 #markNonSuccess()
方法,重新打开断路器。代码如下 :
1: @Override
2: public void markNonSuccess() {
3: if (status.compareAndSet(Status.HALF_OPEN, Status.OPEN)) {
4: //This thread wins the race to re-open the circuit - it resets the start time for the sleep window
5: circuitOpened.set(System.currentTimeMillis());
6: }
7: }
HALF_OPEN=>OPEN
)。#attemptExecution()
过一段时间,可以再次尝试执行正常逻辑。如下两处调用了 #markNonSuccess()
方法 :
handleFallback
unsubscribeCommandCleanup
#allowRequest()
和 #attemptExecution()
方法,方法目的基本类似,差别在于当断路器满足尝试关闭条件时,前者不会将断路器不会修改状态( CLOSE=>HALF-OPEN
),而后者会。点击 链接 查看代码实现。
#isOpen()
方法,比较简单,点击 链接 查看代码实现。
呼呼,相对比较干净的一篇文章,满足。
胖友,分享一波朋友圈可好!