前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >事件总线方案实践

事件总线方案实践

原创
作者头像
杨充
修改2020-03-13 14:09:28
1.8K0
修改2020-03-13 14:09:28
举报
文章被收录于专栏:lib库lib库

liveData实现事件总线

目录介绍
  • 01.EventBus使用原理
  • 02.RxBus使用原理
  • 03.为何使用liveData
  • 04.LiveDataBus的组成
  • 05.LiveDataBus原理图
  • 06.简单的实现案例代码
  • 07.遇到的问题和分析思路
  • 08.使用反射解决遇到问题
  • 09.使用postValue的bug
  • 10.如何发送延迟事件消息
  • 11.如何发送轮训延迟事件
  • 12.避免类型转换异常问题
  • 13.如何实现生命周期感知

00.事件开源库

01.EventBus使用原理

  • 框架的核心思想,就是消息的发布和订阅,使用订阅者模式实现,其原理图大概如下所示,摘自网络。
    • image
      image
  • 发布和订阅之间的依赖关系,其原理图大概如下所示,摘自网络。
    • image
      image
  • 订阅/发布模式和观察者模式之间有着微弱的区别,个人觉得订阅/发布模式是观察者模式的一种增强版。两者区别如下所示,摘自网络。
    • image摘自网络
      image摘自网络
  • 具体使用可以看demo代码,demo开源地址

02.RxBus使用原理

  • RxBus不是一个库,而是一个文件,实现只有短短30行代码。RxBus本身不需要过多分析,它的强大完全来自于它基于的RxJava技术。
  • 在RxJava中有个Subject类,它继承Observable类,同时实现了Observer接口,因此Subject可以同时担当订阅者和被订阅者的角色,我们使用Subject的子类PublishSubject来创建一个Subject对象(PublishSubject只有被订阅后才会把接收到的事件立刻发送给订阅者),在需要接收事件的地方,订阅该Subject对象,之后如果Subject对象接收到事件,则会发射给该订阅者,此时Subject对象充当被订阅者的角色。
  • 完成了订阅,在需要发送事件的地方将事件发送给之前被订阅的Subject对象,则此时Subject对象作为订阅者接收事件,然后会立刻将事件转发给订阅该Subject对象的订阅者,以便订阅者处理相应事件,到这里就完成了事件的发送与处理。
  • 最后就是取消订阅的操作了,RxJava中,订阅操作会返回一个Subscription对象,以便在合适的时机取消订阅,防止内存泄漏,如果一个类产生多个Subscription对象,我们可以用一个CompositeSubscription存储起来,以进行批量的取消订阅。
  • 具体使用可以看demo代码,demo开源地址

03.为何使用liveData

  • 为何使用liveData
    • LiveData具有的这种可观察性和生命周期感知的能力,使其非常适合作为Android通信总线的基础构件。在一对多的场景中,发布消息事件后,订阅事件的页面只有在可见的时候才会处理事件逻辑。
    • 使用者不用显示调用反注册方法。LiveData具有生命周期感知能力,所以LiveDataBus只需要调用注册回调方法,而不需要显示的调用反注册方法。这样带来的好处不仅可以编写更少的代码,而且可以完全杜绝其他通信总线类框架(如EventBus、RxBus)忘记调用反注册所带来的内存泄漏的风险。
  • 该liveDataBus优势
    • 1.该LiveDataBus的实现比较简单,支持发送普通事件,也支持发送粘性事件;
    • 2.该LiveDataBus支持发送延迟事件消息,也可以用作轮训延迟事件(比如商城类项目某活动页面5秒钟刷一次接口数据),支持stop轮训操作
    • 3.该LiveDataBus可以减小APK包的大小,由于LiveDataBus只依赖Android官方Android Architecture Components组件的LiveData;
    • 4.该LiveDataBus具有生命周期感知,这个是一个很大的优势。不需要反注册,避免了内存泄漏等问题;
  • 关于liveData深度解析,可以看我这篇博客:01.LiveData详细分析

