专栏首页瓜农老梁RocketMQ NameServer【源码笔记】

RocketMQ NameServer【源码笔记】

1. NameServer启动

从生产环境实践来看,NameServer启动使用默认配置即可,运行良好。

启动命令:nohup sh bin/mqnamesrv &

NamesrvStartup.java 启动入口类,NameServer 启动默认端口9876

nettyServerConfig.setListenPort(9876)

每10秒钟扫描一次,移除失效的broker,同时删除缓存元数据信息

//初始化NameServer
boolean initResult = controller.initialize();
public boolean initialize() {
    //加载KV配置
    this.kvConfigManager.load();

    this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);

    this.remotingExecutor =
        Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));

    this.registerProcessor();
    //每10秒钟扫描一次,移除失效的broker
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            NamesrvController.this.routeInfoManager.scanNotActiveBroker();
        }
    }, 5, 10, TimeUnit.SECONDS);
    //每隔10秒钟打印一次KV配置信息
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            NamesrvController.this.kvConfigManager.printAllPeriodically();
        }
    }, 1, 10, TimeUnit.MINUTES);

    return true;
}

失效时间为2分钟,即:Broker在2分钟内未上报心跳会被移除

/**
 * 失效时间为2分钟,即:Broker在2分钟内未上报心跳会被移除
 */
public void scanNotActiveBroker() {
    Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
    while (it.hasNext()) {
        Entry<String, BrokerLiveInfo> next = it.next();
        long last = next.getValue().getLastUpdateTimestamp();
        if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
            RemotingUtil.closeChannel(next.getValue().getChannel());
            it.remove();
            log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
            this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
        }
    }
}
private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;

2.DefaultRequestProcessor

用于响应客户端、Broker的请求。主要向NameServer发送心跳包、获取Cluster、Broker、Topic元数据信息。

调用链: 在NameServer启动时注册,NamesrvController.initialize()->registerProcessor()

public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
    if (log.isDebugEnabled()) {
        log.debug("receive request, {} {} {}",
            request.getCode(),
            RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
            request);
    }

    switch (request.getCode()) {
        case RequestCode.PUT_KV_CONFIG://增加NameServer配置信息;由DefaultMQAdminExt使用
            return this.putKVConfig(ctx, request);
        case RequestCode.GET_KV_CONFIG://根据NameSpace和key获取NameServer配置信息;由DefaultMQAdminExt使用
            return this.getKVConfig(ctx, request);
        case RequestCode.DELETE_KV_CONFIG: //据NameSapce和Key删除NameServerr配置信息
            return this.deleteKVConfig(ctx, request);
        case RequestCode.REGISTER_BROKER: //注册Broker信息;由BrokerOuterAPI.registerBroker使用,在BrokerController启动时调用
            Version brokerVersion = MQVersion.value2Version(request.getVersion());
            if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
                return this.registerBrokerWithFilterServer(ctx, request);
            } else {
                return this.registerBroker(ctx, request);
            }
        case RequestCode.UNREGISTER_BROKER://移除注销broker信息;由BrokerOuterAPI.unregisterBroker使用,在BrokerController.shutdown时调用
            return this.unregisterBroker(ctx, request);
        case RequestCode.GET_ROUTEINTO_BY_TOPIC: //获取Topic路由信息 TopicRouteData
            return this.getRouteInfoByTopic(ctx, request);
        case RequestCode.GET_BROKER_CLUSTER_INFO://获取Cluster及Broker信息
            return this.getBrokerClusterInfo(ctx, request);
        case RequestCode.WIPE_WRITE_PERM_OF_BROKER: //去除该broker上所有topic的写权限
            return this.wipeWritePermOfBroker(ctx, request);
        case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER: //获取所有的Topic列表
            return getAllTopicListFromNameserver(ctx, request);
        case RequestCode.DELETE_TOPIC_IN_NAMESRV: //从nameServer中删除topic
            return deleteTopicInNamesrv(ctx, request);
        case RequestCode.GET_KVLIST_BY_NAMESPACE: //获取配置信息 configTable
            return this.getKVListByNamespace(ctx, request);
        case RequestCode.GET_TOPICS_BY_CLUSTER: //获取该集群下的所有topic list
            return this.getTopicsByCluster(ctx, request);
        case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS: // 此处意思为:系统会将集群名称、broker名称作为默认topic创建。现在获取这类topic
            return this.getSystemTopicListFromNs(ctx, request);
        case RequestCode.GET_UNIT_TOPIC_LIST: //暂无使用
            return this.getUnitTopicList(ctx, request);
        case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST: //暂无使用
            return this.getHasUnitSubTopicList(ctx, request);
        case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST://暂无使用
            return this.getHasUnitSubUnUnitTopicList(ctx, request);
        case RequestCode.UPDATE_NAMESRV_CONFIG: //更新properties请求
            return this.updateConfig(ctx, request);
        case RequestCode.GET_NAMESRV_CONFIG: //获取properties内容
            return this.getConfig(ctx, request);
        default:
            break;
    }
    return null;
}

