RxJava 线程模型分析

RxJava的被观察者在使用操作符时可以利用线程调度器--Scheduler来切换线程,例如

        Observable.just("aaa","bbb")
                .observeOn(Schedulers.newThread())
                .map(new Function<String, String>() {
                    @Override
                    public String apply(@NonNull String s) throws Exception {

                        return s.toUpperCase();
                    }
                })
                .subscribeOn(Schedulers.single())
                .observeOn(Schedulers.io())
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(@NonNull String s) throws Exception {

                        System.out.println(s);
                    }
                });

被观察者(Observable、Flowable...)发射数据流之后,其操作符可以在不同的线程中加工数据流,最后被观察者在前台线程中接受并响应数据。

下图不同的箭头颜色表示不同的线程。

schedulers.png

一. 线程调度器

Schedulers 是一个静态工厂类,通过分析Schedulers的源码可以看到它有多种不同类型的Scheduler。下面是Schedulers的各个工厂方法。

computation()用于CPU密集型的计算任务,但并不适合于IO操作。

    @NonNull
    public static Scheduler computation() {
        return RxJavaPlugins.onComputationScheduler(COMPUTATION);
    }

io()用于IO密集型任务,支持异步阻塞IO操作,这个调度器的线程池会根据需要增长。对于普通的计算任务,请使用Schedulers.computation()。

    @NonNull
    public static Scheduler io() {
        return RxJavaPlugins.onIoScheduler(IO);
    }

trampoline()在RxJava2中跟RxJava1的作用是不同的。在RxJava2中表示立即执行,如果当前线程有任务在执行,则会将其暂停,等插入进来的新任务执行完之后,再将原先未完成的任务接着执行。在RxJava1中表示在当前线程中等待其他任务完成之后,再执行新的任务。

    @NonNull
    public static Scheduler trampoline() {
        return TRAMPOLINE;
    }

newThread()为每个任务创建一个新线程。

    @NonNull
    public static Scheduler newThread() {
        return RxJavaPlugins.onNewThreadScheduler(NEW_THREAD);
    }

single()拥有一个线程单例,所有的任务都在这一个线程中执行,当此线程中有任务执行时,它的任务们将会按照先进先出的顺序依次执行。

    @NonNull
    public static Scheduler single() {
        return RxJavaPlugins.onSingleScheduler(SINGLE);
    }

除此之外,还支持自定义的Executor来作为调度器。

    @NonNull
    public static Scheduler from(@NonNull Executor executor) {
        return new ExecutorScheduler(executor);
    }

RxJava 线程模型.png

Scheduler是RxJava的线程任务调度器,Worker是线程任务的具体执行者。从Scheduler源码可以看到,Scheduler在scheduleDirect()、schedulePeriodicallyDirect()方法中创建了Worker,然后会分别调用worker的schedule()、schedulePeriodically()来执行任务。

    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        final Worker w = createWorker();

        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        DisposeTask task = new DisposeTask(decoratedRun, w);

        w.schedule(task, delay, unit);

        return task;
    }

    public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initialDelay, long period, @NonNull TimeUnit unit) {
        final Worker w = createWorker();

        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        PeriodicDirectTask periodicTask = new PeriodicDirectTask(decoratedRun, w);

        Disposable d = w.schedulePeriodically(periodicTask, initialDelay, period, unit);
        if (d == EmptyDisposable.INSTANCE) {
            return d;
        }

        return periodicTask;
    }

Worker也是一个抽象类,从上图可以看到每一种Scheduler会对应一种具体的Worker。

    public abstract static class Worker implements Disposable {

        public Disposable schedule(@NonNull Runnable run) {
            return schedule(run, 0L, TimeUnit.NANOSECONDS);
        }

        public abstract Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit);

        public Disposable schedulePeriodically(@NonNull Runnable run, final long initialDelay, final long period, @NonNull final TimeUnit unit) {
            final SequentialDisposable first = new SequentialDisposable();

            final SequentialDisposable sd = new SequentialDisposable(first);

            final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

            final long periodInNanoseconds = unit.toNanos(period);
            final long firstNowNanoseconds = now(TimeUnit.NANOSECONDS);
            final long firstStartInNanoseconds = firstNowNanoseconds + unit.toNanos(initialDelay);

            Disposable d = schedule(new PeriodicTask(firstStartInNanoseconds, decoratedRun, firstNowNanoseconds, sd,
                    periodInNanoseconds), initialDelay, unit);

            if (d == EmptyDisposable.INSTANCE) {
                return d;
            }
            first.replace(d);

            return sd;
        }

        public long now(@NonNull TimeUnit unit) {
            return unit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        }

        ...
        
    }

1.1 SingleScheduler

