专栏首页振兴的Android修炼手册关于RxJava的基础心法解析

关于RxJava的基础心法解析

前言

我接触Rxjava是在2015年底,已经过去4年的时间了。

2016年学习过一阵子RxJava的操作符也做过一些笔记,我们项目的网络请求框架也替换成了Okhttp+Retrofit,所以使用RxJava做线程间切换就非常好用。

一开始接触RxJava感觉除了线程切换之外很能发现其实际的作用,因为我感觉自己响应式编程的思想,很难实际运用到开发需求当中去。但我身边有一位前辈使用Rxjava非常溜,他一般做需求的时候写的都是流式的代码。

2017年Kotlin语言Google举行的I/O开发者大会上宣布,将Kotlin语言作为安卓开发的一级编程语言,所以自己又看了看了Kotlin语言。

RxJava在我们项目中还是静静的躺着,因为自己懒的思考,懒的在代码结构上做更新,懒的对RxJava做研究。有时候感觉自己就算会了RxJava也不会将其使用在项目当中,因为自己什么业务场景之下使用Rxjava更加方便。

2018就这么有一下没一下的使用RxJava,最近在做需求开发的时候用的RxJava比较多了,一些业务场景也逐渐思考使用响应式编程。思考这样写的好处,以及怎么将之前的代码结构转化为流式结构。

感觉有时候思维观念的转变是一个漫长的过程,但有时候又会很快。凡事都可以熟能生巧,我们使用RxJava多了之后再笨也会思考。之前想不到RxJava的使用场景是因为自己见的、写的代码还不够多。

今天回过头来从代码的角度看看一次RxJava 的基础操作,事件订阅到触发的过程。

这里推荐一篇RxJava的入门的文章 给 Android 开发者的 RxJava 详解

读完本篇文章希望所有读者能明白RxJava的观察者与java的观察者模式有什么不同,以及Rxjava的观察者模式的代码运行过程。至于怎么具体的使用 Rxjava 那么就需要更多学习和实践了。

Java的观察者模式

观察者:Observer

被观察者:Observable

被观察者提供添加(注册)观察者的方法; 被观察者更新的同时可以主动通知注册它观察者更新;

观察者模式面向的需求是:收音机听广播,电台是被观察者,收音机是观察者。收音机调频到广播的波段(注册),广播发送信息(被观察者更新数据,通知所有的观察者)收音机接受信息从而播放声音(观察者数据更新)。

RxJava的观察者模式

可观察者(被观察者):Observalbe

观察者:Observer

订阅操作:subscribe()

订阅:Subscription

订阅者:Subscriber ,实现 ObserverSubscription

发布: OnSubscribe

ObservableSubscriber 通过 subscribe() 方法实现订阅关系,从而 Observable 在被订阅之后就会触发 OnSubscribe.call 进行发布事件来通知 Subscriber

RxJavaObservable.png

分析源代码

我们先看一个简单的打印数据的例子:

Observable.just(1)
    .subscribe(new Subscriber<Object>() {
        @Override
        public void onCompleted() {
            System.out.println("onCompleted");
        }
        @Override
        public void onError(Throwable e) {
            System.out.println("onError");
        }
        @Override
        public void onNext(Object o) {
            System.out.println("onNext");
            System.out.println(Integer.valueOf(o.toString()));
        }
    });

RxJava 的版本已经更新到2了,我们现在还用的是版本1。版本1中1.0和1.3这两个版本用的比较多。但这两个RxJava 版本之前改动不是很大,我们来分析分析最初始的版本,主要看看其中的设计思想啥的~!

谁触发了被观察者

我们进行了 subscribe 之后就会触发 Observable 的执行动作,然后将执行结果传输给订阅它的 Subscriber

//无参的subscribe
public final Subscription subscribe() {
    return subscribe(new Subscriber<T>() {
        @Override
        public final void onCompleted() {}
        @Override
        public final void onError(Throwable e) {throw new OnErrorNotImplementedException(e);}
        @Override
        public final void onNext(T args) {}
    });
}
/***部分代码省略***/
//onnext、onerror、oncomplete参数
public final Subscription subscribe(final Action1<? super T> onNext, final Action1<Throwable> onError, final Action0 onComplete) {
    if (onNext == null) {
        throw new IllegalArgumentException("onNext can not be null");
    }
    if (onError == null) {
        throw new IllegalArgumentException("onError can not be null");
    }
    if (onComplete == null) {
        throw new IllegalArgumentException("onComplete can not be null");
    }
    return subscribe(new Subscriber<T>() {
        @Override
        public final void onCompleted() {onComplete.call();}
        @Override
        public final void onError(Throwable e) {onError.call(e);}
        @Override
        public final void onNext(T args) {onNext.call(args);}
    });
}
//observer参数
public final Subscription subscribe(final Observer<? super T> observer) {
    return subscribe(new Subscriber<T>() {
        @Override
        public void onCompleted() {observer.onCompleted();}
        @Override
        public void onError(Throwable e) {observer.onError(e);}
        @Override
        public void onNext(T t) {observer.onNext(t);}
    });
}
//Subscriber参数
public final Subscription subscribe(Subscriber<? super T> subscriber) {
    if (subscriber == null) {
        throw new IllegalArgumentException("observer can not be null");
    }
    if (onSubscribe == null) {
        throw new IllegalStateException("onSubscribe function can not be null.");
    }
    //调用订阅者的start方法
    subscriber.onStart();
    if (!(subscriber instanceof SafeSubscriber)) {
        subscriber = new SafeSubscriber<T>(subscriber);
    }
    try {
        //Observable的OnSubscribe调用call方法
        hook.onSubscribeStart(this, onSubscribe).call(subscriber);
        return hook.onSubscribeReturn(subscriber);
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        try {
            //调用订阅者的onError方法
            subscriber.onError(hook.onSubscribeError(e));
        } catch (OnErrorNotImplementedException e2) {
            throw e2;
        } catch (Throwable e2) {
            RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
            hook.onSubscribeError(r);
            throw r;
        }
        return Subscriptions.unsubscribed();
    }
}
private static final RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook();

