首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

聊聊reactive streams的processors

本文主要研究一下reactive streams的processors

processors分类

processors既是Publisher也是Subscriber。在project reactor中processor有诸多实现,他们的分类大致如下:

direct()

synchronous()

asynchronous()

direct

DirectProcessor

它不支持backpressure特性,如果publisher发布了N个数据,如果其中一个subscriber请求数

输出如下

UnicastProcessor

支持backpressure特性,但是代价是至多只能有一个subscriber。默认是无界的,如果发布数据之后,subscriber还没来得及request,则它会把数据缓存下来。

如果设置了一个有界的queue,当buffer满而且subscriber没有发送足够多的request的时候,processor会拒绝推送数据。在这种场景下,processor内置了一个callback,每当一个element被rejected的时候会触发.

输出实例

synchronous

EmitterProcessor

能够支持多个subscriber,同时还对每个subscriber支持backpressure。它也可以订阅publisher,然后把数据同步重放。

它有一个bufferSize参数,用来在发布数据之后还没有订阅者期间的数据,onNext会一直阻塞直到数据被消费;当第一个订阅者订阅之后,它会接收到buffer里头的数据,而后续的订阅者就只能消费到自他们订阅那个时候起发布的数据。

当所有的subscriber都取消订阅之后,该processor会清空buffer,并停止接收新的订阅。

输出实例

ReplayProcessor

可以缓存通过sink产生的数据或者订阅publisher的数据,然后重放给后来的订阅者。有如下四种配置

cacheLast

只缓存最后一个数据

create(int)

缓存最后N个数据

createTimeout(Duration)

对每个数据打上时间戳标签,只缓存age在指定ttl内的数据

createSizeOrTimeout(int,Duration)

对每个数据打上时间戳标签,只缓存age在指定ttl内的N个数据

实例

输出如下

asynchronous

TopicProcessor

TopicProcessor是一个异步的processor,当shared设置为true的时候,支持对多个publisher的并发重放。如果订阅的publisher是一个并发的stream或者是需要并发调用Topicrocessor的onNext,onCompleete,onError方法,则必须强制开启share。关闭share则是遵循reactive streams规范的processor,不允许并发调用。

TopicProcessor也支持把消息广播(fan-out)到多个subscriber,它给每个subscriber绑定一个线程。能够支持的subscriber的最大个数由线程池executor限制。

TopicProcessor使用了RingBuffer数据结构来推送数据,每个subscriber线程都在RingBuffer记录其消费的位置

TopicProcessor也支持autoCancel选项,默认为true,也就是当所有subscriber都取消订阅的时候,publisher也会被自动cannel

注意两个地方:

share

share背后设置的是EventLoopProcessor的multiproducers属性

reactor-core-3.1.2.RELEASE-sources.jar!/reactor/core/publisher/EventLoopProcessor.java

如果share为true,则创建的是createMultiProducer.

具体的表象就是如果有多线程调用processor的onNext方法,而没有开启share的话,会有并发问题,即数据会丢失.比如上面的代码,如果注释掉share(true),则最后count的大小就不一定是100,而开启share为true就能保证最后count的大小是100

如果设置executor(Executors.newSingleThreadExecutor()),则flux1,flux2,flux3的订阅者则是顺序执行,而不是并发的.

WorkQueueProcessor

WorkQueueProcessor也是一个异步的processor,当shared设置为true的时候,支持对多个publisher的并发重放。

WorkQueueProcessor使用了RingBuffer数据结构来推送数据。

WorkQueueProcessor不是每来一个subscriber就给其创建一个线程,因此比TopicProcessor的伸缩性更好一点。能够支持的subscriber的最大个数由线程池executor限制。但是值得注意的是最好不要给WorkQueueProcessor添加过多的subscriber,这样会增加processor的锁竞争。最好使用ThreadPoolExecutor或者ForkJoinPool,processor可以检测他们的容量然后再订阅者过多的时候抛出异常。

WorkQueueProcessor不遵循reactive streams的规范,因此比TopicProcessor所消耗的资源更少。作为trade-off,所有subscriber的request会累加在一起,然后WorkQueueProcessor每次只给一个subscriber重放数据,相比于TopicProcessorde fan-out广播模式,它类似于round-robin模式,但是公平的round-robin模式是不被保证的。

输出实例

可以看到WorkQueueProcessor的subscriber就类似kafka的同属于一个group的consumer,各自消费的消息总和就是publisher发布的总消息,不像TopicProcessor那种广播式的消息传递.

doc

processor-overview

disruptor-3.3.2源码解析(3)-发布事件

  • 发表于:
  • 原文链接http://kuaibao.qq.com/s/20180117G0ZCCU00?refer=cp_1026
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券