设置:
我们有一个Debezium/Kafka设置,有一个Debezium Oracle生产者和一个Confluend JDBC使用者/接收器。
起始位置/背景/问题:
由于流量大,使得log.retention.minutes
降低到1h
,适用于99%的场合。但在一些罕见的情况下,卡夫卡的一个消费者放慢了脚步,再也跟不上了。在这种情况下,信息将在Kafka中删除(由于上述保留期),然后再被消费者捡起和处理。在默认配置中,使用者将跳过丢失的记录,选择最早可用的偏移量。这导致了目标端的不一致。
问题:
如何处理这些情况(如果提高log.retention.minutes不是一个选项)?
注意:如果使用者只是抛出一个异常/停止/etc,以防它找不到它给定的偏移量的消息,我们就可以了。
我们试着走远了.
我们尝试为使用者将auto.offset.reset
设置为none
,并期望使用者停止,以防它找不到偏移量。从理论上讲,这是可行的。在实践中,由于没有第一个/初始偏移量,当使用者被实例化时,它会不折不扣地抛出一个异常。
Final there ,那么我们还可以使用另一个配置参数吗?(比如“如果偏移量丢失/跳过,而不是在第一开始时抛出异常”?)或者,如果消费者跳过消息,我们可以监视JMX度量吗?
发布于 2022-02-01 16:14:01
为使用者将
auto.offset.reset
设置为none
,并期望使用者停止,以防它找不到偏移量
这就是它要做的,是的。
实际上,当使用者被实例化时,由于没有第一个/初始偏移量,
会立即抛出一个异常
您需要首先实际初始化组,然后将其查找到最早的偏移量。例如kafka-consumer-offsets --reset-offsets --to-earliest --group connect-<name>
类似于“如果偏移量丢失/跳过,而不是在第一次启动时抛出异常”?)
auto.offset.reset
在“先”和“下一步”开始之间没有什么区别。但是,您可以使用consumer.override.auto.offset.reset=earliest
创建连接器,然后等待它运行,然后使用PUT /config
调用将其设置为none
。当它停止运行时,再重复一遍。
我们可以监视
JMX度量,以防消费者跳过消息。
据我所知,这些指标大多是处理的报告字节。您还必须另外跟踪预期读取的字节数。
您需要其他监视解决方案来检测代理上正在删除的日志段,并与您的使用者当前正在读取的偏移量相比,跟踪这些偏移量范围。
https://stackoverflow.com/questions/70942159
复制相似问题