前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Broker注册到NameServer源码分析

Broker注册到NameServer源码分析

作者头像
CBeann
发布2023-12-25 19:15:54
1410
发布2023-12-25 19:15:54
举报
文章被收录于专栏:CBeann的博客CBeann的博客

写作目的

RocketMQ一个用Java写的开源项目,而且也是阿里开源的,所以想看一看设计思路以及一些细节,所以就写了这篇博客,记录一下Broker注册到Nameserver的过程以及心跳逻辑。

前提

要会Netty吧,如果不会的话,感觉应该看不懂吧。

跟源码思路

其实很多源码的讲解都是把一个类都标上注释,其实我感觉这样的人很厉害,因为他确实对这个代码很精通。我的风格比较偷懒,我们想看哪一部分就跟哪一部分和哪个分支,其他的没必要看,这样你就能偷懒了,所以这篇文章想跟的是Broker注册到NameServer源码以及Broker与NameServer的心跳。你调试的时候不也是这样吗,哪报错了你就进哪个分支,不关心的分支我们不去分析

注意:本文只关心Broker注册到NameServer和心跳逻辑,其他都不关心。

启动源码分析

源码版本

RocketMQ4.9.1

NameServer启动流程

在这里插入图片描述
在这里插入图片描述

启动入口我们从NamesrvStartup#main0开始

代码语言:javascript
复制
public static NamesrvController main0(String[] args) {

        try {
            //创建NameServer,其实就初始化一些变量,跳过
            NamesrvController controller = createNamesrvController(args);
            //启动controller
            start(controller);
            String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
            log.info(tip);
            System.out.printf("%s%n", tip);
            return controller;
        } catch (Throwable e) {
            e.printStackTrace();
            System.exit(-1);
        }

        return null;
    }

接下来就是NamesrvStartup#start方法

代码语言:javascript
复制
public static NamesrvController start(final NamesrvController controller) throws Exception {
        //初始化,就是给一些属性初始化,其中remotingServer得到初始化
        boolean initResult = controller.initialize();
        //省略
        //启动controller
        controller.start();
        return controller;
    }

下面跟NamesrvController#start方法

代码语言:javascript
复制
public void start() throws Exception {
        //启动NettyServer服务器
        this.remotingServer.start();

        if (this.fileWatchService != null) {
            this.fileWatchService.start();
        }
    }

此时在NettyRemotingServer#start就启动了一个ServerBootstrap服务端

代码语言:javascript
复制
public void start() {
       //省略
        ServerBootstrap childHandler =
            this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
                .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 1024)
                .option(ChannelOption.SO_REUSEADDR, true)
                .option(ChannelOption.SO_KEEPALIVE, false)
                .childOption(ChannelOption.TCP_NODELAY, true)
                .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
                .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
                .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline()
                            .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
                            .addLast(defaultEventExecutorGroup,
                                encoder,
                                new NettyDecoder(),
                                new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                                connectionManageHandler,
                                serverHandler
                            );
                    }
                });

       //省略
        try {
            ChannelFuture sync = this.serverBootstrap.bind().sync();
            InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
            this.port = addr.getPort();
        } catch (InterruptedException e1) {
            throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
        }

        //省略
    }

综上:NameServer的启动流程的核心就是在NettyRemotingServer#start就启动了一个ServerBootstrap并监听9876端口

Broker启动流程

在这里插入图片描述
在这里插入图片描述

还是直接看BrokerController#start方法吧,反正前面也是debug

代码语言:javascript
复制
public void start() throws Exception {
        
        //省略

        //向NameServer注册自己的信息
        if (!messageStoreConfig.isEnableDLegerCommitLog()) {
            startProcessorByHa(messageStoreConfig.getBrokerRole());
            handleSlaveSynchronize(messageStoreConfig.getBrokerRole());
            向NameServer注册自己的信息
            this.registerBrokerAll(true, false, true);
        }

        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                try {
                    //心跳,重复注册自己
                    BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
                } catch (Throwable e) {
                    log.error("registerBrokerAll Exception", e);
                }
            }
        }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);

        if (this.brokerStatsManager != null) {
            this.brokerStatsManager.start();
        }

        if (this.brokerFastFailure != null) {
            this.brokerFastFailure.start();
        }


    }

Broker给NameServer发心跳

Broker启动一个定时任务,每次都会向NameServer注册自己,不断覆盖到NameServer存的Broker的信息,从而达到心跳的效果,我只能说一个字,秀。

代码语言:javascript
复制
//BrokerController#start
public void start() throws Exception {
        
        //省略
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    //心跳,重复注册自己
                    BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
                } catch (Throwable e) {
                    log.error("registerBrokerAll Exception", e);
                }
            }
        }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);

        //省略
    }

broker和NameServer之间维护连接

NameServer维护和Broker之间的连接

代码语言:javascript
复制
//NamesrvController#initialize
public boolean initialize() {
        //省略

        //定时任务,根据broker注册到nameServer的时间与此时此刻时间的阈值去判断该broker是否还存活
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                NamesrvController.this.routeInfoManager.scanNotActiveBroker();
            }
        }, 5, 10, TimeUnit.SECONDS);

        //省略

        return true;
    }

总结

1 broker和NameServer之间的心跳我以为是发送心跳包去实现的,结果是通过不断的向nameserver注册自己实现的 2 NameServer通过定时任务不断的扫描brokerLiveTable去根据时间阈值(broker注册的时间和此时此刻的时间差距)实现维护连接

参考

https://www.bilibili.com/video/BV1fE411V7Ho?p=6

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022-03-07,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 写作目的
  • 前提
  • 跟源码思路
  • 启动源码分析
    • 源码版本
      • NameServer启动流程
        • Broker启动流程
          • Broker给NameServer发心跳
            • broker和NameServer之间维护连接
            • 总结
            • 参考
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档