上篇文章主要梳理了NameServer的启动器和配置信息,并复习了JVM中的关闭钩子这个知识点。这篇文章看下NameServer的其他模块。建议带着如下三个问题阅读:
public class NamesrvController {
private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
//name server的配置
private final NamesrvConfig namesrvConfig;
//netty server的配置定义
private final NettyServerConfig nettyServerConfig;
//创建一个具备调度功能的线程池,该线程池里只有一个线程,用于:(1)周期性检查broker信息;(2)周期性打印路由信息
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
"NSScheduledThread"));
//name server配置的操作接口
private final KVConfigManager kvConfigManager;
//name server路由信息的操作接口
private final RouteInfoManager routeInfoManager;
//服务器
private RemotingServer remotingServer;
//broker信息清理器,监听通道事件
private BrokerHousekeepingService brokerHousekeepingService;
//服务端处理请求的线程池
private ExecutorService remotingExecutor;
private Configuration configuration;
//other code....
}
ScheduledExecutorService.png
在NameServerController中会注册请求处理器,那么name server的请求处理器实现了哪些接口呢,请看代码:
public class DefaultRequestProcessor implements NettyRequestProcessor {
private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
protected final NamesrvController namesrvController;
public DefaultRequestProcessor(NamesrvController namesrvController) {
this.namesrvController = namesrvController;
}
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
if (log.isDebugEnabled()) {
log.debug("receive request, {} {} {}",
request.getCode(),
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
request);
}
switch (request.getCode()) {
case RequestCode.PUT_KV_CONFIG:
return this.putKVConfig(ctx, request);
case RequestCode.GET_KV_CONFIG:
return this.getKVConfig(ctx, request);
case RequestCode.DELETE_KV_CONFIG:
return this.deleteKVConfig(ctx, request);
case RequestCode.REGISTER_BROKER:
Version brokerVersion = MQVersion.value2Version(request.getVersion());
if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
return this.registerBrokerWithFilterServer(ctx, request);
} else {
return this.registerBroker(ctx, request);
}
case RequestCode.UNREGISTER_BROKER:
return this.unregisterBroker(ctx, request);
case RequestCode.GET_ROUTEINTO_BY_TOPIC:
return this.getRouteInfoByTopic(ctx, request);
case RequestCode.GET_BROKER_CLUSTER_INFO:
return this.getBrokerClusterInfo(ctx, request);
case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
return this.wipeWritePermOfBroker(ctx, request);
case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:
return getAllTopicListFromNameserver(ctx, request);
case RequestCode.DELETE_TOPIC_IN_NAMESRV:
return deleteTopicInNamesrv(ctx, request);
case RequestCode.GET_KVLIST_BY_NAMESPACE:
return this.getKVListByNamespace(ctx, request);
case RequestCode.GET_TOPICS_BY_CLUSTER:
return this.getTopicsByCluster(ctx, request);
case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS:
return this.getSystemTopicListFromNs(ctx, request);
case RequestCode.GET_UNIT_TOPIC_LIST:
return this.getUnitTopicList(ctx, request);
case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST:
return this.getHasUnitSubTopicList(ctx, request);
case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:
return this.getHasUnitSubUnUnitTopicList(ctx, request);
case RequestCode.UPDATE_NAMESRV_CONFIG:
return this.updateConfig(ctx, request);
case RequestCode.GET_NAMESRV_CONFIG:
return this.getConfig(ctx, request);
default:
break;
}
return null;
}
//其他具体的实现方法
}
从这个代码中可以看出两个方面的内容:
该模块实现了ChannelEventListener接口,每个broker都会跟name server建立一个连接通道,当这个通道发生异常事件时,需要及时在name server这边清理掉对应的broker信息。异常事件的类型有:(1)通道关闭时;(2)通道抛出异常时;(3)通道空闲时。
public class BrokerHousekeepingService implements ChannelEventListener {
private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
private final NamesrvController namesrvController;
public BrokerHousekeepingService(NamesrvController namesrvController) {
this.namesrvController = namesrvController;
}
@Override
public void onChannelConnect(String remoteAddr, Channel channel) {
}
@Override
public void onChannelClose(String remoteAddr, Channel channel) {
this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
}
@Override
public void onChannelException(String remoteAddr, Channel channel) {
this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
}
@Override
public void onChannelIdle(String remoteAddr, Channel channel) {
this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
}
}
这个模块是name server的核心模块,真正管理broker、消息队列等相关信息的地方。代码如下:
public class RouteInfoManager {
private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
public RouteInfoManager() {
this.topicQueueTable = new HashMap<String, List<QueueData>>(1024);
this.brokerAddrTable = new HashMap<String, BrokerData>(128);
this.clusterAddrTable = new HashMap<String, Set<String>>(32);
this.brokerLiveTable = new HashMap<String, BrokerLiveInfo>(256);
this.filterServerTable = new HashMap<String, List<String>>(256);
}
//对外暴露的方法
}
主要属性的含义如下:
关于ReentrantReadWriteLock:
这个模块用于管理name server自己的配置信息,配置信息以json信息存放在文件中,以二维数组形式存在于内存中,请看代码:
/**
* 管理NameServer的配置属性
*/
public class KVConfigManager {
private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
private final NamesrvController namesrvController;
//可重入读写锁
private final ReadWriteLock lock = new ReentrantReadWriteLock();
//配置表
private final HashMap<String/* Namespace */, HashMap<String/* Key */, String/* Value */>> configTable =
new HashMap<String, HashMap<String, String>>();
public KVConfigManager(NamesrvController namesrvController) {
this.namesrvController = namesrvController;
}
//这个类对外暴露的方法,省了
}
这个类对外暴露的方法有: