专栏首页Android进阶之路RxBus 的初步探索
原创

RxBus 的初步探索

前言

1月份项目上线了,之后就在优化项目结构,减少依赖。之前项目一直用的EventBus来作为项目事件流的框架,这两天偶然看到RxBus这个东西,基于RxJava和RxAndroid,考虑到自身的业务需求,由于本身用EventBus的功能比较单一,而发现RxBus足以实现我现有的业务,所以决定踩踩坑。

##具体实现

public class RxBus {
    private static volatile RxBus mInstance;
    private final Subject mBus;
    public RxBus() {
        mBus = new SerializedSubject<>(PublishSubject.create());
    }
    public static RxBus getInstance() {
        if (mInstance == null) {
            synchronized (RxBus.class) {
                if (mInstance == null) {
                    mInstance = new RxBus();
                }
            }
        }
        return mInstance;
    }
    public void post(Object object) {
        mBus.onNext(object);
    }
    public <T> Observable<T> toObserverable(Class<T> eventType) {
        return mBus.ofType(eventType);
//        return mBus.filter(eventType::isInstance)
//                .cast(eventType);
    }
}

目前只是消息的注册,发送。

SerializedSubject

SerializedSubject 特征是线程安全

public SerializedSubject(final Subject<T, R> actual) {
    super(new OnSubscribe<R>() {
        @Override
        public void call(Subscriber<? super R> child) {
            actual.unsafeSubscribe(child);
        }
    });
    this.actual = actual;
    this.observer = new SerializedObserver<T>(actual);
}

这里有个小细节,actual 是当前的数据链,这里通过SerializedObserver将数据链做一个转换,类似于map。

下面我们看SerializedObserver

public void onNext(T t) {
    if (terminated) {
        return;
    }
    synchronized (this) {
        if (terminated) {
            return;
        }
        if (emitting) {
            FastList list = queue;
            if (list == null) {
                list = new FastList();
                queue = list;
            }
            list.add(NotificationLite.next(t));
            return;
        }
        emitting = true;
    }
    try {
        actual.onNext(t);
    } catch (Throwable e) {
        terminated = true;
        Exceptions.throwOrReport(e, actual, t);
        return;
    }
    for (;;) {
        FastList list;
        synchronized (this) {
            list = queue;
            if (list == null) {
                emitting = false;
                return;
            }
            queue = null;
        }
        for (Object o : list.array) {
            if (o == null) {
                break;
            }
            try {
                if (NotificationLite.accept(actual, o)) {
                    terminated = true;
                    return;
                }
            } catch (Throwable e) {
                terminated = true;
                Exceptions.throwIfFatal(e);
                actual.onError(OnErrorThrowable.addValueAsLastCause(e, t));
                return;
            }
        }
    }
}

这里丑抽出onNext,我们发现synchronized线程锁,证明当前是线程安全的,当多个线程再要执行onNext,这里线程安全,排队线程会加入queue,然后依次执行。onError,onComplete同理。

PublishSubject

与普通的Subject不同,在订阅时并不立即触发订阅事件,而是允许我们在任意时刻手动调用onNext(),onError(),onCompleted来触发事件。

可以看到PublishSubject与普通的Subject最大的不同就是其可以先订阅事件,然后在某一时刻手动调用方法来触发事件。

demo:

PublishSubject<String> publishSubject = PublishSubject.create();
publishSubject.subscribe(new Action1<String>() {
        @Override
        public void call(String s) {
            // TODO
        }
});
publishSubject.onNext(result);

我们可以根据我们的业务需求先对Subject进行订阅,然后再默一时刻触发我们的onNext。

原理总结

这里的publishSubject就是在我们发出通知的时候才会去onNext,而我们的onNext是线程安全的,当并发访问的时候,可以依次执行onNext,这里我们要用到ofType这个操作符,用来过滤TargetEvent.class的Observable来实现“发送端”与“接收端”的约束。

使用方法

简单的使用方法

消息发送

RxBus.getInstance().post(event);

消息注册,取消注册

这里就不以Activity,Fragment做对照了,基本用法都一样,风向一个View AttachToWindow,DetachFromWindow 的方式

@Override
protected void onAttachedToWindow() {
    super.onAttachedToWindow();
    mSubscription = RxBus.getInstance().toObserverable(IndexLeftBtnGetFocusEvent.class)
            .compose(RxSchedulers.threadSwitchSchedulers())
            .subscribe(event -> {
                // TODO 业务逻辑
            });
//        EventBus.getDefault().register(this);
}
@Override
protected void onDetachedFromWindow() {
    super.onDetachedFromWindow();
    if (mSubscription.isUnsubscribed()) {
        mSubscription.unsubscribe();
    }
//        EventBus.getDefault().unregister(this);
}

