我有一个用例,在这个用例中,我有3个Kafka消费者在写一个主题,每个消费者中的消息需要按顺序处理。以防万一,如果其中一个消费者存在滞后,则需要更早处理的消息将被丢弃(写入条件)。那么,有没有办法可以维持这些消息的顺序呢?
发布于 2020-06-04 14:22:25
消息始终在Kafka分区中排序。通常,属于某个关键字的所有消息都位于某个分区中(通过分区逻辑)。
我有一个用例,在这个用例中,我有3个Kafka用户在写一个主题
我想,你的意思是你有3个消费者从一个主题中阅读
这里有两种情况:
案例#1中的
您可以有3个消费者,每个消费者都有不同的group.id,这样每个消费者都可以使用所有的消息集。在这里,速度较慢的消费者不会减慢其他消费者的速度。因为每个使用者通常在其自己的线程或进程中运行。
案例2中的
您可以有3个具有相同group.id的使用者,这样每个使用者将获得其自己的分区份额。一个消费者消费的消息不会被另一个消费者消费。在这里,速度较慢的消费者也不会减慢其他消费者的速度。因为每个使用者将只使用它自己的一组分区。
在这种情况下,如果其中一个消费者存在滞后,则需要更早处理的消息将被丢弃(写入条件)
在Kafka中没有隐含的丢弃,你必须在轮询消息后自己丢弃它。
为了检查滞后,我认为,你可以从consumer.endOffsets()和consumer.position()开始,不同之处应该会给你带来滞后。根据延迟的不同,您可以选择丢弃邮件。
consumer.assignment().forEach( topicPartition -> {
long currentPos = consumer.position(topicPartition);
long endOfPartition = consumer.endOffsets(Collections.singletonList(topicPartition)).get(topicPartition);
});https://stackoverflow.com/questions/62187416
复制相似问题