首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

RxJava 容易忽视的细节: subscribeOn() 方法没有按照预期地运行

通常情况下,RxJava 发射的数据会在同一个线程,但是稍作一些变化,发射的数据来自不同线程会怎样呢?...,这段代码3个线程运行。...当我们的 subject 发射第一个值,第一个观察者已经被订阅。由于订阅代码我们调用 onNext() 已经完成,因此订阅调度程序没有任何作用。...在这种情况下,当我们调用 onNext() 它类似于 PublishSubject 的工作方式。 第二和第三个观察者都在初始 onNext() 之后订阅。...这样,将尊重订阅调度程序,并在它提供的线程通知观察者。 所有后续的发射的值都发生在订阅之后,因此,值再次与 onNext() 同一线程发出,类似于 PublishSubject 的工作方式。

1.7K10

RxBus 的初步探索

,我们发现synchronized线程锁,证明当前是线程安全的,当多个线程再要执行onNext,这里线程安全,排队线程会加入queue,然后依次执行。...PublishSubject 与普通的Subject不同订阅并不立即触发订阅事件,而是允许我们在任意时刻手动调用onNext(),onError(),onCompleted来触发事件。...可以看到PublishSubject与普通的Subject最大的不同就是其可以先订阅事件,然后某一刻手动调用方法来触发事件。...(result); 我们可以根据我们的业务需求先对Subject进行订阅,然后再默一刻触发我们的onNext。...原理总结 这里的publishSubject就是我们发出通知的时候才会去onNext,而我们的onNext线程安全的,当并发访问的时候,可以依次执行onNext,这里我们要用到ofType这个操作符

1.1K50
您找到你想要的搜索结果了吗?
是的
没有找到

RxJava 的 Subject