注册broker信息

Broker每隔30秒向所有的NameServer上报Topic注册信息

Broker调用链

  BrokerController.start()->this.registerBrokerAll()->this.brokerOuterAPI.registerBrokerAll()
//每隔30秒向所有的NameServer上报Topic注册信息
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

    @Override
    public void run() {
        try {
            BrokerController.this.registerBrokerAll(true, false);
        } catch (Throwable e) {
            log.error("registerBrokerAll Exception", e);
        }
    }
}, 1000 * 10, 1000 * 30, TimeUnit.MILLISECONDS);

服务端处理主要包括:注册集群信息clusterAddrTable、注册broker信息brokerAddrTable、 注册topic信息topicQueueTable、broker心跳包brokerLiveTable

NameServer处理链

  DefaultRequestProcessor->processRequest->RequestCode.REGISTER_BROKER->this.registerBroker->RouteInfoManager.registerBroker()
public RegisterBrokerResult registerBroker(
    final String clusterName,
    final String brokerAddr,
    final String brokerName,
    final long brokerId,
    final String haServerAddr,
    final TopicConfigSerializeWrapper topicConfigWrapper,
    final List<String> filterServerList,
    final Channel channel) {
    RegisterBrokerResult result = new RegisterBrokerResult();
    try {
        try {
            //加写锁,防止并发修改
            this.lock.writeLock().lockInterruptibly();
            //注册集群信息
            Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
            if (null == brokerNames) {
                brokerNames = new HashSet<String>();
                this.clusterAddrTable.put(clusterName, brokerNames);
            }
            brokerNames.add(brokerName);

            boolean registerFirst = false;
            //注册broker信息
            BrokerData brokerData = this.brokerAddrTable.get(brokerName);
            if (null == brokerData) {
                registerFirst = true;
                brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
                this.brokerAddrTable.put(brokerName, brokerData);
            }
            String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
            registerFirst = registerFirst || (null == oldAddr);
            //Topic配置变化了;Master Broker第一次注册或者Topic dataVersion不相同时更新路由信息
            //有Topic新增时dataVersion会递增
            if (null != topicConfigWrapper //
                && MixAll.MASTER_ID == brokerId) {
                if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())//
                    || registerFirst) {
                    ConcurrentMap<String, TopicConfig> tcTable =
                        topicConfigWrapper.getTopicConfigTable();
                    if (tcTable != null) {
                        for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
                            this.createAndUpdateQueueData(brokerName, entry.getValue()); //更新topicQueueTable
                        }
                    }
                }
            }
            //更新broker心跳信息
            BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
                new BrokerLiveInfo(
                    System.currentTimeMillis(),
                    topicConfigWrapper.getDataVersion(),
                    channel,
                    haServerAddr));
            //新broker注册时会有日志输出
            if (null == prevBrokerLiveInfo) {
                log.info("new broker registerd, {} HAServer: {}", brokerAddr, haServerAddr);
            }
            //更新filterServer信息
            if (filterServerList != null) {
                if (filterServerList.isEmpty()) {
                    this.filterServerTable.remove(brokerAddr);
                } else {
                    this.filterServerTable.put(brokerAddr, filterServerList);
                }
            }
            //Slave设置MasterAddr和HaServerAddr
            if (MixAll.MASTER_ID != brokerId) {
                String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
                if (masterAddr != null) {
                    BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
                    if (brokerLiveInfo != null) {
                        result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
                        result.setMasterAddr(masterAddr);
                    }
                }
            }
        } finally {
            this.lock.writeLock().unlock();
        }
    } catch (Exception e) {
        log.error("registerBroker Exception", e);
    }

    return result;
}
3.NameServer缓存元数据结构

