最近,我将kafka流从2.0.1升级到2.5.0。因此,我看到了很多警告,如下所示:
org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate$KStreamWindowAggregateProcessor Skipping record for expired window. key=[325233] topic=[MY_TOPIC] partition=[20] offset=[661798621] timestamp=[1600041596350] window=[1600041570000,1600041600000) expiration=[1600059629913] streamTime=[1600145999913]
KStreamWindowAggregate类中似乎有新的逻辑来检查窗口是否关闭。如果它已关闭,则跳过消息。与2.0.1相比,这些消息仍在处理中。
问题
有办法像以前那样做吗?在这次升级中,我看到我的数据中有很多空白,而且我不知道如何解决这个问题,因为以前没有看到这些漏洞。
我所使用的聚合函数已经处理了窗口化,因此也处理了过期的窗口。这个新逻辑与这个即将过期的窗口有什么关系?
更新
在进一步探索的同时,我确实认为,这与迈斯的晚年时期有关。在我的自定义时间戳提取器(它具有使用来自有效载荷的时间戳而不是普通时间戳的逻辑)中,我可以看到过期窗口警告的传入时间戳确实比有效负载的事件时间大24小时。
我想这是由24小时以上的消费滞后造成的。
时间戳提取器提取方法有一个分区时间,根据文档:
partitionTime,当前记录的分区·(如果未知的话可能是-1 )的最高提取有效时间戳。
那么,这是关于这个主题的记录的创建时间吗?有没有办法影响到我的唱片不再被跳过?
发布于 2021-01-19 00:18:30
与2.0.1相比,这些消息仍在处理中。
这有点令人吃惊(即使我需要再次检查代码),至少对于默认配置是这样的。默认情况下,存储保留时间设置为24h,因此在2.0.1中,当相应的状态已经被清除时,也不应该处理超过24h的旧消息。如果确实将存储保留时间(通过Materialized#withRetention
)更改为更大的值,则还需要通过TimeWindows#grace()
方法相应地增加窗口宽限期。
--我已经使用的聚合函数--处理窗口,并因此处理过期的窗口。这个新逻辑与这个即将过期的窗口有什么关系?
不知道你这么说是什么意思还是你是怎么做到的?旧的和新的逻辑与长窗口的存储方式类似(保留时间配置)。新的部分是宽限期,如果您愿意的话,可以将其增加到与保留时间相同的值)。
关于“分区时间”:它是根据TimestampExtractor
返回的任何内容计算的。对于您的情况,它是您从消息有效负载中提取的最大值。
https://stackoverflow.com/questions/63900263
复制相似问题