首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >Kafka流窗口聚合批处理

Kafka流窗口聚合批处理
EN

Stack Overflow用户
提问于 2018-09-24 16:24:51
回答 1查看 748关注 0票数 0

我的应用程序中有Kafka流处理:

代码语言:javascript
运行
复制
myStream
    .mapValues(customTransformer::transform)
    .groupByKey(Serialized.with(new Serdes.StringSerde(), new SomeCustomSerde()))
    .windowedBy(TimeWindows.of(10000L).advanceBy(10000L))
    .aggregate(CustomCollectorObject::new,
            (key, value, aggregate) -> aggregate.collect(value),
            Materialized.<String, CustomCollectorObject, WindowStore<Bytes, byte[]>>as("some_store_name")
                    .withValueSerde(new CustomCollectorSerde()))
    .toStream()
    .foreach((k, v) -> /* do something very important */);

预期行为:传入消息按密钥分组,并在一定时间间隔内在CustomCollectorObject中进行聚合。CustomCollectorObject只是一个包含List的类。在foreach中每隔10秒钟,我就会对我的聚合数据做一些非常重要的事情。重要的是,我希望每10秒调用一次foreach

实际行为:我可以看到,在我的foreach中的处理称为稀薄,大约每30-35秒处理一次,这并不重要。最重要的是,我一次收到3-4条信息.

问题是:如何才能达到预期的行为?我需要在运行时处理我的数据,没有任何延迟。

我尝试过设置cache.max.bytes.buffering: 0,但在这种情况下,窗口根本无法工作。

EN

回答 1

Stack Overflow用户

发布于 2018-09-25 22:08:16

Kafka流有一个不同的执行模型,并提供了不同的语义,也就是说,您的期望与Kafka流所做的不匹配。已经有多个类似的问题:

还请注意,社区目前正在开发一个名为suppress()的新操作符,它将能够提供所需的语义:https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables

现在,您需要添加一个带有状态存储的transform(),并使用标点符号获取所需的语义(c.f )。https://docs.confluent.io/current/streams/developer-guide/processor-api.html#defining-a-stream-processor)

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/52483589

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档