private final Map<String, MutableLiveData<Object>> bus;
private LiveDataBus1() {
bus = new HashMap<>();
}
private static class SingletonHolder {
private static final LiveDataBus1 DATA_BUS = new LiveDataBus1();
}
public static LiveDataBus1 get() {
return SingletonHolder.DATA_BUS;
}
public <T> MutableLiveData<T> getChannel(String target, Class<T> type) {
if (!bus.containsKey(target)) {
bus.put(target, new MutableLiveData<>());
}
return (MutableLiveData<T>) bus.get(target);
}
public MutableLiveData<Object> getChannel(String target) {
return getChannel(target, Object.class);
}
}
- 那么如何发送消息和接收消息呢,注意两者的key需要保持一致,否则无法接收?具体代码如下所示://发送消息
LiveDataBus1.get().getChannel("yc\_bus").setValue(text);
//接收消息
LiveDataBus1.get().getChannel("yc\_bus", String.class)
.observe(this, new Observer<String>() {
@Override
public void onChanged(@Nullable String newText) {
// 更新数据
tvText.setText(newText);
}
});
### 07.遇到的问题和分析思路
- 遇到的问题:
- 1.LiveData 一时使用一时爽,爽完了之后我们发现这个简易的 LiveDataBus 存在一个问题,就是订阅者会收到订阅之前发布的消息,类似于粘性消息。对于一个消息总线来说,这是不可接受的。
- 2.多次调用了 postValue() 方法,只有最后次调用的值会得到更新。也就是此方法是有可能会丢失事件!
#### 7.1 先看第一个问题
- 然后看一下LiveData的订阅方法observe源码
- 看下面代码可知道,LiveData 内部会将传入参数包装成 wrapper ,然后存在一个 Map 中,最后通过 LifeCycle 组件添加观察者。// 注释只能在主线程中调用该方法
@MainThread
public void observe(@NonNull LifecycleOwner owner, @NonNull Observer<T> observer) {
// 当前绑定的组件(activity or fragment)状态为DESTROYED的时候, 则会忽视当前的订阅请求
if (owner.getLifecycle().getCurrentState() == DESTROYED) {
// ignore
return;
}
// 转为带生命周期感知的观察者包装类
LifecycleBoundObserver wrapper = new LifecycleBoundObserver(owner, observer);
ObserverWrapper existing = mObservers.putIfAbsent(observer, wrapper);
// 对应观察者只能与一个owner绑定,否则抛出异常
if (existing != null && !existing.isAttachedTo(owner)) {
throw new IllegalArgumentException("Cannot add the same observer"
+ " with different lifecycles");
}
if (existing != null) {
return;
}
// lifecycle注册
owner.getLifecycle().addObserver(wrapper);
}
- 紧接着,来看一下LiveData的更新数据方法
- LiveData 更新数据方式有两个,一个是 setValue() 另一个是 postValue(),这两个方法的区别是,postValue() 在内部会抛到主线程去执行更新数据,因此适合在子线程中使用;而 setValue() 则是直接更新数据。@MainThread
protected void setValue(T value) {
assertMainThread("setValue");
// 这里的 mVersion,它本问题关键,每次更新数据都会自增,默认值是 -1。
mVersion++;
mData = value;
dispatchingValue(null);
}
- 跟进下 dispatchingValue() 方法,注意,这里需要重点看considerNotify代码:private void dispatchingValue(@Nullable ObserverWrapper initiator) {
// mDispatchingValue的判断主要是为了解决并发调用dispatchingValue的情况
// 当对应数据的观察者在执行的过程中, 如有新的数据变更, 则不会再次通知到观察者。所以观察者内的执行不应进行耗时工作
if (mDispatchingValue) {
mDispatchInvalidated = true;
return;
}
mDispatchingValue = true;
do {
mDispatchInvalidated = false;
if (initiator != null) {
// 等下重点看这里的代码
considerNotify(initiator);
initiator = null;
} else {
for (Iterator<Map.Entry<Observer<T>, ObserverWrapper>> iterator =
mObservers.iteratorWithAdditions(); iterator.hasNext(); ) {
// 等下重点看这里的代码
considerNotify(iterator.next().getValue());
if (mDispatchInvalidated) {
break;
}
}
}
} while (mDispatchInvalidated);
mDispatchingValue = false;
}
- 然后看一下considerNotify() 方法做了什么,代码如下所示,这里有道词典翻译下注释:private void considerNotify(ObserverWrapper observer) {
if (!observer.mActive) {
return;
}
// 检查最新的状态b4调度。也许它改变了状态,但我们还没有得到事件。
// 我们还是先检查观察者。活动,以保持它作为活动的入口。
// 因此,即使观察者移动到一个活动状态,如果我们没有收到那个事件,我们最好不要通知一个更可预测的通知顺序。
if (!observer.shouldBeActive()) {
observer.activeStateChanged(false);
return;
}
if (observer.mLastVersion >= mVersion) {
return;
}
observer.mLastVersion = mVersion;
//noinspection unchecked
observer.mObserver.onChanged((T) mData);
}
- 为何订阅者会马上收到订阅之前发布的最新消息?
- 如果 ObserverWrapper 的 mLastVersion 小于 LiveData 的 mVersion,那么就会执行的 onChange() 方法去通知观察者数据已更新。而 ObserverWrapper.mLastVersion 的默认值是 -1, LiveData 只要更新过数据,mVersion 就肯定会大于 -1,所以订阅者会马上收到订阅之前发布的最新消息!!
#### 7.2 然后看一下第二个问题
- 首先看一下postValue源代码,如下所示:
- 看代码注释中说,如果在多线程中同一个时刻,多次调用了 postValue() 方法,只有最后次调用的值会得到更新。也就是此方法是有可能会丢失事件!
- postValue 只是把传进来的数据先存到 mPendingData,ArchTaskExecutor.getInstance()获取的是一个单利对象。然后往主线程抛一个 Runnable,在这个 Runnable 里面再调用 setValue 来把存起来的值真正设置上去,并回调观察者们。而如果在这个 Runnable 执行前多次 postValue,其实只是改变暂存的值 mPendingData,并不会再次抛另一个 Runnable。protected void postValue(T value) {
boolean postTask;
synchronized (mDataLock) {
postTask = mPendingData == NOT\_SET;
mPendingData = value;
}
if (!postTask) {
return;
}
ArchTaskExecutor.getInstance().postToMainThread(mPostValueRunnable);
}
private final Runnable mPostValueRunnable = new Runnable() {
@Override
public void run() {
Object newValue;
synchronized (mDataLock) {
newValue = mPendingData;
mPendingData = NOT_SET;
}
//noinspection unchecked
setValue((T) newValue);
}
};
private void hook(@NonNull Observer<T> observer) {
try {
Class<LiveData> classLiveData = LiveData.class;
Field fieldObservers = classLiveData.getDeclaredField("mObservers");
fieldObservers.setAccessible(true);
Object objectObservers = fieldObservers.get(this);
Class<?> classObservers = objectObservers.getClass();
Method methodGet = classObservers.getDeclaredMethod("get", Object.class);
methodGet.setAccessible(true);
Object objectWrapperEntry = methodGet.invoke(objectObservers, observer);
Object objectWrapper = null;
if (objectWrapperEntry instanceof Map.Entry) {
objectWrapper = ((Map.Entry) objectWrapperEntry).getValue();
}
if (objectWrapper != null) {
Class<?> classObserverWrapper = objectWrapper.getClass().getSuperclass();
Field fieldLastVersion = null;
if (classObserverWrapper != null) {
fieldLastVersion = classObserverWrapper.getDeclaredField("mLastVersion");
fieldLastVersion.setAccessible(true);
Field fieldVersion = classLiveData.getDeclaredField("mVersion");
fieldVersion.setAccessible(true);
Object objectVersion = fieldVersion.get(this);
fieldLastVersion.set(objectWrapper, objectVersion);
}
}
} catch (Exception e){
e.printStackTrace();
}
}
- 同时还需要注意,在实现MutableLiveData<T>自定义类BusMutableLiveData中,需要重写这几个方法。代码如下所示:/\*\*
\* 在给定的观察者的生命周期内将给定的观察者添加到观察者列表所有者。
\* 事件是在主线程上分派的。如果LiveData已经有数据集合,它将被传递给观察者。
\* @param owner owner
\* @param observer observer
\*/
public void observeSticky(@NonNull LifecycleOwner owner, @NonNull Observer<T> observer) {
super.observe(owner, observer);
}
/**
private BusWeakHandler mainHandler = new BusWeakHandler(Looper.getMainLooper());
private class PostValueTask implements Runnable {
private T newValue;
public PostValueTask(@NonNull T newValue) {
this.newValue = newValue;
}
@Override
public void run() {
setValue(newValue);
}
}
- 然后再次使用for循环,发送100条消息事件,查看日志。发现就会刚好有100条数据。代码这里就不展示了,跟上面测试代码类似。
### 10.如何发送延迟事件消息
- 可以知道,通过postValue可以在子线程发送消息,那么发送延迟消息也十分简单,代码如下所示:/\*\*
\* 子线程发送事件
\* @param value value
\*/
@Override
public void postValue(T value) {
//注意,去掉super方法,
//super.postValue(value);
mainHandler.post(new PostValueTask(value));
}
/**
private class IntervalValueTask implements Runnable {
private T newValue;
private long interval;
public IntervalValueTask(T newValue, long interval) {
this.newValue = newValue;
this.interval = interval;
}
@Override
public void run() {
setValue(newValue);
mainHandler.postDelayed(this,interval);
}
}
- 轮训总不可以一直持续下去吧,这个时候可以添加一个手动关闭轮训的方法。代码如下所示:/\*\*
\* 停止轮训间隔发送事件
\*/
@Deprecated
@Override
public void stopPostInterval(@NonNull String taskName) {
IntervalValueTask intervalTask = intervalTasks.get(taskName);
if(intervalTask!= null){
//移除callback
mainHandler.removeCallbacks(intervalTask);
intervalTasks.remove(taskName);
}
}
### 12.避免类型转换异常问题
- 代码如下所示public class SafeCastObserver<T> implements Observer<T> {
@NonNull
private final Observer<T> observer;
public SafeCastObserver(@NonNull Observer<T> observer) {
this.observer = observer;
}
@Override
public void onChanged(@Nullable T t) {
//捕获异常,避免出现异常之后,收不到后续的消息事件
try {
//注意为了避免转换出现的异常,try-catch捕获
observer.onChanged(t);
} catch (ClassCastException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
}
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。