我有一个基于拉取的流数据源(就像Kafka一样)。我想把reactor应用到这个事件处理应用程序上。
目前,我使用EmitterProcessor创建了一个无限的事件序列。并且它在初始时间被订阅一次,并且从未被取消。
下面的代码表明了我所做的事情。
public void initialize(){
    EmitterProcessor<Event> emitter = ...
    emitter.flatmap(this::step1)
           .flatmap(this::step2)
           .flatmap(this::finalStep)
         //.subscriberContext(...)
           .subscribe()
}对于初始Flux<Event>中的每个事件,我需要维护/更新上下文,以便可以获得每个步骤的所有输入和结果,并在最后一步中进行一些报告。
可以一步一步地传递不可变的Context类,但这会导致所有的step()都有一个额外的参数。而且并不是所有的step()都会使用Context。在这种情况下,您只需传递Context并返回Pair<Context,OtherResult>,这看起来很难看。Pair也很难看。
所以我更喜欢像ThreadLocal<Context>这样的东西。显然,在反应堆中,替代方案是subscriberContext()。但是,根据我的代码,initialize()将被调用一次。Flux<Event>将为subscribe() once。subscriberContext不是在我的Event级别,而是在订阅级别。因此,在我的代码中将只有一个上下文。它不起作用。
问题是,我应该将事件流视为一个Flux<Event>还是多个Mono<Event>,并对每个事件进行订阅?如果Mono<Event>是最佳实践,那么我可以直接使用subscriberContext()。但是是否有任何组装时间开销(在每个事件到来时进行组装)?
在reactor-kafka中,它使每一批Record都是一个Flux<Record>,它如何实现像记录级上下文这样的东西?
谢谢。
发布于 2020-01-10 18:37:37
根据您上次需要此上下文中的信息的时间,您可以选择使用单个flatMap为每个事件创建一个作用域,并为它们分配自己的上下文:
public void initialize(){
    EmitterProcessor<Event> emitter = ...
    emitter.flatMap(eventForScope ->
        Mono.just(eventForScope)
            .flatmap(this::step1)
            .flatmap(this::step2)
            .flatmap(this::finalStep)
            .subscriberContext(...) //context for ONE event
        )
        .subscribe()
}这是可以调优的,一些较晚的步骤可能不再需要每个事件的上下文,所以您可以将它们移到外部flatMap之外,等等……
这之所以有效,是因为flatMap的内部可以看到“主”Context,但是对内部上下文的更改对外部/主序列不可见。
https://stackoverflow.com/questions/59659091
复制相似问题