前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RocketMQ-NameServer原理

RocketMQ-NameServer原理

作者头像
潇洒
发布2023-10-20 10:24:17
2680
发布2023-10-20 10:24:17
举报
文章被收录于专栏:石头岛

NameServer 名字服务

实际作就是就一个注册中心

NameServer 作用

在系统中肯定是做命名服务,服务治理方面的工作,功能应该是和zookeeper差不多 早期的版本中,使用的是 Zookeeper 做为配置中心,改名 RocketMQ 后使用了自己开发的 NameServer。 是一个几乎无状态的节点,可集群部署,节点之间无任何信息同步

两个主要做用

  1. NameServer维护Broker NameServer 维护了一份 Broker 的地址列表和 Broker 在启动的时候会去 NameServer 进行注册,会维护 Broker 的存活状态。
  2. NameServer 维护Topic NameServer 维护了一份 Topic 和 Topic 对应队列的地址列表,Broker 每次发送的心跳过来的时候会把 Topic 信息带上。

producer、consumer 发送消息会去 NameServer 去拉取路由信息

NameServer 维护 Broker

1.维护 Broker 信息 broker 启动后,会连接到 NameServer,定期上报自身信息,NameServer 收到消息后会每 30秒 扫描一次所有已上报的 Broker 信息的心跳。 NameServer与每台Broker保持长连接,并间隔30S检测Broker是否存活,如果检测到Broker宕机,则从路由注册表中删除。 注意: 但是路由变化不会马上通知消息生产者Producer。 这样设计的目的是为了降低 NameServer 实现的复杂度,在消息发送端提供容错机制保证消息发送的可用性。

2.判断 broker 失效 以 NameServer自身 broker列表中的broker的更新时间,当前时间与最后更新时间差值超过2分钟,就判定为失效,移除失效 broker。这个后面带上源码分析。

3.无状态性 NameServer本身的高可用是通过部署多台NameServer来实现,但彼此之间不通讯,也就是NameServer服务器之间在某一个时刻的数据并不完全相同,但这对消息发送并不会造成任何影响,这也是NameServer设计的一个亮点

特点:

  1. 互相独立,彼此没有通信关系,单台nameserver挂掉,不影响其他nameserver,即使全部挂掉,也不影响业务系统使用,这点类似于dubbo的zookeeper。
  2. nameserver不会有频繁的读写,所以性能开销非常小,稳定性很高。

Broker 启动的时候会将自己注册到 NameServer 中,注册的同时还会将 Broker 的 IP 地址、端口相关的数据,以及保存在 Broker 中的 RocketMQ 集群路由的数据一并跟随心跳发送到 NameServer。这里的路由信息是指 Topic 下的 MessageQueue 分别都在哪台 Broker 上。

从代码是了解这一过程

registerBrokerbroker注册、维护的主要逻辑,主要的几个集合:

  • topicQueueTable topic和broker对应关系
  • brokerAddrTable broker信息
  • clusterAddrTable 集群信息
代码语言:javascript
复制
// 处理 broker 相关事务
public class RouteInfoManager {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
    //broker 超时时间设置 120 秒,就是这个指定的,没有发现有给api修改
    private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    // topic 路由表
    private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
    // broker 信息
    private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;

    //省略部分代码

