前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >基于 Rust 的高性能 RocketMQ Proxy 在希沃多云的实践

基于 Rust 的高性能 RocketMQ Proxy 在希沃多云的实践

作者头像
挖坑的张师傅
发布2024-01-12 19:01:08
1940
发布2024-01-12 19:01:08
举报
文章被收录于专栏:张师傅的博客张师傅的博客

背景

在当今的技术环境中,多云架构几乎成为了企业的标配。这种架构为企业提供了更多的选择和议价能力,有助于避免对单一供应商的依赖。同时,多云架构还能提高系统的高可用性,降低因单点故障带来的风险。然而,随之而来的是复杂性的增加。例如,在多云部署的情景中,以 RocketMQ 为例,可能会出现 producer 和 consumer 分布在不同云集群的情况。在这种场景下,位于 B 云的 consumer 可能无法接收到 A 云中 producer 生成的消息。

因此,在多云环境中部署的 RocketMQ 需要一种特定的通信机制,以实现消息在不同云环境间的选择性投递。

为什么要用 rust

希沃主要采用 Java 技术栈,而 RocketMQ 本身也是基于 Java 开发的。然而,为何我们选择使用 Rust 来编写 RocketMQ 的代理呢?首先,Java 在内存基础消耗方面较大,且其垃圾回收(GC)机制是一个显著的弱点。我们希望找到一种没有 GC、内存使用可控且对开发者友好的语言,以开发我们所有后续的高性能中间件。C++ 是一个很好的替代选择,我们内部有核心系统就是用 C++ 开发的,它具有超高性能和极低的内存占用。

然而,C++ 也存在其自身的问题,例如,如果不小心,很容易遇到内存相关的问题(如内存泄漏、野指针等),而且其包管理也不尽人意。因此,我们转向了 Rust。在编写了几个示例项目并验证其可行性后,我们陆续开发了基于 Rust 的多个工具,包括 RDB 分析工具、Zookeeper 代理、模拟 Redis Slave 以实现可靠的跨云同步,以及 JVM 内存 dump 分析工具。此次 RocketMQ 代理的开发,标志着 Rust 在我们的在线上环境中的正式应用。

整体设计

早期思路

为了在多云环境中有效地管理 RocketMQ 的消息传递,我们采用了一个直接而有效的方法。我们在 A 云中的 producer 端部署了一个称为 hyper-consumer 的组件。这个 hyper-consumer 负责订阅那些需要在 B 云中被消费 Topic 和TAGS。

接下来,hyper-consumer 执行的消费逻辑相当于它本身变身为 B 云中的一个 producer。它将在 A 云中消费的消息重新投递到 B 云。通过这种方式,hyper-consumer 充当了两个云环境之间的“中间商”,像一座桥梁一样实现了消息的跨云转运。

实际上,这个初步的想法仔细一琢磨是存在很多问题的:

  • 无法保证在 A 云中按顺序生产的消息在传递到 B 云时仍然保持相同的顺序
  • 存在丢失消息的风险,特别是在 hyper-consumer 消费过程中遇到失败的情况
  • 在 B 云中可能会产生大量的“无用消息”,尤其是当 B 云上对应的 topic 没有消费者时
  • ......

这些问题的出现使得我们需要进一步深入思考和改进这一方案,以确保跨云消息传递的可靠性和效率。

设计过程

业界思路借鉴

在与业界大厂的交流中,得知他们在多云的架构设计中,为了更好地进行流量管控,他们几乎对每个中间件都部署了一个代理层。以 RocketMQ 为例,一些公有云服务提供商在其多云架构中,通过代理层实现了对 RocketMQ 4.X 和 RocketMQ 5.X 版本的兼容,主要是涉及到 4.X 版本的 Remoting 协议和 5.X 版本的 grpc/protobuf 协议)

基于这一思路,我们也可以采用类似的代理机制来解决跨云消息传递的问题。通过部署一个 proxy,不仅可以解决消息传递的问题,还可以带来其他额外的好处,比如:

  • 实现任意延时的延时消息
  • 基于服务端的流控功能、包括对特定 topic 和 tag 的熔断和降级处理
  • 在 RocketMQ 版本升级时保持兼容性
  • 为未来的 ServiceMesh 化提供支持

于是有了下面这样的架构图

在这种架构中,我们会部署两个关键的 proxy:一个用于 NameServer,另一个用于 Broker。整个流程大致分为以下四个步骤:

  1. 首先,通过 NameServer 的代理,我们对 GetRouteInfoByTopic 的请求进行特殊处理
  2. 接下来,我们修改 GetRouteInfoByTopic 请求的返回值,将其中的 Broker 地址替换为我们自己的 Broker 代理地址,引导客户端与我们的 proxy 进行生产和消费的通信
  3. 随后,客户端将连接到我们的 Broker proxy 以进行消息的生产和消费。在这个代理层,我们可以实现消息路由、流量管控和协议转换等操作,从而确保消息的高效和安全传输
  4. 最后,对于那些需要跨云投递的消息,我们会通过专门的跨云通信线路进行投递
