前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Kafka消费速度巨慢 | 你是不是懵了?

Kafka消费速度巨慢 | 你是不是懵了?

作者头像
大数据真好玩
发布2019-10-09 16:59:06
9.7K1
发布2019-10-09 16:59:06
举报
文章被收录于专栏:暴走大数据暴走大数据

来源:zhuanlan.zhihu.com/p/33238750

作者:柏仁

By 暴走大数据

场景描述:随着业务的发展,项目组有大量的任务需要处理。Kafka消费出现了问题。

背景

随着业务的发展,项目组有大量的任务需要处理。

这些任务需要主要分为两种类型:

  • 通过接口调用, 后台执行任务
  • 通过调度系统定时执行

接口调用就需要执行任务不能阻塞, 不然系统的处理能力就会下降。任务调度系统需要在在一个最小的检测粒度时间内,执行完所有任务。这两种情况都面临这样一个问题, 任务不能阻塞,不然会非常影响性能。所以需要引入消息中间件,将任务派发方和任务执行方分离出来。

在这种情况下, 我们选择了kafka作为了我们的消息中间件, 选择kafka主要基于以下几点:

  • 支持分布式, 避免单点问题
  • 技术方案成熟, 公司内部有上线项目
  • 性能优异, 能够持久化消息

问题

我们团队在kafka使用上面都没有经验, 其他同事说kafka consumer在消费超时后会掉线,导致重复消费,当时没有这个使用场景,不能理解这个概念。

第一次发现问题是在联调的时候,任务执行方发现consumer会打印出错误日志,重复消费,并且陷入循环。

当时很快定位到问题, consumer长时间没有发送心跳包, 导致触发rebalance操作, consumer被踢下线了。

对于这个问题,需要详细讲述一下kafka consumer相关的机制。

kafka为了保证partition分配的高效率, 使用了如下机制:

  1. 所有的consumer都要和coordinator连接
  2. coordinator选出一个consumer作为leader来分配partition
  3. leader分配完以后通知coordinator, 由coordinator来通知给其他consumer
  4. 如果一个consumer不能工作了, coordinator会触发rebalance机制,重新分配partition

coordinator判定一个consumer不能工作, 依靠的就是heartbeat机制。consumer的配置里面有一项是session_timeout,如果heartbeat不能在session_timeout时间内发出一次请求,coordinator就会触发一次rebalance操作,重新分配partition。

从上面这样看没什么问题,很多系统都是这么设计的,一个工作线程,一个心跳包线程。但是kafka consumer为了设计上的简单(或者是出于其他目的),他们只有一个线程,也就是说工作逻辑和心跳包逻辑是同步的。对于心跳包这种定时任务,他们使用了一种叫做delayed_task的方案。

delayed_task是Best-Effort的,为什么这么说呢,我们来看看delayed_task是在什么时候工作的:

  1. 取出一批数据
  2. 执行delayed_task
  3. 循环yield 这批数据
  4. 重复执行上述过程

前面我们也说过, consumer只有一个线程, 也就意味着,如果主逻辑消耗了大量时间,delayed_task中的任务就会延期执行。在这种情况下, delayed_task只能保证任务不会提前进行,不能保证任务准时执行。拿一个具体的场景来说, 如果主逻辑花费了60s, 那么delayed_task中的任务最早也只能在60s之后执行,像heartbeat任务就直接超时了。

在提出解决方案之前, 我们需要考虑以下几个问题:

生产者速度大于消费者速度怎么处理

如果生产者速度大于消费者速度,消息就会积累。常规的解决方案是增加partition,增加消费者数量,但是在某一些场景下却不能这么实现。思考一下,如果生产者的速度不是恒定的,而是波动的,并且波峰和波谷差距比较大,大部分时间处于波谷,这样在波谷时其实资源是闲置的,并且会降低消费速度。另外对于消费的实时性比较高的场景,如果短时间内消息被积压,纵然最后能够消费掉,但是已经过了有效期,这样的消费其实是无效的。