PublishSubject Observer只接收PublishSubject订阅之后发送的数据。...("publicSubject3"); subject.onNext("publicSubject4"); 执行结果: publicSubject:complete 因为subject订阅之前...,都发射全部数据 PublishSubject 发送订阅之后全部数据 可能错过的事件 Subject 作为一个Observable,可以不停地调用onNext()来发送事件,直到遇到onComplete...因为事件总线是基于发布/订阅模式实现的,如果某一事件多个Activity/Fragment中被订阅的话,App的任意地方一旦发布该事件,则多个订阅的地方都能够同时收到这一事件(在这里,订阅事件的Activity...每当用户处于弱网络,打开一个App可能出现一片空白或者一直loading,那用户一定会很烦躁。此时,如果能够预先加载一些数据,例如上一次打开App保存的数据,这样不至于会损伤App的用户体验。

1.4K20

RxSwift介绍(三)——更加灵活的Subject

RxSwift 框架中,提供了四种类型的 subject,首先要了解的一点就是提供的四种 subject 创建方式最主要的区别:当一个新的订阅订阅到subject对象,能否收到 subject...PublishSubject 最普通的 subject ,不需要初始值就可以创建,而且从订阅者开始订阅的时间点起,可以收到 subject 发出的新 event,而不会收到订阅前已发出的 event...因此,使用时必须在创建设置 bufferSize,表示将会返回给订阅者对应个数最近缓存的旧 event (注:若一个订阅者去订阅已经结束的 ReplaySubject ,除了会收到缓存的 .next...与 BehaviorSubject 不同的是,Variable还会把当前发出的值保存为自己的状态,同时销毁自动发送 .completed event,不需要也不能手动给 Variable 发送终结事件...换个方式理解,Variable 有一个 value 属性,当改变 value 属性的值就相当于调用一般 Subjects 的 onNext() 方法,而这个最新的 onNext() 的值就被保存在 value

1.6K30

Android 中 RxJava 的使用

前言 Android原生的多线程和异步处理简直糟透了,反复的嵌套让代码看起来十分不明了,多线程也没有iOS的dispatch好用,但是用了Rxjava后就会有所改善,虽然代码量看起来会多一点,但是逻辑就清晰多了...) 中转站(Subject) 线程(Scheduler) 操作符 ---- 形象的来说 发布者 就相当于 报社 订阅者 就相当于 用户 中转站 就相当于 报亭 它既是订阅者 又是发布者 线程 是指定在哪个线程处理...操作符 则是把发布者的数据进行处理,再给订阅者 ---- 发布者和订阅者之间传递的事件总共有三种 onNext(): 发送事件的数据 onCompleted(): 事件队列完结。...事件处理过程中出异常,onError() 会被触发,同时队列自动终止,不允许再有事件发出。...observable.subscribe(observer); 注意上面方法的顺序 看上去是发布者订阅订阅者,之所以这样是因为链式代码的优雅 线程(Scheduler) 常用的方式是分线程中处理数据

2.1K30

RxSwift-Subject即攻也守

,只是subject 把订阅流程和响应流程都内部实现,所以也就没有必要引入sink 各种Subject PublishSubject 可以不需要初始来进行初始化(也就是可以为空),并且它只会向订阅者发送订阅之后才接收到的元素...// PublishSubject // 1:初始化序列 let publishSub = PublishSubject() //初始化一个PublishSubject 装着Int类型的序列...通过一个默认初始值来创建,当订阅订阅BehaviorSubject,会收到订阅后Subject上一个发出的Event,如果还没有收到任何数据,会发出一个默认值。...和publish 稍微不同就是behavior这个家伙有个存储功能:存储一次的信号 // BehaviorSubject // 1:创建序列 let behaviorSub = BehaviorSubject.init...("订阅到了:",$0)} .disposed(by: disposbag) // 再次发送 behaviorSub.onNext(4) behaviorSub.onNext(5) // 再次订阅

46010

Android RxJava+Retrofit完美封装(缓存,请求,生命周期管理)

因为我们每一个请求中都会处理code以及一些重用一些操作符,比如用observeOn和subscribeOn来切换线程。...然而 onStart()由于 subscribe()发生就被调用了,因此不能指定线程,而是只能执行在 subscribe()被调用时的线程。所以onStart并不能保证永远在主线程运行。...千万不要小看了RxJava,与 onStart()相对应的有一个方法 doOnSubscribe(),它和 onStart()同样是subscribe()调用后而且事件发送前执行,但区别在于它可以指定线程...public class HttpUtil{ /** * 构造方法私有 */ private HttpUtil() { } /** * 访问HttpUtil创建单例 */ private.../** * 获取单例 */ public static HttpUtil getInstance() { return SingletonHolder.INSTANCE; } //添加线程管理并订阅

3.1K11

干货| 是时候对RxLifecycle来篇详解了

随着Android第三库的普及,RxJava和RxAndroid 越来越被人熟知,简洁的语法,配合Java8 Lambda表达式,使代码的结构更加清晰,通过线程调度器更容易控制和切换线程,种种优点,使用它的人也越来越多...,因为是onStart的时候调用,所以onStop的时候自动取消订阅 .compose(this....在前两步一般都是不会出现问题的,但是第三步,当数据返回给client端,如果页面已经不在了,那么就无法去绘制UI,很有可能会导致意向不到的问题。...one"); subject.onNext("two"); subject.onCompleted(); 这里做的事情很简单,先创建一个PublishSubject -> 绑定一个myObserver...BaseActivity监听生命周期 那么我们先来实现生命周期监听功能,基本思路是:BaseActivity里创建一 个PublishSubject对象,每个生命周期发生,把该生命周期事件传递给PublishSubject

1.5K20

详解用RxJava实现事件总线(Event Bus)

只会把订阅发生的时间点之后来自原始Observable的数据发射给观察者 public RxBus() { bus = new SerializedSubject< (PublishSubject.create...,要避免该问题,需要将 Subject转换为一个 SerializedSubject ,上述RxBus类中把线程非安全的PublishSubject包装成线程安全的Subject。...2、PublishSubject只会把订阅发生的时间点之后来自原始Observable的数据发射给观察者。...RxBus工作流程图 1、首先创建一个可同时充当Observer和Observable的Subject; 2、需要接收事件的地方,订阅该Subject(此时Subject是作为Observable),...在这之后,一旦Subject接收到事件,立即发射给该订阅者; 3、我们需要发送事件的地方,将事件post至Subject,此时Subject作为Observer接收到事件(onNext),然后会发射给所有订阅

1.3K10

项目需求讨论 — 手把手带你写RxPermission

产品经理针对答复做出相应处理(订阅Observable) 我们可以看到,我们已经可以把第一次情况反馈给了产品经理,但这里可以有二种情况发生(主要看你跟产品经理关系铁铁): ?...手机APP向系统提权限 没错,基本一模一样,唯一不同的地方就是图中蓝色那一块: 产品经理提需求 手机APP提权限 脑子想了一会,调用onNext和onComplete回复。...跳出弹框,让用户点击,然后再调用onNext和onComplete回复。...中会针对这个列表里面的每个权限,去调用系统方法进行询问, 然后根据不同的结果去onNext和onComplete通知。...还记不记得我们跟产品经理的关系铁与铁,还有不同的处理,就是都成功还算任务完成,或者分批告诉你每个需求的评估结果。

61720

【iOS】RxSwift官方Example3--地理位置监听

图一是当App可以使用定位信息,显示当前的经纬度。 图二是当App被禁止使用定位信息,显示的提示信息 代码解释 比起两个Example,这个Example复杂的多了。...当想绑定的视图信息越多,我们就需要对UILabel进行扩展。...PublishSubject的概念 当你订阅PublishSubject的时候,你只能接收到订阅他之后发生的事件 因此为了能够成为代理的代理,我们需要监听代理的事件,并且能够让外部进行监听,所以我们创建了以下两个...lazy var didFailWithErrorSubject = PublishSubject() 将代理事件通过subject传递出去,记得调用_forwardToDelagate...因此,每次订阅authorized信息,都会发送独立的序列,确保每次都会响应。

1.1K20

Android消息总线的演进之路:用LiveDataBus替代RxBus、EventBus

响应式编程(Reactive Programming)技术这几年特别火,RxJava是它在Java的实作。RxJava天生就是发布/订阅模式,而且很容易处理线程切换。...RxBus原理 RxJava中有个Subject类,它继承Observable类,同时实现了Observer接口,因此Subject可以同时担当订阅者和被订阅者的角色,我们使用Subject的子类PublishSubject...来创建一个Subject对象(PublishSubject只有被订阅后才会把接收到的事件立刻发送给订阅者),需要接收事件的地方,订阅该Subject对象,之后如果Subject对象接收到事件,则会发射给该订阅者...避免内存泄漏,观察者被绑定到组件的生命周期,当被绑定的组件销毁(destroy),观察者会立刻自动清理自身的数据。...消息总线 消息总线通过单例实现,不同的消息通道存放在一个HashMap中。 订阅 订阅者通过getChannel获取消息通道,然后调用observe订阅这个通道的消息。

2.3K30

RxJava && Agera 从源码简要分析基本调用流程(2)

对于我们而言,最常见的莫过于非主线程获取并处理数据之后线程更新UI这样的场景了: [image.jpg] 这是我们十分常见的调用方法,一气呵成就把不同线程之间的处理都搞定了,因为是链式所以结构也很清晰...现在再结合之前的过程我们从头梳理一下: [image.jpg] subscribeOn(),我们会新生成一个Observable,它的成员onSubscribe会在目标Subscriber订阅使用传入的...之后,当我们调用subject.onNext(),消息才被发送,Observer的onNext()被触发调用,输出了"Hello World"。...这里我们注意到,当订阅事件发生,我们的subject是没有产生数据流的,直到它发射了"Hello World",数据流才开始运转,试想我们如果将订阅过程和subject.onNext()调换一下位置,...此时,我们可以结束前按需要选择对数据流进行最后的配置,例如:调用onDeactivation()配置从“订阅”到“取消订阅”的过程是否需要继续执行数据流等等。

10.4K10

学着造轮子-RxLifeCycle

使用RxJava的一个很大的优势就是线程的灵活切换,特别是Android开发,工作线程请求,主线程监听,这已经是最普通的常规操作,但是Activity和Fragment都是有生命周期的,如何让我们的请求能在页面销毁及时方便的撤销...它的特性是,最终发射的数据是它被订阅之前发射的最后一条数据+被订阅后发射的所有数据,它能够保存一条被订阅前发射的最新一条数据,可以防止我们的异步请求漏掉activity或者fragment的生命周期。...compose 如果让我们的所有Observable都自己新建一个BehaviorSubject,然后去调用takeUtil,然后activity或者Fragment的生命周期回调中调用 BehaviorSubject.onNext...的onNext方法 ?...event enum.png 然后RxLifeCycleActivity中的生命周期回调用发送不同的事件 ?

70630
领券