在project reactor中processor有诸多实现,他们的分类大致如下: direct(DirectProcessor以及UnicastProcessor) synchronous(EmitterProcessor...@Test public void testUnicastProcessor() throws InterruptedException { UnicastProcessor unicastProcessor = UnicastProcessor.create(Queues....get(8).get()); Flux flux = unicastProcessor .map(e -> e)...(); TimeUnit.SECONDS.sleep(10); // unicastProcessor.blockLast(); //blockLast也是一个subscriber
lineBuilderFunction; this.lineSink = lineSink; config().namingConvention(namingConvention); UnicastProcessor... processor = UnicastProcessor.create(Queues....stop(); super.close(); } //...... } 1.可以看到底层是使用reactor的UdpClient,processor使用的是UnicastProcessor
UnicastProcessor 只能有一个观察者。
另外还有二个接近的方法window/windowTimeout,只是window/windowTimeout调用后的结果是Flux>,处理过程中产生的流为UnicastProcessor
io.reactivex.processors.PublishProcessor, io.reactivex.processors.ReplayProcessor io.reactivex.processors.UnicastProcessor
领取专属 10元无门槛券
手把手带您无忧上云