首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

RxJava Flowable.create(),如何尊重subscribeOn()线程

RxJava是一个基于观察者模式的异步编程库,用于处理数据流和事件序列。Flowable.create()是RxJava中的一个方法,用于创建一个自定义的Flowable对象。

在RxJava中,subscribeOn()方法用于指定Observable(或Flowable)的数据流在哪个线程上执行。尊重subscribeOn()线程意味着在创建Flowable时,应该确保数据流的生产者在指定的线程上执行。

下面是一个示例代码,展示了如何使用RxJava的Flowable.create()方法并尊重subscribeOn()线程:

代码语言:txt
复制
import io.reactivex.Flowable;
import io.reactivex.schedulers.Schedulers;

public class RxJavaExample {
    public static void main(String[] args) {
        Flowable.create(emitter -> {
            // 在这里定义数据流的生产逻辑
            // 可以是网络请求、数据库查询等耗时操作
            // 数据流的产生通过emitter发射数据
            emitter.onNext("Hello, World!");
            emitter.onComplete();
        }, BackpressureStrategy.BUFFER)
        .subscribeOn(Schedulers.io()) // 指定数据流的生产者在IO线程上执行
        .observeOn(Schedulers.computation()) // 指定数据流的消费者在计算线程上执行
        .subscribe(System.out::println); // 订阅数据流并打印结果
    }
}

在上述示例中,我们使用Flowable.create()方法创建了一个自定义的Flowable对象。在create()方法的回调函数中,我们定义了数据流的生产逻辑,并通过emitter发射了一个字符串数据。我们还使用了BackpressureStrategy.BUFFER来处理背压策略,确保数据流的稳定性。

通过subscribeOn(Schedulers.io())方法,我们指定了数据流的生产者在IO线程上执行。这意味着数据流的生产逻辑将在IO线程中执行,例如网络请求或数据库查询等耗时操作。

通过observeOn(Schedulers.computation())方法,我们指定了数据流的消费者在计算线程上执行。这意味着数据流的消费逻辑将在计算线程中执行,例如对数据进行计算或处理等操作。

最后,我们通过subscribe()方法订阅了数据流,并通过System.out::println打印了结果。

需要注意的是,上述示例中的线程调度器(Schedulers)是RxJava提供的默认调度器,你也可以根据实际需求选择其他的调度器。

关于RxJava的更多信息和使用方法,你可以参考腾讯云的相关文档和教程:

页面内容是否对你有帮助?
有帮助
没帮助

相关·内容

RxJava源码浅析(三): subscribeOn线程切换和多次切换

一、subscribeOn 这篇不仅看下subscribeOn线程切换本身,我们还要研究下多次subscribeOn为啥只有第一次有效。...上面执行了两次subscribeOn,第一次会把订阅放在新线程中,第二次会把订阅放在主线程中,最终订阅是在主线程中执行。...这里我们先得出一个结论,多次subscribeOn,以第一个subscribeOn为准。 我们现在知道RxJava是逆向向上调用的,那我们就一步一步的调代码看看。...,有了前篇讲解,我们以已经了解了Rxjava基础订阅流程,知道了ObservableCreate如何执行任务,只不过我们现在是在指定线程中执行。...三、总结 对于OnSubscribe方法而言,不管subscribeOn怎么切换线程,他都不受影响,他是最先开始执行的且只执行一次,只针对最下游有效,对于订阅而言,线程切换只是改变当前observer的所属线程

1.9K50

Android Rxjava :最简单&全面背压讲解 (Flowable)

1.前言 阅读本文需要对Rxjava了解,如果还没有了解或者使用过Rxjava的兄die们,推荐观看 Android Rxjava:图解不一样的诠释 进行学习。...Rxjava背压:被观察者发送事件的速度大于观察者接收事件的速度时,观察者内会创建一个无限制大少的缓冲池存储未接收的事件,因此当存储的事件越来越多时就会导致OOM的出现。...(注:当subscribeOn与observeOn不为同一个线程时,被观察者与观察者内存在不同时长耗时任务,就会使发送与接收速度存在差异。)...(Schedulers.newThread())//使被观察者存在独立的线程执行 .observeOn(Schedulers.newThread())//使观察者存在独立的线程执行...通过上述例子可以大概了解背压是如何产生,因此Rxjava2.0版本提供了 Flowable 解决背压问题。 本文章就是使用与分析 Flowable 是如何解决背压问题。

1.5K20

Android RxJava:一文带你全面了解 背压策略

如果还不了解RxJava,请看文章:Android:这是一篇 清晰 & 易懂的Rxjava 入门教程 本文主要讲解的是RxJava中的 背压控制策略,希望你们会喜欢。...背压策略的使用 在本节中,我将结合 背压策略的原理 & Flowable的使用,为大家介绍在RxJava 2.0 中该如何使用Flowable来实现背压策略功能,即背压策略的使用 Flowable与Observable...而在异步订阅关系中,反向控制的原理是:通过RxJava内部固定调用被观察者线程中的request(n) 从而 反向控制被观察者的发送事件速度 那么该什么时候调用被观察者线程中的request(n) &...面向对象:针对缓存区 作用:当缓存区大小存满、被观察者仍然继续发送下1个事件时,该如何处理的策略方式 缓存区大小存满、溢出 = 发送事件速度 > 接收事件速度 的结果 = 发送 & 接收事件不匹配的结果...- 对于自身手动创建FLowable的情况,可通过传入背压模式参数选择背压策略 (即上面描述的) 可是对于自动创建FLowable,却无法手动传入传入背压模式参数,那么出现流速不匹配的情况下,该如何选择

