Kafka消费者API

简介

Kafka消费者客户端可以透明地处理集群的故障,在集群发生变化时也能透明地调整topic的分区。客户端维持了TCP连接以便获取数据。用完客户端不关闭将会导致泄露这些连接。消费者不是线程安全的。

版本兼容性

在低版本的Kafka上使用高版本的API会导致UnsupportedVersionException。

offset和consumer postion

kafka为每个分区的每一条记录维护了一个数字的offset。offset既充当了分区记录的唯一标识,又意味着消费者的位置。比如position=5的消费者,已经消费了0-4的记录,将要消费offset为5的记录。

实际上,与消费者有关的position的概念有两个,一个是position(将要给消费者的下一条记录的offset,消费者已经见过的最大offset+1,并且在调用poll时自动增长)。

另一个是committed position(被安全的存储的最后的offset,进程崩溃重启之后恢复到的位置,消费者可以选择自动提交已经手动提交)。

消费者组和topic订阅

kafka使用消费者组的概念,相同group.id的程序协同消费。Kafka均衡消费者是通过分区实现,一个分区只会对应一个消费者。成员关系是动态的,某个消费者崩溃时会重新分配给组内的其他消费者,新的消费者加入时,某个分区将转移给新的消费者。这称为消费者组重平衡。重平衡也发生在增加分区和通过正则匹配了新的订阅。消费者组通过周期性的元数据自动检测并分配。kafka支持多个消费者组,不需要复制多份数据。重平衡发生时,可以通过ConsumerRebalanceListener通知消费者,以便让消费者能够完成程序级别的逻辑如清理和提交offset。消费者也可以通过assign方法自行划分分区(简单消费),这种情况下分区重平衡和消费者组协同不可用。

检测消费者失败

消费者有两个参数可以影响poll行为。

对于消息处理时间不可预测的时候,上面的两个设置都不够。另一种推荐的方式是,将消息处理移到另一个线程,消费者poll继续调用,但必须保证committed offsets不能比实际的position大。通常,必须禁用自动提交,并仅在线程完成处理后手动提交offset。另外需要pause分区,以便线程在完成之前的数据处理之前不会再poll到数据。

自动提交offset

手动提交

手动提交有commitSync和commitAsync,明确提交offset时,必须是处理过的最后一条消息+1

手动分区分配

即简单消费者,通过assign方法实现,并且最好不要与其他消费者组共享group id,以免混乱。

在Kafka外存储offset

Kafka支持此功能的主要目的是允许程序在统一系统中存储offset和结果(比如利用mysql事务),以此来提供完全一次的语义。当分区也是手动分配的时候比较简单(类似与寻找索引),当分区是自动分配时,需要在subscribe时传入ConsumerRebalanceListener实例。在分区从消费者拿走时,ConsumerRebalanceListener.onPartitionsRevoked(Collection)将被调用,当分区分配给消费者时,ConsumerRebalanceListener.onPartitionsAssigned(Collection)将被调用。另外ConsumerRebalanceListener的用途是刷新应用程序维护的将要去其他地方的分区的缓存。

控制消费者位置

Kafka允许消费者手动控制其位置,随意前后移动。有集中场景:对于时间敏感的,可以跳过远远落后的,先跳到最近的记录。另一种是重新消费重建记录。Kafka使用seek指定新位置,seekToBeginning到开始,seekToEnd到最后。

消费流控

场景:消费多个topic时,针对topic制定优先级。流处理中,暂停某个流消费以便让滞后的流跟上等。通过pause和滞后poll中调用resume支持动态流控。

读取事务消息(0.11.0)

事务是为了原子性的写入多个topic,通过isolation.level=readcommitted,只能读取已提交的数据。readcommitted模式中没有客户端缓冲,取而代之的是,分区的结束偏移量将是属于开放事务的分区中的第一个消息的偏移量。该偏移称为“最后稳定偏移”(LSO)。

read_committed的消费者仅仅读取LSO,并筛选出已中止任何事务性消息。LSO同时影响of seekToEnd和endOffsets的行为,fetch lag指标也是相对于LSO的。

事务消息分区将包含commit或abort标记,来指明事务的结果。标记不会返回给应用程序,但会有offset。应用程序将会看到消费offset的间隙(gaps)。这些丢失的消息将成为事务标记,将会在两个隔离级别中为消费者过滤。除此之外,read_committed消费者可能看到由于中断的事务造成的间隙,因为这些消息不会给消费者,但它具有有效的偏移量。

多线程编程

Kafka的消费者不是线程安全的,所有网络I/O都发生在进行调用的应用程序线程中。用户有责任保证正确同步多线程访问,否则抛出ConcurrentModificationException异常。唯一的例外是wakeup,可以从外部线程来终端活动操作,用于关闭消费者。通常做法是在捕获异常时设置flag跳出循环。尽管线程中断也可以关闭,但不推荐,因为可能导致非正常关闭。通常多线程的模型有 [每个线程一个消费者]或者[一个或多个消费者+业务处理线程池]的方式。

前者优点是容易实现,但有更多的TCP连接(代价不大),更多的请求发送到服务器但数据少导致I/O下降,并且总线程数受分区数限制。

后者的优点是摆脱分区的限制,可独立扩展消费者和处理程序的数量。缺点是需要保证处理程序之间的顺序(不需要顺序保证时可忽略),手动提交offset更难,因为需要所有线程协调以确保分区处理完成。这种做法有很多变种,例如每个处理线程有自己的队列,消费者线程根据分区hash到这些队列中,以确保顺序并简化提交。

  • 发表于:
  • 原文链接https://kuaibao.qq.com/s/20180808G1TGMK00?refer=cp_1026
  • 腾讯「云+社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。

同媒体快讯

扫码关注云+社区

领取腾讯云代金券