本文主要基于 Hystrix 1.5.X 版本
本文主要分享 Hystrix 命令执行(二)之执行隔离策略。
建议 :对 RxJava 已经有一定的了解的基础上阅读本文。
Hystrix 提供两种执行隔离策略( ExecutionIsolationStrategy ) :
SEMAPHORE
:信号量,命令在调用线程执行。在《Hystrix 源码解析 —— 命令执行(一)之正常执行逻辑》「3. TryableSemaphore」 已经详细解析。THREAD
:线程池,命令在线程池执行。在《Hystrix 源码解析 —— 命令执行(一)之正常执行逻辑》「5. #executeCommandWithSpecifiedIsolation(...)」 的 #executeCommandWithSpecifiedIsolation(...)
方法中,调用 Observable#subscribeOn(Scheduler)
方法,指定在 RxJava Scheduler 执行。Observable#subscribeOn(Scheduler)
,可以阅读 《RxJava 源码解析 —— Observable#subscribeOn(Scheduler)》 。两种方式的优缺点比较,推荐阅读 《【翻译】Hystrix文档-实现原理》「依赖隔离」。
com.netflix.hystrix.HystrixThreadPoolProperties
,Hystrix 线程池属性配置抽象类,点击 链接 查看,已添加中文注释说明。
com.netflix.hystrix.strategy.properties.HystrixPropertiesThreadPoolDefault
,Hystrix 线程池配置实现类,点击 链接 查看。实际上没什么内容,官方如是说 :
Default implementation of {@link HystrixThreadPoolProperties} using Archaius (https://github.com/Netflix/archaius)
com.netflix.hystrix.HystrixThreadPoolKey
,Hystrix 线程池标识接口。
FROM HystrixThreadPoolKey 接口注释 A key to represent a {@link HystrixThreadPool} for monitoring, metrics publishing, caching and other such uses. This interface is intended to work natively with Enums so that implementing code can be an enum that implements this interface.
name
( 标识 ) 获得同 HystrixThreadPoolKey 对象。通过在内部维持一个 name
与 HystrixThreadPoolKey 对象的映射,以达到枚举的效果。HystrixThreadPoolKey 代码如下 :
1: public interface HystrixThreadPoolKey extends HystrixKey {
2: class Factory {
3: private Factory() {
4: }
5:
6: // used to intern instances so we don't keep re-creating them millions of times for the same key
7: private static final InternMap<String, HystrixThreadPoolKey> intern
8: = new InternMap<String, HystrixThreadPoolKey>(
9: new InternMap.ValueConstructor<String, HystrixThreadPoolKey>() {
10: @Override
11: public HystrixThreadPoolKey create(String key) {
12: return new HystrixThreadPoolKeyDefault(key);
13: }
14: });
15:
16: public static HystrixThreadPoolKey asKey(String name) {
17: return intern.interned(name);
18: }
19:
20: private static class HystrixThreadPoolKeyDefault extends HystrixKeyDefault implements HystrixThreadPoolKey {
21: public HystrixThreadPoolKeyDefault(String name) {
22: super(name);
23: }
24: }
25:
26: /* package-private */ static int getThreadPoolCount() {
27: return intern.size();
28: }
29: }
30: }
com.netflix.hystrix.HystrixKey
接口,点击 链接 查看。该接口定义的 #name()
方法,即是上文我们所说的标识( Key )。intern
属性, name
与 HystrixThreadPoolKey 对象的映射,以达到枚举的效果。com.netflix.hystrix.util.InternMap
,点击 链接 查看带中文注释的代码。#asKey(name)
方法,从 intern
获得 HystrixThreadPoolKey 对象。#getThreadPoolCount()
方法,获得 HystrixThreadPoolKey 数量。在 AbstractCommand 构造方法里,初始化命令的 threadPoolKey
属性,代码如下 :
protected final HystrixThreadPoolKey threadPoolKey;
protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool,
HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults,
HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore,
HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook) {
// ... 省略无关代码
this.commandGroup = initGroupKey(group);
this.commandKey = initCommandKey(key, getClass());
this.properties = initCommandProperties(this.commandKey, propertiesStrategy, commandPropertiesDefaults);
// 初始化 threadPoolKey
this.threadPoolKey = initThreadPoolKey(threadPoolKey, this.commandGroup, this.properties.executionIsolationThreadPoolKeyOverride().get());
}
#initThreadPoolKey(...)
方法,创建最终的 threadPoolKey
属性。代码如下 :
threadPoolKeyOverride
> threadPoolKey
> groupKey
private
static
HystrixThreadPoolKey initThreadPoolKey(HystrixThreadPoolKey threadPoolKey,
HystrixCommandGroupKey groupKey,
String threadPoolKeyOverride)
{
if
(threadPoolKeyOverride ==
null)
{
// we don't have a property overriding the value so use either HystrixThreadPoolKey or HystrixCommandGroup
if
(threadPoolKey ==
null)
{
/* use HystrixCommandGroup if HystrixThreadPoolKey is null */
return
HystrixThreadPoolKey.Factory.asKey(groupKey.name());
}
else
{
return threadPoolKey;
}
}
else
{
// threadPoolKeyOverride 可覆盖属性
// we have a property defining the thread-pool so use it instead
return
HystrixThreadPoolKey.Factory.asKey(threadPoolKeyOverride);
}
}
com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategy
,Hystrix 并发策略抽象类。
HystrixConcurrencyStrategy#getThreadPool(...)
方法,代码如下 :
1: public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties threadPoolProperties) {
2: final ThreadFactory threadFactory = getThreadFactory(threadPoolKey);
3:
4: final boolean allowMaximumSizeToDivergeFromCoreSize = threadPoolProperties.getAllowMaximumSizeToDivergeFromCoreSize().get();
5: final int dynamicCoreSize = threadPoolProperties.coreSize().get();
6: final int keepAliveTime = threadPoolProperties.keepAliveTimeMinutes().get();
7: final int maxQueueSize = threadPoolProperties.maxQueueSize().get();
8:
9: final BlockingQueue<Runnable> workQueue = getBlockingQueue(maxQueueSize);
10:
11: if (allowMaximumSizeToDivergeFromCoreSize) {
12: final int dynamicMaximumSize = threadPoolProperties.maximumSize().get();
13: if (dynamicCoreSize > dynamicMaximumSize) {
14: logger.error("Hystrix ThreadPool configuration at startup for : " + threadPoolKey.name() + " is trying to set coreSize = " +
15: dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize + ". Maximum size will be set to " +
16: dynamicCoreSize + ", the coreSize value, since it must be equal to or greater than the coreSize value");
17: return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
18: } else {
19: return new ThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
20: }
21: } else {
22: return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
23: }
24: }
#getThreadFactory(...)
方法,获得 ThreadFactory 。点击 链接 查看方法代码。PlatformSpecific#getAppEngineThreadFactory()
方法,无需细看,适用于 Google App Engine 场景。#getBlockingQueue()
方法,获得线程池的阻塞队列。点击 链接 查看方法代码。maxQueueSize<=0
时( 默认值 : -1
) 时,使用 SynchronousQueue 。超过线程池的 maximumPoolSize
时,提交任务被拒绝。SynchronousQueue>0
时,使用 LinkedBlockingQueue 。超过线程池的 maximumPoolSize
时,任务被拒绝。超过线程池的 maximumPoolSize
+ 线程池队列的 maxQueueSize
时,提交任务被阻塞等待。allowMaximumSizeToDivergeFromCoreSize
的情况,计算线程池的 maximumPoolSize
属性。计算的方式和 HystrixThreadPoolProperties#actualMaximumSize()
方法是一致的。com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategyDefault
,Hystrix 并发策略实现类。代码如下( 基本没做啥 ) :
public class HystrixConcurrencyStrategyDefault extends HystrixConcurrencyStrategy {
/**
* 单例
*/
private static HystrixConcurrencyStrategyDefault INSTANCE = new HystrixConcurrencyStrategyDefault();
public static HystrixConcurrencyStrategy getInstance() {
return INSTANCE;
}
private HystrixConcurrencyStrategyDefault() {
}
}
在 AbstractCommand 构造方法里,初始化命令的 threadPoolKey
属性,代码如下 :
protected final HystrixConcurrencyStrategy concurrencyStrategy;
protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool,
HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults,
HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore,
HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook) {
// ... 省略无关代码
// 初始化 并发策略
this.concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
}
HystrixPlugins#getConcurrencyStrategy()
获得 HystrixConcurrencyStrategy 对象。默认情况下,使用 HystrixConcurrencyStrategyDefault 。当然你也可以参考 Hystrix 插件体系,实现自定义的 HystrixConcurrencyStrategy 实现,以达到覆写 #getThreadPool()
, #getBlockingQueue()
等方法。点击 链接 查看该方法代码。com.netflix.hystrix.HystrixThreadPool
,Hystrix 线程池接口。当 Hystrix 命令使用 THREAD
执行隔离策略时, HystrixCommand#run()
方法在线程池执行。点击 链接 查看。HystrixThreadPool 定义接口如下 :
#getExecutor()
:获得 ExecutorService 。#getScheduler()
/ #getScheduler(Func0<Boolean>)
:获得 RxJava Scheduler 。#isQueueSpaceAvailable()
:线程池队列是否有空余。#markThreadExecution()
/ #markThreadCompletion()
/ #markThreadRejection()
:TODO 【2002】【metrics】com.netflix.hystrix.HystrixThreadPool.HystrixThreadPoolDefault
,Hystrix 线程池实现类。
构造方法,代码如下 :
1: private final HystrixThreadPoolProperties properties;
2: private final BlockingQueue<Runnable> queue;
3: private final ThreadPoolExecutor threadPool;
4: private final HystrixThreadPoolMetrics metrics;
5: private final int queueSize;
6:
7: public HystrixThreadPoolDefault(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesDefaults) {
8: // 初始化 HystrixThreadPoolProperties
9: this.properties = HystrixPropertiesFactory.getThreadPoolProperties(threadPoolKey, propertiesDefaults);
10: // 获得 HystrixConcurrencyStrategy
11: HystrixConcurrencyStrategy concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
12: // 队列大小
13: this.queueSize = properties.maxQueueSize().get();
14:
15: // TODO 【2002】【metrics】
16: this.metrics = HystrixThreadPoolMetrics.getInstance(threadPoolKey,
17: concurrencyStrategy.getThreadPool(threadPoolKey, properties), // 初始化 ThreadPoolExecutor
18: properties);
19:
20: // 获得 ThreadPoolExecutor
21: this.threadPool = this.metrics.getThreadPool();
22: this.queue = this.threadPool.getQueue(); // 队列
23:
24: // TODO 【2002】【metrics】
25: /* strategy: HystrixMetricsPublisherThreadPool */
26: HystrixMetricsPublisherFactory.createOrRetrievePublisherForThreadPool(threadPoolKey, this.metrics, this.properties);
27: }
queueSize
。HystrixConcurrencyStrategy#getThreadPool(...)
方法,初始化 ThreadPoolExecutor 。#getExecutor()
方法,代码如下 :
@Override
public ThreadPoolExecutor getExecutor() {
touchConfig();
return threadPool;
}
#touchConfig()
方法,动态调整 threadPool
的 coreSize
/ maximumSize
/ keepAliveTime
参数。点击 链接 查看该方法。#getScheduler()
/ #getScheduler(Func0<Boolean>)
方法,代码如下 :
@Override
public Scheduler getScheduler() {
//by default, interrupt underlying threads on timeout
return getScheduler(new Func0<Boolean>() {
@Override
public Boolean call() {
return true;
}
});
}
@Override
public Scheduler getScheduler(Func0<Boolean> shouldInterruptThread) {
touchConfig();
return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread);
}
shouldInterruptThread
都在 「6. HystrixContextScheduler」 详细解析。#isQueueSpaceAvailable()
方法,代码如下 :
@Override
public boolean isQueueSpaceAvailable() {
if (queueSize <= 0) {
// we don't have a queue so we won't look for space but instead
// let the thread-pool reject or not
return true;
} else {
return threadPool.getQueue().size() < properties.queueSizeRejectionThreshold().get();
}
}
HystrixThreadPoolProperties.queueSizeRejectionThreshold
属性控制。queueSize
属性,决定了线程池的队列类型。queueSize<=0
时, #isQueueSpaceAvailable()
都返回 true
的原因是,线程池使用 SynchronousQueue 作为队列,不支持新任务排队,任务超过线程池的 maximumPoolSize
时,新任务被拒绝。queueSize>0
时, #isQueueSpaceAvailable()
根据情况 true
/ false
的原因是,线程池使用 LinkedBlockingQueue 作为队列,支持一定数量的阻塞排队,但是这个数量无法调整。通过 #isQueueSpaceAvailable()
方法的判断,动态调整。另外,初始配置的 queueSize
要相对大,否则即使 queueSizeRejectionThreshold
配置的大于 queueSize
,实际提交任务到线程池,也会被拒绝。com.netflix.hystrix.HystrixThreadPool.Factory
,HystrixThreadPool 工厂类,不仅限于 HystrixThreadPool 的创建,也提供了 HystrixThreadPool 的管理( HystrixThreadPool 的容器 )。
threadPools
属性,维护创建的 HystrixThreadPool 对应的映射,代码如下 :
final static ConcurrentHashMap<String, HystrixThreadPool> threadPools = new ConcurrentHashMap<String, HystrixThreadPool>();
HystrixThreadPoolKey#name()
,每个 HystrixThreadPoolKey 对应一个 HystrixThreadPool 对象。#getInstance(...)
方法,获得 HystrixThreadPool 对象,代码如下 :
/* package */static HystrixThreadPool getInstance(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesBuilder) {
// get the key to use instead of using the object itself so that if people forget to implement equals/hashcode things will still work
String key = threadPoolKey.name();
// this should find it for all but the first time
HystrixThreadPool previouslyCached = threadPools.get(key);
if (previouslyCached != null) {
return previouslyCached;
}
// if we get here this is the first time so we need to initialize
synchronized (HystrixThreadPool.class) {
if (!threadPools.containsKey(key)) {
threadPools.put(key, new HystrixThreadPoolDefault(threadPoolKey, propertiesBuilder));
}
}
return threadPools.get(key);
}
threadPoolKey
先从 threadPool
获取已创建的 HystrixThreadPool ;获取不到,创建对应的 HystrixThreadPool 返回,并添加到 threadPool
。#shutdown()
/ #shutdown(timeout, unit)
方法,比较易懂,点击 链接 查看。
在 AbstractCommand 构造方法里,初始化命令的 threadPool
属性,代码如下 :
protected final HystrixThreadPool threadPool;
protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool,
HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults,
HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore,
HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook) {
// ... 省略其他代码
// 初始化 threadPoolKey
this.threadPoolKey = initThreadPoolKey(threadPoolKey, this.commandGroup, this.properties.executionIsolationThreadPoolKeyOverride().get());
// 初始化 threadPool
this.threadPool = initThreadPool(threadPool, this.threadPoolKey, threadPoolPropertiesDefaults);
}
#initThreadPool(...)
方法,获得 HystrixThreadPool ,点击 链接 查看。Hystrix 实现了自定义的 RxJava Scheduler ,整体类图如下 :
actualScheduler
属性。worker
属性。构造方法,代码如下 :
public class HystrixContextScheduler extends Scheduler {
private final HystrixConcurrencyStrategy concurrencyStrategy;
private final Scheduler actualScheduler;
private final HystrixThreadPool threadPool;
// ... 省略无关代码
public HystrixContextScheduler(HystrixConcurrencyStrategy concurrencyStrategy, HystrixThreadPool threadPool, Func0<Boolean> shouldInterruptThread) {
this.concurrencyStrategy = concurrencyStrategy;
this.threadPool = threadPool;
this.actualScheduler = new ThreadPoolScheduler(threadPool, shouldInterruptThread);
}
}
actualScheduler
属性,类型为 ThreadPoolScheduler 。#createWorker()
方法,代码如下 :
@Override
public Worker createWorker() {
return new HystrixContextSchedulerWorker(actualScheduler.createWorker());
}
actualScheduler
创建 ThreadPoolWorker ,传参给 HystrixContextSchedulerWorker 。构造方法,代码如下 :
private class HystrixContextSchedulerWorker extends Worker {
private final Worker worker;
// ... 省略无关代码
private HystrixContextSchedulerWorker(Worker actualWorker) {
this.worker = actualWorker;
}
}
worker
属性,类型为 ThreadPoolWorker 。#schedule(Action0)
方法,代码如下 :
@Override
public Subscription schedule(Action0 action) {
if (threadPool != null) {
if (!threadPool.isQueueSpaceAvailable()) {
throw new RejectedExecutionException("Rejected command because thread-pool queueSize is at rejection threshold.");
}
}
return worker.schedule(new HystrixContexSchedulerAction(concurrencyStrategy, action));
}
ThreadPool#isQueueSpaceAvailable()
方法,判断线程池队列是否有空余。这个就是 HystrixContextScheduler 的实际用途。#unsubscribe()
/ #isUnsubscribed()
方法,使用 worker
判断,点击 链接查看。
ThreadPoolScheduler 比较简单,点击 链接 查看。
构造方法,代码如下 :
private static class ThreadPoolWorker extends Worker {
private final HystrixThreadPool threadPool;
private final CompositeSubscription subscription = new CompositeSubscription();
private final Func0<Boolean> shouldInterruptThread;
// ... 省略无关代码
public ThreadPoolWorker(HystrixThreadPool threadPool, Func0<Boolean> shouldInterruptThread) {
this.threadPool = threadPool;
this.shouldInterruptThread = shouldInterruptThread;
}
}
subscription
属性,订阅信息。#schedule(Action0)
方法,代码如下 :
1: @Override
2: public Subscription schedule(final Action0 action) {
3: // 未订阅,返回
4: if (subscription.isUnsubscribed()) {
5: // don't schedule, we are unsubscribed
6: return Subscriptions.unsubscribed();
7: }
8:
9: // 创建 ScheduledAction
10: // This is internal RxJava API but it is too useful.
11: ScheduledAction sa = new ScheduledAction(action);
12:
13: // 添加到 订阅
14: subscription.add(sa);
15: sa.addParent(subscription);
16:
17: // 提交 任务
18: ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.getExecutor();
19: FutureTask<?> f = (FutureTask<?>) executor.submit(sa);
20: sa.add(new FutureCompleterWithConfigurableInterrupt(f, shouldInterruptThread, executor));
21:
22: return sa;
23: }
subscription
)。threadPool
,提交任务,并创建 FutureCompleterWithConfigurableInterrupt 添加到订阅( sa
)。sa
)。整体订阅关系如下 :#unsubscribe()
/ #isUnsubscribed()
方法,使用 subscription
判断,点击 链接查看。
com.netflix.hystrix.strategy.concurrency.HystrixContextScheduler.FutureCompleterWithConfigurableInterrupt
,实现类似 rx.internal.schedulers.ScheduledAction.FutureCompleter
,在它的基础上,支持配置 FutureTask#cancel(Boolean)
是否可打断运行( mayInterruptIfRunning
)。
构造方法,代码如下 :
private static class FutureCompleterWithConfigurableInterrupt implements Subscription {
private final FutureTask<?> f;
private final Func0<Boolean> shouldInterruptThread;
private final ThreadPoolExecutor executor;
// ... 省略无关代码
private FutureCompleterWithConfigurableInterrupt(FutureTask<?> f, Func0<Boolean> shouldInterruptThread, ThreadPoolExecutor executor) {
this.f = f;
this.shouldInterruptThread = shouldInterruptThread;
this.executor = executor;
}
}
当命令执行超时,或是主动取消命令执行时,调用 #unsubscribe()
方法,取消执行。
当命令执行超时,或是主动取消命令执行时,调用 #unsubscribe()
方法,取消执行。
当命令执行超时,或是主动取消命令执行时,调用 #unsubscribe()
方法,取消执行。
#unsubscribe()
方法,代码如下 :
@Override
public void unsubscribe() {
// 从 线程池 移除 任务
executor.remove(f);
// 根据 shouldInterruptThread 配置,是否强制取消
if (shouldInterruptThread.call()) {
f.cancel(true);
} else {
f.cancel(false);
}
}
shouldInterruptThread
方法,判断是否强制取消。shouldInterruptThread
对应的方法,实现代码如下 :subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
@Override
public Boolean call() {
return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;
}
}));
executionIsolationThreadInterruptOnTimeout=true
时,命令可执行超时。当命令可执行超时时,强制取消。HystrixCommand.queue()
返回的 Future ,可以使用 Future#cancel(Boolean)
取消命令执行。从 shouldInterruptThread
对应的方法可以看到,如果此时不满足命令执行超时的条件,命令执行取消的方式是非强制的。此时当 executionIsolationThreadInterruptOnFutureCancel=true
时,并且调用 Future#cancel(Boolean)
传递 mayInterruptIfRunning=true
,强制取消命令执行。CommandHelloWorld#testAsynchronous3()
HystrixCommand#queue()
:点击 链接 查看 Future#cancel(Boolean)
方法。