功能模块设计

名词解释:在代理模型中,downstream 一般是指发起请求的客户端,upstream 一般是指被代理的服务端(类比 Nginx)

简单来说,proxy 无非就是作为中间层做两件事情:

  • 接收来自 downstream 的请求,并将这些请求转发给 upstream
  • 接收来自 upstream 的响应,并将这些响应返回给 downstream。

在这个过程中,proxy 可以对来自 downstream 的请求做一定的修改、处理,也可以对来自 upstream 的响应做同样的处理。

一、协议模块(protocol)

协议模块作为最底层的模块,主要是实现 rocketmq 的 Remoting 协议(主要参考了 JAVA 的 4.X 版本),主要提供:

  • model 数据模型
  • decoder 解码器
  • encoder 编码器

不过首先遇到的挑战就是,得手搓 RocketMQ 的协议解析,因为我们还在使用 4.x 版本 RocketMQ,官方的 RocketMQ Client 只支持 5.x 的协议,首先得手搓一个协议解析器。好在 RocketMQ 的协议够简单,又有 Java 版本的代码可以参考,于是很快就撸了一个协议的 codec。详细的协议介绍可参考「给 wireshark 写一个 RocketMQ 协议解析的 lua 插件」

RocketMQ 命令都以 RemotingCommand 结构体的传递,对应的 rust 结构体如下:

这里使用 tokio 来作为底层的网络通信框架,这里实现一个 tokio 的 PktDecoder

二、代理模块(proxy)

代理模块就是整个 proxy 最核心的模块,包含了所有的代理逻辑,根据 RocketMQ 4.X 的 Remoting 协议,结合 JAVA 的原生实现,实际上就是请求分发器(Handler)基于请求携带的 code 字段来区分本次请求的类型,根据类型找到对应的处理器(Processor)来处理本次请求。

talk is cheap, show you the code:(好的代码不需要注释🐶):

代码语言:javascript
复制
#[async_trait::async_trait]
pub trait Handler: Send + Sync {
    async fn on_req(&self, proxy_name: &str, req: &mut RemotingCommand);
    async fn on_resp(&self, req_code: i32, resp: &mut RemotingCommand);
}

#[async_trait::async_trait]
pub trait Processor: Send + Sync {
    async fn process_request(&self, _: &str, _: &mut RemotingCommand) {
        // default nothing to do
    }
    async fn process_response(&self, _: &mut RemotingCommand) {
        // default nothing to do
    }
}

前面起到过代理的类型分为下面这两种:

  • namesrv-proxy,代理 namesrv 的请求
  • broker-proxy,代理 broker 的请求

值得庆幸的是,namesrv 和 broker 的协议是一样的,其中目前比较关键的消息类型有下面这些:

对于SendMessage(10),这个应该是3.X版本的发送消息的code,目前业务大部分都是基于4.X版本,所以这个code算是已经弃用了,暂时可以不考虑

请求类型(code)

代理类型

说明

代理逻辑

GetRouteInfoByTopic(105)

namesrv-proxy

producer从namesrv端获取Topic路由信息的请求。

将upstream(namesrv)返回的路由信息中的broker地址修改为 broker-proxy 的地址,这个是让producer通过broker-proxy发送消息的关键。

SendMessageV2(310)

broker-proxy

producer 向broker发送消息的请求。

计算是否需要将本消息也投递到另外一个云上(多‍云投递消息)。

SendBatchMessage(320)

broker-proxy

producer 向broker发送 批量 消息的请求。

同上。

PullMessage(11)

broker-proxy

consumer从broker拉取信息的请求(实际上push、pull模式底层都是基于这个来实现的)。

根据consumer在多云的部署情况以及该Topic的消息是否需要重复消费的配置,决定返回什么消息让consumer消费。

这里还有小问题,broker proxy 收到一个到来的消息,该怎么判断这个消息是否需要投递到其他云上呢?其实非常简单,主要依据是目标云上有没有 在线的consumer。如果没有在线的 consumer,则不进行投递。

由于跨云网络的不稳定性和较高的时延,实测从阿里云到腾讯云,非同城(距离很近)也至少需要 6ms,这种网络环境下,直接同步进行消息投递不是一个好的选择。同时也不能在投递缓慢或者网络中断时,把所有消息全部缓存在内存中,因此,为了确保数据的安全和可靠传输,我们需要引入一个数据持久化层。