注册集群信息clusterAddrTable

/**
 * Broker集群信息; key为集群名称,value为所有的broker名称集合
 */
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;

注册broker信息brokerAddrTable

/**
 *  Broker信息;key为brokerName,value为BrokerData
 */
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;

public class BrokerData implements Comparable<BrokerData> {
    /**
     * Cluster名称
     */
    private String cluster;
    /**
     * broker名称
     */
    private String brokerName;
    /**
     * 0->ip:port
     */
    private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;
}

注册topic信息topicQueueTable

/**
 * 消息队列路由信息;key为topic,value为QueueData
 */
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;

public class QueueData implements Comparable<QueueData> {
    /**
     * Broker名称
     */
    private String brokerName;
    /**
     * 读队列个数,默认4个
     */
    private int readQueueNums;
    /**
     * 写队列个数,默认4个
     */
    private int writeQueueNums;
    /**
     * 队列权限
     */
    private int perm;
    /**
     * 配置的,同步复制还是异步复制标记,对应TopicConfig.topicSysFlag
     *
     */
    private int topicSynFlag;
}

Broker心跳包brokerLiveTable

/**
 * Broker状态信息,NameServer每次收到心跳包会替换该信息,每隔30秒更新一次
 * brokerAddr: ip:port->{}
 */
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;

class BrokerLiveInfo {
    /**
     * 存储上次收到心跳包的时间,每隔30秒更新一次
     */
    private long lastUpdateTimestamp;
    private DataVersion dataVersion;
    private Channel channel;
    private String haServerAddr;
}

本文分享自微信公众号 - 瓜农老梁(gh_01130ae30a83),作者:梁勇

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2019-08-21

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • RocketMQ Topic创建【源码笔记】

    Topic的创建分为自动创建和通过命令行创建两种。通过broker配置参数autoCreateTopicEnable设置。 通常可以在非生产环境开启自动创建,生...

    瓜农老梁
  • RocketMQ存储--主从同步【源码笔记】

    1.消息存储在Master上了,如何同步到Slave上了呢? 2.同步复制和异步复制流程是怎么样的?

    瓜农老梁
  • RocketMQ客户端消费--ProcessQueue处理队列【源码笔记】

    在消费消息时处处能看到处理队列ProcessQueue的身影,既然随处可见也一定很重要,那有必要分析下为何重要了。

    瓜农老梁
  • 利用canvas实现毛笔字帖(二)

    2. 第2部分write.js 第二部分决定先介绍write部分,因为controller部分必须要结合write部分才能看到效果。 针对write.js部分,...

    用户1394570
  • Button按钮--inject与provide

    inject 和 provider 是vue中的组合选项,需要一起使用。目的是允许一个祖先组件向其所有子孙后代注入依赖(简单地说就是祖先组件向子孙后代传值的一种...

    用户1148399
  • Mybatis 源码分析(一)之 Mybatis 的Executor的初始化

    Mybatis系列: Mybatis 基础介绍与逆向工程的构建 :https://www.jianshu.com/p/1c18db4d7a38 Mybati...

    zoro
  • 通过vue.js 学习来总结es6语法中的箭头函数,箭头函数原理分析。

    版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/acoolgiser/article/details/...

    acoolgiser
  • JS回调函数中的 this 指向(详细)

    1. obj.fun() fun 中的 this->obj ,自动指向.前的对象

    TimothyJia
  • 求超大文件上传方案( B/S )

    需求:项目要支持大文件上传功能,经过讨论,初步将文件上传大小控制在500M内,因此自己需要在项目中进行文件上传部分的调整和配置,自己将大小都以501M来进行限制...

    用户6892318
  • flutter单引擎方案

    假设有两个模块,FlutterA,FlutterB,我们利用io.flutter.embedding.android.FlutterFragment下面的接入方...

    brzhang

扫码关注云+社区

领取腾讯云代金券