04.LiveDataBus的组成

  • 消息: 消息可以是任何的 Object,可以定义不同类型的消息,如 Boolean、String。也可以定义自定义类型的消息。
  • 消息通道: LiveData 扮演了消息通道的角色,不同的消息通道用不同的名字区分,名字是 String 类型的,可以通过名字获取到一个 LiveData 消息通道。
  • 消息总线: 消息总线通过单例实现,不同的消息通道存放在一个 HashMap 中。
  • 订阅: 订阅者通过 getChannel() 获取消息通道,然后调用 observe() 订阅这个通道的消息。
  • 发布: 发布者通过 getChannel() 获取消息通道,然后调用 setValue() 或者 postValue() 发布消息。

05.LiveDataBus原理图

  • 为了方便理解,LiveDataBus原理图如下所示
  • 订阅和注册的流程图
    • image
      image
  • 订阅注册原理图
    • image
      image
  • 为何用LiveDataBus替代EventBus和RxBus
    • LiveDataBus的实现极其简单
    • LiveDataBus可以减小APK包的大小,由于LiveDataBus只依赖Android官方Android Architecture Components组件的LiveData。
    • LiveDataBus具有生命周期感知。

06.简单的实现案例代码

  • 我这里先用最简单的代码实现liveDataBus,然后用一下,看一下会出现什么问题,代码如下所示:public final class LiveDataBus1 {
    private final Map<String, MutableLiveData<Object>> bus;
    private LiveDataBus1() {
        bus = new HashMap<>();
    }
    private static class SingletonHolder {
        private static final LiveDataBus1 DATA_BUS = new LiveDataBus1();
    }
    public static LiveDataBus1 get() {
        return SingletonHolder.DATA_BUS;
    }
    public <T> MutableLiveData<T> getChannel(String target, Class<T> type) {
        if (!bus.containsKey(target)) {
            bus.put(target, new MutableLiveData<>());
        }
        return (MutableLiveData<T>) bus.get(target);
    }
    public MutableLiveData<Object> getChannel(String target) {
        return getChannel(target, Object.class);
    }
}
- 那么如何发送消息和接收消息呢,注意两者的key需要保持一致,否则无法接收?具体代码如下所示://发送消息
LiveDataBus1.get().getChannel("yc\_bus").setValue(text);
//接收消息
LiveDataBus1.get().getChannel("yc\_bus", String.class)
        .observe(this, new Observer<String>() {
            @Override
            public void onChanged(@Nullable String newText) {
                // 更新数据
                tvText.setText(newText);
            }
        });

### 07.遇到的问题和分析思路

- 遇到的问题:
    - 1.LiveData 一时使用一时爽,爽完了之后我们发现这个简易的 LiveDataBus 存在一个问题,就是订阅者会收到订阅之前发布的消息,类似于粘性消息。对于一个消息总线来说,这是不可接受的。
    - 2.多次调用了 postValue() 方法,只有最后次调用的值会得到更新。也就是此方法是有可能会丢失事件!

#### 7.1 先看第一个问题

- 然后看一下LiveData的订阅方法observe源码
    - 看下面代码可知道,LiveData 内部会将传入参数包装成 wrapper ,然后存在一个 Map 中,最后通过 LifeCycle 组件添加观察者。// 注释只能在主线程中调用该方法
