我能接受失败,但无法接受放弃。
代码下载地址:https://github.com/f641385712/netflix-learning
我们知道Hystrix每个请求都对应有一个Command命令对象,在线程池隔离模式下,每个命令对象在初始化时都会对应有一个“自己的”线程池HystrixThreadPool
,当然这由threadPoolKey
来决定(理论上多个CommandKey是可以共用同一个线程池资源的),并且有全局缓存(不会每次都执行初始化动作)。
Hystrix使用RxJava来编程,那么你是否知道它在执行目标方法时(发射数据时),是如何调用线程池资源的呢?换句话说,Hystrix是如何把自己的线程池“输入”到RxJava里让其调度的呢?
Hystrix
的源码是使用RxJava来实现的,所以在文首得先认识认识RxJava里的一些相关概念。
rx.Scheduler.Worker
:它是Scheduler
的一个静态内部类,用于在单个线程或事件循环上执行操作的顺序调度程序,简单的说它就是真正干活的 ThreadPoolWorker
和HystrixContextSchedulerWorker
的扩展实现public abstract static class Worker implements Subscription {
// 调度一个要执行的动作
public abstract Subscription schedule(Action0 action);
// 在将来的某个时间点安排一个动作执行
public abstract Subscription schedule(final Action0 action, final long delayTime, final TimeUnit unit);
// 定期调度要执行的可取消操作
public Subscription schedulePeriodically(final Action0 action, long initialDelay, long period, TimeUnit unit) { ... }
}
rx.Scheduler
:是一个调度工作单元的对象,简单的说就是决定你在哪里执行(使用哪个线程去执行)。对它部署,但是它有个工具类rx.schedulers.Schedulers
应该非常熟悉,专门用于产生Scheduler
实例。 ThreadPoolScheduler
和HystrixContextScheduler
的扩展实现Observable
上的一些动作解释: doOnSubscribe
:是事件被订阅之前(也就是事件源发起之前)会调用的方法,这个方法一般用于修改、添加或者删除事件源的数据流。doOnNext
:观察者被回调之前的调用。这个方法一般做的事件类似于观察者做的事情,只是自己不是最终的回调者(观察者即最终回调者)doOnUnSubscribe
:取消订阅时的监听doOnCompleted
:Observable正常终止时的监听(注意:正常终止才会执行哦)doOnError
:出错时的监听doOnTerminate
:订阅即将被终止时的监听,无论是正常终止还是异常终止observeOn
:语义为:观察者在哪个Scheduler
观察?作用范围:该操作符之后的所有操作符,直到出现新的observeOn操作符出现位置(所以它是可以写多个的)。subscribeOn
:语义为:发布者在哪里发布数据?作用范围:该操作符之前的数据创建动作和doOnSubscribe
动作。@Test
public void fun4() throws InterruptedException {
Integer[] data = {1, 2, 3, 4, 5};
Subscription subscribe = Observable.create((Observable.OnSubscribe<Integer>) subscriber -> {
String threadName = Thread.currentThread().getName();
subscriber.onStart();
for (int i = 0; i < 6; i++) {
subscriber.onNext(i);
System.out.printf("[%s]发送数据:%s\n", threadName, i);
}
subscriber.onCompleted();
}).doOnSubscribe(() -> data[4] = 10) // 发射之前执行。可用于更改数据源
// .delay(1, TimeUnit.SECONDS)
// .doOnNext()
.subscribeOn(Schedulers.io()) //创建/发射数据使用的是IO线程
.observeOn(Schedulers.newThread()) // 后面的观察者统一在新的线程上观察
.doOnUnsubscribe(() -> System.out.println(Thread.currentThread().getName() + "取消订阅喽~~~~~"))
.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
String threadName = Thread.currentThread().getName();
System.out.printf("[%s]监听结束\n", threadName);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer i) {
String threadName = Thread.currentThread().getName();
System.out.printf("[%s]监听到数据:%s\n", threadName, i);
}
});
TimeUnit.SECONDS.sleep(2);
if (subscribe.isUnsubscribed()) {
subscribe.unsubscribe();
}
}
运行程序,输出:
[RxIoScheduler-2]发送数据:0
[RxNewThreadScheduler-1]监听到数据:0
[RxIoScheduler-2]发送数据:1
[RxIoScheduler-2]发送数据:2
[RxIoScheduler-2]发送数据:3
[RxNewThreadScheduler-1]监听到数据:1
[RxIoScheduler-2]发送数据:4
[RxNewThreadScheduler-1]监听到数据:2
[RxIoScheduler-2]发送数据:5
[RxNewThreadScheduler-1]监听到数据:3
[RxNewThreadScheduler-1]监听到数据:4
[RxNewThreadScheduler-1]监听到数据:5
[RxNewThreadScheduler-1]监听结束
RxNewThreadScheduler-1取消订阅喽~~~~~
因为发射和观察使用的不同线程,所以执行顺序是乱的。但是,肯定的顺序是:先发送,再消费。
针对RxJava里的rx.Scheduler.Worker
和Scheduler
在Hystrix都提供了其扩展实现。提前说明:以下四个类均为HystrixContextScheduler
的静态内部类。
纯粹用于调度线程池让其工作。
HystrixContextScheduler:
private static class ThreadPoolWorker extends Worker {
// Hystrix的线程池接口。提供
// ExecutorService getExecutor();
// Scheduler getScheduler();
// Scheduler getScheduler(Func0<Boolean> shouldInterruptThread);
// markThreadExecution();
// isQueueSpaceAvailable(); // 队列是否允许向其添加项(线程池是否还有资源)
private final HystrixThreadPool threadPool;
private final CompositeSubscription subscription = new CompositeSubscription();
// 是否需要中断线程(比如你取消、中断线程时)
private final Func0<Boolean> shouldInterruptThread;
// 实现抽象方法:干活的方法
@Override
public Subscription schedule(final Action0 action) {
// 如果已经取消了,那就不管了喽
if (subscription.isUnsubscribed()) {
return Subscriptions.unsubscribed();
}
// 它是一个Runnable,且是一个Subscription
ScheduledAction sa = new ScheduledAction(action);
subscription.add(sa);
sa.addParent(subscription);
// 重点:拿到配置的线程池ExecutorService(ThreadPoolExecutor)
ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.getExecutor();
// 提交任务sa,然后得到一个FutureTask
FutureTask<?> f = (FutureTask<?>) executor.submit(sa);
// 在增加一个订阅者:用于响应取消....
sa.add(new FutureCompleterWithConfigurableInterrupt(f, shouldInterruptThread, executor));
return sa;
}
// 不支持延迟执行~~~~~~~~~
@Override
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
throw new IllegalStateException("Hystrix does not support delayed scheduling");
}
}
该worker的核心可用一句话总结:使用自定义配置的线程池HystrixThreadPool
(ThreadPoolExecutor
)去执行任务threadPool.getExecutor()
。
说明:
HystrixThreadPool
实现是跟HystrixThreadPoolKey
绑定的哦~
它就是个代理worker,和上下文关联。
HystrixContextScheduler:
private class HystrixContextSchedulerWorker extends Worker {
private final Worker worker;
...
@Override
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
if (threadPool != null) {
if (!threadPool.isQueueSpaceAvailable()) {
throw new RejectedExecutionException("Rejected command because thread-pool queueSize is at rejection threshold.");
}
}
return worker.schedule(new HystrixContexSchedulerAction(concurrencyStrategy, action), delayTime, unit);
}
@Override
public Subscription schedule(Action0 action) {
if (threadPool != null) {
if (!threadPool.isQueueSpaceAvailable()) {
throw new RejectedExecutionException("Rejected command because thread-pool queueSize is at rejection threshold.");
}
}
return worker.schedule(new HystrixContexSchedulerAction(concurrencyStrategy, action));
}
}
HystrixContexSchedulerAction
它的实现做了一件事:在Callable
执行前绑定上HystrixRequestContext
上下文,从而可以保证子线程里也可以获取到父线程的内容。
而HystrixContextSchedulerWorker
做的事也可以总结为一句话:执行任务之前检查一下threadPool
是否有资源可用threadPool.isQueueSpaceAvailable()
,若没资源了就抛出RejectedExecutionException
异常。
说明:实际干活的work是
ThreadPoolWorker
,这个work只是在其外包一层上下文,确保数据的传递。 但是,但是,但是:它并不要求你一定是用线程池执行哦,所以你看threadPool
是做了判空处理的,毕竟它不要求你的Worker必须是ThreadPoolWorker
。
rx.Scheduler
有且仅有一个抽象方法:public abstract Worker createWorker();
用于产生一个work去工作,所以它的实现类均非常的简单。
HystrixContextScheduler:
private static class ThreadPoolScheduler extends Scheduler {
private final HystrixThreadPool threadPool;
private final Func0<Boolean> shouldInterruptThread;
...
@Override
public Worker createWorker() {
return new ThreadPoolWorker(threadPool, shouldInterruptThread);
}
}
该Scheduler
创建的ThreadPoolWorker
实例。
同样的它包装了一个实际"干活的"Scheduler
。
public class HystrixContextScheduler extends Scheduler {
// SPI
private final HystrixConcurrencyStrategy concurrencyStrategy;
// 真正干活的
private final Scheduler actualScheduler;
// 线程池
private final HystrixThreadPool threadPool;
.... // 省略构造器,为属性们赋值
// worker由实际的actualScheduler负责创建
@Override
public Worker createWorker() {
return new HystrixContextSchedulerWorker(actualScheduler.createWorker());
}
}
此类需要关注三个属性:
concurrencyStrategy
:可以构造器传入,默认取值HystrixPlugins.getInstance().getConcurrencyStrategy()
actualScheduler
:可构造器传入,默认取值new ThreadPoolScheduler(threadPool, shouldInterruptThread)
threadPool
:可以为null。该接口只有一个实现类HystrixThreadPoolDefault
,同于根据HystrixThreadPoolProperties
配置来生成线程池以及得到一个getScheduler():HystrixContextScheduler
文首介绍RxJava时候说到了,被观察对象Observable
在哪个线程发射数据是由subscribeOn()
方法指定的Scheduler
来调度的。所以Hystrix在执行目标方法时的线程资源也是由它指定:
AbstractCommand#executeCommandWithSpecifiedIsolation
// 说明:shouldInterruptThread函数并不是很关键,建议初学者忽略
xxx.subscribeOn(threadPool.getScheduler(shouldInterruptThread));
可以看到数据发送的线程资源是由threadPool
来决定的,而它的唯一实现类是HystrixThreadPoolDefault
:
HystrixThreadPoolDefault:
@Override
public Scheduler getScheduler(Func0<Boolean> shouldInterruptThread) {
touchConfig(); // touch一下的目的是:每次get都使用最新的线程池配置
return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread);
}
需要注意的是,这里使用的HystrixContextScheduler
构造器创建的Scheduler
,它内部使用的是new ThreadPoolScheduler(threadPool, shouldInterruptThread);
这个线程池哦~
就这样,该调度器里面包括了使用的线程池信息,subscribeOn()
就会根据当前Observable
和获取到的调度器创建任务,并执行。
关于Hystrix执行目标方法时,如何调用线程池资源?这个话题就介绍到这了,理解本篇内容很大程度还是基于对RxJava一些关键概念的理解:比如worker、调度器Scheduler
、subscribeOn()
方法等。同时本篇文章也解释了:何时会出现线程池拒绝,也就是产生RejectedExecutionException
异常,这和前两篇文章内容是相呼应的,可以对照起来笼统的学习。