Kafka入门教程(二):Consumer使用(topic,partition和offset

Consumer线程不安全,不能多线程共用

topic与partition

上文提到:一个Topic的一个Partition,只能被同一个ConsumerGroup的一个消费者消费,这里主要介绍Consumer启动时指定topic和partition的使用。

只指定topic

即每个consumer只指定需要消费的topic,高级消费,对应kafka-clients中的Consumer.subscribe()方法

假定,1-N个consumer,属于同一个group。根据consumer的个数,由kafka-clinets分配每个consumer消费的partition,分配策略见后文。注意:必须使用合理的分配策略,否则可能出现一些consumer没有分配partition的情况。

若N>partition num(所有topic的partition总和), 则一些consumer不会被分配partition

若N

当消费多个partition时,消费每个分区内的消息是有序的,但消费多个分区之间的消息是无序的(可以在消费记录中获得当前记录的partition)

指定topic和partition

即为每个consumer指定需要消费的topic和partition,也即所说的低级消费,对应kafka-clients中的Consumer.assign()方法。

这种情况下,不再由kafka-clinets分配,指定哪个partition消费哪个,所以,当同一个消费组指定重复的partition时,会消费到重复的数据(完全重复的数据,因为poll的offset是本地维护的),但是server端只有一个offset!

所以,这种模式下,需要开发者自己保证同一个消费组的消费着具有不重复的partition。

那么,为什么要使用低级消费者呢?

高级消费partition的分配是由kafka-clinets完成的,但是会查询server端的信息,所以集群环境下,当没有指定partition时,每加入/离开一个消费者,kafka-clients都会重新平衡(reblance)partition的分配,这个时候,如果有消费完成但是没有提交的offset,reblance时则会造成数据的重复消费或者数据丢失(具体是哪种情况,要看offset的提交策略)。低级消费则不会发生reblance!

注意:Spring-kafka多线程消费的配置下,指定topic和partition时,也是低级消费,其线程和partition的分配策略见后续spring-kafka的教程。

partition分配策略

range: 得到topic-partitions关系,得到topic-consumers关系,然后,按照topic进行分配,即topic的所有partition按顺序分配到其所有的consumer上,举例:topicA-3partition, topicB-1partition, 4 consumers, 过程是,A的3个partition分配到consumer1-3,B的1个partition分配到consumer1,consumer4空闲,所以使用的最大线程数=max(topic*partition)

roundrobin:topics和patition组合,上述例子,就是ta-0,ta-1,ta-2,tb-0,然后四个取hashcode得到顺序,然后挨个分配到consumer上(要求:每一个consumer消费的topics有相同的streams&&这个消费组中每个consumer消费的topics必须完全相同)

上面的文字可能有描述不准确或不清楚的地方,这里列出了官方对着两种策略的解释:

RoundRobin:

The round-robin partition assignor lays out all the available partitions and all the available consumer threads. It then proceeds to do a round-robin assignment from partition to consumer thread. If the subscriptions of all consumer instances are identical, then the partitions will be uniformly distributed. (i.e., the partition ownership counts will be within a delta of exactly one across all consumer threads.)

(For simplicity of implementation) the assignor is allowed to assign a given topic-partition to any consumer instance and thread-id within that instance. Therefore, round-robin assignment is allowed only if:

a) Every topic has the same number of streams within a consumer instance

b) The set of subscribed topics is identical for every consumer instance within the group.

Range:

Range partitioning works on a per-topic basis. For each topic, we lay out the available partitions in numeric order and the consumer threads in lexicographic order. We then divide the number of partitions by the total number of consumer streams (threads) to determine the number of partitions to assign to each consumer. If it does not evenly divide, then the first few consumers will have one extra partition. For example, suppose there are two consumers C1 and C2 with two streams each, and there are five available partitions (p0, p1, p2, p3, p4). So each consumer thread will get at least one partition and the first consumer thread will get one extra partition. So the assignment will be: p0 -> C1-0, p1 -> C1-0, p2 -> C1-1, p3 -> C2-0, p4 -> C2-1

Reblance时发生了什么?

查看kafka-clients源码可以发现:

AbstractCoordinator 有详细说明调用subscribe方法发生了以下的事请

consumer注册到到服务端

coordinator(server端维护的一个服务)查找所有的该组consumer,选取leander

如果auto commit为true,所有的consumer提交本地offset到服务端;为false则不提交

leader通过coordinator获取服务端所有的partition和offset,并使用策略重新分配partition,结果返回给coordinator,coordinator下发分配结果到所有consumer(即jon和leave的reblance)

调用assign则不会使用AbstractCoordinator ,而是直接分配指定的partition。这两个是高级消费和低级消费的分界点

所以高级消费者集群时,新加入的consumer,如果是auto-commit则会提交offset,若为处理完可能会丢失数据;否则不提交,会重复消费数据。离开consumer,若未提交offset离开,则会重复消费数据;若自动提交了但是未消费,则会丢失数据

offset

如上文所述,kafka高吞吐量的保证是Partition是顺序写磁盘,同样消费也是顺序的,offset维护了一个group的消费者在当前partition消费的数据位置。

当consumer启动后,会维护一个本地的offset,运行中poll数据使用的是本地offset,不再查询server;server端也会维护一个offset,新版kafka offset是维护在一个topic中,使用默认的producer更新offset;旧版维护在zookeeper。

提交offset是指:使用本地的offset去更新server端的offset。

一个Consumer的运行过程是:查询服务端offset–>存在,从offset开始读取数据;不存在,根据则根据autoreset的策略执行(earliest最开始接收,latest只接收新数据);或者启动时指定offset–>poll数据,更新本地offset–>自动或手动提交offset,如果提交时未指定offset,则使用本地维护的offset更新服务端,指定了offset,则使用指定的offset更新服务端(!但是poll数据依旧使用的本地的offset,server端的offset仅在Consumer启动时会使用)

自动提交(不推荐)

自动提交策略下,是每隔指定时间,由kafka-clients自动提交本地维护的offset,默认本地offset=poll的数量+1。(本地offset可以通过seek方法修改)

但是会出现数据丢失的情况,比如poll了一批数据没有处理完,但是到时间了已经提交了offset,然后程序终止了,下次启动会从新的offset’启动,没有处理的数据丢失了

手动提交

不指定offset:同上,也是提交本地维护的offset,默认本地offset=poll的数量+1。

这种模式下,数据处理完毕(保存/丢弃)后再手动提交,解决了自动模式下的数据丢失问题,但是可能存在消费完的数据,offset没有提交成功,重复消费数据的问题(可以通过数据库事务解决)

指定offset:更新server端offset为指定值,但是本地offset不会更新,所以在consumer没有重启的情况下,是不会消费到重复数据。

【参考链接】

Kafka-偏移量提交(https://blog.csdn.net/u011669700/article/details/80053313)

更详细的reblance过程(http://matt33.com/2017/10/22/consumer-join-group/)

  • 发表于:
  • 原文链接https://kuaibao.qq.com/s/20181023G0RBS100?refer=cp_1026
  • 腾讯「云+社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。

扫码关注云+社区

领取腾讯云代金券