专栏首页java 成神之路RocketMQ NameServer 原理分析

RocketMQ NameServer 原理分析

概述

NameServer 是RocketMQ 消息队列的状态服务器(服务发现功能),集群中的各个服务都需要通过 NameServer 来了解集群中各个服务的状态。相当于 SpringCloud 中的 Eureka 的功能。

NameServer 中维护着 Producer 集群、Broker 集群、 Consumer 集群的服务状态。通过定时发送心跳数据包进行维护更新各个服务的状态。

当有新的Producer 加入集群时,通过上报自身的服务信息,及获取各个 Broker Master的信息(Broker 地址、Topic、Queue 等信息),这样就可以决定把对应的Topic消息存储到那个Broker、哪个Queue 上。Consumer 同理。

NameServer 可以部署多个,多个NameServer互相独立,不会交换消息。Producer、Broker、Consumer 启动的时候都需要指定多个 NameServer,各个服务的信息会同时注册到多个 NameServer 上,从而能到达高可用。

NameServer 模块结构

可以看出 NameServer 中的类比较少,8个类。分析起来也比较轻松。

NameServer 启动

org.apache.rocketmq.namesrv.NamesrvStartup 是 NameServer 的启动类。

通过 createNamesrvController 方法创建 NamesrvController 。

NameServer 启动时首先判断是否传入了命令行参数。

命令行参数有两个,-p 和 -c -c 可以指定 NameServer 的配置文件,如果不指定,则使用默认值。 -p 打印 NameServer 的配置参数信息。打印完参数后退出进程。

下面是打印 NameServer 默认的配置参数信息。

如果想修改这些默认的参数,则可以使用 -c 参数,指定配置文件,进行更改。

初始化 NamesrvController

1、调用NamesrvController.initialize() 初始化 NamesrvController,然后调用 NamesrvController.start() 方法来开启 NameServer 服务。 2、注册 ShutdownHookThread 服务。在 JVM 退出之前,调用 ShutdownHookThread 来进行关闭服务,释放资源。

注意:使用 kill -9 强制杀进程是不会执行 ShutdownHook 的。

NamesrvController.initialize()

public boolean initialize() {
    //从 /namesrv/kvConfig.json 中加载 NameServer 的配置
    this.kvConfigManager.load();
    //创建 Netty Server
    this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
    // 创建 Netty Server 执行的线程池
    this.remotingExecutor =
            Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));

    //注册 NameServer 服务接受请求的处理类
    this.registerProcessor();
    //定时清理超时的Broker
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            NamesrvController.this.routeInfoManager.scanNotActiveBroker();
        }
    }, 5, 10, TimeUnit.SECONDS);
    //定时打印 NameServer 的配置信息
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            NamesrvController.this.kvConfigManager.printAllPeriodically();
        }
    }, 1, 10, TimeUnit.MINUTES);

    ...
}
  • 1、KVConfigManager类默认是从 /namesrv/kvConfig.json 配置文件中加载NameServer的配置参数.将配置参数加载保存到HashMap<String/* Namespace */, HashMap<String/* Key */, String/* Value */>> configTable = new HashMap<String, HashMap<String, String>>();变量中。

kvConfig.json 文件的默认路径为: private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json";

  • 2、创建并初始化 NettyRemotingServer 。 NettyRomotingServer 是 NameServer 对外提供服务功能的。
  • 3、创建 Netty Server 执行使用的线程池。
  • 4、注册默认的处理类DefaultRequestProcessor,所有的请求均由该处理类的processRequest方法来处理。
  • 5、创建一个定时清理超时的 Broker 定时任务。 每隔10秒检查一遍所有Broker的状态的定时任务,判断每一个Broker 最近两分钟是否更新过。如果没有更新则把该 Broker 的 channel 关闭(关闭该Broker 的长连接),并清除相关数据。
  • 6、创建一个打印 NameServer 配置的定时任务。 每隔10分钟打印一次NameServer的配置参数。即KVConfigManager.configTable变量的内容。

NamesrvController.registerProcessor()

注册接收请求的处理类。
private void registerProcessor() {
        if (namesrvConfig.isClusterTest()) {
            this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()),
                this.remotingExecutor);
        } else {
            this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor);
        }
    }

默认注册的是 DefaultRequestProcessor 处理器。 如果设置了 NamesrvConfig.clusterTest = true,则会注册 ClusterTestRequestProcessor 处理器。

ClusterTestRequestProcessor继承DefaultRequestProcessor。

ClusterTestRequestProcessor.getRouteInfoByTopic 方法