@MainThread
public void observe(@NonNull LifecycleOwner owner, @NonNull Observer<T> observer) {
    // 当前绑定的组件(activity or fragment)状态为DESTROYED的时候, 则会忽视当前的订阅请求
    if (owner.getLifecycle().getCurrentState() == DESTROYED) {
        // ignore
        return;
    }
    // 转为带生命周期感知的观察者包装类
    LifecycleBoundObserver wrapper = new LifecycleBoundObserver(owner, observer);
    ObserverWrapper existing = mObservers.putIfAbsent(observer, wrapper);
    // 对应观察者只能与一个owner绑定,否则抛出异常
    if (existing != null && !existing.isAttachedTo(owner)) {
        throw new IllegalArgumentException("Cannot add the same observer"
                + " with different lifecycles");
    }
    if (existing != null) {
        return;
    }
    // lifecycle注册
    owner.getLifecycle().addObserver(wrapper);
}
- 紧接着,来看一下LiveData的更新数据方法
    - LiveData 更新数据方式有两个,一个是 setValue() 另一个是 postValue(),这两个方法的区别是,postValue() 在内部会抛到主线程去执行更新数据,因此适合在子线程中使用;而 setValue() 则是直接更新数据。@MainThread
protected void setValue(T value) {
    assertMainThread("setValue");
    // 这里的 mVersion,它本问题关键,每次更新数据都会自增,默认值是 -1。
    mVersion++;
    mData = value;
    dispatchingValue(null);
}
    - 跟进下 dispatchingValue() 方法,注意,这里需要重点看considerNotify代码:private void dispatchingValue(@Nullable ObserverWrapper initiator) {
    // mDispatchingValue的判断主要是为了解决并发调用dispatchingValue的情况
    // 当对应数据的观察者在执行的过程中, 如有新的数据变更, 则不会再次通知到观察者。所以观察者内的执行不应进行耗时工作
    if (mDispatchingValue) {
        mDispatchInvalidated = true;
        return;
    }
    mDispatchingValue = true;
    do {
        mDispatchInvalidated = false;
        if (initiator != null) {
            // 等下重点看这里的代码
            considerNotify(initiator);
            initiator = null;
        } else {
            for (Iterator<Map.Entry<Observer<T>, ObserverWrapper>> iterator =
                    mObservers.iteratorWithAdditions(); iterator.hasNext(); ) {
                // 等下重点看这里的代码
                considerNotify(iterator.next().getValue());
                if (mDispatchInvalidated) {
                    break;
                }
            }
        }
    } while (mDispatchInvalidated);
    mDispatchingValue = false;
}
    - 然后看一下considerNotify() 方法做了什么,代码如下所示,这里有道词典翻译下注释:private void considerNotify(ObserverWrapper observer) {
    if (!observer.mActive) {
        return;
    }
    // 检查最新的状态b4调度。也许它改变了状态,但我们还没有得到事件。
    // 我们还是先检查观察者。活动,以保持它作为活动的入口。
    // 因此,即使观察者移动到一个活动状态,如果我们没有收到那个事件,我们最好不要通知一个更可预测的通知顺序。
    if (!observer.shouldBeActive()) {
        observer.activeStateChanged(false);
        return;
    }
    if (observer.mLastVersion >= mVersion) {
        return;
    }
    observer.mLastVersion = mVersion;
    //noinspection unchecked
    observer.mObserver.onChanged((T) mData);
}
- 为何订阅者会马上收到订阅之前发布的最新消息?
    - 如果 ObserverWrapper 的 mLastVersion 小于 LiveData 的 mVersion,那么就会执行的 onChange() 方法去通知观察者数据已更新。而 ObserverWrapper.mLastVersion 的默认值是 -1, LiveData 只要更新过数据,mVersion 就肯定会大于 -1,所以订阅者会马上收到订阅之前发布的最新消息!!

#### 7.2 然后看一下第二个问题

- 首先看一下postValue源代码,如下所示:
    - 看代码注释中说,如果在多线程中同一个时刻,多次调用了 postValue() 方法,只有最后次调用的值会得到更新。也就是此方法是有可能会丢失事件!
    - postValue 只是把传进来的数据先存到 mPendingData,ArchTaskExecutor.getInstance()获取的是一个单利对象。然后往主线程抛一个 Runnable,在这个 Runnable 里面再调用 setValue 来把存起来的值真正设置上去,并回调观察者们。而如果在这个 Runnable 执行前多次 postValue,其实只是改变暂存的值 mPendingData,并不会再次抛另一个 Runnable。protected void postValue(T value) {
    boolean postTask;
    synchronized (mDataLock) {
        postTask = mPendingData == NOT\_SET;
        mPendingData = value;
    }
    if (!postTask) {
        return;
    }
    ArchTaskExecutor.getInstance().postToMainThread(mPostValueRunnable);
}