1.9K20

Carson带你学Android:图文详解RxJava背压策略

前言 Rxjava,由于其基于事件流的链式调用、逻辑简洁 & 使用简单的特点,深受各大 Android开发者的欢迎。 本文主要讲解的是RxJava中的 背压控制策略,希望你们会喜欢。...特点 Flowable的特点 具体如下 下面再贴出一张RxJava2.0 与RxJava1.0的观察者模型的对比图 实际上,RxJava2.0 也有保留(被观察者)Observerble - Observer...背压策略的使用 在本节中,我将结合 背压策略的原理 & Flowable的使用,为大家介绍在RxJava 2.0 中该如何使用Flowable来实现背压策略功能,即背压策略的使用 Flowable与Observable...BackpressureStrategy 5.3.1 背压模式介绍 在Flowable的使用中,会被要求传入背压模式参数 面向对象:针对缓存区 作用:当缓存区大小存满、被观察者仍然继续发送下1个事件时,该如何处理的策略方式...对于自身手动创建FLowable的情况,可通过传入背压模式参数选择背压策略 (即上面描述的) 可是对于自动创建FLowable,却无法手动传入传入背压模式参数,那么出现流速不匹配的情况下,该如何选择

1.2K10

RxJava Flowable Processor

同一个线程生产一个就消费了,不会产生问题,在异步线程中,如果生产者的速度大于消费者的速度,就会产生 Backpressure 问题。...在异步调用时,RxJava 中有个缓存池,用来缓存消费者处理不了暂时缓存下来的数据,缓存池的默认大小为 128,即只能缓存 128 个事件。...backpress.PNG BUFFER 就是把 RxJava 中默认的只能存 128 个事件的缓存池换成一个大的缓存池,支持存很多很多的数据。...Subject 不支持背压,是 RxJava 1.x 继承过来的,Processor 继承 FlowableProcessor,支持背压。...SerializedProcessor 其它 Processor 不要在多线程上发射数据,如果确实要在多线程上使用,用这个 Processor 封装,可以保证在一个时刻只在一个线程上执行。

2.1K20

RxJava2--Flowable与BackPress

转载自:Rxjava2入门教程五:Flowable背压支持——对Flowable最全面而详细的讲解 背压介绍 当上下游在不同的线程中,通过Observable发射,处理,响应数据流时,如果上游发射数据的速度快于下游接收处理数据的速度...所以,如果能够确定: 上下游运行在同一个线程中, 上下游工作在不同的线程中,但是下游处理数据的速度不慢于上游发射数据的速度, 上下游工作在不同的线程中,但是数据流中只有一条数据 则不会产生背压问题,就没有必要使用...request(10) Flowable.create({ emitter -> emitter.onNext(1) emitter.onComplete...() }, BackpressureStrategy.ERROR) .subscribeOn(Schedulers.io())...LATEST ----> LatestAsyncEmitter: 与Drop策略一样,如果缓存池满了,会丢掉将要放入缓存池中的数据,不同的是,不管缓存池的状态如何,LATEST都会将最后一条数据强行放入缓存池中

83230

锦囊篇|一文摸懂RxJava

RxJava同样的是存在这样的问题的,处理速度一定,但是被观察者的数据量过大,我们该如何去进行处理呢?这就引出了背压的概念。...为了解决这样的问题,RxJava给了我们一个很好的解决方案,也就是subscribeOn() & observeOn(),以及一些已经定义好的场景内容。...subscribeOn() 的线程切换原理 抛出一个问题,为什么网上都说subscribeOn()只会生效一次?...但是这就是问题所在了,我们该如何进行数据的通信呢?我的被观察者有数据了,但是我们的观察者该如何知道? 先来看一下如何进行使用,我们应该在IO线程中进行订阅,在UI线程中进行观察。...因为我们要进行UI线程的数据更新,自然是不会使用上述的方法进行的,那RxJava如何完成这样的操作的呢。

77120

RxJava 容易忽视的细节: subscribeOn() 方法没有按照预期地运行

RxJava 会通过 Scheduler、subscribeOn() 来管理线程,但只有在不手动更改线程的情况下,它才会这样做。...通常情况下,RxJava 发射的数据会在同一个线程上,但是稍作一些变化,发射的数据来自不同的线程会怎样呢?...此时 RxJava 没有改变线程,是因为 subscribeOn() 方法已经完成了工作,订阅已经在其他线程上进行了。这时,没有理由 RxJava 会再次更改线程。所以,会看到上述的运行结果。 二....这样,将尊重订阅调度程序,并在它提供的线程上通知观察者。 所有后续的发射的值都发生在订阅之后,因此,值再次与 onNext() 在同一线程上发出,类似于 PublishSubject 的工作方式。...总结 RxJava 用好不易,很多东西需要深究其源码。 本文介绍了几种方式,RxJava 即使调用了 subscribeOn() 方法,线程切换也不会起作用。

