RxAndroid的操作符有很多,本以为写了上一节的一些基本的Operator就可以正常编写代码了,但是后来在github上看googlesample,发现了一些另外的Operator。那么本文就继续介绍这些operator并加上自己的一些理解。
只发射第一个数据项,或者是满足条件的第一个数据项。
Observable.just(1,2,3,4)
.first()
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println(integer);
}
});
它的输出结果是
1
很简单吧?文章最后,我会讲它的妙用之处。
合并多个Observeable的发射数据。
特别指出:它可能将数据项交错发射。如果要按顺序发现需要用到另外一个Operator:concat
Observable<Integer> odds = Observable.just(1, 3, 5).subscribeOn(someScheduler);
Observable<Integer> evens = Observable.just(2, 4, 6);
Observable.merge(odds, evens)
.subscribe(new Subscriber<Integer>() {
@Override
public void onNext(Integer item) {
System.out.println("Next: " + item);
}
@Override
public void onError(Throwable error) {
System.err.println("Error: " + error.getMessage());
}
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
});
它的输出结果可能为:
Next: 1 Next: 3 Next: 5 Next: 2 Next: 4 Next: 6 Sequence complete.
concat的字面意思是连接。它的用法如同merge。都是将多个Observable的发射数据合并后再发送,但有一点不同的是,concat()会按原来的顺序发送。
zip的字面意思是压缩,那么它压缩了什么呢?
它也是将多个Observable的发射项结合在一起,而且是真正的结合,严格按照顺序组合发射项,然后一一发送。如1和A组成1A,2和B组成2B。最终发射项的个数等于之前的最少发射项的Observable的发射项个数。
比较抽象,来个例子
Observable a = Observable.just(1,2,3,4,5);
Observable b = Observable.just("a","b","c");
Observable.zip(a,b,
new Func2<Integer,String,String>(){
@Override
public String call(Integer integer, String s) {
return integer+s;
}
}).subscribe(new Action1() {
@Override
public void call(Object o) {
System.out.println(o);
}
});
输出结果是:
1a 2b 3c
注册一个动作去监听一系列Observable的生命周期事件,因为对应生命周期,所以有不同的动作。
注册一个回调,Observable每发送一次数据项,它都将被调用,可以传递Notification参数或者Obsever参数给它的onNext()方法。
作用同doOnEach(),但是它不接受Notificationr的参数。
Observable.just(1, 2, 3)
.doOnNext(new Action1<Integer>() {
@Override
public void call(Integer item) {
if( item > 1 ) {
throw new RuntimeException( "Item exceeds maximum value" );
}
}
}).subscribe(new Subscriber<Integer>() {
@Override
public void onNext(Integer item) {
System.out.println("Next: " + item);
}
@Override
public void onError(Throwable error) {
System.err.println("Error: " + error.getMessage());
}
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
});
它的输出结果是:
Next: 1 Error: Item exceeds maximum value
当发射结束时回调。
当发射过程出错时的回调。
当Observable被订阅时的回调。
网上有人写过如何用RxJava去实现三级缓存。 其实 Github上有谷歌开源的Demo 地址是todo-mvp-rxjava 这个demo主要是基于mvp模式下用rxjava的编码方式去实现一个TODO功能的APP. 其中有一段代码很实用,就是所谓的三级缓存功能。 我们都知道所谓三级缓存其实就是
程序首先去检索内存中是否缓存数据,如果有则取内存数据。 如果内存无缓存,则去磁盘上查找缓存数据,如果有则取磁盘数据。 如果前两者都没有的话,才去网络请求数据,这样做的目的主要是为了减小客户端应用对服务器的数据请求压力。
好吧,github上相关代码片断如下:
public Observable<Task> getTask(@NonNull final String taskId) {
checkNotNull(taskId);
final Task cachedTask = getTaskWithId(taskId);
// Respond immediately with cache if available
//(1) 如果有内存缓存
if (cachedTask != null) {
return Observable.just(cachedTask);
}
// Load from server/persisted if needed.
// Do in memory cache update to keep the app UI up to date
if (mCachedTasks == null) {
mCachedTasks = new LinkedHashMap<>();
}
// Is the task in the local data source? If not, query the network.
//(2) 获取磁盘上的缓存
Observable<Task> localTask = getTaskWithIdFromLocalRepository(taskId);
// (3) 获取远程的数据任务
Observable<Task> remoteTask = mTasksRemoteDataSource
.getTask(taskId)
.doOnNext(new Action1<Task>() {
@Override
public void call(Task task) {
mTasksLocalDataSource.saveTask(task);
mCachedTasks.put(task.getId(), task);
}
});
// (4) 合并任务,并排队执行
return Observable.concat(localTask, remoteTask).first()
.map(new Func1<Task, Task>() {
@Override
public Task call(Task task) {
if (task == null) {
throw new NoSuchElementException("No task found with taskId " + taskId);
}
return task;
}
});
}
可以看见,这里面用到的操作符有Concat,first,doOnNext,map. 特别是
return Observable.concat(localTask, remoteTask).first()
.map(new Func1<Task, Task>() {
@Override
public Task call(Task task) {
if (task == null) {
throw new NoSuchElementException("No task found with taskId " + taskId);
}
return task;
}
});
因为concat是将localTask和remoteTask发射项合并,然后顺序执行发射。并且localTask的顺序在前。
而first()的存在让事情变得有趣,如果localTask有效取得了数据,则remoteTask则不会执行,否则remoteTask执行。加上之前的cacheTask判断,三级缓存便形成了。