大家好,我是君哥。
RocketMQ 选择了自己写 NameServer 做注册中心而没有选择 Zookeeper,这是为什么呢?
首先看一下 RocketMQ 的架构,如下图:
RocketMQ 的 Broker 注册到 NameServer 集群,而生产者和消费者则需要从 NameServer 拉取消息。
Broker 启动时,会向 NameServer 发送注册消息,相关的 UML 类图如下:
我们看一下 BrokerOuterAPI 的 registerBrokerAll 方法,代码如下:
//BrokerOuterAPI.java
public List<RegisterBrokerResult> registerBrokerAll(
//省略参数
final boolean compressed) {
final List<RegisterBrokerResult> registerBrokerResultList = new CopyOnWriteArrayList<>();
List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
//省略 requestHeader 封装
final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
for (final String namesrvAddr : nameServerAddressList) {
brokerOuterExecutor.execute(new Runnable() {
@Override
public void run() {
try {
RegisterBrokerResult result = registerBroker(namesrvAddr, oneway, timeoutMills, requestHeader, body);
if (result != null) {
registerBrokerResultList.add(result);
}
} catch (Exception e) {
} finally {
countDownLatch.countDown();
}
}
});
}
try {
countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
}
}
return registerBrokerResultList;
}
可以看到,当 Broker 启动时,会向所有的 NameServer 发送注册消息,NameServer 端的注册内容如下:
从上图中看出,需要在 NameServer 上保存的数据其实是很少的。
注意:
1.Broker 向 NameServer 注册时,会注册到所有的 NameServer 服务器, NameServer 并不是分布式存储,NameServer 集群是去中心化的。
2.NameServer 会有定时任务(每 10s 一次)检查 Broker 是否下线了,判断依据是 120s 内有没有收到心跳,如果没有收到,则关闭 channel,把 Broker 信息从本地缓存移除。代码见 RouteInfoManager 类 scanNotActiveBroker 方法。
3.Broker 启动时,同时会启动定时任务,每 30s 向 NameServer 发送注册消息,NameServer 收到注册消息后更新心跳时间(BrokerLiveInfo.lastUpdateTimestamp)。
下面是 Broker 对 NameServer 的两个请求码:
创建 Topic 时,Broker 会向 NameServer 发送注册消息。代码如下:
//BrokerController 类
public synchronized void registerIncrementBrokerData(TopicConfig topicConfig, DataVersion dataVersion) {
TopicConfig registerTopicConfig = topicConfig;
//省略
ConcurrentMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>();
topicConfigTable.put(topicConfig.getTopicName(), registerTopicConfig);
TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
topicConfigSerializeWrapper.setDataVersion(dataVersion);
topicConfigSerializeWrapper.setTopicConfigTable(topicConfigTable);
doRegisterBrokerAll(true, false, topicConfigSerializeWrapper);
}
最终调用了上一节的 registerBrokerAll 的方法。NameServer 收到注册消息后更新本地保存的数据,所保存的数据并没有增加新数据。
对于生产者和消费者,在发送和拉取消息时,首先会从本地缓存获取 Topic 路由信息,如果获取失败,则需要从 NameServer 进行获取。下面是获取 Topic 路由信息的 UML 类图:
看一下更新 Topic 路由信息的核心代码:
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
DefaultMQProducer defaultMQProducer) {
try {
if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
TopicRouteData topicRouteData;
if (isDefault && defaultMQProducer != null) {
//根据默认 Topic 来取
topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
clientConfig.getMqClientApiTimeout());
//省略部分逻辑
} else {
topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, clientConfig.getMqClientApiTimeout());
}
if (topicRouteData != null) {
//判断路由信息是否发送变化
TopicRouteData old = this.topicRouteTable.get(topic);
boolean changed = topicRouteDataIsChange(old, topicRouteData);
if (!changed) {
changed = this.isNeedUpdateTopicRouteInfo(topic);
} else {}
if (changed) {
TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
for (BrokerData bd : topicRouteData.getBrokerDatas()) {
this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
}
// Update Pub info
if (!producerTable.isEmpty()) {
//更新生产者缓存
}
// Update sub info
if (!consumerTable.isEmpty()) {
//更新消费者缓存
}
log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);
this.topicRouteTable.put(topic, cloneTopicRouteData);
return true;
}
} else {
log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}. [{}]", topic, this.clientId);
}
} catch (MQClientException e) {
} catch (RemotingException e) {
} finally {
this.lockNamesrv.unlock();
}
} else {}
} catch (InterruptedException e) {
}
return false;
}
注意:客户端会有定时任务,默认每隔 30s 向 NameServer 拉取 Topic 路由信息来刷新本地缓存。
RocketMQ 设计之初也是使用 Zookeeper 做注册中心的,这参考了 Kafka 的设计。Zookeeper 是一个非常成熟的注册中心,还有支持主节点选举、强一致等特性。使用 Zookeeper 的架构如下:
从上面的分析中可以看到,RocketMQ 需要保存的数据非常少,完全不必引入 Zookeeper 这种重量级的注册中心。
NameServer 集群各节点是对等的,相互之间并不会进行通信,这样确实会有短暂不一致。Broker 启动时会跟所有的 NameServer 建立长链接,发送注册信息。注册成功后,每 30s 会向 NameServer 发送心跳,NameServer 收到心跳后更新 Broker 的 lastUpdateTimestamp。
Zookeeper 使用 ZAB 协议来保证节点之间数据的强一致性,这要求在每一个写请求都需要在节点上写事务日志,同时需要将内存数据持久化到磁盘以保证一致性和持久性。对于 RocketMQ 这种元数据非常少的简单场景,有点小题大做了。
放弃强一致而选择可用性也是 RocketMQ 放弃 Zookeeper 的选择,这也让 NameServer 的设计更加简单。
NameServer 处理 Broker 注册的时候,考虑到多个 Broker 并发注册的问题,保存路由信息时采用了 ReadWriteLock 中的写锁,代码如下:
public RegisterBrokerResult registerBroker(
//省略参数
final Channel channel) {
RegisterBrokerResult result = new RegisterBrokerResult();
try {
try {
this.lock.writeLock().lockInterruptibly();
Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
if (null == brokerNames) {
brokerNames = new HashSet<String>();
this.clusterAddrTable.put(clusterName, brokerNames);
}
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
if (null == brokerData) {
registerFirst = true;
brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
this.brokerAddrTable.put(brokerName, brokerData);
}
BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
new BrokerLiveInfo(
System.currentTimeMillis(),
topicConfigWrapper.getDataVersion(),
channel,
haServerAddr));
} finally {
this.lock.writeLock().unlock();
}
} catch (Exception e) {
}
return result;
}
如果有新的 Broker 加入时,NameServer 并不会主动向客户端推送新的 Broker 信息,而是需要客户端的定时任务(30s 一次)去主动拉取,这样客户端保存的路由信息跟 NameServer 会有短暂的不一致。
同样,Broker 掉线后,NameServer 会用定时任务(10s 一次)检测 Broker 最后更新时间是否超过 120s,如果超过就把 Broker 路由信息删除。在客户端,同样需要定时任务(30s 一次)去主动拉取,客户端保存的路由信息跟 NameServer 也会有短暂的不一致。
从上面分析看到,NameServer 集群各节点是对等的,当集群有压力时,横向扩展非常容易。而 Zookeeper 在写扩展方面非常不灵活。
在 Broker 主从集群中,RocketMQ 实现了基于 raft 协议的 DLedger 算法,可以基于 DLedger 进行日志复制。如果 Master 节点发生故障,可以基于 DLedger 自动进行主从切换。这可以完全不依赖于 Zookeeper 的实现。
如果引入 Zookeeper,运维人员必须要具备运维 Zookeeper 的能力,这又增加了运维的复杂性。