我们进入RouteInfoManager#pickupTopicRouteData方法来看下具体如何实现。
public TopicRouteData pickupTopicRouteData(final String topic) {
TopicRouteData topicRouteData = new TopicRouteData();
boolean foundQueueData = false;
boolean foundBrokerData = false;
Set<String> brokerNameSet = new HashSet<String>();
List<BrokerData> brokerDataList = new LinkedList<BrokerData>();
topicRouteData.setBrokerDatas(brokerDataList);
HashMap<String, List<String>> filterServerMap = new HashMap<String, List<String>>();
topicRouteData.setFilterServerTable(filterServerMap);
try {
try {
//加读锁
this.lock.readLock().lockInterruptibly();
//从元数据topicQueueTable中根据topic名字获取队列集合
List<QueueData> queueDataList = this.topicQueueTable.get(topic);
if (queueDataList != null) {
//将获取到的队列集合写入topicRouteData的queueDatas中
topicRouteData.setQueueDatas(queueDataList);
foundQueueData = true;
Iterator<QueueData> it = queueDataList.iterator();
while (it.hasNext()) {
QueueData qd = it.next();
brokerNameSet.add(qd.getBrokerName());
}
//遍历从QueueData集合中提取的brokerName
for (String brokerName : brokerNameSet) {
//根据brokerName从brokerAddrTable获取brokerData
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
if (null != brokerData) {
//克隆brokerData对象,并写入到topicRouteData的brokerDatas中
BrokerData brokerDataClone = new BrokerData(brokerData.getCluster(), brokerData.getBrokerName(), (HashMap<Long, String>) brokerData.getBrokerAddrs().clone());
brokerDataList.add(brokerDataClone);
foundBrokerData = true;
//遍历brokerAddrs
for (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) {
//根据brokerAddr获取filterServerList,封装后写入到topicRouteData的filterServerTable中
List<String> filterServerList = this.filterServerTable.get(brokerAddr);
filterServerMap.put(brokerAddr, filterServerList);
}
}
}
}
} finally {
//释放读锁
this.lock.readLock().unlock();
}
} catch (Exception e) {
log.error("pickupTopicRouteData Exception", e);
}
log.debug("pickupTopicRouteData {} {}", topic, topicRouteData);
if (foundBrokerData && foundQueueData) {
return topicRouteData;
}
return null;
}
上面代码封装了TopicRouteData的queueDatas、BrokerDatas和filterServerTable,还有orderTopicConf字段没封装
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。