前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >[享学Netflix] 三十三、Hystrix执行目标方法时,如何调用线程池资源?

[享学Netflix] 三十三、Hystrix执行目标方法时,如何调用线程池资源?

作者头像
YourBatman
发布2020-03-18 19:40:16
1.3K0
发布2020-03-18 19:40:16
举报
文章被收录于专栏:BAT的乌托邦

我能接受失败,但无法接受放弃。

代码下载地址:https://github.com/f641385712/netflix-learning

目录
  • 前言
  • 正文
    • RxJava相关概念
      • 使用示例
    • Hystrix里的Worker
      • ThreadPoolWorker
      • HystrixContextSchedulerWorker
    • Hystrix里的Scheduler
      • ThreadPoolScheduler
      • HystrixContextScheduler
    • Hystrix如何调用线程池资源?
  • 总结
    • 声明

前言

我们知道Hystrix每个请求都对应有一个Command命令对象,在线程池隔离模式下,每个命令对象在初始化时都会对应有一个“自己的”线程池HystrixThreadPool,当然这由threadPoolKey来决定(理论上多个CommandKey是可以共用同一个线程池资源的),并且有全局缓存(不会每次都执行初始化动作)。

Hystrix使用RxJava来编程,那么你是否知道它在执行目标方法时(发射数据时),是如何调用线程池资源的呢?换句话说,Hystrix是如何把自己的线程池“输入”到RxJava里让其调度的呢?


正文

Hystrix的源码是使用RxJava来实现的,所以在文首得先认识认识RxJava里的一些相关概念。


RxJava相关概念

  • rx.Scheduler.Worker:它是Scheduler的一个静态内部类,用于在单个线程或事件循环上执行操作的顺序调度程序,简单的说它就是真正干活的
    • Hystrix里提供ThreadPoolWorkerHystrixContextSchedulerWorker的扩展实现
代码语言:javascript
复制
public abstract static class Worker implements Subscription {
	// 调度一个要执行的动作
	public abstract Subscription schedule(Action0 action);
	// 在将来的某个时间点安排一个动作执行
	public abstract Subscription schedule(final Action0 action, final long delayTime, final TimeUnit unit);
	// 定期调度要执行的可取消操作
	public Subscription schedulePeriodically(final Action0 action, long initialDelay, long period, TimeUnit unit) { ... }
}
  • rx.Scheduler:是一个调度工作单元的对象,简单的说就是决定你在哪里执行(使用哪个线程去执行)。对它部署,但是它有个工具类rx.schedulers.Schedulers应该非常熟悉,专门用于产生Scheduler实例。
    • Hystrix提供ThreadPoolSchedulerHystrixContextScheduler的扩展实现
  • 关于Observable上的一些动作解释:
    • doOnSubscribe:是事件被订阅之前(也就是事件源发起之前)会调用的方法,这个方法一般用于修改、添加或者删除事件源的数据流。
    • doOnNext:观察者被回调之前的调用。这个方法一般做的事件类似于观察者做的事情,只是自己不是最终的回调者(观察者即最终回调者)
    • doOnUnSubscribe:取消订阅时的监听
    • doOnCompleted:Observable正常终止时的监听(注意:正常终止才会执行哦)
    • doOnError:出错时的监听
    • doOnTerminate:订阅即将被终止时的监听,无论是正常终止还是异常终止
    • observeOn:语义为:观察者在哪个Scheduler观察?作用范围:该操作符之后的所有操作符,直到出现新的observeOn操作符出现位置(所以它是可以写多个的)。
    • subscribeOn:语义为:发布者在哪里发布数据?作用范围:该操作符之前的数据创建动作和doOnSubscribe动作。

使用示例
代码语言:javascript
复制
@Test
public void fun4() throws InterruptedException {
    Integer[] data = {1, 2, 3, 4, 5};
    Subscription subscribe = Observable.create((Observable.OnSubscribe<Integer>) subscriber -> {
        String threadName = Thread.currentThread().getName();

        subscriber.onStart();
        for (int i = 0; i < 6; i++) {
            subscriber.onNext(i);
            System.out.printf("[%s]发送数据:%s\n", threadName, i);
        }
        subscriber.onCompleted();

    }).doOnSubscribe(() -> data[4] = 10) // 发射之前执行。可用于更改数据源
            // .delay(1, TimeUnit.SECONDS)
            // .doOnNext()
            .subscribeOn(Schedulers.io()) //创建/发射数据使用的是IO线程
            .observeOn(Schedulers.newThread()) // 后面的观察者统一在新的线程上观察
            .doOnUnsubscribe(() -> System.out.println(Thread.currentThread().getName() + "取消订阅喽~~~~~"))
            .subscribe(new Subscriber<Integer>() {
                @Override
                public void onCompleted() {
                    String threadName = Thread.currentThread().getName();
                    System.out.printf("[%s]监听结束\n", threadName);
                }

                @Override
                public void onError(Throwable e) {
                }

                @Override
                public void onNext(Integer i) {
                    String threadName = Thread.currentThread().getName();
                    System.out.printf("[%s]监听到数据:%s\n", threadName, i);
                }
            });

    TimeUnit.SECONDS.sleep(2);
    if (subscribe.isUnsubscribed()) {
        subscribe.unsubscribe();
    }

}

