但是,我们始终没有去深入的了解下,异步任务框架对于任务执行的进度是如何监控的,任务执行的结果该如何获取。...outcome 是任务执行结束的返回值,runner 是正在执行当前任务的线程,waiters 是一个简单的单链表,维护的是所有在任务执行结束之前尝试调用 get 方法获取执行结果的线程集合。...而我们可以直接调用 get 方法去获取任务执行的结果,不过 get 方法是阻塞式的,一旦任务还未执行结束,当前线程将丢失 CPU 进而被阻塞到 Future 的内部队列上。...到这里,相信你也一定看出来了,FutureTask 实现的 Future 的弊端在 get 方法,这个方法非异步,如果没有成功获取到任务的执行结果就将直接阻塞当前线程,以等待任务的执行完成。...take 方法直接获取已完成的任务返回结果,无需阻塞。
打印结果如下: 1 2 3 4 5 6 7 8 9 10 在Completable中,andThen有多个重载的方法,正好对应了五种被观察者的类型。...s=testA 跟第一次执行的结果是一致的。...我们对上面的代码再做一下修改,在subscribe()中也加入onComplete(),看看打印出来的结果会是这样的?因为SingleObserver中是没有onComplete()方法。...Kotlin编写的工具方法,这些工具方法由Kotlin来编写会显得比较简单和清晰,特别是lambda表达式更加直观。...request.jpeg 获取内容的response.jpeg 总结 RxJava 有五种不同类型的被观察者,合理地使用它们能够写出更简洁优雅的代码。
4、doOnNext 数据源每次调用onNext() 之前都会先回调该方法。 5、doOnError 数据源每次调用onError() 之前会回调该方法。...数据源每次调用dispose() 之后会回调该方法 其他的见官网吧,不难 实用操作符 对数据源过滤操作符 主要讲对数据源进行选择和过滤的常用操作符 1、skip(跳过) 可以作用于Flowable,Observable...8, 9, 10); source.skipLast(4) .subscribe(System.out::print); 打印结果:1 2 3 4 5 6 skipLast(n)操作表示从流的尾部跳过...4、elementAt(获取指定位置元素) 可作用于Flowable,Observable,从数据源获取指定位置的元素,从0开始。...方法,并停止合并。
4.4 Flowable的基础使用 Flowable的基础使用非常类似于 Observable 具体如下 /** * 步骤1:创建被观察者 = Flowable */ Flowable...背压策略的使用 在本节中,我将结合 背压策略的原理 & Flowable的使用,为大家介绍在RxJava 2.0 中该如何使用Flowable来实现背压策略功能,即背压策略的使用 Flowable与Observable.../ 作用:返回当前线程中request(a)中的a值 // 该request(a)则是措施1中讲解的方法,作用 = 设置 ....// 仅贴出关键代码 } 每个线程中的requested...、被观察者仍然继续发送下1个事件时,该如何处理的策略方式 缓存区大小存满、溢出 = 发送事件速度 > 接收事件速度 的结果 = 发送 & 接收事件不匹配的结果 5.3.2 背压模式类型 下面我将对每种模式逐一说明...其余方法的作用类似于上面的说背压模式参数,此处不作过多描述。
doOnNext 数据源每次调用onNext() 之前都会先回调该方法。 doOnError 数据源每次调用onError() 之前会回调该方法。...doOnComplete 数据源每次调用onComplete() 之前会回调该方法 doOnSubscribe 数据源每次调用onSubscribe() 之后会回调该方法 doOnDispose 数据源每次调用...5, 6, 7, 8, 9, 10); 9 10source.skipLast(4) 11 .subscribe(System.out::print); 12 13打印结果:1 2 3 4 5...elementAt(获取指定位置元素) 可作用于Flowable,Observable,从数据源获取指定位置的元素,从0开始。...方法,并停止合并。
Flowable 特点 Flowable的特点 具体如下 ?...背压策略的使用 在本节中,我将结合 背压策略的原理 & Flowable的使用,为大家介绍在RxJava 2.0 中该如何使用Flowable来实现背压策略功能,即背压策略的使用 Flowable与Observable...:返回当前线程中request(a)中的a值 // 该request(a)则是措施1中讲解的方法,作用 = 设置 ....// 仅贴出关键代码 } 每个线程中的requested(...面向对象:针对缓存区 作用:当缓存区大小存满、被观察者仍然继续发送下1个事件时,该如何处理的策略方式 缓存区大小存满、溢出 = 发送事件速度 > 接收事件速度 的结果 = 发送 & 接收事件不匹配的结果...其余方法的作用类似于上面的说背压模式参数,此处不作过多描述。 背压策略模式小结 ?
但是在ReactiveX中,很多指令可能是并行执行的,之后他们的执行结果才会被观察者捕获,顺序是不确定的。为达到这个目的,你定义一种获取和变换数据的机制,而不是调用一个方法。...但是,RxJava 的反应类型是不可变的;每个方法调用都返回一个带有添加行为的新 Flowable。...根据上面的代码的结果输出中可以看到,当我们调用 subscription.request(n) 方法的时候,会等onSubscribe()中后面的代码执行完成后,才会立刻执行到onNext方法。...它对每个观察者都这样做,因此尽管每个订阅者都以为自己订阅的是同一个Observable,事实上每个订阅者获取的是它们自己的单独的数据序列。...,flatMap 返回的是包含结果集的 Observable; 执行顺序不同:map 被订阅时每传递一个事件执行一次 onNext 方法,flatmap 多用于多对多,一对多,再被转化为多个时,一般利用
"); } }); } 输出结果如下 Observable调用subscribe方法时会触发这个onSubscribe方法 可以在这里触发消费者的方法 onNext方法被调用 onComplete...} 接下来调用FlowableOnSubscribe的subscrib方法,FlowableOnSubscribe中的subscribe即匿名内部类中的subscribe方法会, 先调用调用BaseEmitter...并返回 .subscribe => 以原生的Subscriber作为参数调用Flowable的subscribe方法,然后再对原生的Subscriber做一层包装作为参数,调用FlowableMap...的subscribeActual, 然后再调用FlowableCreate的subscribe方法(即lowable的subscribe方法),然后再以上一层包装的Subscriber作为参数调用FlowableCreate...以BaseEmitter为参数调用FlowableOnSubscribe的subscribe 方法,即调用BaseEmitter的相关方法(onNext......)
背压是在Flowable处理事件流中,某些异步阶段无法足够快地处理这些值,并且需要一种方法来告诉上游生产商减速。...---- Flowable的用法 我们来看看 Flowable的用法: Flowable.create(FlowableOnSubscribe source, BackpressureStrategy...当发送了Flowable.bufferSize()个事件,get() != 0不成立,调用onOverflow()方法抛出 MissingBackpressureException异常。...---- BufferAsyncEmitter 我们可以看到内部有一个SpscLinkedArrayQueue的缓存队列,每次调用onNext都会先保存到缓存队列,然后通过drain()方法一直去遍历当前的缓存队列...我们介绍了Flowable的使用和五种背压策略的具体实现。
下载安装 下载地址: https://github.com/fengzhizi715/RxCondition 使用方法: 1.ifThen用法 if条件语句传统的写法: Observable....subscribe((Consume) (s) -> {System.out.println("s="+s)}); 2.switchCase用法 switch case语句传统的写法:...原理 以ifThen为例,看看它某个方法是怎么运行的。...它的subscribeActual()方法是被订阅时真正执行的方法,用来衔接Observable和Observer(Subscriber)。...总结 这个库其实很简单,编写它的同时我加深了对RxJava中Observable/Flowable原理的认识。
(都有一个共同的方法subscrib()),但是参数不一样),正是各自接口的不同,决定了他们功能不同,各自独立(特别是Observable和Flowable),同时保证了他们各自的拓展或者配套的操作符不会相互影响...在测试的时候,快速发送了100000个整形数据,下游延迟接收,结果被观察者的数据全部发送出去了,内存确实明显增加了,遗憾的是没有OOM。...Flowable/Subscriber Flowable.range(0, 10) .subscribe(new Subscriber() { Subscription...根据上面的代码的结果输出中可以看到,当我们调用subscription.request(n)方法的时候,不等onSubscribe()中后面的代码执行,就会立刻执行onNext方法,因此,如果你在onNext...方法中使用到需要初始化的类时,应当尽量在subscription.request(n)这个方法调用之前做好初始化的工作; 当然,这也不是绝对的,我在测试的时候发现,通过create()自定义Flowable
上图可以很清楚看出二者的区别,其实Flowable 出来以上的区别之外,它其他所有使用与Observable完全一样。...Flowable 的create例子 public void flowable(){ Flowable.create(new FlowableOnSubscribe(...BUFFER_SIZE = Math.max(1, Integer.getInteger("rx2.buffer-size", 128)); } ..... } 通过上面的例子,我们可以看到create方法中的包含了一个...4.5 requested requested 与 request不是同一的函数,但它们都是属于FlowableEmitter类里的方法,那么requested()是有什么作用呢,看看以下例子: Flowable.create...从图中我们可以发现,requested打印的结果就是 剩余可接收的数量 ,它的作用就是可以检测剩余可接收的事件数量。 5.总结 到此,Flowable讲解完毕。
1.去重 Flowable.just(1, 1, 1, 2, 2, 3, 4, 5) .distinct() .subscribe...它和 concat 的区别在于,不用等到 发射器 A 发送完所有的事件再进行发射器 B 的发送 Flowable.merge(Flowable.just(1, 2), Flowable.just(3,...//调用request()方法,会立即触发onNext()方法---不调用的话会卡住,onNext无法调用 Log.e("onSubscribe...(subscriber); 10.amb操作符只发射首先发射数据或通知的那个Observable的所有数据 ArrayListFlowable> list = new ArrayList...(Flowable.just("a"), Flowable.just("b"), Flowable.just("c")).subscribe( new Consumer<
Subscription 还有一个 request(long n) 方法,用来向生产者申请可以消费的事件数量。这样便可以根据本身的消费能力进行消费事件。...request 这个方法若不调用,下游的 onNext 与 OnComplete 都不会调用。...处理策略 处理 Backpressure 的策略是处理 Subscriber 接收事件的方式,并不影响 Flowable 发送事件的方法。...---- 如果 Flowable 对象不是通过 create() 获取的或不是自己创建的,可以采用 onBackpressureBuffer()、onBackpressureDrop()、onBackpressureLatest...不要使用 Flowable 或 Observable 里的方法,这样会将 Processor 转成一个 Flowable 或 Observable,用 Processor 内部重写的 create。
RxJava 规定,当不会再有新的 onNext() 发出时,需要触发 onCompleted() 方法作为标志。 onError(): 事件队列异常。...下面就说一下各块内容 发布者 对比 Observable/Flowable: Observable不支持背压(backpressure) Flowable是Rxjava2新增加的支持背压(backpressure...如果上游发送数据速度远大于下游接收数据的速度 用Observable就会内存溢出 Flowable则会抛弃掉处理不了的数据来防止溢出 但是不能就都用Flowable 因为Observable...Flowable .create(new FlowableOnSubscribe() { @Override public void subscribe...(observer); 注意上面方法的顺序 看上去是发布者订阅了订阅者,之所以这样是因为链式代码的优雅 线程(Scheduler) 常用的方式是分线程中处理数据,主线程中使用数据生成页面 Observable
/** * 获取当前任务流程图片的输入流 * @param PROC_INST_ID_ 流程实例ID * @from fhadmin.cn */ public InputStream...genProcessDiagram(String PROC_INST_ID_, String type){ /* 获得活动的节点 */ String processDefinitionId...pi.getProcessDefinitionId(); } else { // 如果流程没有结束,则取当前活动节点 /* 根据流程实例ID获得当前处于活动状态的ActivityId...singleResult(); processDefinitionId = pi.getProcessDefinitionId(); } /* 获得活动的节点对象...activityId); } BpmnModel bpmnModel = repositoryService.getBpmnModel(processDefinitionId); //获取流程图
just 内部触发对 Observer 的 onNext 方法的调用,just 中传递的参数将直接在 onNext 方法中接收到,参数的类型要和 Observer 的泛型保持一致。...共有 10 个重载方法,其中 2 个以上参数方法的内部直接调用了 fromArray。...extends T> supplier) 当有观察者订阅时,从 Callable 的回调方法里获取要发射的数据。...fromPublisher 从响应式数据流获取要发射的数据。如果可以,尽可能使用 ObservableOnSubscribe。不支持背压。..., // 表示新的 state,下面这个两个方法的 state 是可变的,而上面两个方法的 state 在第一个参数 initialState 指定后就不变了 public static
subscribe(), 我们来看看最基本的用法吧: 这段代码中,分别创建了一个上游Flowable和下游Subscriber, 上下游工作在同一个线程中, 和之前的Observable的使用方式只有一点点的区别..., 先来看看运行结果吧: 结果也和我们预期的是一样的....()方法可以切断水管, 同样的调用Subscription.cancel()也可以切断水管, 不同的地方在于Subscription增加了一个void request(long n)方法, 这个方法有什么用呢...(FlowableCreate.java:72) at io.reactivex.Flowable.subscribe(Flowable.java...(FlowableCreate.java:72) at io.reactivex.Flowable.subscribe(Flowable.java
举个简单的例子,写一个transformer()方法将一个发射整数的Observable转换为发射字符串的Observable。...当创建Observable/Flowable...时,compose操作符会立即执行,而不像其他的操作符需要在onNext()调用后才执行。...对于Flowable切换到主线程的操作,可以这样使用 .compose(RxJavaUtils.flowableToMain()) 2........ } 如果你想在RxJava的链式调用中也使用缓存,还可以考虑使用transformer的方式,下面我写了一个简单的方法 /** * Created by Tony Shen on...return t; } }); } }; } } 结合上述三种使用场景,封装了一个方法用于获取内容
这就像小学做的那道数学题:一个水池,有一个进水管和一个出水管。如果进水管水流更大,过一段时间水池就会满(溢出)。这就是没有Flow Control导致的结果。...的问题,用Observable就足以满足需求; 获取数据操作是同步的,但你的平台不支持Java流或者相关特性。...而新的接口带来的新的调用方式与旧的也不太一样, subscribe 后不再会有 Subscription 也就是如今的 Disposable,为了保持向后的兼容, Flowable 提供了 subscribeWith...因为Reactive-Streams的基础接口org.reactivestreams.Publisher 定义subscribe()为无返回值,Flowable.subscribe(Subscriber...其他的基础类型也遵循这种规律。 在2.x中其他的subscribe的重载方法返回Disposable。 原始的Subscription容器类型已经被重命名和修改。
领取专属 10元无门槛券
手把手带您无忧上云