我的应用程序中有Kafka流处理:
myStream
.mapValues(customTransformer::transform)
.groupByKey(Serialized.with(new Serdes.StringSerde(), new SomeCustomSerde()))
.windowedBy(TimeWindows.of(10000L).advanceBy(10000L))
.aggregate(CustomCollectorObject::new,
(key, value, aggregate) -> aggregate.collect(value),
Materialized.<String, CustomCollectorObject, WindowStore<Bytes, byte[]>>as("some_store_name")
.withValueSerde(new CustomCollectorSerde()))
.toStream()
.foreach((k, v) -> /* do something very important */);
预期行为:传入消息按密钥分组,并在一定时间间隔内在CustomCollectorObject
中进行聚合。CustomCollectorObject
只是一个包含List
的类。在foreach
中每隔10秒钟,我就会对我的聚合数据做一些非常重要的事情。重要的是,我希望每10秒调用一次foreach
!
实际行为:我可以看到,在我的foreach
中的处理称为稀薄,大约每30-35秒处理一次,这并不重要。最重要的是,我一次收到3-4条信息.
问题是:如何才能达到预期的行为?我需要在运行时处理我的数据,没有任何延迟。
我尝试过设置cache.max.bytes.buffering: 0
,但在这种情况下,窗口根本无法工作。
发布于 2018-09-25 22:08:16
Kafka流有一个不同的执行模型,并提供了不同的语义,也就是说,您的期望与Kafka流所做的不匹配。已经有多个类似的问题:
还请注意,社区目前正在开发一个名为suppress()
的新操作符,它将能够提供所需的语义:https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables
现在,您需要添加一个带有状态存储的transform()
,并使用标点符号获取所需的语义(c.f )。https://docs.confluent.io/current/streams/developer-guide/processor-api.html#defining-a-stream-processor)
https://stackoverflow.com/questions/52483589
复制相似问题