Kafka作为高可用的分布式消息系统,由多个broker组成的Kafka集群在物理层将消息分布在不同broker的不同partition上,支持多个producer和多个consumer,producer可以将topic的消息分布到不同broker的不同partition上,consumer也可以消费集群中多个partition。
多个consumer通过coordinator(Consumer和Broker的协调者)可以加入到一个逻辑的Consumer Group中,一个partition只允许被一个Consumer Group中的一个consumer所消费,在一个Consumer Group中一个consumer可以消费多个partition,不同的consumer消费的partition一定不会重复,不同Consumer Group都会消费所有的partition,也就是说一个Consumer Group中的consumer对partition是互斥的,而不同Consumer Group之间是共享的。
消费组与分区平衡
消费端的平衡主要包含JoinGroup和SyncGroup两部分:
Consumer会周期性的发送heartbeat到coordinator(协调者),如果协调者在session.timeout.ms以内没有收到consumer的heartbeat就会认定该consumer已退出,它所订阅的partition会分配到同一Consumer Group内的其它consumer上。该机制叫rebalance,只是针对于group的协调。
JoinGroup请求的主要作用是将Group Consumer订阅的信息发送给leader进行分配partiton。
SyncGroup请求的主要作用是让协调者把leader制定的分配方案发送给group内各consumer,当所有成员都成功接收到分配方案后,消费者组进入到Stable状态开始正常的消费工作。
以上机制会存在一个问题:
由于消费者消费消息的最小单元是partition,每个partition的消费进度offset是保存在consumer中的,在进行rebalance后partition会由group内的其它consumer接管,而接管的consumer如何知道该分区的offset?
在kafka中可以以Consumer Group级别保存各consumer所消费分区的offset,这些信息可以保存在ZK或topic中,一旦group内发生了rebalance之后,其它接管的consumer就可以读取该分区的offset。
Broker在启动时,都会创建和开启相应的coordinator组件,每次rebalance时group下所有的consumer都需要参与,如果group内的consumer过多会导致这个流程的效率非常低下,幸好rebalance不容易触发,只在以下几种条件触发:
主要配置
表示该consumer想要加入到哪个group中。默认值是“”。
心跳间隔。心跳是在consumer与coordinator之间进行的。心跳是确定consumer存活、加入或者退出group的方式。这个值必须设置的小于session.timeout.ms,因为当consumer由于某种原因不能发heartbeat到coordinator时,并且时间超过session.timeout.ms时,就会认为该consumer已退出导致rebalance。通常要低于session.timeout.ms的1/3。默认值:3000(3s)。
Consumer与coordinator的session 过期时间。这个值必须设置在group.min.session.timeout.ms 与 group.max.session.timeout.ms之间。默认值:10000(10s)
Consumer 在commit offset时有两种模式:自动提交、手动提交。默认值:true。
自动提交间隔。默认值:5000(5s)
设置broker在发现kafka在没有初始offset,或者当前的offset是一个不存在的值时的处理方式。有以下四种:
earliest:自动重置到最早的offset。
latest:看上去重置到最晚的offset(默认)。
none:如果更早的offset也没有的话,就抛出异常给consumer 。
consumer到broker的连接空闲超时时间 默认值是:540000(9min)
拉取消息
在Kafka partition中,每个消息都有唯一标识offset,每个consumer group中consumer依次从partition读取数据,Consumer循环不断的向broker发起fetch请求,每次从partition只拉取fetch.max.bytes大小的数据,每次执行max.poll.records条数据,然后consumer调用commitSync/commitAsync方法commit,将批量消息中最大的offset保留在coordinator中。如果超过max.poll.interval.ms没有调用poll就会认为这个consumer失败,导致触发rebalance的机制。
由于消息的offset是由consumer管理,更新offset的方式称为commit。Consumer提供了两种模式:
当enable.auto.commit设置为true时,consumer在发起fetch后每隔auto.commit.interval.ms 默认(5S)commit一次offset,如果在fetch后处理消息之后还未commit offset,发生了rebalance机制,接管的Consumer无法知道最新的offset而导致重复消费问题。
当enable.auto.commit设置为false时,需要consumer调用同步commitSync来主动提交offset,为了避免重复消费应当在消息处理完之后才commit offset,即使处理过程中发生rebalance也只有处理过的消息会重复消费。由于大量手动同步提交等待最终结果会导致阻塞,所以有异步commitAsync方式commit offset,但是commit提交失败后不会重试,因为如果有多个异步提交,会导致offset被覆盖。
多线程消费
每个consumer可开启多线程消费加大消费吞吐量,线程数量最好与分区数保持一致。topic的一个分区只能被同一个consumer group下的一个consumer线程来消费,但一consumer线程可以消费多个分区的数据,所以可加大consumer消费的线程数量来并行消费。
领取专属 10元无门槛券
私享最新 技术干货