ClusterTestRequestProcessor仅重写了 getRouteInfoByTopic()方法。 判断如果获取不到 topicRouteData数据,则会去其它的NameServer 上查找该数据并返回。

DefaultRequestProcessor

通过 processRequest 方法来处理客户端发过来的请求。

@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
    RemotingCommand request) throws RemotingCommandException {
    if (ctx != null) {
        log.debug("receive request, {} {} {}",
            request.getCode(),
            RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
            request);
    }

    switch (request.getCode()) {
        case RequestCode.PUT_KV_CONFIG:
            return this.putKVConfig(ctx, request);
        case RequestCode.GET_KV_CONFIG:
            return this.getKVConfig(ctx, request);
        case RequestCode.DELETE_KV_CONFIG:
            return this.deleteKVConfig(ctx, request);
        case RequestCode.QUERY_DATA_VERSION:
            return queryBrokerTopicConfig(ctx, request);
        case RequestCode.REGISTER_BROKER:
            Version brokerVersion = MQVersion.value2Version(request.getVersion());
            if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
                return this.registerBrokerWithFilterServer(ctx, request);
            } else {
                return this.registerBroker(ctx, request);
            }
        case RequestCode.UNREGISTER_BROKER:
            return this.unregisterBroker(ctx, request);
        case RequestCode.GET_ROUTEINTO_BY_TOPIC:
            return this.getRouteInfoByTopic(ctx, request);
        case RequestCode.GET_BROKER_CLUSTER_INFO:
            return this.getBrokerClusterInfo(ctx, request);
        case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
            return this.wipeWritePermOfBroker(ctx, request);
        case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:
            return getAllTopicListFromNameserver(ctx, request);
        case RequestCode.DELETE_TOPIC_IN_NAMESRV:
            return deleteTopicInNamesrv(ctx, request);
        case RequestCode.GET_KVLIST_BY_NAMESPACE:
            return this.getKVListByNamespace(ctx, request);
        case RequestCode.GET_TOPICS_BY_CLUSTER:
            return this.getTopicsByCluster(ctx, request);
        case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS:
            return this.getSystemTopicListFromNs(ctx, request);
        case RequestCode.GET_UNIT_TOPIC_LIST:
            return this.getUnitTopicList(ctx, request);
        case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST:
            return this.getHasUnitSubTopicList(ctx, request);
        case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:
            return this.getHasUnitSubUnUnitTopicList(ctx, request);
        case RequestCode.UPDATE_NAMESRV_CONFIG:
            return this.updateConfig(ctx, request);
        case RequestCode.GET_NAMESRV_CONFIG:
            return this.getConfig(ctx, request);
        default:
            break;
    }
    return null;
}

所有请求的操作说明如下:

requectcode

说明

PUT_KV_CONFIG

向Namesrv追加KV配置

GET_KV_CONFIG

从Namesrv获取KV配置

DELETE_KV_CONFIG

从Namesrv获取KV配置

QUERY_DATA_VERSION

获取版本信息

REGISTER_BROKER

注册一个Broker,数据都是持久化的,如果存在则覆盖配置

UNREGISTER_BROKER

卸载一个Broker,数据都是持久化的

GET_ROUTEINTO_BY_TOPIC

根据Topic获取Broker Name、topic配置信息

GET_BROKER_CLUSTER_INFO

获取注册到Name Server的所有Broker集群信息

WIPE_WRITE_PERM_OF_BROKER

去掉BrokerName的写权限

GET_ALL_TOPIC_LIST_FROM_NAMESERVER

从Name Server获取完整Topic列表

DELETE_TOPIC_IN_NAMESRV

从Namesrv删除Topic配置

GET_KVLIST_BY_NAMESPACE

通过NameSpace获取所有的KV List

GET_TOPICS_BY_CLUSTER

获取指定集群下的所有 topic

GET_SYSTEM_TOPIC_LIST_FROM_NS

获取所有系统内置 Topic 列表

GET_UNIT_TOPIC_LIST

单元化相关 topic

GET_HAS_UNIT_SUB_TOPIC_LIST

获取含有单元化订阅组的 Topic 列表

GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST

获取含有单元化订阅组的非单元化

UPDATE_NAMESRV_CONFIG

更新Name Server配置

根据 processRequest 方法分析源码,发现接收到的所有请求操作的数据都保存在 RouteInfoManager 类中,所有的操作都是对RouteInfoManager 类的操作。

RouteInfoManager