private final Runnable mPostValueRunnable = new Runnable() {

@Override
public void run() {
    Object newValue;
    synchronized (mDataLock) {
        newValue = mPendingData;
        mPendingData = NOT_SET;
    }
    //noinspection unchecked
    setValue((T) newValue);
}

};

08.使用反射解决遇到问题

  • 根据之前的分析,只需要在注册一个新的订阅者的时候把Wrapper的version设置成跟LiveData的version一致即可。
  • 能不能从Map容器mObservers中取到LifecycleBoundObserver,然后再更改version呢?答案是肯定的,通过查看SafeIterableMap的源码我们发现有一个protected的get方法。因此,在调用observe的时候,我们可以通过反射拿到LifecycleBoundObserver,再把LifecycleBoundObserver的version设置成和LiveData一致即可。@Override public void observe(@NonNull LifecycleOwner owner, @NonNull Observer<T> observer) { super.observe(owner, observer); hook(observer); }
private void hook(@NonNull Observer<T> observer) {
    try {
        Class<LiveData> classLiveData = LiveData.class;
        Field fieldObservers = classLiveData.getDeclaredField("mObservers");
        fieldObservers.setAccessible(true);
        Object objectObservers = fieldObservers.get(this);
        Class<?> classObservers = objectObservers.getClass();
        Method methodGet = classObservers.getDeclaredMethod("get", Object.class);
        methodGet.setAccessible(true);
        Object objectWrapperEntry = methodGet.invoke(objectObservers, observer);
        Object objectWrapper = null;
        if (objectWrapperEntry instanceof Map.Entry) {
            objectWrapper = ((Map.Entry) objectWrapperEntry).getValue();
        }
        if (objectWrapper != null) {
            Class<?> classObserverWrapper = objectWrapper.getClass().getSuperclass();
            Field fieldLastVersion = null;
            if (classObserverWrapper != null) {
                fieldLastVersion = classObserverWrapper.getDeclaredField("mLastVersion");
                fieldLastVersion.setAccessible(true);
                Field fieldVersion = classLiveData.getDeclaredField("mVersion");
                fieldVersion.setAccessible(true);
                Object objectVersion = fieldVersion.get(this);
                fieldLastVersion.set(objectWrapper, objectVersion);
            }
        }
    } catch (Exception e){
        e.printStackTrace();
    }
}
- 同时还需要注意,在实现MutableLiveData<T>自定义类BusMutableLiveData中,需要重写这几个方法。代码如下所示:/\*\*
 \* 在给定的观察者的生命周期内将给定的观察者添加到观察者列表所有者。
 \* 事件是在主线程上分派的。如果LiveData已经有数据集合,它将被传递给观察者。
 \* @param owner                                 owner
 \* @param observer                              observer
 \*/
public void observeSticky(@NonNull LifecycleOwner owner, @NonNull Observer<T> observer) {
    super.observe(owner, observer);
}

/**

  • 将给定的观察者添加到观察者列表中。这个调用类似于{@link LiveData#observe(LifecycleOwner, Observer)}
  • 和一个LifecycleOwner, which总是积极的。这意味着给定的观察者将接收所有事件,并且永远不会 被自动删除。
  • 您应该手动调用{@link #removeObserver(Observer)}来停止 观察这LiveData。
  • @param observer observer */ public void observeStickyForever(@NonNull Observer<T> observer) { super.observeForever(observer); }

09.使用postValue的bug