所以我们必须有能力知道两个数据,即当前队列剩余的消息的数量和当前消息产生的时间。

在消费速度不一致的情况下如何提交offset

kafka-consumer的offset的提交机制是定时向delayed_task里面加入一个AutoCommitTask。但是在消费者消费速度不均衡的情况下不能这么做,如果消费者消费速度比较快,定时提交offset的机制会使得一旦consumer宕机,会丢失一大批消费信息。 同时我们也不能单纯的以消费数量作为是否提交的阈值,在消费者比较消费速率比较慢的情况下,一旦consumer宕机,我们会耗费大量时间在无用的消费上面。 所以我们需要同时衡量数量和时间两个变量,作为我们是否提交的阈值

offset提交失败该怎么处理

consumer的offset提交是按照TopicPartition作为提交单元的。在consumer消费过程中,可能会发生reblance事件,如果当前consumer分配到的partition数量大于1个,可能这个partition会被分配给其他的consumer。在这个过程中,consumer已经消费了该条数据,那么在提交offset的时候,就会遇到CommitOffsetError,因为这个partition已经不属于自己了。 这种情况下该如何处理这些数据。

方案

带着上面的一些问题,我们开始着手提出解决方案。

从上面的分析可以看出来, consumer掉线的最主要问题就是delayed_task和主函数处于同一个工作线程中,那么最直观的解决方法就是将这两个分离出来。

由于python GIL的限制,加上kafka consumer 是线程不安全的, 所以我们使用多进程来解决这个问题。

在consumer中,除了迭代器_message_generator之外,还提供了一个poll函数。这个函数和迭代器功能差不多,也能够获取消息,同时也会执行delayed_task。不同之处是, 这个函数会一次性返回一批数据,这样我们就有能力统计剩下的消息的数量。同时我们要求在producer发送消息的时候,一定要带上create_time这个字段,标注消息产生的时间。客户端现在同时能获取数量和时间两个参数,对于实时性要求比较高的场景,他就可以选择性的丢弃一批不满足要求的数据。

当消费者消费速度比较低的时候,我们需要停止获取数据,但是同时不能停下delayed_task。幸运的是,consumer提供了一个pause的函数,可以让我们停止对应的partition。一旦使用pause函数,poll函数将不会返回任何数据,但他依然会执行delayed_task。

由于我们使用poll函数一次性返回多个数据,加上在消费速度不均衡的情况下offset管理的问题。所以我们必须要手动管理offset, 保存我们上次提交offset的时间和未提交offset的数量,一旦其中某一个达到阈值,就真正的提交offset。

当我们提交offset失败的时候,我们需要清除对应的partition的所有数据,防止consumer做无用消费。

综合上面,我们就有能力构造出一个强健的consumer客户端,方便其他同学来使用。

核心代码如下:

代码语言:javascript
复制
while True:
    topic_records = self.consumer.poll().values()
    if not topic_records:
        self.get_offset()
        time.sleep(self.config['idle_timeout'])
    self.consumer.pause(*self.consumer.assigment())
    paused = True
    for records in topic_records:
        remain = len(records)
        for record in records:
        while True:
            data = {"record":record, "remain": remain}
            try:
                self.task_queue.put(data, self.config['block_timeout'])
                remain -= 1
              break
            except Full:
                self.consumer.poll()
                self.get_offset()
    if self.task_queue.qsize < self.config['resumen_count'] and paused:
        partitions = self.consumer.paused()
        if partitions:
            self.consumer.resume(*partitions)
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-10-08,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据真好玩 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 在消费速度不一致的情况下如何提交offset
相关产品与服务
消息队列 TDMQ
消息队列 TDMQ (Tencent Distributed Message Queue)是腾讯基于 Apache Pulsar 自研的一个云原生消息中间件系列,其中包含兼容Pulsar、RabbitMQ、RocketMQ 等协议的消息队列子产品,得益于其底层计算与存储分离的架构,TDMQ 具备良好的弹性伸缩以及故障恢复能力。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档