SingleScheduler是RxJava2新增的Scheduler。SingleScheduler中有一个属性叫executor,它是使用AtomicReference包装的ScheduledExecutorService。

final AtomicReference<ScheduledExecutorService> executor = new AtomicReference<ScheduledExecutorService>();

在SingleScheduler构造函数中,executor会调用lazySet()。

    public SingleScheduler(ThreadFactory threadFactory) {
        this.threadFactory = threadFactory;
        executor.lazySet(createExecutor(threadFactory));
    }

它的createExecutor()用于创建工作线程,可以看到通过SchedulerPoolFactory来创建ScheduledExecutorService。

    static ScheduledExecutorService createExecutor(ThreadFactory threadFactory) {
        return SchedulerPoolFactory.create(threadFactory);
    }

在SchedulerPoolFactory类的create(ThreadFactory factory) 中,使用newScheduledThreadPool线程池定义定时器,最大允许线程数为1。

    public static ScheduledExecutorService create(ThreadFactory factory) {
        final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);
        if (exec instanceof ScheduledThreadPoolExecutor) {
            ScheduledThreadPoolExecutor e = (ScheduledThreadPoolExecutor) exec;
            POOLS.put(e, exec);
        }
        return exec;
    }

在SingleScheduler中每次使用ScheduledExecutorService,其实是使用executor.get()。所以说,single拥有一个线程单例。

SingleScheduler会创建一个ScheduledWorker,ScheduledWorker使用jdk的ScheduledExecutorService作为executor。

下面是ScheduledWorker的schedule()方法。使用ScheduledExecutorService的submit()或schedule()来执行runnable。

        @NonNull
        @Override
        public Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
            if (disposed) {
                return EmptyDisposable.INSTANCE;
            }

            Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

            ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, tasks);
            tasks.add(sr);

            try {
                Future<?> f;
                if (delay <= 0L) {
                    f = executor.submit((Callable<Object>)sr);
                } else {
                    f = executor.schedule((Callable<Object>)sr, delay, unit);
                }

                sr.setFuture(f);
            } catch (RejectedExecutionException ex) {
                dispose();
                RxJavaPlugins.onError(ex);
                return EmptyDisposable.INSTANCE;
            }

            return sr;
        }

1.2 ComputationScheduler

ComputationScheduler使用FixedSchedulerPool作为线程池,并且FixedSchedulerPool被AtomicReference包装了一下。

从ComputationScheduler的源码中可以看出,MAX_THREADS是CPU的数目。FixedSchedulerPool可以理解为拥有固定数量的线程池,数量为MAX_THREADS。

static { 
     MAX_THREADS = cap(Runtime.getRuntime().availableProcessors(), Integer.getInteger(KEY_MAX_THREADS, 0));
     ......
}

static int cap(int cpuCount, int paramThreads) {
     return paramThreads <= 0 || paramThreads > cpuCount ? cpuCount : paramThreads;
}

ComputationScheduler会创建一个EventLoopWorker。

    @NonNull
    @Override
    public Worker createWorker() {
        return new EventLoopWorker(pool.get().getEventLoop());
    }

其中,getEventLoop()是FixedSchedulerPool中的方法,返回了FixedSchedulerPool中的一个PoolWorker。

        public PoolWorker getEventLoop() {
            int c = cores;
            if (c == 0) {
                return SHUTDOWN_WORKER;
            }
            // simple round robin, improvements to come
            return eventLoops[(int)(n++ % c)];
        }

PoolWorker继承自NewThreadWorker,它也是线程数为1的ScheduledExecutorService。

1.3 IoScheduler

IoScheduler使用CachedWorkerPool作为线程池,并且CachedWorkerPool也是被AtomicReference包装了一下。

CachedWorkerPool是基于RxThreadFactory这个ThreadFactory来创建的。

static {
        ......
        WORKER_THREAD_FACTORY = new RxThreadFactory(WORKER_THREAD_NAME_PREFIX, priority);
        ......
        NONE = new CachedWorkerPool(0, null, WORKER_THREAD_FACTORY);
       ......
}

在RxThreadFactory中,由 prefix 和 incrementAndGet() 来创建新线程的名称。

    @Override
    public Thread newThread(Runnable r) {
        StringBuilder nameBuilder = new StringBuilder(prefix).append('-').append(incrementAndGet());

        String name = nameBuilder.toString();
        Thread t = nonBlocking ? new RxCustomThread(r, name) : new Thread(r, name);
        t.setPriority(priority);
        t.setDaemon(true);
        return t;
    }

IoScheduler创建的线程数是不固定的,可以通过IoScheduler 的 size() 来获得当前的线程数。而ComputationScheduler的线程数一般情况等于CPU的数目。

    public int size() {
        return pool.get().allWorkers.size();
    }