运行程序,输出:

代码语言:javascript
复制
[RxIoScheduler-2]发送数据:0
[RxNewThreadScheduler-1]监听到数据:0
[RxIoScheduler-2]发送数据:1
[RxIoScheduler-2]发送数据:2
[RxIoScheduler-2]发送数据:3
[RxNewThreadScheduler-1]监听到数据:1
[RxIoScheduler-2]发送数据:4
[RxNewThreadScheduler-1]监听到数据:2
[RxIoScheduler-2]发送数据:5
[RxNewThreadScheduler-1]监听到数据:3
[RxNewThreadScheduler-1]监听到数据:4
[RxNewThreadScheduler-1]监听到数据:5
[RxNewThreadScheduler-1]监听结束
RxNewThreadScheduler-1取消订阅喽~~~~~

因为发射和观察使用的不同线程,所以执行顺序是乱的。但是,肯定的顺序是:先发送,再消费


针对RxJava里的rx.Scheduler.WorkerScheduler在Hystrix都提供了其扩展实现。提前说明:以下四个类均为HystrixContextScheduler的静态内部类。

Hystrix里的Worker


ThreadPoolWorker

纯粹用于调度线程池让其工作。

代码语言:javascript
复制
HystrixContextScheduler:

	private static class ThreadPoolWorker extends Worker {
		
		// Hystrix的线程池接口。提供
		// ExecutorService getExecutor();
		// Scheduler getScheduler();
		// Scheduler getScheduler(Func0<Boolean> shouldInterruptThread);
		// markThreadExecution();
		// isQueueSpaceAvailable(); // 队列是否允许向其添加项(线程池是否还有资源)
        private final HystrixThreadPool threadPool;
        private final CompositeSubscription subscription = new CompositeSubscription();
        // 是否需要中断线程(比如你取消、中断线程时)
        private final Func0<Boolean> shouldInterruptThread;

		// 实现抽象方法:干活的方法
        @Override
        public Subscription schedule(final Action0 action) {
        	// 如果已经取消了,那就不管了喽
            if (subscription.isUnsubscribed()) {
                return Subscriptions.unsubscribed();
            }

            // 它是一个Runnable,且是一个Subscription
            ScheduledAction sa = new ScheduledAction(action);
            subscription.add(sa);
            sa.addParent(subscription);

			// 重点:拿到配置的线程池ExecutorService(ThreadPoolExecutor)
            ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.getExecutor();
            // 提交任务sa,然后得到一个FutureTask
            FutureTask<?> f = (FutureTask<?>) executor.submit(sa);
            // 在增加一个订阅者:用于响应取消....
            sa.add(new FutureCompleterWithConfigurableInterrupt(f, shouldInterruptThread, executor));

            return sa;
        }


		// 不支持延迟执行~~~~~~~~~
        @Override
        public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
            throw new IllegalStateException("Hystrix does not support delayed scheduling");
        }
	}

该worker的核心可用一句话总结:使用自定义配置的线程池HystrixThreadPoolThreadPoolExecutor)去执行任务threadPool.getExecutor()

说明:HystrixThreadPool实现是跟HystrixThreadPoolKey绑定的哦~


HystrixContextSchedulerWorker

它就是个代理worker,和上下文关联。

代码语言:javascript
复制
HystrixContextScheduler:

	private class HystrixContextSchedulerWorker extends Worker {
		private final Worker worker;
		... 

        @Override
        public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
            if (threadPool != null) {
                if (!threadPool.isQueueSpaceAvailable()) {
                    throw new RejectedExecutionException("Rejected command because thread-pool queueSize is at rejection threshold.");
                }
            }
            return worker.schedule(new HystrixContexSchedulerAction(concurrencyStrategy, action), delayTime, unit);
        }
        @Override
        public Subscription schedule(Action0 action) {
            if (threadPool != null) {
                if (!threadPool.isQueueSpaceAvailable()) {
                    throw new RejectedExecutionException("Rejected command because thread-pool queueSize is at rejection threshold.");
                }
            }
            return worker.schedule(new HystrixContexSchedulerAction(concurrencyStrategy, action));
        }
	}

