前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >源码阅读--RxJava(二)

源码阅读--RxJava(二)

作者头像
提莫队长
发布2019-02-21 17:43:48
6320
发布2019-02-21 17:43:48
举报
文章被收录于专栏:刘晓杰刘晓杰

这次研究一下rx是如何实现线程切换的 参考: http://gank.io/post/560e15be2dca930e00da1083 http://www.jianshu.com/p/d149043d103a http://www.jianshu.com/p/310726a75045 http://www.jianshu.com/p/1f4867ce3c01

先说明一下几个概念: 1.Scheduler(abstract) 用来执行任务的,子类在io.reactivex.internal.schedulers 有三个内部类:

public abstract static class Worker implements Disposable
static class PeriodicDirectTask implements Runnable, Disposable
static final class DisposeTask implements Runnable, Disposable

大体的调度流程如下: 1.createWorker(这是个abstract方法,需要自己实现) 2.RxJavaPlugins.onSchedule获取要调度的Runnable 3.生成对应的Task(PeriodicDirectTask或者DisposeTask)并执行

举例:子类IoScheduler里面,createWorker的实现如下:

    public Worker createWorker() {
        return new EventLoopWorker(pool.get());
    }
    static final class EventLoopWorker extends Scheduler.Worker

2.Disposable(interface)—-ReferenceDisposable—–io.reactivex.disposables中的其他Disposable 通过Disposables来管理。Disposables通过调用fromXXX来生成对应的Disposable Disposable中dispose方法的具体实现:(说白了就是设置的一种状态)

    public final void dispose() {
        T value = get();
        if (value != null) {
            value = getAndSet(null);
            if (value != null) {
                onDisposed(value);
            }
        }
    }

一、代码示例

先来看上次展示过的代码:

        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                Log.d("所在的线程:",Thread.currentThread().getName());
                Log.d("发送的数据:", 1+"");
                e.onNext(1);----------------ObservableEmitter.onNext=LambdaObserver.onNext=onNext.accept(t);就是最后subscribe设置的Consumer.accept
            }
        })----------------------------这里生成ObservableCreate
        .subscribeOn(Schedulers.io())----------------------------这里生成ObservableSubscribeOn
        .observeOn(AndroidSchedulers.mainThread())----------------------------这里生成ObservableObserveOn
        .subscribe(new Consumer<Integer>() { ----------------------------这里生成LambdaObserver,并且subscribeActual(observer);(这里的observer就是LambdaObserver)
                                            ------------------这里郑重说明subscribe可以实现onNext,onError,onComplete,onSubscribe,但是这里只有实现了onNext,其他全是默认的空
            @Override
            public void accept(Integer integer) throws Exception {
                Log.d("所在的线程:",Thread.currentThread().getName());
                Log.d("接收到的数据:", "integer:" + integer);
            }
        });

二、相关类的说明:

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;-------------------这里的source就是Observable.create中生成的实例

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {---------------observer=LambdaObserver
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);-----------默认为空

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

    static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable {

        final Observer<? super T> observer;

        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }

        @Override
        public void onNext(T t) {--------------相当于LambdaObserver.onNext
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }
    }
}







public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(final Observer<? super T> s) {---------------observer=LambdaObserver
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

        s.onSubscribe(parent);-----------默认为空

        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));-------------------在IoScheduler中调度执行线程,并保存结果,用于后面的判断
    }
}














public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    final boolean delayError;
    final int bufferSize;
    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        this.scheduler = scheduler;-------------------HandlerScheduler(里面有Handler)
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {---------------observer=LambdaObserver
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker(); -----------------------HandlerWorker

            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));------调用subscribeActual
        }
    }

    static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T> implements Observer<T>, Runnable {

        private static final long serialVersionUID = 6576896619930983584L;
        final Observer<? super T> actual;
        final Scheduler.Worker worker;
        final boolean delayError;
        final int bufferSize;

        SimpleQueue<T> queue;

        Disposable s;

        Throwable error;
        volatile boolean done;

        volatile boolean cancelled;

        int sourceMode;

        boolean outputFused;

        ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
            this.actual = actual;
            this.worker = worker;
            this.delayError = delayError;
            this.bufferSize = bufferSize;
        }

        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != QueueDisposable.ASYNC) {
                queue.offer(t);
            }
            schedule();
        }


        void schedule() {
            if (getAndIncrement() == 0) {
                worker.schedule(this);
            }
        }
    }
}

















public final class LambdaObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {

    private static final long serialVersionUID = -7251123623727029452L;
    final Consumer<? super T> onNext;
    final Consumer<? super Throwable> onError;
    final Action onComplete;
    final Consumer<? super Disposable> onSubscribe;

    --------------------本来可以实现onNext,onError,onComplete,onSubscribe,但是这里只有实现了onNext,其他全是默认的空
    public LambdaObserver(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
            Action onComplete,
            Consumer<? super Disposable> onSubscribe) {
        super();
        this.onNext = onNext;
        this.onError = onError;
        this.onComplete = onComplete;
        this.onSubscribe = onSubscribe;
    }

    @Override
    public void onNext(T t) {
        if (!isDisposed()) {
            try {
                onNext.accept(t);
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                get().dispose();
                onError(e);
            }
        }
    }
}

三、没有线程切换时的调用过程

这里写图片描述
这里写图片描述

同步的调用过程较为简单,主要就是注意LambdaObserver在中间起的作用就行,这里我就不配图了

四、异步线程切换的调用过程

这个就比较复杂了,我先解释一下,然后配张过程图

这里写图片描述
这里写图片描述

过程图:

这里写图片描述
这里写图片描述

五、疑问

04-23 21:52:07.397 29549-29549/com.company.rxjavacode E/ObservableObserveOn: subscribeActual
04-23 21:52:08.422 29549-29549/com.company.rxjavacode E/ObservableSubscribeOn: subscribeActual
04-23 21:52:08.452 29549-29751/com.company.rxjavacode E/ObservableCreate: subscribeActual

04-23 21:52:08.453 29549-29751/com.company.rxjavacode E/subscribe: subscribe
04-23 21:52:08.453 29549-29751/com.company.rxjavacode D/所在的线程:: RxCachedThreadScheduler-1
04-23 21:52:08.453 29549-29751/com.company.rxjavacode D/发送的数据:: 1
04-23 21:52:08.453 29549-29751/com.company.rxjavacode E/ObserveOnObserver: onNext
04-23 21:52:08.523 29549-29549/com.company.rxjavacode E/LambdaObserver: onNext
04-23 21:52:08.523 29549-29549/com.company.rxjavacode D/所在的线程:: main
04-23 21:52:08.523 29549-29549/com.company.rxjavacode D/接收到的数据:: integer:1

为什么三个subscribeActual函数是这样的调用顺序?异步的时候一共有两次subscribe,为什么又是这样的调用顺序? 我给出的解释在上面的过程图中,具体对不对也不知道。求路过的大神指点

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2017年04月24日,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、代码示例
  • 二、相关类的说明:
  • 三、没有线程切换时的调用过程
  • 四、异步线程切换的调用过程
  • 五、疑问
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档