9.1 模拟通过发送多个postValue消息出现丢失问题
  • 首先看看MutableLiveData源代码,如下所示,这里重点展示测试数据案例public void postValue(T value) { super.postValue(value); }
  • 然后使用for循环,使用postValue发送100条消息事件,代码如下所示:public void postValueCountTest() { sendCount = 100; receiveCount = 0; ExecutorService threadPool = Executors.newFixedThreadPool(2); for (int i = 0; i < sendCount; i++) { threadPool.execute(new Runnable() { @Override public void run() { LiveDataBus2.get().getChannel(Constant.LIVE_BUS3).postValue("test_1_data"+sendCount); } }); } new Handler().postDelayed(new Runnable() { @Override public void run() { BusLogUtils.d("sendCount: " + sendCount + " | receiveCount: " + receiveCount); Toast.makeText(ThirdActivity4.this, "sendCount: " + sendCount + " | receiveCount: " + receiveCount, Toast.LENGTH_LONG).show(); } }, 1000); } //接收消息 LiveDataBus2.get() .getChannel(Constant.LIVE_BUS3, String.class) .observe(this, new Observer<String>() { @Override public void onChanged(@Nullable String s) { receiveCount++; BusLogUtils.d("接收消息--ThirdActivity4------yc_bus---1-"+s+"----"+receiveCount); } });
  • 然后看一下打印日志,是不是发现了什么问题?发现根本没有100条数据……2020-03-03 10:25:51.397 4745-4745/com.ycbjie.yclivedatabus D/BusLogUtils: 接收消息--ThirdActivity4------yc_bus---1-test_1_data100----1 2020-03-03 10:25:51.397 4745-4745/com.ycbjie.yclivedatabus D/BusLogUtils: 接收消息--ThirdActivity4------yc_bus---1-test_1_data100----2 2020-03-03 10:25:51.397 4745-4745/com.ycbjie.yclivedatabus D/BusLogUtils: 接收消息--ThirdActivity4------yc_bus---1-test_1_data100----3 2020-03-03 10:25:51.403 4745-4745/com.ycbjie.yclivedatabus D/BusLogUtils: 接收消息--ThirdActivity4------yc_bus---1-test_1_data100----4
