Kafka是批量发送消息的?

        Kafka作为高可用的分布式消息系统,由多个broker组成的Kafka集群在物理层将消息分布在不同broker的不同partition上,支持多个producer和多个consumer,producer可以将topic的消息分布到不同broker的不同partition上,consumer也可以消费集群中多个partition。

    多个consumer通过coordinator(Consumer和Broker的协调者)可以加入到一个逻辑的Consumer Group中,一个partition只允许被一个Consumer Group中的一个consumer所消费,在一个Consumer Group中一个consumer可以消费多个partition,不同的consumer消费的partition一定不会重复,不同Consumer Group都会消费所有的partition,也就是说一个Consumer Group中的consumer对partition是互斥的,而不同Consumer Group之间是共享的。

消费组与分区平衡

       消费端的平衡主要包含JoinGroup和SyncGroup两部分:

  • 当启动一个consumer时,向group.id发起join group请求
  • Coordinator根据请求选出Consumer Group内的leader,并通知group内的各consumer 
  • Consumer leader重新为每个consumer重新分配partition 
  • 分配完毕后,coordinator通过SyncGroup请求把最新分配情况同步给每个consumer

  Consumer会周期性的发送heartbeat到coordinator(协调者),如果协调者在session.timeout.ms以内没有收到consumer的heartbeat就会认定该consumer已退出,它所订阅的partition会分配到同一Consumer Group内的其它consumer上。该机制叫rebalance,只是针对于group的协调。

   JoinGroup请求的主要作用是将Group Consumer订阅的信息发送给leader进行分配partiton。

    SyncGroup请求的主要作用是让协调者把leader制定的分配方案发送给group内各consumer,当所有成员都成功接收到分配方案后,消费者组进入到Stable状态开始正常的消费工作。

以上机制会存在一个问题:

        由于消费者消费消息的最小单元是partition,每个partition的消费进度offset是保存在consumer中的,在进行rebalance后partition会由group内的其它consumer接管,而接管的consumer如何知道该分区的offset?

      在kafka中可以以Consumer Group级别保存各consumer所消费分区的offset,这些信息可以保存在ZK或topic中,一旦group内发生了rebalance之后,其它接管的consumer就可以读取该分区的offset。

     Broker在启动时,都会创建和开启相应的coordinator组件,每次rebalance时group下所有的consumer都需要参与,如果group内的consumer过多会导致这个流程的效率非常低下,幸好rebalance不容易触发,只在以下几种条件触发:

  • group内成员数量发生变化
  • topic数量发生变化
  • topic的分区数发生变化

主要配置

  • group.id 

表示该consumer想要加入到哪个group中。默认值是“”。

  • heartbeat.interval.ms 

心跳间隔。心跳是在consumer与coordinator之间进行的。心跳是确定consumer存活、加入或者退出group的方式。这个值必须设置的小于session.timeout.ms,因为当consumer由于某种原因不能发heartbeat到coordinator时,并且时间超过session.timeout.ms时,就会认为该consumer已退出导致rebalance。通常要低于session.timeout.ms的1/3。默认值:3000(3s)。

  • session.timeout.ms 

Consumer与coordinator的session 过期时间。这个值必须设置在group.min.session.timeout.ms 与 group.max.session.timeout.ms之间。默认值:10000(10s)

  • enable.auto.commit 

Consumer 在commit offset时有两种模式:自动提交、手动提交。默认值:true。

  • auto.commit.interval.ms

自动提交间隔。默认值:5000(5s)

  • auto.offset.reset 

设置broker在发现kafka在没有初始offset,或者当前的offset是一个不存在的值时的处理方式。有以下四种: 

            earliest:自动重置到最早的offset。

            latest:看上去重置到最晚的offset(默认)。

            none:如果更早的offset也没有的话,就抛出异常给consumer 。

  • connections.max.idle.ms 

consumer到broker的连接空闲超时时间 默认值是:540000(9min)

拉取消息

    在Kafka partition中,每个消息都有唯一标识offset,每个consumer group中consumer依次从partition读取数据,Consumer循环不断的向broker发起fetch请求,每次从partition只拉取fetch.max.bytes大小的数据,每次执行max.poll.records条数据,然后consumer调用commitSync/commitAsync方法commit,将批量消息中最大的offset保留在coordinator中。如果超过max.poll.interval.ms没有调用poll就会认为这个consumer失败,导致触发rebalance的机制。

      由于消息的offset是由consumer管理,更新offset的方式称为commit。Consumer提供了两种模式:

  • 自动方式(默认)

当enable.auto.commit设置为true时,consumer在发起fetch后每隔auto.commit.interval.ms 默认(5S)commit一次offset,如果在fetch后处理消息之后还未commit offset,发生了rebalance机制,接管的Consumer无法知道最新的offset而导致重复消费问题。

  • 手动提交

当enable.auto.commit设置为false时,需要consumer调用同步commitSync来主动提交offset,为了避免重复消费应当在消息处理完之后才commit offset,即使处理过程中发生rebalance也只有处理过的消息会重复消费。由于大量手动同步提交等待最终结果会导致阻塞,所以有异步commitAsync方式commit offset,但是commit提交失败后不会重试,因为如果有多个异步提交,会导致offset被覆盖。

多线程消费

       每个consumer可开启多线程消费加大消费吞吐量,线程数量最好与分区数保持一致。topic的一个分区只能被同一个consumer group下的一个consumer线程来消费,但一consumer线程可以消费多个分区的数据,所以可加大consumer消费的线程数量来并行消费。

  • 发表于:
  • 本文为 InfoQ 中文站特供稿件
  • 首发地址https://www.infoq.cn/article/2536c11bae7c9e893a029c416
  • 如有侵权,请联系 yunjia_community@tencent.com 删除。

扫码关注云+社区

领取腾讯云代金券