class RxJavaObservableExecutionHookDefault extends RxJavaObservableExecutionHook {
    private static RxJavaObservableExecutionHookDefault INSTANCE = new RxJavaObservableExecutionHookDefault();
    public static RxJavaObservableExecutionHook getInstance() {
        return INSTANCE;
    }
}
public abstract class RxJavaObservableExecutionHook {
    public <T> OnSubscribe<T> onSubscribeStart(Observable<? extends T> observableInstance, final OnSubscribe<T> onSubscribe) {
        // pass-thru by default
        return onSubscribe;
    }
}

RxJavaObservableExecutionHookonSubscribeStart 可以看出 hook.onSubscribeStart(this, onSubscribe).call(subscriber); 实际是 onSubscribe.call(subscirber)

开始你的表演:Observable.OnSubscribe.call

刚才我们了解到通过 subscirbe 可以通知被观察者进行 call 操作。

public class Observable<T> {
    protected Observable(OnSubscribe<T> f) {
        this.onSubscribe = f;
    }
    /***部分代码省略***/
    public final static <T> Observable<T> just(final T value) {
        return ScalarSynchronousObservable.create(value);
    }
}

public final class ScalarSynchronousObservable<T> extends Observable<T> {
    public static final <T> ScalarSynchronousObservable<T> create(T t) {
        return new ScalarSynchronousObservable<T>(t);
    }
    private final T t;
    protected ScalarSynchronousObservable(final T t) {
        super(new OnSubscribe<T>() {
            @Override
            public void call(Subscriber<? super T> s) {
                s.onNext(t);
                s.onCompleted();
            }
        });
        this.t = t;
    }
    public T get() {
        return t;
    }
}

我们最后看到 Observable.just(1) 生成的 Observable 实际是 ScalarSynchronousObservable 实例。 Observable.OnSubscribe 实际上是:

new OnSubscribe<T>() {
    @Override
    public void call(Subscriber<? super T> s) {
        s.onNext(t);
        s.onCompleted();
    }
}

所以 onSubscribe.call(subscirber) 最终调用的是了 subscirberonNext和onCompleted 方法。

总结

对于Android开发人员开发来说 RxJava 是一个很好用的库,但是需要我们转化平时的对代码结构设计的思想,能很好的去使用到大部分的业务场景之中。只有对 RxJava 有了足够的了解我们才能灵活、熟练的使用。

本篇文章只是一个 RxJava 简单的基础开篇,观察者:Observer订阅操作:subscribe()订阅:Subscription订阅者:Subscriber 以及 ObserverSubscription 的订阅关系,之后我会慢慢的学习和分享关于 RxJava 更多的知识。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Multidex记录二:缺陷&解决

    为什么要用记录呢,因为我从开始接触Android时我们的项目就在65535的边缘。不久Google就出了multidex的解决方案。我们也已经接入multide...

    静默加载
  • Android网络之Retrofit2.0使用和解析

    javacompile 'com.squareup.retrofit2:retrofit:2.0.1'

    静默加载
  • 解决6.5.16及以上版本微信内部M页不能唤起APP

    最近微信唤起app的数据急速下降,产品同学告诉我们大事来了,微信不能唤起Android的App了!!

    静默加载
  • SVD在推荐系统中的应用

    参考自:http://www.igvita.com/2007/01/15/svd-recommendation-system-in-ruby/

    AIHGF
  • 观察者模式

    tanoak
  • 其他流---基本数据处理流

    shimeath
  • WCF入门(10)

    公司是做乙方的,工资还凑合,主要是项目基本都已完成,进去就是干维护,体会不到那种从头彻尾的成就感。项目中具体用了EF+Ado.net+WCF+WPF+(VB.n...

    _淡定_
  • 微信小程序开发入门篇

    本文档将带你一步步创建完成一个微信小程序,并可以在手机上体验该小程序的实际效果。 开发准备工作 获取微信小程序的 AppID 登录 https://mp.wei...

    xiangzhihong
  • 从一道面试题简单谈谈发布订阅和观察者模式

    今天的话题是javascript中常被提及的「发布订阅模式和观察者模式」,提到这,我不由得想起了一次面试。记得在去年的一次求职面试过程中,面试官问我,“你在项目...

    Tusi
  • 原 荐 SpringBoot 2.0 系列0

    石奈子

扫码关注云+社区

领取腾讯云代金券