前面我们已经通过quickstrat可以看到nameServer的启动:
/**
* nameServer启动类
*/
public class NamesrvStartup {
private static InternalLogger log;
private static Properties properties = null;
private static CommandLine commandLine = null;
public static void main(String[] args) {
main0(args);
}
//启动NamesrvController做了:创建namesrvController和启动controller,返回controller
public static NamesrvController main0(String[] args) {
try {
NamesrvController controller = createNamesrvController(args);
start(controller);
String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
log.info(tip);
System.out.printf("%s%n", tip);
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
return null;
}
//创建namesrvController:首先设置配置信息rocketmq的版本信息
public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
//PackageConflictDetect.detectFastjson();
Options options = ServerUtil.buildCommandlineOptions(new Options());
commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
if (null == commandLine) {
System.exit(-1);
return null;
}
//创建namesrvCnfig、nettyServerConfig,并进行配置的填充
//采用-c和-p两种方式进行填充
final NamesrvConfig namesrvConfig = new NamesrvConfig();
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
nettyServerConfig.setListenPort(9876);
if (commandLine.hasOption('c')) {
String file = commandLine.getOptionValue('c');
if (file != null) {
InputStream in = new BufferedInputStream(new FileInputStream(file));
properties = new Properties();
properties.load(in);
MixAll.properties2Object(properties, namesrvConfig);
MixAll.properties2Object(properties, nettyServerConfig);
namesrvConfig.setConfigStorePath(file);
System.out.printf("load config properties file OK, %s%n", file);
in.close();
}
}
if (commandLine.hasOption('p')) {
InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
MixAll.printObjectProperties(console, namesrvConfig);
MixAll.printObjectProperties(console, nettyServerConfig);
System.exit(0);
}
MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
if (null == namesrvConfig.getRocketmqHome()) {
System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
System.exit(-2);
}
LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
JoranConfigurator configurator = new JoranConfigurator();
configurator.setContext(lc);
lc.reset();
configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");
log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
MixAll.printObjectProperties(log, namesrvConfig);
MixAll.printObjectProperties(log, nettyServerConfig);
final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
// remember all configs to prevent discard
controller.getConfiguration().registerConfig(properties);
return controller;
}
//启动namesrvController
public static NamesrvController start(final NamesrvController controller) throws Exception {
if (null == controller) {
throw new IllegalArgumentException("NamesrvController is null");
}
//初始化:加载kv配置、创建网络对象,开启两个定时任务(心跳检查),注册jvm钩子
boolean initResult = controller.initialize();
if (!initResult) {
controller.shutdown();
System.exit(-3);
}
//添加关机钩子函数
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
@Override
public Void call() throws Exception {
controller.shutdown();
return null;
}
}));
//启动controller
controller.start();
return controller;
}
public static void shutdown(final NamesrvController controller) {
controller.shutdown();
}
//使用-c和-p的方式进行环境变量参数的添加
public static Options buildCommandlineOptions(final Options options) {
Option opt = new Option("c", "configFile", true, "Name server config properties file");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("p", "printConfigItem", false, "Print all config item");
opt.setRequired(false);
options.addOption(opt);
return options;
}
public static Properties getProperties() {
return properties;
}
}
从启动类中,我们看到:首先创建NamesrvConfig、nettyServerConfig,设置监听端口,将8888改成9876。填充NamesrvConfig、NettyServerConfig、BrokerConfig,获取namesrvAddr,创建Controller,注册钩子函数,启动start。
NamesrvController的属性信息、构造函数:
public class NamesrvController {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
//namesrvConfig配置信息
private final NamesrvConfig namesrvConfig;
//NettyServer配置信息
private final NettyServerConfig nettyServerConfig;
//定时任务
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
"NSScheduledThread"));
//kv配置管理
private final KVConfigManager kvConfigManager;
//路由信息管理
private final RouteInfoManager routeInfoManager;
//远程服务
private RemotingServer remotingServer;
//brokerHousekeepingService
private BrokerHousekeepingService brokerHousekeepingService;
private ExecutorService remotingExecutor;
//配置
private Configuration configuration;
private FileWatchService fileWatchService;
//namesrvController:从入参可以看到里面包含两个重要的配置namesrvConfig、nettyServerConfig
//除了这两个重要的配置之外,还添加了kvConfigManager、routeInfoManager、brokerHousekeepingService、
//Configuration
public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) {
this.namesrvConfig = namesrvConfig;
this.nettyServerConfig = nettyServerConfig;
this.kvConfigManager = new KVConfigManager(this);
this.routeInfoManager = new RouteInfoManager();
this.brokerHousekeepingService = new BrokerHousekeepingService(this);
this.configuration = new Configuration(
log,
this.namesrvConfig, this.nettyServerConfig
);
this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath");
}
}
NamesrvConfig配置信息:
public class NamesrvConfig {
//rocketmqHome信息
private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
//K-V配置路径
private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json";
//配置存储路径
private String configStorePath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties";
//productEnvName
private String productEnvName = "center";
//clusterTes
private boolean clusterTest = false;
//消息有序默认false
private boolean orderMessageEnable = false;
}
NettysrvComfig配置信息
/**
* NettyServer配置
*/
public class NettyServerConfig implements Cloneable {
//监听端口号
private int listenPort = 8888;
//worker线程数
private int serverWorkerThreads = 8;
//callbackExecutor线程数
private int serverCallbackExecutorThreads = 0;
//选择器线程数
private int serverSelectorThreads = 3;
//OnewaySemaphoreValue值
private int serverOnewaySemaphoreValue = 256;
//异步SemaphoreValue值
private int serverAsyncSemaphoreValue = 64;
//通道最大闲置时间
private int serverChannelMaxIdleTimeSeconds = 120;
//socket的发送和接收的bufSize
private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize;
private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;
//开启堆外内存
private boolean serverPooledByteBufAllocatorEnable = true;
}
RouteInfoManager路由信息:
**
* 路由信息
*/
public class RouteInfoManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
//broker通道过期时间
private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
//读写锁
private final ReadWriteLock lock = new ReentrantReadWriteLock();
//topic消息队列路由表
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
//broker基础表
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
//cluster集群表
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
//broker状态表
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
//FilterServer表
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
//路由信息构造函数
public RouteInfoManager() {
this.topicQueueTable = new HashMap<String, List<QueueData>>(1024);
this.brokerAddrTable = new HashMap<String, BrokerData>(128);
this.clusterAddrTable = new HashMap<String, Set<String>>(32);
this.brokerLiveTable = new HashMap<String, BrokerLiveInfo>(256);
this.filterServerTable = new HashMap<String, List<String>>(256);
}
start(final NamesrvController controller)中进行初始化,创建nettyServer对象,开启两个定时任务,添加关闭钩子,然后进行启动操作
//进行初始化
public boolean initialize() {
//加载kv配置信息
this.kvConfigManager.load();
//netty远程服务:nettyServer配置、brokerHousekeepingService
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
//远程线程
this.remotingExecutor =
Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
//注册处理
this.registerProcessor();
//下面两个定时任务是心跳检查
//定时任务:路由信息管理扫描没有激活的broker
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
//定时任务:kv配置管理调用定时打印所有
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.kvConfigManager.printAllPeriodically();
}
}, 1, 10, TimeUnit.MINUTES);
//注册一个监听去重新加载Ssl上下文
if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
// Register a listener to reload SslContext
try {
fileWatchService = new FileWatchService(
new String[] {
TlsSystemConfig.tlsServerCertPath,
TlsSystemConfig.tlsServerKeyPath,
TlsSystemConfig.tlsServerTrustCertPath
},
new FileWatchService.Listener() {
boolean certChanged, keyChanged = false;
@Override
public void onChanged(String path) {
if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
log.info("The trust certificate changed, reload the ssl context");
reloadServerSslContext();
}
if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
certChanged = true;
}
if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
keyChanged = true;
}
if (certChanged && keyChanged) {
log.info("The certificate and private key changed, reload the ssl context");
certChanged = keyChanged = false;
reloadServerSslContext();
}
}
private void reloadServerSslContext() {
((NettyRemotingServer) remotingServer).loadSslContext();
}
});
} catch (Exception e) {
log.warn("FileWatchService created error, can't load the certificate dynamically");
}
}
return true;
}
启动服务:
public void start() throws Exception {
this.remotingServer.start();
if (this.fileWatchService != null) {
this.fileWatchService.start();
}
}
启动服务
//reomotingCelint,使用Netty,可以看到DefaultEventExecutorGroup继承MultithreadEventExecutorGroup
//而在Netty中,我们知道MultithreadEventExecutorGroup的构造方法是NioEventLoopGroup的构造方法
//构造方法:DefaultEventExecutorGroup(int nThreads, ThreadFactory threadFactory)
@Override
public void start() {
//创建NioEventLoopGroup
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
nettyClientConfig.getClientWorkerThreads(),
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
//重写线程方法
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());
}
});
//创建引导 客户端 填充信息
Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
.option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
.option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
.handler(new ChannelInitializer<SocketChannel>() {
//重写initChannel方法
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
if (nettyClientConfig.isUseTLS()) {
if (null != sslContext) {
pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));
log.info("Prepend SSL handler");
} else {
log.warn("Connections are insecure as SSLContext is null!");
}
}
//pipeline添加信息,以及handler
pipeline.addLast(
defaultEventExecutorGroup,
new NettyEncoder(),
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
new NettyConnectManageHandler(),
new NettyClientHandler());
}
});
//扫描响应表启动 定时任务
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
NettyRemotingClient.this.scanResponseTable();
} catch (Throwable e) {
log.error("scanResponseTable exception", e);
}
}
}, 1000 * 3, 1000);
//如果通道事件监听不为空,则启动
if (this.channelEventListener != null) {
this.nettyEventExecutor.start();
}
}
定时任务:
/**
* <p>
* This method is periodically invoked to scan and expire deprecated request.
* 定期调用此方法以扫描和终止已弃用的请求
* </p>
*/
public void scanResponseTable() {
final List<ResponseFuture> rfList = new LinkedList<ResponseFuture>();
//迭代器
Iterator<Entry<Integer, ResponseFuture>> it = this.responseTable.entrySet().iterator();
while (it.hasNext()) {
Entry<Integer, ResponseFuture> next = it.next();
ResponseFuture rep = next.getValue();
if ((rep.getBeginTimestamp() + rep.getTimeoutMillis() + 1000) <= System.currentTimeMillis()) {
rep.release();
it.remove();
rfList.add(rep);
log.warn("remove timeout request, " + rep);
}
}
for (ResponseFuture rf : rfList) {
try {
executeInvokeCallback(rf);
} catch (Throwable e) {
log.warn("scanResponseTable, operationComplete Exception", e);
}
}
}
总结:从启动类中,我们看到:首先创建NamesrvConfig、nettyServerConfig,设置监听端口,将8888改成9876。填充NamesrvConfig、NettyServerConfig、BrokerConfig,获取namesrvAddr,创建Controller,注册钩子函数,启动start。