1.7K10

线程切换哪家强?RxJava与Flow的操作符对比

本文针对两者在多线程场景中的使用区别进行一个简单对比。 1、RxJava 我们先来回顾一下RxJava中的线程切换 ?...如上,RxJava使用subscriberOn与observeOn进行线程切换 1.1 subscribeOn subscribeOn用来决定在哪个线程进行订阅,对于Cold流来说即决定了数据的发射线程...flowOn类似于RxJavasubscribeOn,Flow中没有对应observeOn的操作符,因为collect是一个suspend函数,必须在CoroutineScope中执行,所以响应线程是由...(inner or outer flowOn) 最后通过一个例子看一下如何将代码从RxJava迁移到Flow 3.1 RxJava RxJava代码如下: ?...4、FIN Flow在线程切换方面可以完全取代RxJava的能力,而且将subscribeOn和observeOn两个操作符合二为一成flowOn,学习成本更低。

84110

关于RxJava2.0你不知道的事(一)

再举个例子,在 RxJava1.x 中的 observeOn, 因为是切换了消费者的线程,因此内部实现用队列存储事件。...这里限于篇幅的问题,我们就不再一一介绍了,请移步:https://gold.xitu.io/post/58535b5161ff4b0063aa6b10 如何让Observable支持Backpressure...由于规范要求所有的操作符强制支持背压,因此新的 create 采用了保守的设计,让用户实现 FlowableOnSubscribe 接口,并选取背压策略,然后在内部实现封装支持背压,简单的例子如下: Flowable.create...相关API如下: 实际操作下,写个方法,创建一个Transformer调度器: //子线程运行,主线程回调 public Observable.Transformer io_main(final...tObservable) { Observable observable = (Observable) tObservable .subscribeOn

1.4K20

RxJava2 实战(1) - 后台执行耗时操作,实时通知 UI 更新

偶然的机会看到了开源项目 RxJava-Android-Samples,这里一共介绍了十六种RxJava2的使用场景,它从实际的应用场景出发介绍RxJava2的使用,特别适合对于RxJava2已经有初步了解的开发者进一步地去学习如何将其应用到实际开发当中...那么,让我们看一些在RxJava如何完成这一需求。...()).subscribe(disposableObserver); subscribeOn(Schedulers.io()):指定observable的subscribe方法运行在后台线程。...这两个函数刚开始的时候很有可能弄混,我是这么记的,subscribeOn以s开头,可以理解为“上游”开头的谐音,也就是上游执行的线程。...3.2 线程的类型 subscribeOn/observeOn都要求传入一个Schedulers的子类,它就代表了运行线程类型,下面我们来看一下都有哪些选择: Schedulers.computation

2.2K80

RxJava2 实战知识梳理(1) - 后台执行耗时操作,实时通知 UI 更新

偶然的机会看到了开源项目 RxJava-Android-Samples,这里一共介绍了十六种RxJava2的使用场景,它从实际的应用场景出发介绍RxJava2的使用,特别适合对于RxJava2已经有初步了解的开发者进一步地去学习如何将其应用到实际开发当中...那么,让我们看一些在RxJava如何完成这一需求。...这两个函数刚开始的时候很有可能弄混,我是这么记的,subscribeOn以s开头,可以理解为“上游”开头的谐音,也就是上游执行的线程。...3.2 线程的类型 subscribeOn/observeOn都要求传入一个Schedulers的子类,它就代表了运行线程类型,下面我们来看一下都有哪些选择: Schedulers.computation...四、小结 这个系列的第一篇文章,我们介绍了如何使用subscribeOn/observeOn来实现后台执行耗时任务,并通知主线程更新进度。

66320

Carson带你学Android:RxJava线程控制(含实例讲解)

本文主要讲解的是: 线程控制(也称为调度 / 切换),即讲解功能性操作符中的:subscribeOn() & observeOn() Carson带你学RxJava系列文章,包括 原理、操作符、应用场景...实现方式 采用 RxJava内置的线程调度器( Scheduler ),即通过 **功能性操作符subscribeOn() & observeOn()**实现 3.1 功能性操作符subscribeOn...若Observable.subscribeOn()多次指定被观察者 生产事件的线程,则只有第一次指定有效,其余的指定线程无效 // 步骤3:通过订阅(subscribe)连接观察者和被观察者...observable.subscribeOn(Schedulers.newThread()) // 第一次指定被观察者线程 = 新线程 .subscribeOn(AndroidSchedulers.mainThread...总结 本文主要对 Rxjava 中的线程调度、功能性操作符subscribeOn() & observeOn()进行讲解 Carson带你学RxJava系列文章: 入门 Carson带你学Android

76420
领券