    //注册 broker
    public RegisterBrokerResult registerBroker(
        final String clusterName,
        final String brokerAddr,
        final String brokerName,
        final long brokerId,
        final String haServerAddr,
        final TopicConfigSerializeWrapper topicConfigWrapper,
        final List<String> filterServerList,
        final Channel channel) {
        RegisterBrokerResult result = new RegisterBrokerResult();
        try {
            try {
                this.lock.writeLock().lockInterruptibly();
                //集群名称
                Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
                if (null == brokerNames) {
                    //如果没有拿到 broker名,broker就用 clusterName
                    brokerNames = new HashSet<String>();
                    this.clusterAddrTable.put(clusterName, brokerNames);
                }
                brokerNames.add(brokerName);

                boolean registerFirst = false;

                //brokerData 数据,第一次注册,并没有数据
                BrokerData brokerData = this.brokerAddrTable.get(brokerName);
                if (null == brokerData) {
                    registerFirst = true;
                    brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
                    this.brokerAddrTable.put(brokerName, brokerData);
                }
                //key 是 0-n
                Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();
                //Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>
                //The same IP:PORT must only have one record in brokerAddrTable
                //slave 切换到 master:删除1,再将slave改为0,add到brokerAddrTable
                Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();
                while (it.hasNext()) {
                    Entry<Long, String> item = it.next();
                    //brokerAddr 申请注册的 broker
                    //去重,找到 IP:PORT 只允许一条存在,如果 IP:PORT 存在,ID不同,删除这一条
                    if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) {
                        it.remove();
                    }
                }
                //上面删完,这里add进去,可以理解成更新操作
                String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
                registerFirst = registerFirst || (null == oldAddr);//返回旧值

                //处理 topic 的配置修改,如果是master,开发中topic经常会调整
                if (null != topicConfigWrapper
                    && MixAll.MASTER_ID == brokerId) {
                    if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
                        || registerFirst) {
                        ConcurrentMap<String, TopicConfig> tcTable =
                            topicConfigWrapper.getTopicConfigTable();
                        if (tcTable != null) {
                            for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
                                this.createAndUpdateQueueData(brokerName, entry.getValue());
                            }
                        }
                    }
                }
            // 省略部分代码
            } finally {
                this.lock.writeLock().unlock();
            }
        } catch (Exception e) {
            log.error("registerBroker Exception", e);
        }

        return result;
    }

topic 队列和 broker 的对应关系

一个topic默认会有16个队列(queue),队列(queue)会分布在不同的broker

代码语言:javascript
复制
//创建、更新队列 brokerName 和 queue 的对应关系
private void createAndUpdateQueueData(final String brokerName, final TopicConfig topicConfig) {
    QueueData queueData = new QueueData();
    queueData.setBrokerName(brokerName);
    //默认16
    queueData.setWriteQueueNums(topicConfig.getWriteQueueNums());
    //默认16
    queueData.setReadQueueNums(topicConfig.getReadQueueNums());
    queueData.setPerm(topicConfig.getPerm());
    queueData.setTopicSynFlag(topicConfig.getTopicSysFlag());

    List<QueueData> queueDataList = this.topicQueueTable.get(topicConfig.getTopicName());
    if (null == queueDataList) {
        queueDataList = new LinkedList<QueueData>();
        queueDataList.add(queueData);
        this.topicQueueTable.put(topicConfig.getTopicName(), queueDataList);
        log.info("new topic registered, {} {}", topicConfig.getTopicName(), queueData);
    } else {
        boolean addNewOne = true;

        Iterator<QueueData> it = queueDataList.iterator();
        while (it.hasNext()) {
            QueueData qd = it.next();
            //绑定队列对应的 brokerName
            //比如 broker1  TopicA---queue1
            //                      queue2
            //                      queue3
            //                      queue4
            if (qd.getBrokerName().equals(brokerName)) {
                if (qd.equals(queueData)) {
                    addNewOne = false;
                } else {
                    log.info("topic changed, {} OLD: {} NEW: {}", topicConfig.getTopicName(), qd,
                        queueData);
                    it.remove();
                }
            }
        }

        if (addNewOne) {
            queueDataList.add(queueData);
        }
    }
}

总结

NameServer 其实就是抄的 kafka的注册中心,又搞的不像,两个节点之前状态不一致表面上说是开发简单,那实际用起来呢,再观望一下,不行我提个PR帮他们搞搞。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2015-04-17,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • NameServer 名字服务
    • NameServer 作用
      • NameServer 维护 Broker
      • 总结
      相关产品与服务
      微服务引擎 TSE
      微服务引擎(Tencent Cloud Service Engine)提供开箱即用的云上全场景微服务解决方案。支持开源增强的云原生注册配置中心(Zookeeper、Nacos 和 Apollo),北极星网格(腾讯自研并开源的 PolarisMesh)、云原生 API 网关(Kong)以及微服务应用托管的弹性微服务平台。微服务引擎完全兼容开源版本的使用方式,在功能、可用性和可运维性等多个方面进行增强。
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档