后记

这里我只是先用一个小demo来学习一下这里的代码设计,后期会对我们的RxBus优化,比如添加bind,unbind生命周期的相关逻辑。

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 关于Rxjava2下的RxBus实现

    用户4458175
  • 关于RxJS 自定义封装Rxbus的使用规范文档

    2.3、声明isInner为true的情况: 主要是在push页面之前,即进入子页面:

    stormKid
  • Android消息总线的演进之路:用LiveDataBus替代RxBus、EventBus

    对于Android系统来说,消息传递是最基本的组件,每一个App内的不同页面,不同组件都在进行消息传递。消息传递既可以用于Android四大组件之间的通信,也可...

    美团技术团队
  • 框架设计 | 当EventBus遇上自撸RxBus的时候?

    大幅提高自身技术实力最有效的途径之一就是学习世界级优秀开源项目的精髓,除了学习款架提供的API, 还有必要进行高端一些的姿势,才能显得你是老司机,今天本文讲述迷...

    开发者技术前线
  • 基于Retrofit2+RxJava2实现Android App自动更新

    本文实例为大家分享了Retrofit2 RxJava2实现Android App自动更新,具体内容如下

    砸漏
  • Android 中 RxBus 的使用

    经常我们会有这样的需求,B页面操作后,要求A页面处理相关数据,像这样一般我们都是,要么B页面保留A页面的引用,要么使用广播,但是写起来还是想对麻烦的,用Rxbu...

    剑行者
  • 【直播】我的基因组59:CNV初步探索

    好久不见,基因组直播又来了。这篇推送是对SNV进行一个初步探索。 单纯的一个样本来找CNV,总是不太准确的,但还是那句话,毕竟是自己的基因组,硬着头皮也要上。当...

    生信技能树
  • 索引的初探(一)

    以前听做DBA的朋友说索引能解决数据库百分之八十的问题,我也开始简单的写几篇关于索引的随笔,顺便来总结一下我理解的索引以及相关的知识,毕竟进步在于总结。 简介:...

    用户1217611
  • 索引初探(三)

    由于前一篇写的有点匆忙很多地方不是很简单,这一片再描述一些概念和细节。 首先,我们都知道在数据库中的存储分为两种结构,一是堆;二是B树。堆的数据是没有排序也就没...

    用户1217611
  • 索引初探(二)

    在SqlServer中分为两种索引,一是聚集索引;一是费聚集索引。下面我就分别对两种索引进行介绍并分析其区别和各自的特点。 1.聚集索引      之前看过一个...

    用户1217611
  • NoSQL初探之人人都爱Redis:(4)Redis主从复制架构初步探索

      通过前面几篇的介绍中,我们都是在单机上使用Redis进行相关的实践操作,从本篇起,我们将初步探索一下Redis的集群,而集群中最经典的架构便是主从复制架构。...

    Edison Zhou
  • SNS项目笔记<七>--深入探究RXjs

    在正常使用RX做监听的时,时不时有些页面需要重复点击进入,这样在进入该页面的时候,会产生多次触发subscribe方法,这个时候往往会出现多次赋值或者多次提交操...

    stormKid
  • WPF中Dispatcher的初步探讨

    今天要专门讲一下Dispatcher,原因是WPF中经常碰到多线程下软件界面控件的更新问题。相信很多初步接触WPF的界面开发的朋友,为了保持界面不卡,在一个...

    zls365
  • 技术调研----OpenResty高可用技术初步探索

    OpenResty 介绍 OpenResty(又称:ngx_openresty) 是一个基于 NGINX 的可伸缩的 Web 平台,由中国人章亦春发起,提供了...

    流川疯
  • RxJava 的 Subject

    在前面一篇文章Cold Observable 和 Hot Observable中,曾经介绍过 Subject 既是 Observable 又是 Observer...

    fengzhizi715
  • 初探Numpy中的花式索引

    Numpy中对数组索引的方式有很多(为了方便介绍文中的数组如不加特殊说明指的都是Numpy中的ndarry数组),比如:

    触摸壹缕阳光
  • MySQL-Btree索引和Hash索引初探

    http://www.searchdoc.cn/rdbms/mysql/dev.mysql.com/doc/refman/5.7/en/index.com.co...

    小小工匠
  • 祖源探索"三步曲"

    最近探索了一下祖源的分析,从R语言代码分析所有染色体的祖源成分,到Y染色体和线粒体,大概是三步,和大家分享一下:

    用户1075469
  • Redux 异步数据流初探

    用React写的项目中各组件的状态依赖关系非常复杂,为了便于管理组件的状态,使用 Redux。

    一个会写诗的程序员

扫码关注云+社区

领取腾讯云代金券