9.2 修改后使用handler处理postValue消息
  • 既然post是在子线程中发送消息事件,那么可不可以使用handler将它放到主线程中处理事件了,是可以的,代码如下所示/** * 子线程发送事件 * @param value value */ @Override public void postValue(T value) { //注意,去掉super方法, //super.postValue(value); mainHandler.post(new PostValueTask(value)); }
private BusWeakHandler mainHandler = new BusWeakHandler(Looper.getMainLooper());
private class PostValueTask implements Runnable {
    private T newValue;
    public PostValueTask(@NonNull T newValue) {
        this.newValue = newValue;
    }
    @Override
    public void run() {
        setValue(newValue);
    }
}
- 然后再次使用for循环,发送100条消息事件,查看日志。发现就会刚好有100条数据。代码这里就不展示了,跟上面测试代码类似。

### 10.如何发送延迟事件消息

- 可以知道,通过postValue可以在子线程发送消息,那么发送延迟消息也十分简单,代码如下所示:/\*\*
 \* 子线程发送事件
 \* @param value                                 value
 \*/
@Override
public void postValue(T value) {
    //注意,去掉super方法,
    //super.postValue(value);
    mainHandler.post(new PostValueTask(value));
}

/**

  • 发送延迟事件
  • @param value value
  • @param delay 延迟时间 */ @Override public void postValueDelay(T value, long delay) { mainHandler.postDelayed(new PostValueTask(value) , delay); //mainHandler.postAtTime(new PostValueTask(value) , delay); }

11.如何发送轮训延迟事件

  • 轮训延迟事件,比如有的页面需要实现,每间隔5秒钟就刷新一次页面数据,常常用于活动页面。在购物商城这类需求很常见@Override public void postValueInterval(final T value, final long interval) { mainHandler.postDelayed(new Runnable() { @Override public void run() { setValue(value); mainHandler.postDelayed(this,interval); } },interval); }
  • 测试用例,轮训延迟3秒钟发送事件,代码如下所示。具体可以看demo钟的案例!LiveDataBus.get().with(Constant.LIVE_BUS5).postValueInterval("test_5_data",3000);
  • 这里遇到了一个问题,假如有多个页面有这种轮训发送事件的需求,显然这个是实现不了的。那么可不可以把每个轮训runnable记录一个名称区别开来代码更更改如下/** * 发送延迟事件,间隔轮训 * @param value value * @param interval 间隔 */ @Deprecated @Override public void postValueInterval(final T value, final long interval,@NonNull String taskName) { if(taskName.isEmpty()){ return; } IntervalValueTask intervalTask = new IntervalValueTask(value,interval); intervalTasks.put(taskName,intervalTask); mainHandler.postDelayed(intervalTask,interval); }
private class IntervalValueTask implements Runnable {
    private T newValue;
    private long interval;
    public IntervalValueTask(T newValue, long interval) {
        this.newValue = newValue;
        this.interval = interval;
    }
    @Override
    public void run() {
        setValue(newValue);
        mainHandler.postDelayed(this,interval);
    }
}
- 轮训总不可以一直持续下去吧,这个时候可以添加一个手动关闭轮训的方法。代码如下所示:/\*\*
 \* 停止轮训间隔发送事件
 \*/
@Deprecated
@Override
public void stopPostInterval(@NonNull String taskName) {
    IntervalValueTask  intervalTask  = intervalTasks.get(taskName);
    if(intervalTask!= null){
        //移除callback
        mainHandler.removeCallbacks(intervalTask);
        intervalTasks.remove(taskName);
    }
}

### 12.避免类型转换异常问题

- 代码如下所示public class SafeCastObserver<T> implements Observer<T> {
@NonNull
private final Observer<T> observer;
public SafeCastObserver(@NonNull Observer<T> observer) {
    this.observer = observer;
}
@Override
public void onChanged(@Nullable T t) {
    //捕获异常,避免出现异常之后,收不到后续的消息事件
    try {
        //注意为了避免转换出现的异常,try-catch捕获
        observer.onChanged(t);
    } catch (ClassCastException e) {
        e.printStackTrace();
    } catch (Exception e) {
        e.printStackTrace();
    }
}

}

13.如何实现生命周期感知

  • 生命周期感知能力就是当在Android平台的LifecycleOwner(如Activity)中使用的时候,只需要订阅消息,而不需要取消订阅消息。LifecycleOwner的生命周期结束的时候,会自动取消订阅。这带来了两个好处:
    • 可以在任何位置订阅消息,而不是必须在onCreate方法中订阅
    • 避免了忘记取消订阅引起的内存泄漏
  • 具体已经对lifecycle源码作出了分析,具体可以看我上一篇的博客。Lifecycle详细分析

参考内容

事件总线开源库:https://github.com/yangchong211/YCLiveDataBus

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • liveData实现事件总线
    • 00.事件开源库
      • 01.EventBus使用原理
        • 02.RxBus使用原理
          • 03.为何使用liveData
            • 04.LiveDataBus的组成
              • 05.LiveDataBus原理图
                • 06.简单的实现案例代码
                  • 08.使用反射解决遇到问题
                    • 09.使用postValue的bug
                      • 11.如何发送轮训延迟事件
                        • 13.如何实现生命周期感知
                          • 参考内容
                            • 事件总线开源库:https://github.com/yangchong211/YCLiveDataBus
                            相关产品与服务
                            事件总线
                            腾讯云事件总线(EventBridge)是一款安全,稳定,高效的云上事件连接器,作为流数据和事件的自动收集、处理、分发管道,通过可视化的配置,实现事件源(例如:Kafka,审计,数据库等)和目标对象(例如:CLS,SCF等)的快速连接,当前 EventBridge 已接入 100+ 云上服务,助力分布式事件驱动架构的快速构建。
                            领券
                            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档