三、存储模块(store)

存储模块主要用于 RocketMQ 消息的持久化,可以在某些特定情况下(例如由于网络临时不可用导致消息暂时转发失败时)将消息持久化保存下来,这里选择了 RocksDB 作为持久化方案。RocksDB 因其高效和稳定的性能,已被许多大型开源数据库项目采用,比如 TiKV 等。

我们将尽可能使用一个固定大小 channel 缓存待投递的数据,一旦内存中的 channel 达到其容量上限,则将消息存储到 RocksDB 中,然后不断的 drain 其中的数据进行处理。

这样可以保证在跨云网络中断时,MQ 消息不会丢失。

四、服务模块(server)

服务模块是最顶层的功能模块,主要包括:

  • 配置模块,包含整个 proxy 的配置,解析等
  • 绑定、启动 proxy 并监听端口
  • 高性能的代理线程模型

线程模型如下图:

五、监控模块(metric)

我们通过 prometheus 暴露采集点,监控 RocksDB 的消息堆积、处理时延等,这里不展开。

六、部署拓扑结构

根据实际情况,RocketMQ Proxy 是以 RocketMQ 集群为单位进行部署的,覆盖整个 RocketMQ 集群的 name server、broker,以单个 name server,2 个 broker 的 RocketMQ 集群为例:

  • namesrv-proxy:在 k8s 集群中使用多副本 deployment 部署,保证高可用
  • broker-proxy:因为每个实例需要一个独立的 RocksDB 存储,在 k8s 集群中使用多副本 statefulset 部署

遇到的问题

RocketMQ 协议居然用整数当 json 的 键

在 JSON 规范中,键(key)必须是字符串:

RocketMQ 返回的 brokerDatas 中的 brokerAddrs 中的 broker 列表都是以整数作为 key。

可恶的 FastJson 居然纵容了这一行为,(如果你坚守一下你的底线,可能 RocketMQ 协议就不是这样了)

代码语言:javascript
复制
public class TopicRouteData extends RemotingSerializable {
    private String orderTopicConf;
    private List<QueueData> queueDatas;
    private List<BrokerData> brokerDatas;
    private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

}

public class BrokerData implements Comparable<BrokerData> {
    private String cluster;
    private String brokerName;
     // 这里的 key 是 Long
    private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs; 

}

但是在 rust serde-json 看来,这个 json 是非法的,反序列时会失败,无奈只能去修改 serde-json 的源码。

通过上面的方式,临时 hack 解决了这一问题,但也不用想着合并到 serde 主分支了,作者估计大概率不会接受这种奇怪的 pr。

RocksDB 的 rust 封装 API 缺失

rust-rocksdb 库的 WriteBatchIterator trait 没有提供 put_cfdelete_cf 方法,导致当使用多 column family 时,无法遍历到数据。

RocksDB 提供了一个丰富的 C++ 接口,然而在 Rust 绑定的版本中,这些回调方法并没有被完全暴露,活脱脱一个阉割版。

于是继续修改 rust-rocksdb 的代码,解决了这个问题。

内存占用过高问题

在第一版的压测过程中,发现即使给 broker-proxy 分配了 1G ~ 2G 的内存,有时还是会导致容器 OOM,即使不 OOM,内存还是涨得非常高,并且内存不会随着压测流量结束而降低。

使用 jemalloc

这个措施纯属是“算命”,尝试将程序的内存分配器改为早期 Rust 默认使用的 jemalloc(可以提供更好的多核性能以及更好的避免内存碎片)。

Cargo.toml:

代码语言:javascript
复制
[target.'cfg(not(target_env = "msvc"))'.dependencies]
jemallocator = "0.5.4"

main.rs:

代码语言:javascript
复制
#[cfg(not(target_env = "msvc"))]
#[global_allocator]
static GLOBAL: Jemalloc = Jemalloc;
内存 dump 分析

尝试将内存 dump 下来分析,在使用 jemalloc 的基础上,添加一个简单的 dump 接口(编译选项中需要设置 profile.release.debug 为 true):

代码语言:javascript
复制
const PROF_ACTIVE: &'static [u8] = b"prof.active\0";
const PROF_DUMP: &'static [u8] = b"prof.dump\0";
const PROFILE_OUTPUT: &'static [u8] = b"profile.out\0";
 
fn set_prof_active(active: bool) {
    let name = PROF_ACTIVE.name();
    name.write(active).expect("Should succeed to set prof");
}
 
#[get("/dump_profile")]
async fn dump_profile() -> impl Responder {
    let name = PROF_DUMP.name();
    name.write(PROFILE_OUTPUT).expect("Should succeed to dump profile");
    HttpResponse::Ok().body("dump profile success")
}

