Android原生的多线程和异步
处理简直糟透了,反复的嵌套让代码看起来十分不明了,多线程上也没有iOS
的dispatch
好用,但是用了Rxjava
后就会有所改善,虽然代码量看起来会多一点,但是逻辑
就清晰
多了
本文代码对应的是Rxjava2
总的来说Rxjava
可以分为5块内容 分别为
形象的来说
发布者
就相当于 报社
订阅者
就相当于 用户
中转站
就相当于 报亭
它既是订阅者
又是发布者
线程
是指定在哪个线程上处理 操作符
则是把发布者的数据进行处理,再给订阅者在发布者和订阅者之间传递的事件总共有三种
onNext()
: 发送事件的数据onCompleted()
: 事件队列完结。RxJava
不仅把每个事件单独处理,还会把它们看做一个队列。RxJava
规定,当不会再有新的 onNext()
发出时,需要触发 onCompleted()
方法作为标志。onError()
: 事件队列异常。在事件处理过程中出异常时,onError()
会被触发,同时队列自动终止,不允许再有事件发出。onCompleted()
和 onError()
有且只有一个,并且是事件序列中的最后一个。需要注意的是,onCompleted()
和 onError()
二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。下面就说一下各块内容
对比
Observable
不支持背压(backpressure)
Flowable
是Rxjava2新增加的支持背压(backpressure)
背压(backpressure)
:只有上下游运行在各自的线程中,且上游发射数据速度大于下游接收处理数据的速度时,才会产生背压问题。
如果上游发送数据速度远大于下游接收数据的速度
用Observable
就会内存溢出
Flowable
则会抛弃掉处理不了的数据来防止溢出
但是不能就都用Flowable
因为Observable
的性能较高
Observable.just(1).toSingle()
Observable.just(1).toCompletable()
发布者发布事件 可以手动创建也可以调用内置方法
Observable
.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
}
})
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
Flowable
.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> emitter) throws Exception {
}
}, BackpressureStrategy.DROP)
.subscribe(new FlowableSubscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
}
@Override
public void onNext(String s) {
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
});
Single
.create(new SingleOnSubscribe<String>() {
@Override
public void subscribe(SingleEmitter<String> emitter) throws Exception {
}
})
.subscribe(new SingleObserver<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onSuccess(String s) {
}
@Override
public void onError(Throwable e) {
}
});
Completable
.create(new CompletableOnSubscribe() {
@Override
public void subscribe(CompletableEmitter emitter) throws Exception {
}
})
.subscribe(new CompletableObserver() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onComplete() {
}
@Override
public void onError(Throwable e) {
}
});
Observer/FlowableOnSubscribe/SingleOnSubscribe/CompletableOnSubscribe/Consumer/Subscriber
发布者 | 订阅者 |
---|---|
Observable | Observer/Consumer |
Flowable | FlowableOnSubscribe/Subscriber/Consumer |
Single | SingleObserver/Consumer/BiConsumer |
Completable | CompletableObserver/Action |
创建
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};
订阅
observable.subscribe(observer);
注意上面方法的顺序 看上去是发布者
订阅了订阅者
,之所以这样是因为链式代码的优雅
常用的方式是分线程
中处理数据,主线程
中使用数据生成页面
Observable
.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("发送的数据");
emitter.onComplete();
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
名称 | 解析 |
---|---|
amb()ambArrayambWith | 给定多个Observable,只让第一个发射数据的Observable发射全部数据 |
defaultIfEmpty() | 发射来自原始Observable的数据,如果原始Observable没有发射数据,就发射一个默认数据 |
switchIfEmpty() | 如果原始Observable没有发射数据,它发射一个备用Observable的发射物 |
skipUntil() | 跳过原始Observable发射的数据,直到第二个Observable发射了一个数据,然后发射原始Observable的剩余数据 |
skipWhile() | 判断成功的都跳过 一旦为假 发送剩余的所有数据 |
takeUntil() | 发送为真包括以前的数据 不再处理后续数据 |
takeWhile() | 发送为真的数据 一旦为假就不再处理后续数据 |
参见面发布者部分
Observable observable = Observable.just("好好学习", "天天向上");
// 将会依次调用:
// onNext("好好学习");
// onNext("天天向上");
// onCompleted();
Observable.range(1,10);
String[] quotations = {"好好学习", "天天向上"};
Observable observable = Observable.fromArray(quotations);
//延迟10s每10s发送一次
Observable.interval(10,10, TimeUnit.SECONDS);
//延迟10s发送一次
Observable.timer(10,TimeUnit.SECONDS);
throttleFirst
操作符:仅发送指定时间段内的第一个信号
throttleLast
操作符:仅发送指定时间段内的第一个信号
RxView.clicks(mBtn)
.throttleFirst(1, TimeUnit.SECONDS);
指定时间段内没有新的信号时 则发出最后一个信号
比如监听文本变化进行搜索
RxTextView.textChanges(etKey)
.debounce(400, TimeUnit.MILLISECONDS, AndroidSchedulers.mainThread());
类型变换
String[] strs = {"11","22","33"};
Observable
.fromArray(strs)
.map(new Function<String, Integer>() {
@Override
public Integer apply(String s) throws Exception {
return Integer.valueOf(s);
}
})
.subscribe(observer);
concatMap(): 这是一个很有用但非常难理解的变换。
首先假设这么一种需求:上面的{"11","22","33"}
我们像最终获取到1,1,2,2,3,3
String[] strs = {"11","22","33"};
Observable
.fromArray(strs)
.concatMap(new Function<String, ObservableSource<Integer>>() {
private Subject<Integer> subject = PublishSubject.create();
@Override
public ObservableSource<Integer> apply(String s) throws Exception {
for (char c:s.toCharArray()){
subject.onNext(Integer.valueOf(""+c));
}
subject.onComplete();
return subject;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
}
});
用 map()
显然是不行的,因为 map()
是一对一的转化,而我现在的要求是一对多的转化,就需要用 flatMap()
了
Kotlin
Observable
.create<Int> {
for (i in 0 until 4) {
it.onNext(i)
}
it.onComplete()
}
.concatMap {
L.i("concatMap:${it}")
var value = it
return@concatMap Observable.create<String> {
Thread.sleep((Math.random() * 1000).toLong())
it.onNext("${value + 100}")
it.onComplete()
}
}.subscribe({
L.i("最后:${it}")
}, {
}, {
L.i("事件完成")
})
注意
concatMap
中一定要发送onComplete
事件
flatMap和concatMap最大的区别是concatMap发射的数据集是有序的,flatMap发射的数据集是无序的
过滤
假如我们要大于5的数
Integer[] nums = {3, 4, 5, 6, 7};
Observable
.fromArray(nums)
.filter(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
if (integer < 5) {
return false;
} else {
return true;
}
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
}
});
当未发送onNext
直接发送onComplete
时 onNext
收到的默认值
Observable
.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onComplete();
}
})
.defaultIfEmpty("默认数据")
.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
Log.e(TAG, "accept: " + s);
}
});
如果发射源没有发射数据就完成了,就发射switchIfEmpty里面新的Observable发射源
Observable
.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onComplete();
}
})
.switchIfEmpty(Observable.just("a", "b", "c"))
.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
Log.e(TAG, "accept: " + s);
}
});
Observable
.zip(
Observable.just("100", "200"),
Observable.just("1","2","3"),
new BiFunction<String, String, Integer>() {
@Override
public Integer apply(String s, String s2) throws Exception {
return Integer.valueOf(s) + Integer.valueOf(s2);
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
L.i(""+integer);
}
});
上面的代码会收到 101、202
也就是说多个Observable
都发送时 才处理数据
给定多个Observable,只让第一个发射数据的Observable发射全部数据。
take
//取前两个信号
.take(2);
//取后两个信号
.takeLast(2);
//取前1s的信号
.take(1,TimeUnit.SECONDS);
//取后1s的信号
.takeLast(1,TimeUnit.SECONDS);
takeWhile
//发送为真的数据 一旦为假就不再处理后续数据
//会收到1、2
Observable
.just(1, 2, 3,2)
.takeWhile(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer<3;
}
});
takeUntil
//发送为真包括以前的数据 不再处理后续数据
//会收到1、2
Observable
.just(1, 2, 3,2)
.takeUntil(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer >1;
}
})
//获取原始Observable发射的数据,直到第二个Observable发射了一个数据,不再发送原始Observable的剩余数据
//会收到1,2,3,2
Observable
.just(1, 2, 3,2)
.takeUntil(Observable.just("3").delay(1,TimeUnit.SECONDS))
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
L.i("" + integer);
}
});
skip
//取前两个信号
.skip(2);
//取后两个信号
.skipLast(2);
//取前1s的信号
.skip(1,TimeUnit.SECONDS);
//取后1s的信号
.skipLast(1,TimeUnit.SECONDS);
skipWhile:判断成功的都跳过 一旦为假 发送剩余的所有数据
Observable
.just(1, 2, 3,2)
.skipWhile(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer<2;
}
})
会收到2、3、2 判断成功的都跳过 一旦为假 发送剩余的所有数据
skipUntil:跳过原始Observable发射的数据,直到第二个Observable发射了一个数据,然后发射原始Observable的剩余数据
Observable
.just(1, 2, 3,2)
.skipUntil(Observable.just("3").delay(1,TimeUnit.SECONDS))
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
L.i("" + integer);
}
});
收不到数据 因为第二个Observable延迟1s结束后 原始Observable已经没有剩余数据了
Rxjava和Rxjava2对比
io.reactivex.subjects.AsyncSubject
,
io.reactivex.subjects.BehaviorSubject
,
io.reactivex.subjects.PublishSubject
,
io.reactivex.subjects.ReplaySubject
,
io.reactivex.subjects.UnicastSubject
在RxJava2中依然存在,但现在他们不支持backpressure
。
新出现的
io.reactivex.processors.AsyncProcessor
,
io.reactivex.processors.BehaviorProcessor
,
io.reactivex.processors.PublishProcessor
,
io.reactivex.processors.ReplayProcessor
io.reactivex.processors.UnicastProcessor
支持backpressure
Subject 在平时开发时 用的不是很多
它分为四种
用法如下
observable.subscribe(subject);
subject.subscribe(observer);
区别
假如发布者
也就是报社 只发布周一到周五
的报纸 一天一份
如果我们在周三
早上来报厅订报
PublishSubject
我们可以收到 周三 周四 周五
的报纸
BehaviorSubject
我们可以收到 周二 至 周五
的报纸
ReplaySubject
我们可以收到 周一 至 周五
的报纸
AsyncSubject
我们可以收到 周五
的报纸 但是发布的事件中如果有错误
那我们只会接受到错误
而不是错误的前一个事件
添加依赖
implementation 'io.reactivex.rxjava2:rxandroid:2.0.2'
implementation 'io.reactivex.rxjava2:rxjava:2.1.10'
implementation 'com.jakewharton.rxbinding2:rxbinding:2.1.1'
implementation 'com.trello.rxlifecycle2:rxlifecycle:2.2.1'
implementation 'com.trello.rxlifecycle2:rxlifecycle-components:2.2.1'
//网络请求库
implementation 'com.lzy.net:okgo:3.0.4'
implementation 'com.lzy.net:okrx2:2.0.2'
//JSON转换
implementation 'com.alibaba:fastjson:1.2.46'