前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >编写一个配置化的Kafka Proxy,让你分钟级别接入Kafka

编写一个配置化的Kafka Proxy,让你分钟级别接入Kafka

作者头像
薯条的编程修养
发布2022-08-10 19:39:35
1.3K0
发布2022-08-10 19:39:35
举报

消息队列作为服务解耦、异步收发消息的组件,如今已经广泛应用于各大互联网公司,具备一定规模的公司会有专门的团队负责维护消息队列。当我们需要使用消息队列时,得写专门的代码进行连接,比如若用Golang编码,当需要使用Kafka消息队列时可能会使用 sarama库。这意味着需要在业务代码中写一些与业务无关的kafka配置与连接代码。

比如某个需求需要连接kafka的某个topic消费msg计算一些信息,此时的开发流程是: 分析需求->使用sarama连接topic->编写业务代码处理msg。笔者所在的部门是一个中台部门,经常需要接入各种topic去计算实时信息。如果来一个需求就写一坨连接代码去处理消息,成本会很高,且这么搞并不优雅,有些时候需求排期比较紧,你可能来不及写启动/停止消费的API,此时如果出现了线上问题,那kafka非常方便的流量回放都没法使用,老板问你为什么,你就只能干瞪眼了~

分析一下上面的开发流程,在接收消息进行处理这个流程中有简化的方案:我们可以设计一个proxy模块,它负责连接kafka topic拉取消息并把消息发送到业务模块的对应接口上。这些功能可以搞成配置化的流程,用户只需要在proxy的管理系统上填写对应工单,就可以收到对应topic的消息。有了这玩意,需求开发流程变为:分析需求->填个工单接收消息->编写业务代码处理msg。这样大大简化了消息队列接入流程,提高了开发效率。如果需求比较简单,比如只是连接单个topic,读取某个字段进行计数,那编写业务代码处理msg这块也可以做成配置化,整体开发起来非常轻松愉快。

那么proxy模块可以具有什么样的功能呢:

  1. proxy负责连接kafka的多个topic,并把topic的msg转发到业务填写的收集消息 API;
  2. proxy可以控制topic消息发送的启动/停止,业务模块在消息重启期间可以重置offset;
  3. proxy可以做msg的ETL功能:比如根据topic中的type值决定是否向业务API发送此条消息;再比如一个topic的schema中可能有上百个字段,好几层JSON信息,但是本次业务需求只需要其中三个字段即可满足需求,那么用户填写工单时可以声明所需字段,proxy的ETL负责打平消息,这样可以降低网络传输的开销成本。

但是这样的ETL功能会破坏topic原始schema,对定位只是proxy的模块功能显得有点重,系统可以提供这样的功能,并在管理平台工单页面周知用户,用户用不用那是另一码事了。

如果要写这么一个Proxy服务,它具有什么样的特点呢,先来看一下我设计的proxy实例工作的流程图:

  1. 为了实现配置化,服务需要具备动态读取配置和新建消费实例的能力,那么对应的kafka配置和HTTP服务配置可以放在像MySQL这样的关系型数据库中,新建实例即插入一条新数据可以new一个新的消费实例;
  2. proxy的服务器上保存着各个topic的消费实例,意味着这是个有状态的服务,一般的业务系统都是无状态的,接口里面的信息保存在关系型或者kv型存储中;写这种有状态的服务需要非常注意并发问题服务状态与db状态需保持一致;
  3. 现在具有一定规模的互联网公司一般会用微服务,各个服务服务按照事业部、部门等维度做成了一颗巨大的服务树,一个服务最终是服务树的一个叶子节点,服务节点有多个集群处理不同流量,一个集群有多台机器。服务集群是由专门的SRE负责运维的。kafka的topic一般会有多个分区(partition),消费时会有一定的限制条件:一台机器可以消费多个分区,但是一个分区最多只能让一台机器拉取消息,如果一个topic由10个分区,你有20台机器,那么意味着有10台机器拉取不到流量;如果你有5台机器,那么每台机器可能会收到两个分区的流量:

