RocketMQ一个用Java写的开源项目,而且也是阿里开源的,所以想看一看设计思路以及一些细节,所以就写了这篇博客,记录一下Broker注册到Nameserver的过程以及心跳逻辑。
要会Netty吧,如果不会的话,感觉应该看不懂吧。
其实很多源码的讲解都是把一个类都标上注释,其实我感觉这样的人很厉害,因为他确实对这个代码很精通。我的风格比较偷懒,我们想看哪一部分就跟哪一部分和哪个分支,其他的没必要看,这样你就能偷懒了,所以这篇文章想跟的是Broker注册到NameServer源码以及Broker与NameServer的心跳。你调试的时候不也是这样吗,哪报错了你就进哪个分支,不关心的分支我们不去分析。
注意:本文只关心Broker注册到NameServer和心跳逻辑,其他都不关心。
启动入口我们从NamesrvStartup#main0开始
public static NamesrvController main0(String[] args) {
try {
//创建NameServer,其实就初始化一些变量,跳过
NamesrvController controller = createNamesrvController(args);
//启动controller
start(controller);
String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
log.info(tip);
System.out.printf("%s%n", tip);
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
return null;
}
接下来就是NamesrvStartup#start方法
public static NamesrvController start(final NamesrvController controller) throws Exception {
//初始化,就是给一些属性初始化,其中remotingServer得到初始化
boolean initResult = controller.initialize();
//省略
//启动controller
controller.start();
return controller;
}
下面跟NamesrvController#start方法
public void start() throws Exception {
//启动NettyServer服务器
this.remotingServer.start();
if (this.fileWatchService != null) {
this.fileWatchService.start();
}
}
此时在NettyRemotingServer#start就启动了一个ServerBootstrap服务端
public void start() {
//省略
ServerBootstrap childHandler =
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
.addLast(defaultEventExecutorGroup,
encoder,
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
connectionManageHandler,
serverHandler
);
}
});
//省略
try {
ChannelFuture sync = this.serverBootstrap.bind().sync();
InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
this.port = addr.getPort();
} catch (InterruptedException e1) {
throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
}
//省略
}
综上:NameServer的启动流程的核心就是在NettyRemotingServer#start就启动了一个ServerBootstrap并监听9876端口
还是直接看BrokerController#start方法吧,反正前面也是debug
public void start() throws Exception {
//省略
//向NameServer注册自己的信息
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
startProcessorByHa(messageStoreConfig.getBrokerRole());
handleSlaveSynchronize(messageStoreConfig.getBrokerRole());
向NameServer注册自己的信息
this.registerBrokerAll(true, false, true);
}
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
//心跳,重复注册自己
BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
} catch (Throwable e) {
log.error("registerBrokerAll Exception", e);
}
}
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
if (this.brokerStatsManager != null) {
this.brokerStatsManager.start();
}
if (this.brokerFastFailure != null) {
this.brokerFastFailure.start();
}
}
Broker启动一个定时任务,每次都会向NameServer注册自己,不断覆盖到NameServer存的Broker的信息,从而达到心跳的效果,我只能说一个字,秀。
//BrokerController#start
public void start() throws Exception {
//省略
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
//心跳,重复注册自己
BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
} catch (Throwable e) {
log.error("registerBrokerAll Exception", e);
}
}
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
//省略
}
NameServer维护和Broker之间的连接
//NamesrvController#initialize
public boolean initialize() {
//省略
//定时任务,根据broker注册到nameServer的时间与此时此刻时间的阈值去判断该broker是否还存活
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
//省略
return true;
}
1 broker和NameServer之间的心跳我以为是发送心跳包去实现的,结果是通过不断的向nameserver注册自己实现的 2 NameServer通过定时任务不断的扫描brokerLiveTable去根据时间阈值(broker注册的时间和此时此刻的时间差距)实现维护连接
https://www.bilibili.com/video/BV1fE411V7Ho?p=6