特别需要的是 ComputationScheduler 和 IoScheduler 都是依赖线程池来维护线程的,区别就是 IoScheduler 线程池中的个数是无限的,由 prefix 和 incrementAndGet() 产生的递增值来决定线程的名字;而 ComputationScheduler 中则是一个固定线程数量的线程池,数据为CPU的数目,并且不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。

同样,IoScheduler也会创建EventLoopWorker。

    @NonNull
    @Override
    public Worker createWorker() {
        return new EventLoopWorker(pool.get());
    }

但是这个EventLoopWorker是IoScheduler的内部类,跟ComputationScheduler创建的EventLoopWorker是不一样的,只是二者的名称相同罢了。

1.4 NewThreadScheduler

NewThreadScheduler会创建NewThreadWorker。我们看到NewThreadWorker的构造函数也是使用SchedulerPoolFactory。

    public NewThreadWorker(ThreadFactory threadFactory) {
        executor = SchedulerPoolFactory.create(threadFactory);
    }

跟SingleScheduler不同的是,SingleScheduler的executor是使用AtomicReference包装的ScheduledExecutorService。每次使用时,会调用executor.get()。

然而,NewThreadScheduler每次都会创建一个新的线程。

1.5 TrampolineScheduler

TrampolineScheduler会创建TrampolineWorker,在TrampolineWorker内部维护着一个PriorityBlockingQueue。任务进入该队列之前,会先用TimedRunnable封装一下。

    static final class TimedRunnable implements Comparable<TimedRunnable> {
        final Runnable run;
        final long execTime;
        final int count; // In case if time between enqueueing took less than 1ms

        volatile boolean disposed;

        TimedRunnable(Runnable run, Long execTime, int count) {
            this.run = run;
            this.execTime = execTime;
            this.count = count;
        }

        @Override
        public int compareTo(TimedRunnable that) {
            int result = ObjectHelper.compare(execTime, that.execTime);
            if (result == 0) {
                return ObjectHelper.compare(count, that.count);
            }
            return result;
        }
    }

我们可以看到TimedRunnable实现了Comparable接口,会比较任务的execTime和count。

任务在进入queue之前,count每次都会+1。

final TimedRunnable timedRunnable = new TimedRunnable(action, execTime, counter.incrementAndGet());
queue.add(timedRunnable);

所以,使用TrampolineScheduler时,每次新的任务都会优先执行。

二. 线程调度

在默认情况下不做任何线程处理,Observable和Observer是处于同一线程中的。如果想要切换线程的话,可以使用subscribeOn()和observeOn()。

2.1 线程调度subscribeOn

subscribeOn通过接收一个Scheduler参数,来指定对数据的处理运行在特定的线程调度器Scheduler上。

若多次执行subscribeOn,则只有一次起作用。

点击subscribeOn()的源码可以看到,每次调用subscribeOn()都会创建一个ObservableSubscribeOn对象。

    public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }

ObservableSubscribeOn真正发生订阅的方法是subscribeActual(Observer<? super T> observer)。

    @Override
    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

        s.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }

其中,SubscribeOnObserver是下游的Observer通过装饰器模式生成的。它实现了Observer、Disposable接口。

接下来,在上游的线程中执行下游Observer的onSubscribe(Disposable disposabel)方法。

s.onSubscribe(parent);

然后,将子线程的操作加入Disposable管理中,加入Disposable后可以方便上下游的统一管理。

parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));

在这里,已经调用对应scheduler的scheduleDirect()方法。scheduleDirect() 传入的是一个Runnable,也就是下面的SubscribeTask。

    final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
            source.subscribe(parent);
        }
    }

SubscribeTask会执行run()对上游的Observable进行订阅。

此时,已经在对应的Scheduler线程中运行了。

source.subscribe(parent);

在RxJava的链式操作中,数据的处理是自下而上,这点跟数据发射正好相反。如果多次调用subscribeOn,最上面的线程切换最晚执行,所以变成了只有第一次切换线程才有效。

2.2 线程调度observeOn

observeOn同样接收一个Scheduler参数,用来指定下游操作运行在特定的线程调度器Scheduler上。

若多次执行observeOn,则每次均起作用,线程会一直切换。

点击observeOn()的源码可以看到,每次调用observeOn()都会创建一个ObservableObserveOn对象。

    public final Observable<T> observeOn(Scheduler scheduler) {
        return observeOn(scheduler, false, bufferSize());
    }

    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }

ObservableObserveOn真正发生订阅的方法是subscribeActual(Observer<? super T> observer)。

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();

            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }

如果scheduler是TrampolineScheduler,上游事件和下游事件会立即产生订阅。

如果不是的话,scheduler会创建自己的Worker,然后上游事件和下游事件产生订阅,生成一个ObserveOnObserver对象包装了下游真正的Observer。