查看内存 dump 的结果,发现了端倪:

内存中存在大量的 trace 和 span 相关的数据,看上去都是由 console_subscriber 引入的,这个是早期开发为了让 tokio 线程可视化方便调试而加入的组件,是非必须的,去掉之后,果然内存消耗只有 100M ~ 200M。

关于 console-subscriber 对内存的使用问题,可以参考 github 的 issue

CPU 跑不上来的问题

早期的压测过程还发现了一个不太符合预期的问题:在给予比较大的压测流量的情况下,通过 top 查看到的 CPU 的变化的上下波动比较大,CPU 会经常出现空闲时间,变化也非常频繁,平均负载只有 100%~200%,预期的 CPU 表现应该是充分利用 CPU,避免在流量比较大的时候 CPU 出现空闲时间。

简单梳理了一下,猜测是消费线程比较少的问题,此时的消息消费线程模型大致为:

OriginConsumeThread.png

  • 客户端发送过来的消息会立即以同步的方式存储到 RocksDB 中
  • 单个 Drainer 线程不断轮询(根据"消费位点")RocksDB 的新消息,发送到待处理队列中
  • 发送到队列中会立刻更新“消费位点”到 RocksDB
  • 多个 Deliver 线程从待处理队列中接收新消息并发送出去

这里瓶颈可能出现在 Drainer 线程,目前只启用单个 Drainer 线程的原因是需要维护一个“消费位点”,如果多个线程去 fetch 消息的话,这个消费位点维护起来比较困难,那么,即使 Deliver 线程再多,没有待处理消息的话也无济于事,CPU 利用率不高。

RocketMQ 消息到达 Proxy 的时候同步入库是为了避免消息丢失,然而 Drainer 发送消息到队列后会马上更新消费位点,即使 Deliver 线程有失败重新保存到 RocksDB 的措施,也会有可能丢失内存中的消息。

又由于 Drainer 线程是批量从 RocksDB 中 fetch 消息,没有比较优雅的办法保证绝对不丢消息且不重复投递消息,除非一条同步投递并逐步更新消费位点,这个对性能影响太大了。

简单来说就是为了高性能,允许丢消息,被丢弃的消息数最大可能是 queue 的大小,如果确实需要保证不丢消息的话,还可以选择同步分发消息(经测试,性能下降较大,QPS 大概只有异步的 1/6)

那么,在允许宕机时内存中的消息丢失的情况下,提升 CPU 利用率可以从跳过 Drainer 线程入手:

  • Listener 线程收到消息后优先尝试直接投递到 queue 中
  • queue 满的情况下,才保存到 RocksDB

经过上面的简单改造之后,CPU 利用率提升了。

压测

压测环境和说明

基于实际生产环境的公有云:

  • 消息从 A 云 Producer 发出到 A 云上的 proxy
  • 在 B 云部署对应 Topic 的 Consumer,让 proxy 能把对应 Topic 的消息投递到 B 云
  • 控制变量,主要对比走 Proxy、不走 Proxy 的情况下的性能
  • 同时观察小消息、较大消息对性能的影响

proxy 的相关配置如下:

  • 副本数: 4
  • proxy 容器配置:
代码语言:javascript
复制
resources:
  limits:
    cpu: 4
    memory: 512Mi
  requests:
    cpu: 1
    memory: 100Mi

压测结果

在较小消息、60000 QPS 的情况下,资源消耗大致为:

  • CPU: 1C ~ 2C
  • 内存: 140M 左右

压测过程的 top 如下:

在较大消息、60000 QPS 的情况下,资源消耗大致为:

  • CPU: 2C ~ 3C
  • 内存: 150M 左右

压测过程的 top 如下:

不得不说,rust 内存消耗真的是太优秀了,不像某ava,启动完一两 G 内存没了。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2024-01-08,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 张师傅的博客 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 背景
  • 为什么要用 rust
  • 整体设计
    • 早期思路
      • 设计过程
        • 业界思路借鉴
        • 功能模块设计
      • 遇到的问题
        • RocketMQ 协议居然用整数当 json 的 键
        • RocksDB 的 rust 封装 API 缺失
        • 内存占用过高问题
        • CPU 跑不上来的问题
    • 压测
      • 压测环境和说明
        • 压测结果
        相关产品与服务
        消息队列 TDMQ
        消息队列 TDMQ (Tencent Distributed Message Queue)是腾讯基于 Apache Pulsar 自研的一个云原生消息中间件系列,其中包含兼容Pulsar、RabbitMQ、RocketMQ 等协议的消息队列子产品,得益于其底层计算与存储分离的架构,TDMQ 具备良好的弹性伸缩以及故障恢复能力。
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档