我正在外部化数据库中主题的kafka消费者元数据,包括消费者组和组中的消费者数量。
Consumer_info表具有
Topic name、Consumer group name、Consumer class name中的Consumer数量
在应用服务器启动时,我正在读取表,并根据表中设置的数字创建消费者(线程)。如果消费者组计数设置为3,我将创建3个消费者线程。这是基于给定主题的分区数量
现在,如果我需要横向扩展,我如何在多个应用服务器节点上分配属于同一组的消费者。而不需要多次阅读相同的消息。
消费者的初始化代码将在应用服务器启动时被调用,它将从数据库中读取消费者的元数据,并在应用服务器的同一实例上创建所有消费者线程,即使我添加更多的应用服务器实例,它们都将是冗余的,因为启动的第一个服务器已经产生了等于在其他实例上创建的partitions.any更多消费者线程的数量将是空闲的。
你能建议更好的方法来横向扩展消费者吗
发布于 2020-06-30 04:57:09
消费群体和组内消费人数
与外部数据库查询相比,临时运行kafka-consumer-groups --describe
将为您提供更多最新信息,特别是考虑到消费者可以重新平衡并随时退出组。
如何在多个app服务器节点上分配属于同一组的消费者。不需要多次阅读相同的消息
这就是Kafka Consumer groups开箱即用的操作方式,假设您没有在代码中手动分配分区。
在使用、确认并提交组内偏移量之后,不能多次读取消息
当您已经可以尝试公开kafka-consumer-groups
命令周围的API时,我认为没有必要使用外部数据库
或者,您可以使用Cloudera的Stream-Messaging-Manager,它也会显示大量此类信息
https://stackoverflow.com/questions/62646507
复制相似问题