MQClientInstance#start-ScheduledTask(),核心代码如下:
private void startScheduledTask() {
......
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
//从nameserver更新最新的topic路由信息
MQClientInstance.this.updateTopicRouteInfoFromNameServer();
} catch (Exception e) {
log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
}
}
}, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
......
}
/**
* 从nameserver获取topic路由信息
*/
public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis,
boolean allowTopicNotExist) throws MQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
......
//向nameserver发送请求包,requestCode为RequestCode.GET_ROUTEINFO_BY_TOPIC
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINFO_BY_TOPIC, requestHeader);
......
}
producer和NameServer之间通过netty进行网络传输,producer向NameServer发起的请求中添加注册码
RequestCode.GET_ROUTEINFO_BY_TOPIC。
NameServer收到producer发送的请求后,会根据请求中的requestCode进行处理。处理requestCode同样是在默认的网络处理器DefaultRequestProcessor中进行处理,最终通过RouteInfoManager#pickupTopicRouteData来实现。
TopicRouteData结构
在正式解析源码前,我们先看下NameServer返回给producer的数据结构。通过代码可以看到,返回的是一个TopicRouteData对象,具体结构如下:
其中QueueData,BrokerData,filterServerTable在4.1章节介绍路由元信息时有介绍。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。