ObserveOnObserver是ObservableObserveOn的内部类,实现了Observer、Runnable接口。跟SubscribeOnObserver不同的是,SubscribeOnObserver实现了Observer、Disposable接口。

在ObserveOnObserver的onNext()中,schedule()执行了具体调度的方法。

        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != QueueDisposable.ASYNC) {
                queue.offer(t);
            }
            schedule();
        }

        void schedule() {
            if (getAndIncrement() == 0) {
                worker.schedule(this);
            }
        }

其中,worker是当前scheduler创建的Worker,this指的是当前的ObserveOnObserver对象,this实现了Runnable接口。

然后,我们看看Runnable接口的实现方法run(),这个方法是在worker对应的线程里执行的。drainNormal()会取出 ObserveOnObserver 的 queue 里的数据进行发送。

        @Override
        public void run() {
            if (outputFused) {
                drainFused();
            } else {
                drainNormal();
            }
        }

下游多次调用observeOn()的话,线程会一直切换。每一次切换线程,都会把对应的Observer对象的各个方法的处理执行在指定的线程中。

三. 示例

举一个多次调用subscribeOn、observeOn的例子。

        Observable.just("HELLO WORLD")
                .subscribeOn(Schedulers.single())
                .map(new Function<String, String>() {
                    @Override
                    public String apply(@NonNull String s) throws Exception {

                        s = s.toLowerCase();
                        L.i("map1",s);
                        return s;
                    }
                })
                .observeOn(Schedulers.io())
                .map(new Function<String, String>() {

                    @Override
                    public String apply(String s) throws Exception {

                        s = s+" tony.";
                        L.i("map2",s);
                        return s;
                    }
                })
                .subscribeOn(Schedulers.computation())
                .map(new Function<String, String>() {

                    @Override
                    public String apply(String s) throws Exception {

                        s = s+"it is a test.";
                        L.i("map3",s);
                        return s;
                    }
                })
                .observeOn(Schedulers.newThread())
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(@NonNull String s) throws Exception {

                        L.i("subscribe",s);
                        System.out.println(s);
                    }
                });

执行结果.png

四. 总结

了解RxJava的线程模型、线程调度器、线程调度是非常有意义的。能够帮助我们更合理地使用RxJava。另外,RxJava的线程切换结合链式调用非常方便,比起Java使用线程操作实在是简单太多了。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏张善友的专栏

通过XML签名和加密更安全地交换数据

作者:Mike Downen、Shawn Farkas 相关技术:XML、.NET Framework、C#、安全性 [摘要]XML签名和XML加密标准目前被...

80310
来自专栏Linyb极客之路

spring系列之自定义扩展PropertyPlaceHolderConfigurer

一、PropertyPlaceHolderConfigurer介绍 主要用于将一些配置信息移出xml文件,移到至properties文件 二、拓展使用 1、将...

9125
来自专栏听雨堂

C#实现微信AES-128-CBC加密数据的解密

小程序登录时,获得用户的信息,只是昵称,无法用作ID。而有用的数据,都加密着,腾讯给出了解密的方法: 加密数据解密算法 接口如果涉及敏感数据(如wx.getUs...

4749
来自专栏余林丰

利用Java提供的Observer接口和Observable类实现观察者模式

对于观察者模式,其实Java已经为我们提供了已有的接口和类。对于订阅者(Subscribe,观察者)Java为我们提供了一个接口,JDK源码如下: 1 pack...

2348
来自专栏草根专栏

Rx.NET 简介

官网: http://reactivex.io/ 它支持基本所有的主流语言. 这里我简单介绍一下Rx.NET. 基本概念和RxJS是一样的. 下面开始切入正题....

3629
来自专栏我的博客

PHP开发微信公共平台(验证token)ZendFramework

define('TOKEN', '3FC50DEAED1083F162BB3D36FF053709'); //这个是TOKEN,...

3604
来自专栏FreeBuf

从小白变RSA大神,附常用工具使用方法及CTF中RSA典型例题

前言 RSA加解密类题型是ctf题中常见题型,考点比较广泛,涉及各种攻击手法,以前在这栽了不少跟头,这里好好总结一下。包括RSA加密原理,RSA常用工具使用方法...

7256
来自专栏xingoo, 一个梦想做发明家的程序员

【手把手教你全文检索】Apache Lucene初探

PS: 苦学一周全文检索,由原来的搜索小白,到初次涉猎,感觉每门技术都博大精深,其中精髓亦是不可一日而语。那小博猪就简单介绍一下这一周的学习历程,仅供各位程...

26510
来自专栏zcqshine's blog

SpringMVC下获取验证码

5078
来自专栏我是攻城师

Java中4大基本加密算法解析

7935

扫码关注云+社区

领取腾讯云代金券