前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RxAndroid从零开始学之五(常见操作符与三级缓存)

RxAndroid从零开始学之五(常见操作符与三级缓存)

作者头像
Frank909
发布2019-01-14 17:00:16
5810
发布2019-01-14 17:00:16
举报
文章被收录于专栏:Frank909Frank909

RxAndroid的操作符有很多,本以为写了上一节的一些基本的Operator就可以正常编写代码了,但是后来在github上看googlesample,发现了一些另外的Operator。那么本文就继续介绍这些operator并加上自己的一些理解。

first()

只发射第一个数据项,或者是满足条件的第一个数据项。

这里写图片描述
这里写图片描述
代码语言:javascript
复制
Observable.just(1,2,3,4)
.first()
.subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer integer) {
        System.out.println(integer);     
    }
});

它的输出结果是

1

很简单吧?文章最后,我会讲它的妙用之处。

merge()

合并多个Observeable的发射数据。

这里写图片描述
这里写图片描述

特别指出:它可能将数据项交错发射。如果要按顺序发现需要用到另外一个Operator:concat

代码语言:javascript
复制
Observable<Integer> odds = Observable.just(1, 3, 5).subscribeOn(someScheduler);
Observable<Integer> evens = Observable.just(2, 4, 6);

Observable.merge(odds, evens)
          .subscribe(new Subscriber<Integer>() {
        @Override
        public void onNext(Integer item) {
            System.out.println("Next: " + item);
        }

        @Override
        public void onError(Throwable error) {
            System.err.println("Error: " + error.getMessage());
        }

        @Override
        public void onCompleted() {
            System.out.println("Sequence complete.");
        }
    });

它的输出结果可能为:

Next: 1 Next: 3 Next: 5 Next: 2 Next: 4 Next: 6 Sequence complete.

concat()

concat的字面意思是连接。它的用法如同merge。都是将多个Observable的发射数据合并后再发送,但有一点不同的是,concat()会按原来的顺序发送。

这里写图片描述
这里写图片描述

zip()

zip的字面意思是压缩,那么它压缩了什么呢?

这里写图片描述
这里写图片描述

它也是将多个Observable的发射项结合在一起,而且是真正的结合,严格按照顺序组合发射项,然后一一发送。如1和A组成1A,2和B组成2B。最终发射项的个数等于之前的最少发射项的Observable的发射项个数。

比较抽象,来个例子

代码语言:javascript
复制
Observable a = Observable.just(1,2,3,4,5);
Observable b = Observable.just("a","b","c");
Observable.zip(a,b,
      new Func2<Integer,String,String>(){

          @Override
          public String call(Integer integer, String s) {
              return integer+s;
          }
      }).subscribe(new Action1() {
      @Override
      public void call(Object o) {
          System.out.println(o);
  }
});

输出结果是:

1a 2b 3c

do系列

注册一个动作去监听一系列Observable的生命周期事件,因为对应生命周期,所以有不同的动作。

doOnEach()

注册一个回调,Observable每发送一次数据项,它都将被调用,可以传递Notification参数或者Obsever参数给它的onNext()方法。

doOnNext()

作用同doOnEach(),但是它不接受Notificationr的参数。

代码语言:javascript
复制
Observable.just(1, 2, 3)
      .doOnNext(new Action1<Integer>() {
      @Override
      public void call(Integer item) {
        if( item > 1 ) {
          throw new RuntimeException( "Item exceeds maximum value" );
        }
      }
    }).subscribe(new Subscriber<Integer>() {
    @Override
    public void onNext(Integer item) {
        System.out.println("Next: " + item);
    }

    @Override
    public void onError(Throwable error) {
        System.err.println("Error: " + error.getMessage());
    }

    @Override
    public void onCompleted() {
        System.out.println("Sequence complete.");
    }
});

它的输出结果是:

Next: 1 Error: Item exceeds maximum value

doOnCompleted()

当发射结束时回调。

doOnError()

当发射过程出错时的回调。

doOnSubscribe()

当Observable被订阅时的回调。

实例分析

网上有人写过如何用RxJava去实现三级缓存。 其实 Github上有谷歌开源的Demo 地址是todo-mvp-rxjava 这个demo主要是基于mvp模式下用rxjava的编码方式去实现一个TODO功能的APP. 其中有一段代码很实用,就是所谓的三级缓存功能。 我们都知道所谓三级缓存其实就是

  1. 内存 memorycache
  2. 硬盘 diskcache
  3. 网络 networkdata

程序首先去检索内存中是否缓存数据,如果有则取内存数据。 如果内存无缓存,则去磁盘上查找缓存数据,如果有则取磁盘数据。 如果前两者都没有的话,才去网络请求数据,这样做的目的主要是为了减小客户端应用对服务器的数据请求压力。

好吧,github上相关代码片断如下:

代码语言:javascript
复制
public Observable<Task> getTask(@NonNull final String taskId) {
        checkNotNull(taskId);

        final Task cachedTask = getTaskWithId(taskId);

        // Respond immediately with cache if available
        //(1) 如果有内存缓存
        if (cachedTask != null) {
            return Observable.just(cachedTask);
        }

        // Load from server/persisted if needed.

        // Do in memory cache update to keep the app UI up to date
        if (mCachedTasks == null) {
            mCachedTasks = new LinkedHashMap<>();
        }

        // Is the task in the local data source? If not, query the network.
        //(2) 获取磁盘上的缓存
        Observable<Task> localTask = getTaskWithIdFromLocalRepository(taskId);
        // (3) 获取远程的数据任务
        Observable<Task> remoteTask = mTasksRemoteDataSource
                .getTask(taskId)
                .doOnNext(new Action1<Task>() {
                    @Override
                    public void call(Task task) {
                        mTasksLocalDataSource.saveTask(task);
                        mCachedTasks.put(task.getId(), task);
                    }
                });
    // (4) 合并任务,并排队执行
        return Observable.concat(localTask, remoteTask).first()
                .map(new Func1<Task, Task>() {
                    @Override
                    public Task call(Task task) {
                        if (task == null) {
                            throw new NoSuchElementException("No task found with taskId " + taskId);
                        }
                        return task;
                    }
                });
    }

可以看见,这里面用到的操作符有Concat,first,doOnNext,map. 特别是

代码语言:javascript
复制
return Observable.concat(localTask, remoteTask).first()
.map(new Func1<Task, Task>() {
    @Override
    public Task call(Task task) {
        if (task == null) {
            throw new NoSuchElementException("No task found with taskId " + taskId);
        }
        return task;
    }
});

因为concat是将localTask和remoteTask发射项合并,然后顺序执行发射。并且localTask的顺序在前。

而first()的存在让事情变得有趣,如果localTask有效取得了数据,则remoteTask则不会执行,否则remoteTask执行。加上之前的cacheTask判断,三级缓存便形成了。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2016年09月12日,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • first()
  • merge()
  • concat()
  • zip()
  • do系列
    • doOnEach()
      • doOnNext()
        • doOnCompleted()
          • doOnError()
            • doOnSubscribe()
            • 实例分析
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档