HystrixContexSchedulerAction它的实现做了一件事:在Callable执行前绑定上HystrixRequestContext上下文,从而可以保证子线程里也可以获取到父线程的内容。

HystrixContextSchedulerWorker做的事也可以总结为一句话:执行任务之前检查一下threadPool是否有资源可用threadPool.isQueueSpaceAvailable(),若没资源了就抛出RejectedExecutionException异常。

说明:实际干活的work是ThreadPoolWorker,这个work只是在其外包一层上下文,确保数据的传递。 但是,但是,但是:它并不要求你一定是用线程池执行哦,所以你看threadPool是做了判空处理的,毕竟它不要求你的Worker必须是ThreadPoolWorker


Hystrix里的Scheduler

rx.Scheduler有且仅有一个抽象方法:public abstract Worker createWorker();用于产生一个work去工作,所以它的实现类均非常的简单。


ThreadPoolScheduler
代码语言:javascript
复制
HystrixContextScheduler:

	private static class ThreadPoolScheduler extends Scheduler {
        private final HystrixThreadPool threadPool;
        private final Func0<Boolean> shouldInterruptThread;
		...
        @Override
        public Worker createWorker() {
            return new ThreadPoolWorker(threadPool, shouldInterruptThread);
        }
	}

Scheduler创建的ThreadPoolWorker实例。


HystrixContextScheduler

同样的它包装了一个实际"干活的"Scheduler

代码语言:javascript
复制
public class HystrixContextScheduler extends Scheduler {

	// SPI
    private final HystrixConcurrencyStrategy concurrencyStrategy;
    // 真正干活的
    private final Scheduler actualScheduler;
    // 线程池
    private final HystrixThreadPool threadPool;
    
	.... // 省略构造器,为属性们赋值

	// worker由实际的actualScheduler负责创建
    @Override
    public Worker createWorker() {
        return new HystrixContextSchedulerWorker(actualScheduler.createWorker());
    }
}

此类需要关注三个属性:

  • concurrencyStrategy:可以构造器传入,默认取值HystrixPlugins.getInstance().getConcurrencyStrategy()
  • actualScheduler:可构造器传入,默认取值new ThreadPoolScheduler(threadPool, shouldInterruptThread)
  • threadPool可以为null。该接口只有一个实现类HystrixThreadPoolDefault,同于根据HystrixThreadPoolProperties配置来生成线程池以及得到一个getScheduler():HystrixContextScheduler

Hystrix如何调用线程池资源?

文首介绍RxJava时候说到了,被观察对象Observable在哪个线程发射数据是由subscribeOn()方法指定的Scheduler来调度的。所以Hystrix在执行目标方法时的线程资源也是由它指定:

代码语言:javascript
复制
AbstractCommand#executeCommandWithSpecifiedIsolation

	// 说明:shouldInterruptThread函数并不是很关键,建议初学者忽略
	xxx.subscribeOn(threadPool.getScheduler(shouldInterruptThread));

可以看到数据发送的线程资源是由threadPool来决定的,而它的唯一实现类是HystrixThreadPoolDefault

代码语言:javascript
复制
HystrixThreadPoolDefault:

    @Override
    public Scheduler getScheduler(Func0<Boolean> shouldInterruptThread) {
        touchConfig(); // touch一下的目的是:每次get都使用最新的线程池配置
        return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread);
    }

需要注意的是,这里使用的HystrixContextScheduler构造器创建的Scheduler,它内部使用的是new ThreadPoolScheduler(threadPool, shouldInterruptThread);这个线程池哦~

就这样,该调度器里面包括了使用的线程池信息subscribeOn()就会根据当前Observable和获取到的调度器创建任务,并执行。


总结

关于Hystrix执行目标方法时,如何调用线程池资源?这个话题就介绍到这了,理解本篇内容很大程度还是基于对RxJava一些关键概念的理解:比如worker、调度器SchedulersubscribeOn()方法等。同时本篇文章也解释了:何时会出现线程池拒绝,也就是产生RejectedExecutionException异常,这和前两篇文章内容是相呼应的,可以对照起来笼统的学习。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 目录
  • 前言
  • 正文
    • RxJava相关概念
      • 使用示例
    • Hystrix里的Worker
      • ThreadPoolWorker
      • HystrixContextSchedulerWorker
    • Hystrix里的Scheduler
      • ThreadPoolScheduler
      • HystrixContextScheduler
    • Hystrix如何调用线程池资源?
    • 总结
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档