也就是consume过程中发生了rebalance。为了性能,一台机器最好可以消费多个topic的一个partition,这意味着服务在服务树的基础上,proxy还需要具有小集群管理的能力。有了这个能力,我们可以控制topic的分区在不同机器上的启停,比如某台机器上消费的Topic 有A、B两个,突然topic的流量激增,机器的CPU idle不足,那我们可以把这台机器上的B topic的分区流量转移给其他资源充足的机器。

  1. 作为一个偏中部的服务,proxy的稳定性非常重要。同时proxy作为consumer在拉取topic的消息时需要有ACK机制 预防业务系统收到消息却由于程序崩溃导致消息丢失的情况。proxy往业务系统发送消息不是无限次的,需要考虑当msg多次发往业务系统仍失败的情况需如何处理,最后proxy对于接入的topic应具备监控报警机制让用户可以观察实际的消费情况。

有了这些需求,可以进行系统设计了,这是我设计的proxy系统架构图:

下面来介绍一下架构图中的细节内容:

配置化的需求

用户接入Proxy系统的流程是:填写kafka topic的配置,让proxy可以获取kafka相关元数据 -> 若有ETL的需求,填写ETL的元数据 -> 填写业务收集接口的信息(这里为了省事,业务接口只支持程序员最喜欢的HTTP协议)

kafka topic元信息包括broker server信息, topic name, app info等;

对于ETL功能,每个topic的schema应为固定格式,我们只需从topic 的schema中提取我们需要的数据即可,这里可以用gjson库实现;

为了短平快,服务选择我们普通开发最喜欢的HTTP协议,那么本质上就是编写一个可配置化的HTTP client,HTTP接口信息包括服务VIP信息(Nginx可以根据VIP配置把流量轮询发送给下游服务、URL PATH、服务入参、出参、超时时间,重试次数等信息;这个需求比较简单,这里不再赘述。

小集群管理能力

为了充分压榨服务器的性能,每台服务器上需要消费多个topic的partition, 为了方便管理,服务器对于不同的topic应具有启动消费和停止消费的能力。我选择在db的kafka配置表中对于每种topic保存多个机器的hostname来描述消费关系,比如topic A有三个partition,那么机器字段为["rock_hna_01.py","rock_hna_02.py","rock_hna_03.py"], db里还有一个consume_statue字段用于控制consume行为的启动/停止。同时,服务提供实时刷新/定时刷新config的接口,保证当db配置刷新后,服务中的配置随之进行刷新。

有状态的服务

写这个服务需要注意并发情况和服务配置与db配置保持一致,除了上面提过的定时/实时刷新接口,代码内部会有挺多的全局共享变量,为了保证并发安全,我用了大量的golang并发原语,比如:sync.Mutex、sync.Map、sync.Once、context等。

这里说一下context的使用场景,由于consume代码是这样的结构:

代码语言:javascript
复制
for {
select {
 ...
}
}

而停止消费topic的接口需要控制协程关闭,否则会发生泄露,所以选择使用context(channel也可以)。

稳定性相关能力

在服务稳定性建设上,我选择在服务中增加限流功能,限流功能使用官方的令牌桶实现。限流能力的重要性不言而喻,有些具备Exactly Once的下游在出现问题后会选择回溯流量,倘若他一下回放了30天的流量,没有限流的话,瞬间的流量洪峰会压垮你娇贵的proxy服务,啧啧啧,想想就可怕,没有金刚钻,还是最好别揽这种可怕的服务;

consume可靠性

为了预防consume端丢失消息,这里使用ack机制,具体是:推送消息给下游系统,并监控errcode, 当response中的errcode返回0时认为消费成功,否则进行三次重试,如果三次重试仍然失败,把消息落离线日志,并进行监控和报警。

还有一些细节部分没有介绍到,如果读者对这个服务感兴趣,欢迎留言和我讨论。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-09-05,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 薯条的编程修养 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 配置化的需求
  • 小集群管理能力
  • 有状态的服务
  • 稳定性相关能力
  • consume可靠性
相关产品与服务
消息队列 CMQ 版
消息队列 CMQ 版(TDMQ for CMQ,简称 TDMQ CMQ 版)是一款分布式高可用的消息队列服务,它能够提供可靠的,基于消息的异步通信机制,能够将分布式部署的不同应用(或同一应用的不同组件)中的信息传递,存储在可靠有效的 CMQ 队列中,防止消息丢失。TDMQ CMQ 版支持多进程同时读写,收发互不干扰,无需各应用或组件始终处于运行状态。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档