前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >kafka的consumer设计方案

kafka的consumer设计方案

原创
作者头像
mariolu
修改2020-06-15 12:13:01
1.7K0
修改2020-06-15 12:13:01
举报

一、设计consumer的要点

1.1 消费者与消费组的关系。

以下特点实现了了kafka的消费者设计思想:基于队列和基于发布/订阅者模式的 生产-消费模型。

  • 消费组有若干消费者组成。
  • 一个topic会递交给消费组的一个消费者
  • 多个消费组运行同时消费同一个topic

生产一个消息:

图1、生产者生产消息
图1、生产者生产消息

启动两个consumer,这两个consumer属于同一个group

图2、同一个消费组group.0只能有一个consumer收到消息
图2、同一个消费组group.0只能有一个consumer收到消息

启动两个consumer,这两个consumer属于不同group

图3、不同消费组收到同样的消息
图3、不同消费组收到同样的消息

这时我们明白了消费组id的背后实际意义,一般我们会设置组id为一个跟业务相关的名字。

1.2 重平衡(rebalance)机制

重平衡也可以理解成一个后端的分配机制,使得负载均衡或者部分节点失活扔能达到系统可用。例如说一个消费组包含10个消费者。订阅了50个topic,那么每个消费者会平均得到5个topic。

类似于分布式的管理协议,该组会协商出一个协调者(coordinator)。该协调者负责管理组的状态。

触发rebalance的条件有:

  • consumer加入组、离开组。
  • 订阅topic总量发生变更,为了达到组内平均均衡分担topic,这里有个例子是,比如说订阅了含正则表达式的topic,那么生产者只要满足正则规则的topic,都可以生产消息,所以这里会经常触发reblance
  • 订阅topic的partition发生变化(这是什么?)

一个例子是:

当new第二个consumer

图4、新启一个窗口,追加新的consumer
图4、新启一个窗口,追加新的consumer

这时候老的consumer会出现

图5、旧的consumer会触发一次reblance
图5、旧的consumer会触发一次reblance

1.3 消息位移

消费者保存当前消费消息的位置。也就是下一次消费的位置。这个位移消息保存在broker服务端。服务端保存这些offset需要内存,同时需要和客户端同步位移信息。一般同步做法是引入位移信息。还引入了检查点机制(这是什么?)。

1.3.1 位移的提交

在消费者设计的原则上有如下准则:最多一次(at most once)、最少一次(at least once)、有且只有一次(exactly once)。

所以这涉及到consumer在消费之前提交位移还是,处理完消息再提交位移,因为消费者在取到消息和处理完消息之间可能发生崩溃。那么消费者重启到底是从哪个位移消费。kafka默认是at least once方案,也就是说处理完消息之后再提交位移。如果能够支持事务,那么这个设计可以提升到exactly once。

图6、某个topic的偏移量管理
图6、某个topic的偏移量管理

内部topic名字为__consumer_offsets用来保存消费者提供的offset。消费者的位移提交会在__consumer_offsets-<某个分片>写上一条消息。消息key是group id + topic + 分区,value是偏移量,如果一个group的一个conumer对同一个topic分区提交了多次,那么kafka会使用compact策略保存最新的一次提交位移

图7、内部topic __consumer_offset
图7、内部topic __consumer_offset

位移提交又分为自动提交和手动提交。

1.4 poll轮询

在用户订阅了topic之后,poll以事件循环开等待读取消息。可以触发的消息包括coordinator协调消息,消费组内部的reblace消息,和生产者写入topic的消息。这样达到一个线程IO管理所有事件。

在poll进行消息等待,可以设置以下策略来退出等待,处理消息。

  • 获取到一定量的数据
  • 等待时间到达,有多少消息处理多少

1.5 多线程方案

一个实例只允许运行在一个线程方案,这是由于减少引入同步、锁机制带来的性能折损。建议使用单线程方案。在消费者poll消息进入一个循环体,我们用isRunning变量控制循环运行。如果程序执行进入到其他线程,那么主动设置isRunning=false来结束consumer。主动调用consumer.close会及时告知开启新一轮的reblance。

二、JAVA方案的consumer配置方案

2.1 心跳相关

  • sessiong.timeout.ms:检测奔溃消费组某个成员如果超时了,会被剔除组。
  • heartbeat.interval.ms:必须小于sessiong.timeout.ms,其他成员可以及时感知到奔溃的成功,进入新一轮的reblance
  • connections.max.idle.ms:连接的保活时间,设置太短会导致定期关闭导致的新请求必须重新建立连接带来的开销。设置太长又会过度消耗机器连接资源。这里默认值是9分钟。

2.2 poll返回相关:

  • max.poll.interval.ms: 处理消息的业务逻辑所需要的最长时间。基于这个设置该项。一般是结合sessiong.timeout.ms

,sessiong.timeout.ms设置较低值,max.poll.interval.ms设置成实际处理耗时。既可以快速检测奔溃,又可以处理逻辑不会引起没必要的reblance

  • max.poll.records:每次返回的最大消息数,如果是1,每条都返回。这个值涉及到消息的处理速度。

2.3 提交策略

  • auto.offset.reset:可选项有earliest(最早的位移开始消费)、latest(最新的位移)和none(无位移、或者位移越界抛出异常)。
  • enable.auto.commit:自动提交或者手动提交,这个好理解

2.4 其他

fetch.max.bytes:消费的最大字节数,如果过大超过配置值,则无法消费。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、设计consumer的要点
    • 1.1 消费者与消费组的关系。
      • 1.2 重平衡(rebalance)机制
        • 1.3 消息位移
          • 1.3.1 位移的提交
        • 1.4 poll轮询
          • 1.5 多线程方案
          • 二、JAVA方案的consumer配置方案
            • 2.1 心跳相关
              • 2.2 poll返回相关:
                • 2.3 提交策略
                  • 2.4 其他
                  相关产品与服务
                  负载均衡
                  负载均衡(Cloud Load Balancer,CLB)提供安全快捷的流量分发服务,访问流量经由 CLB 可以自动分配到云中的多台后端服务器上,扩展系统的服务能力并消除单点故障。负载均衡支持亿级连接和千万级并发,可轻松应对大流量访问,满足业务需求。
                  领券
                  问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档