导语 | 日前,腾讯云中间件团队联合StreamNative社区正式发布了RoP 0.2.0版本,该版本在架构上全新升级,用户在使用中可以完全避免消息丢失、消息重复消费、只能消费一部分Partition的数据等问题。
作者简介
冉小龙
一、RoP的定义
与KoP、MoP和AoP相似,RoP是一种可插拔的协议处理插件。
将RoP协议处理插件添加到现有Pulsar集群后,用户无需修改代码,便能将现有的RocketMQ应用程序和服务迁移到Pulsar,同时还能使用Pulsar的强大功能,例如:
二、发布RoP 0.2.0
日前,我们重磅发布了RoP 0.2.0,该版本在架构上全新升级,在功能和稳定性上得到了更大的提升。提供了ACL鉴权和验证的功能,可以更好的确保用户数据的安全性,同时允许用户对Partitioned Topic进行扩容,可以获得更好的并发写入能力,并且完善了RocketMQ原生的管控端接口,可以更好的对服务进行处理和监控。
三、最新功能优化
在0.2.0版本中,腾讯云中间件团队在0.1.0的架构上进行全新设计,MessageID、消息路由模型进行重构,确保不同场景下RoP消息的准确性。
主要有以下三点优化内容:
ACL机制是RocketMQ社区自带的一个能力,可以很好的对用户的数据进行鉴权和认证。RoP 0.2.0版本复用了RocketMQ自身的Hook实现,利用Pulsar自身的鉴权机制,实现了对用户数据进行鉴权和认证的功能。
RoP ACL的使用方式依旧延续了RocketMQ的使用方式,只需定义ACL_ACCESS_KEY和ACL_SECRET_KEY字段,然后利用RocketMQ的ACLRPCHook函数加载即可,这样可以确保用户尽可能少的改动客户端的业务代码逻辑。
具体代码示例如下:
private static final String ACL_ACCESS_KEY = "eyJrZXlJZCI6InJvY2tldG1xLW13bmI3bWFwMjhqZSIsImFsZyI6IkhTMjU2In0." + "eyJzdWIiOiJyb2NrZXRtcS1td25iN21hcDI4amVfdGVzdCJ9.BDOjqqY25a6apnZTMZCqg0I0pxVFcqz7fvZbaTqkf5U"; // token private static final String ACL_SECRET_KEY = "rop";
public static void producer() throws MQClientException { DefaultMQProducer producer = new DefaultMQProducer("rocketmq-mwnb7map28je|nit", "ProducerGroupName", getAclRPCHook());... }
static RPCHook getAclRPCHook() { return new AclClientRPCHook(new SessionCredentials(ACL_ACCESS_KEY, ACL_SECRET_KEY)); }
RocketMQ与Kafka类似,都是使用64位的Offset来唯一标识一条消息,但是在Pulsar中,使用64位的LedgerID、64位的EntryID来唯一标识一条消息。针对这个问题,在RoP 0.1.0中,我们使用如下的形式来构造MessageID对象:
使用如上的方式可能存在MessageID的消息精度丢失,在系统运行一段时间之后,无法继续创建出新的LedgerID,导致整个集群的服务对外不可用的情况。这个问题与早期的KoP版本所面临的是同样的困境,所以在RoP 0.2.0 中,我们采用了和KoP相同的处理方式,使用[PIP 70: Introduce lightweight broker entry metadata](https://github.com/apache/pulsar/wiki/PIP-70%3A-Introduce-lightweight-broker-entry-metadata) 的处理思路,在Broker的协议头中,附加了一个64位的index/publish-time字段,这样无需在客户端侧进行协议的解析即可在每一条消息中附加一个64位的字段来使用。
PIP-70是使用插件的方式进行加载的,所以在服务启动时,我们需要做如下配置:
brokerEntryMetadataInterceptors=org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor
Note: Broker Entry Metadata是在Pulsar 2.8.0的版本中才支持的,所以需要确保Pulsar Broker的版本在2.8.0及以上。
需要说明的是,RocketMQ和Kafka在Offset的使用方式上又有所不同,RocketMQ中有两个Offset,一个是Queue Offset,用来表示消息在MessageQueue中的位置,MessageQueue本质上是一个数组,一条消息进来数组的下标就会+1。一个是CommitLog Offset,用来表示消息存储在CommitLog中的位置,消息存储是由ConsumeQueue和CommitLog配合完成,ConsumeQueue是逻辑队列,CommitLog是真正存储消息文件的,ConsumeQueue存储的是指向物理存储的地址。Topic下的每个MessageQueue都有对应的ConsumeQueue文件,内容也会被持久化到磁盘。
所以,在MessageID重构的实现中,区别于Kafka中只有一个全局的Offset来标识消息的唯一性,在RoP中需要针对这两种Offset的情况分别进行处理,具体如下:
在RoP 0.1.0的版本中,在消息路由的实现上,RocketMQ和Pulsar都是首先通过Topic Lookup的操作找到对应的Owner Broker节点,然后将该Broker的地址返回。但是在这个动作中,忽略了一个重要的问题,即RocketMQ与Kafka和Pulsar都是不同的,它的Queue不是全局唯一的。
RocketMQ路由协议主要包括两部分:
在RocketMQ路由协议中,没有全局标识Topic的分区的唯一ID(例如在Pulsar/Kafka中,分区ID在集群中是唯一的);而在RocketMQ中,分区路由信息是由Broker标识加上该Broker上的顺序从0→N的Index来标识Topic的分区。
因此RocketMQ协议中,客户端只需要获取到Topic对应Broker上分区总数,就能通过计算获得该Broker上分区的ID;所有的请求都是基于【Broker-Tag】+【Broker-Topic-Seq】构建唯一路由查询原语来请求服务。简单来说:RocketMQ的分区是有状态的,他绑定在特定的Broker之上;分区一旦分配在某个Broker上,终身与之相关且不能迁移。客户端解析分区路由信息是通过计算得到;比如:某个TopicA有5个分区,分别落在BrokerA和BrokerB上,BrokerA有3个,BrokerB有2个;那么协议记录为(BrokerA,3)(BrokerB,2),客户端通过计算就得到全部的分区数据:
由于上面的路由关系的原因,所以没有办法通过GET_ROUTEINTO_BY_TOPIC这个协议请求去和Pulsar的Lookup协议去做映射。本质原因是像Kafka/Pulsar这种,它的Partition信息是全局唯一的,在执行Topic路由策略之后,能准确的返回某一个Topic的Partition所对应的Owner Broker是谁。但是RocketMQ的Topic路由返回的是两个字段,一个是Broker Name,一个是Queue的数量。具体的QueueID,是Client根据Broker返回的数量固定的从0开始递增计算。所以在Topic的路由映射中,RocketMQ和Pulsar自身的路由协议没办法一一映射。为了解决这个问题,在RoP 0.2.0中,抽象了一层Proxy用来维护Topic与Broker之间的映射关系。为了达到这个目的,这里主要有以下几方面的事情需要考虑:
针对第一个问题,综合考量,我们选择将路由的映射关系存储到ZooKeeper集群中来,因为当前RoP的服务本身也需要依赖ZooKeeper集群,不会引入新的组件;其次ZooKeeper自身的一致性能力能很好的满足这个场景需求。
针对第二个问题,我们是在RoP接口创建分区主题的同时,依次查找各个分区所在的Broker节点,依照初始主题所在节点信息为基准,将映射关系写入到ZooKeeper集群中。这样做的好处在于:
针对第三个问题,我们通过增加Master-Slave模式,可以减少单节点故障对系统的影响。ZooKeeper元数据如下,只需要增加Broker相关信息,即可实现各个节点的互为主从关系,达到主节点不可用时从节点可以继续提供服务。由于当前Offset信息都存储在Compact Topic中,全部节点同时订阅,所以各个节点的元数据可以保证一致,可以实现主从切换。下面是测试环境中部署RoP集群中的路由映射关系:
所以,为了保证RoP集群能有较好的容错能力,在部署RoP集群中建议使用偶数台节点。可以通过如下参数配置决定当前Master节点有几个Slave节点作为其备份节点:
RoPBrokerReplicationNum=2
假设有6台Broker节点,RoPBrokerReplicationNum=2,那么就说明此时只有三台Master Broker节点对外提供服务。但是对于Pulsar来说,Broker节点之间是对等的,当创建Topic的时候,可能会分配到任意节点上,所以对于不在Owner Broker节点上的请求,在RoP Proxy层做了一层代理,会先对该Topic进行查找的操作,然后将请求转发到Owner Broker的节点上来返回。
四、未来规划
为了更好的践行开源协同和开源共建的理念,目前,上述功能均已贡献回社区。除此之外针对RocketMQ商业版本的任意延迟消息功能,腾讯云中间件团队也基于Pulsar原生的特性开发了相关的插件来进行支持。RoP的延迟消息功能除了支持多级别的延迟消息之外还具备支持任意延迟消息的能力。
之后,腾讯云中间件团队将会在确保RoP项目稳定的同时,持续开发RoP相关的功能,诸如消息轨迹、消息查询和回溯以及监控等能力,进一步完善RoP的功能以及周边生态。
(RoP项目地址:https://github.com/streamnative/rop)
特别鸣谢
感谢腾讯云中间件团队韩明泽和张勇华对本文提供的技术细节校验和支持。