源码阅读--RxJava(一)

所有有关RxJava,RxAndroid的介绍性文章在这里贴出一二: http://blog.csdn.net/caroline_wendy/article/details/50444461 http://frodoking.github.io/2015/09/08/reactivex/

RxJava最核心的两个东西是Observables(被观察者,事件源)和Subscribers(观察者)。Observables发出一系列事件,Subscribers处理这些事件。这里的事件可以是任何你感兴趣的东西(触摸事件,web接口调用返回的数据。。。)

一个Observable可以发出零个或者多个事件,知道结束或者出错。每发出一个事件,就会调用它的Subscriber的onNext方法,最后调用Subscriber.onNext()或者Subscriber.onError()结束。

Rxjava的看起来很想设计模式中的观察者模式,但是有一点明显不同,那就是如果一个Observerble没有任何的的Subscriber,那么这个Observable是不会发出任何事件的。

用法:

创建一个Observable对象

Observable<String> myObservable = Observable.create(
    new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> sub) {
            sub.onNext("Hello, world!");
            sub.onCompleted();
        }
    }
);

这里定义的Observable对象仅仅发出一个Hello World字符串,然后就结束了。接着我们创建一个Subscriber来处理Observable对象发出的字符串。

Subscriber<String> mySubscriber = new Subscriber<String>() {
    @Override
    public void onNext(String s) { System.out.println(s); }

    @Override
    public void onCompleted() { }

    @Override
    public void onError(Throwable e) { }
};

这里subscriber仅仅就是打印observable发出的字符串。通过subscribe函数就可以将我们定义的myObservable对象和mySubscriber对象关联起来,这样就完成了subscriber对observable的订阅。

myObservable.subscribe(mySubscriber);

一旦mySubscriber订阅了myObservable,myObservable就是调用mySubscriber对象的onNext和onComplete方法,mySubscriber就会打印出Hello World!

源代码

那么我们先看Observable.create这个方法:

    public final static <T> Observable<T> create(OnSubscribe<T> f) {
        return new Observable<T>(hook.onCreate(f));
    }

先解释一下hook

private static final RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook();//相当于一个单例

再看hook.onCreate(f):

    public <T> OnSubscribe<T> onCreate(OnSubscribe<T> f) {
        return f;
    }

注意看,就是return f;也就是说Observable.create返回的就是构造函数的参数对应的一个实例

然后再看myObservable.subscribe(mySubscriber)函数

    public final Subscription subscribe(Subscriber<? super T> subscriber) {
        return Observable.subscribe(subscriber, this);
    }

再看subscribe函数

    private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
        ......

        // new Subscriber so onStart it    该函数的实现为空
        subscriber.onStart();

        // The code below is exactly the same an unsafeSubscribe but not used because it would 
        // add a significant depth to already huge call stacks.
        try {
            // allow the hook to intercept and/or decorate
            hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
            //***************************相当于后面的参数.call,也就是执行构造函数中实例化的call函数
            return hook.onSubscribeReturn(subscriber);
        } catch (Throwable e) {
            ......
            return Subscriptions.unsubscribed();
        }
    }

    public <T> OnSubscribe<T> onSubscribeStart(Observable<? extends T> observableInstance, final OnSubscribe<T> onSubscribe) {
        // pass-thru by default
        return onSubscribe;
    }

    //这是个interface,在create中实例化
    public interface Action1<T> extends Action {
        void call(T t);
    }

操作符:map,flatMap,take

(1)map

    public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
        return lift(new OperatorMap<T, R>(func));
    }

    public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
        return new Observable<R>(new OnSubscribe<R>() {
            @Override//***********当调用subscribe函数时调用这个call函数
            public void call(Subscriber<? super R> o) {
                try {
                    Subscriber<? super T> st = hook.onLift(operator).call(o);
                    //******************onLift返回一个lift。这个call又是一个接口,需要自己实现
                    try {
                        // new Subscriber created and being subscribed with so 'onStart' it
                        st.onStart();//空
                        onSubscribe.call(st);//subscribe一个Action,map一个Func
                    } catch (Throwable e) {
                        ......
                    }
                } catch (Throwable e) {
                    ......
                }
            }
        });
    }

    /**
     * Operator function for lifting into an Observable.
     */
    public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<? super T>> {
        // cover for generics insanity
    }

    //自己实现
    public interface Func1<T, R> extends Function {
        R call(T t);
    }

(2)flatMap

    public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func) {
        if (getClass() == ScalarSynchronousObservable.class) {
            return ((ScalarSynchronousObservable<T>)this).scalarFlatMap(func);
        }
        return merge(map(func));//我们只需要看merge函数了
    }

    public final static <T> Observable<T> merge(Observable<? extends Observable<? extends T>> source) {
        if (source.getClass() == ScalarSynchronousObservable.class) {
            return ((ScalarSynchronousObservable<T>)source).scalarFlatMap((Func1)UtilityFunctions.identity());
        }
        //map返回一个Observable,相当于还是调用Observable.lift函数
        return source.lift(OperatorMerge.<T>instance(false));
    }

(3)take

    public final Observable<T> take(final int count) {
        return lift(new OperatorTake<T>(count));
    }

你会发现所有的操作符都会调用lift函数,只是参数不一样。你进入各个参数类查看,它们都继承自Operator

总结:

create实例化new Observable.OnSubscribe(),在subscribe函数中调用这个类的call函数,在这个call函数中调用什么函数,就在相对应的接收者函数中实现。 所有的操作符都会调用lift这个函数,只是参数不同,这可以通过多态来实现 简而言之,这个库就是由观察者模式+多态来实现的。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

扫码关注云+社区

领取腾讯云代金券