我有一个单节点kafka代理和简单的流应用程序。我创建了两个主题(topic1和topic2)。
Produced on topic1 - processed message - write to topic2
注意:对于生成的每条消息,只有一条消息被写入目标主题。
我只发了一条信息。在写到topic2之后,我阻止了卡夫卡经纪人。过了一段时间,我重新启动了代理,并在topic1上产生了另一条消息。现在,streams应用程序处理了这条消息3次。现在,在没有停止代理的情况下,我向topic1生成了消息,并等待streams应用程序在再次生成之前将其写入topic2。
Streams应用程序表现得
假设我有一个名为“批处理”的主题,其中包含一个分区,并将数百万条记录发布到其中进行处理。我有一个3人的消费者小组来处理那些数百万的记录。我遇到这样一种情况:我不再需要处理某些满足特定条件的消息子集,比如age < 50。
如何以编程的方式从主题中删除这些消息。就像我单击UI中的“取消”按钮一样,它应该从主题的age < 50中删除那些记录的子集,这样消费者就不会处理它。
我知道我可以通过运行带有偏移量的命令行来删除消息:-
也是Java,但也是通过抵消:
Delete records whose offset is smaller than the given offset o