public class RouteInfoManager {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
    private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
    private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
    private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
    private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
    private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

1、topicQueueTable

private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;

Map 中 key存储的是 Topic 的名称, value 存储的是 QueueData 的集合。

QueueData 的集合 size 等于 Topic 对应的 Broker Master 的个数。

QueueData 的数据结构如下:

public class QueueData implements Comparable<QueueData> {
    private String brokerName;   //broker 名字
    private int readQueueNums;   //可读 queue 数
    private int writeQueueNums;  //可写 queue 数 
    private int perm;  //读写权限
    private int topicSynFlag;  //同步标识

2、brokerAddrTable

private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;

Map 中 key 存储的是 Broker Name, value 存储的是 BrokerData 数据(Broker 的相关信息)。

BrokerData 的数据结构如下:

public class BrokerData implements Comparable<BrokerData> {
    private String cluster;  // 集群名称
    private String brokerName;  // Broker Name
    // 存储的是该 Broker Name 对应的多个 Broker 地址信息。
    private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;

因为相同的名称的 BrokerName 可以多有个。一个 Master 和多个 Slave。所有使用 brokerAddrs 来存储相同 BrokerName 下所有的 Broker 信息(判断Master 和 Slave 的关系是通过 Master 和 Slave 名称是否相同,brokerId 为 0 的是Master, 大于0 的是 Slave)。

3、clusterAddrTable

private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;

Map 中 key存储的是 clusterName 的名称, value 存储的是 brokerName 的集合。

4、brokerLiveTable

private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;

Map 中 key 存储的是 brokerAddr 信息,value 存储的是 BrokerLiveInfo 信息,BrokerLiveInfo 中存储了 Broker 的实时状态。

class BrokerLiveInfo {
    // 最后更新时间
    private long lastUpdateTimestamp;
    private DataVersion dataVersion;
    private Channel channel;
    private String haServerAddr;

上面介绍的 NamesrvController.initialize() 中有一个schedule定时任务,每个10秒钟定时调用 scanNotActiveBroker() 方法进行扫描不活动的 Broker,并把不活动的 Broker 删除掉,就是判断的 这个 lastUpdateTimestamp 这个数据。

private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;

超过 2分钟没有更新这个值,就认为 Broker 不可用了。

5、filterServerTable

private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

Map 中 key 存储的是 brokerAddr 信息,value 存储的是 Filter Server 信息。 Filter Server 是消息的过滤服务器,一个 Broker 可以对应多个 Filter Server。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • HashMap实现中文分词器

    java404
  • RocketMQ 存储机制源码解析

    producer 发送消息后,broker端开始存储消息,会调用 store 模块的 DefaultMessageStore.putMessage 进行存储消息...

    java404
  • RocketMQ 生产者 Producer 启动过程

    从类关系中可以看出,MQProducer 有两种实现方式。一个是 DefaultMQProducer,另一个是 TransactionMQProducer。

    java404
  • ffmpeg api的应用——提取视频图片

            这些年来,“短视频”吸引了无数网民的注意。相对于丰富有趣的内容,我们码农可能更关心其底层技术实现。本系列文章将结合ffmpeg,讲解几则视频处理...

    方亮
  • 订阅发布模式到底是不是观察者模式?

    快手前天发布了《看见》一时间好评如潮,盖过了之前的《后浪》。现如今搞内容创作都要开始玩价值观导向了。不过互联网真是一个神奇的东西,我们足不出户就可以看到你想看的...

    码农小胖哥
  • Java入门 - 语言基础 - 18.正则表达式

    原文地址:http://www.work100.net/training/java-regular-expression.html

    光束云
  • 解决微信公众号文章的防盗链

    写一个服务,把微信图片下载到本地,然后放在静态文件目录中,修改微信图片中的域名为自己的IP或域名,备注:程序请求时,referer需要时空的

    JouyPub
  • Android的DataBinding原理介绍

    Activity在inflate layout时,通过DataBindingUtil来生成绑定,从代码看,是遍历contentView得到View数组对象,然后...

    xiangzhihong
  • ExtJs学习笔记(16)_Form布局

    这是最重要的一个布局,几乎所有的表单界面都可以采用form布局,详细的用法本文不作讨论(可以查阅官方API文档),这里只给出一个简单的示例 <script ty...

    菩提树下的杨过
  • 一线大厂在用的反爬虫方法,看我如何破了它!

    本篇内容摘自出版图书《Python3 反爬虫原理与绕过实战》第 6 章中的第 3 小节 SVG 反爬虫

    刘早起

扫码关注云+社区

领取腾讯云代金券