RxBus作为Android组件间通信工具,简单方便十分受欢迎。
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
compile 'io.reactivex.rxjava2:rxjava:2.1.3'
public class RxBus {
private HashMap<String, CompositeDisposable> mSubscriptionMap;
private static volatile RxBus mRxBus;
private final Subject<Object> mSubject;
//单列模式
public static RxBus getIntanceBus(){
if (mRxBus==null){
synchronized (RxBus.class){
if(mRxBus==null){
mRxBus = new RxBus();
}
}
}
return mRxBus;
}
public RxBus(){
//转换成一个线程安全的Subject对象
mSubject = PublishSubject.create().toSerialized();
}
public void post(Object o){
mSubject.onNext(o);
}
/**
* 返回指定类型的带背压的Flowable实例
*
* @param <T>
* @param type
* @return
*/
public <T>Flowable<T> getObservable(Class<T> type){
return mSubject.toFlowable(BackpressureStrategy.BUFFER)
.ofType(type);
}
/**
* 一个默认的订阅方法
*
* @param <T>
* @param type
* @param next
* @param error
* @return
*/
public <T> Disposable doSubscribe(Class<T> type, Consumer<T> next, Consumer<Throwable> error){
return getObservable(type)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(next,error);
}
/**
* 是否已有观察者订阅
*
* @return
*/
public boolean hasObservers() {
return mSubject.hasObservers();
}
/**
* 保存订阅后的disposable
* @param o
* @param disposable
*/
public void addSubscription(Object o, Disposable disposable) {
if (mSubscriptionMap == null) {
mSubscriptionMap = new HashMap<>();
}
String key = o.getClass().getName();
if (mSubscriptionMap.get(key) != null) {
mSubscriptionMap.get(key).add(disposable);
} else {
//一次性容器,可以持有多个并提供 添加和移除。
CompositeDisposable disposables = new CompositeDisposable();
disposables.add(disposable);
mSubscriptionMap.put(key, disposables);
}
}
/**
* 取消订阅
* @param o
*/
public void unSubscribe(Object o) {
if (mSubscriptionMap == null) {
return;
}
String key = o.getClass().getName();
if (!mSubscriptionMap.containsKey(key)){
return;
}
if (mSubscriptionMap.get(key) != null) {
mSubscriptionMap.get(key).dispose();
}
mSubscriptionMap.remove(key);
}
}
在Activity 中初始化RxBus,注册RxBus,destory时销毁
private void initRxBus() {
rxBus = RxBus.getIntanceBus();
registerRxBus(Integer.class, new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d("Tag",integer+"");
});
}
//注册
public <T> void registerRxBus(Class<T> eventType, Consumer<T> action) {
Disposable disposable = rxBus.doSubscribe(eventType, action, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
Log.e("NewsMainPresenter", throwable.toString());
}
});
rxBus.addSubscription(this,disposable);
}
@Override
protected void onDestroy() {
super.onDestroy();
rxBus.unSubscribe(this);
}
发送事件
RxBus.getInstance().post(1);