前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RocketMQ源码系列(一) NameServer 核心源码解析

RocketMQ源码系列(一) NameServer 核心源码解析

作者头像
IT大咖说
发布2021-07-19 17:22:11
5260
发布2021-07-19 17:22:11
举报
文章被收录于专栏:IT大咖说IT大咖说

◆ NameServer 介绍

NameServer 是rocketmq核心组件之一,与zookeeper一样天生具有分布式的特性,在rocketmq中担当着路由注册、发现、动态地维护broker相关信息的角色, NameServer 不提供Master-slave同步机制,但是能够保证数据的最终一致性。

◆ NameServer 功能列表

1、动态路由发现和注册功能。broker 启动时,会将brokerAddr 注册到NameServer里, 路由发现是指客户端会定时的向NameServer根据topic拉取路由的最新信息。

2、动态剔除功能。每隔10 s NameServer 会自动扫描所有的broker, 如果有broker失效,那么会从地址列表里将其剔除掉。

◆ NameServer 架构分析

下面是 rocketmq 的部署图

核心原理解析

Broker消息服务器启动时会自动向NameServer 注册信息,消息生产者在发送消息时,会在NameServer的地址列表里通过负载均衡选择一个Broker进行消息发送。NameServer 与每台broker保持长连接,broker会每隔30s向NameServer发送一个心跳包,NameServer每间隔10s查看broker是否存活,如果broker挂掉了,判断挂掉的逻辑是brokerLiveTable检测上次的心跳包与当前系统时间的时间差,如果时间戳大于120s, 那么就将broker从服务地址列表里剔除。

这样设计的目的是降低NameServer 的复杂性, 在消息发送端提供容错机制来保证消息发送的高可用性。

NameServer 可以通过集群来保证高可用性,但在同一时刻有可能获取到数据是不一致的,因为不提供同步机制,但能够保证多个节点的最终一致性。NameServer 这样设计是为了简单高效。

◆ NameServer 工程目录解析

工程目录结构以及解析如下:

namesrv

├─ NamesrvController.java // 执行初始化逻辑,加载配置、注册Processor等

├─ NamesrvStartup.java // NameServer的启动类, 启动netty server

├─ kvconfig

│ ├─ KVConfigManager.java // namespace和config配置管理

│ └─ KVConfigSerializeWrapper.java // 将获取到的配置json序列化

├─ processor

│ ├─ ClusterTestRequestProcessor.java //处理请求类型。

│ └─ DefaultRequestProcessor.java // 默认地请求处理器, 处理数据包

└─ routeinfo

├─ BrokerHousekeepingService.java // netty 的channel共用方法抽象

└─ RouteInfoManager.java // 路由管理器,维护topic, broker,

//clusterName, brokerAddr等信息

分析发现netty 是rocketmq 网络通信的核心,掌握netty 的常见用法是非常有必要的。

◆ NameServer 启动流程分析

1) 加载配置

加载 namesrvConfig 和 nettyServerConfig, 如果有手动配置也可以生效, 使用option类封装参数,在程序运行前添加配置Program arguments, 添加的格式: 例如 -c , -p 等。

代码语言:javascript
复制
    public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
   ....
    final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
 
        // remember all configs to prevent discard
        controller.getConfiguration().registerConfig(properties);
 
        return controller;
}

‍‍

NameSrv的配置存放在 user.home\namesrv\ 目录下:

2) initialize()启动前的初始化工作

NamesrvController 在执行start()方法前需要做一些准备工作,比如加载配置、创建Netty Server实例、注册请求处理器、扫描所有的失联的broker等

具体的解释如下注释:

代码语言:javascript
复制
 public boolean initialize() {
       // 加载k,v 相关配置,含自定义配置。
        this.kvConfigManager.load();
        // 启动netty server, 管理channel
        this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
        //  初始化netty 线程池
        this.remotingExecutor =
            Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
        //  注册netty 请求Handler, 可以通过NettyRequestProcessor接口找到其实现类
        this.registerProcessor();
        // 与broker建立长连接,扫描所有的broker
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
 
            @Override
            public void run() {
                NamesrvController.this.routeInfoManager.scanNotActiveBroker();
            }
        }, 5, 10, TimeUnit.SECONDS);
       // 打印所有的config
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
 
            @Override
            public void run() {
                NamesrvController.this.kvConfigManager.printAllPeriodically();
            }
        }, 1, 10, TimeUnit.MINUTES);
         // 监听文件里的配置是否修改
        if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
            // Register a listener to reload SslContext
            try {
                fileWatchService = new FileWatchService(
                    new String[] {
                        TlsSystemConfig.tlsServerCertPath,
                        TlsSystemConfig.tlsServerKeyPath,
                        TlsSystemConfig.tlsServerTrustCertPath
                    },
                    new FileWatchService.Listener() {
                        boolean certChanged, keyChanged = false;
                        @Override
                        public void onChanged(String path) {
                            if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
                                log.info("The trust certificate changed, reload the ssl context");
                                reloadServerSslContext();
                            }
                            if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
                                certChanged = true;
                            }
                            if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
                                keyChanged = true;
                            }
                            if (certChanged && keyChanged) {
                                log.info("The certificate and private key changed, reload the ssl context");
                                certChanged = keyChanged = false;
                                reloadServerSslContext();
                            }
                        }
                        private void reloadServerSslContext() {
                            ((NettyRemotingServer) remotingServer).loadSslContext();
                        }
                    });
            } catch (Exception e) {
                log.warn("FileWatchService created error, can't load the certificate dynamically");
            }
        }
 
        return true;
    }

如果initialize()方法返回false, 那么需要检查一些相关配置是否正确, 返回true后,就可以执行最后一步controller.start()方法, 该方法表示NameServer正式启动。

3) 启动server

接下来看下源代码分析start()方法做了哪些事

代码语言:javascript
复制
  public void start() throws Exception {
     // 1. 启动netty server
        this.remotingServer.start();
      // 2. 启动文件扫描线程,监听核心配置是否修改。
        if (this.fileWatchService != null) {
            this.fileWatchService.start();
        }
    }

可以通过debug发现,首先会进入到NettyRemotingServer类里的start()方法, 该方法实现了nettyServer, 初始化netty的线程组和实例化 ServerBootStrap。

然后开启一个线程执行FileWatchService 的run()方法:

通过此线程扫描配置文件是否被修改。

NameServer启动成功后,会在控制台打印 boot success的字样。

◆ NameServer核心源码解析

1. 路由注册

1) broker向NameServer 发送心跳包

找到brokerController的start()方法里,broker 通过 BrokerController.this.registerBrokerAll(true,false) 方法来向NameServer 发送心跳包,其中使用定时任务 sheduledExecutorService 线程池定时发送,每隔30s 发送一次, brokerConfig.getRegisterNameServerPeriod() 的默认值为30s。

代码语言:javascript
复制
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
 
            @Override
            public void run() {
                try {
                    BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
                } catch (Throwable e) {
                    log.error("registerBrokerAll Exception", e);
                }
            }
        }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);

然后进入到doRegisterBrokerAll()方法,找到BrokerOuterApi里的registerBrokerAll()方法, 用RegiterBrokerRequestHeader类封装broker相关的信息, RegiterBrokerRequestHeader 主要属性如下:

  • brokerName: broker名称。
  • brokerAddr: broker的地址。
  • cluterName: broker所在集群的名称。
  • haServerAddr: 集群master的地址。
  • brokerId: brokerId为0的时候表示该broker为master, 如果大于0,表示该broker为slave。

brokerId=0为master节点的配置在MixALL配置中:

然后会调用到registerBrokerAll() 方法, 最终会将该broker信息注册到所有的NameServer上。

代码语言:javascript
复制
public List<RegisterBrokerResult> registerBrokerAll(
        final String clusterName,
        final String brokerAddr,
        final String brokerName,
        final long brokerId,
        final String haServerAddr,
        final TopicConfigSerializeWrapper topicConfigWrapper,
        final List<String> filterServerList,
        final boolean oneway,
        final int timeoutMills,
        final boolean compressed) {
 
        final List<RegisterBrokerResult> registerBrokerResultList = new CopyOnWriteArrayList<>();
        List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
        if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
         // 封装broker信息
            final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
            requestHeader.setBrokerAddr(brokerAddr);
            requestHeader.setBrokerId(brokerId);
            requestHeader.setBrokerName(brokerName);
            requestHeader.setClusterName(clusterName);
            requestHeader.setHaServerAddr(haServerAddr);
            requestHeader.setCompressed(compressed);
 
            RegisterBrokerBody requestBody = new RegisterBrokerBody();
            requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
            requestBody.setFilterServerList(filterServerList);
            final byte[] body = requestBody.encode(compressed);
            final int bodyCrc32 = UtilAll.crc32(body);
            requestHeader.setBodyCrc32(bodyCrc32);
    // 等待所有的NameServer都含有broker信息后,才表示执行完毕。
            final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
       // 把该broker的信息注册到所有的NameServer上。
            for (final String namesrvAddr : nameServerAddressList) {
                brokerOuterExecutor.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);
                            if (result != null) {
                                registerBrokerResultList.add(result);
                            }
 
                            log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);
                        } catch (Exception e) {
                            log.warn("registerBroker Exception, {}", namesrvAddr, e);
                        } finally {
                            countDownLatch.countDown();
                        }
                    }
                });
            }
 
            try { 
           // 默认超时时间为6s, 在BrokerConfig里配有registerBrokerTimeoutMills=6000
                countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
            }
        }
 
        return registerBrokerResultList;
    }

而broker相关信息是通过netty 发送给NameServer, broker信息的请求注册方式有oneway 和同步和异步,默认发送注册请求的方式是同步的。

可以在BrokerOuterAPI类里的registerBroker(final String namesrvAddr, final boolean oneway, final int timeoutMills, final RegisterBrokerRequestHeader requestHeader, final byte[] body) 里找到通过同步的方式发送注册请求,同步的注册方式如下:

2) NameServer 处理心跳包

NameServer接收到broker发送过来的请求后,首先会在DefaultRequestProcessor 网络处理器解析请求类型,请求类型如果为RequestCode.REGISTER_BROKER, 则最终的请求会到RouteInfoManager里的registerBroker()方法。

代码语言:javascript
复制
 public RemotingCommand registerBroker(ChannelHandlerContext ctx,
                                          RemotingCommand request) throws RemotingCommandException {
        final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class);
        final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.readCustomHeader();
        final RegisterBrokerRequestHeader requestHeader =
                (RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);
 
        if (!checksum(ctx, request, requestHeader)) {
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark("crc32 not match");
            return response;
        }
        // 解析数据包
        TopicConfigSerializeWrapper topicConfigWrapper;
        if (request.getBody() != null) {
            topicConfigWrapper = TopicConfigSerializeWrapper.decode(request.getBody(), TopicConfigSerializeWrapper.class);
        } else {
            topicConfigWrapper = new TopicConfigSerializeWrapper();
            topicConfigWrapper.getDataVersion().setCounter(new AtomicLong(0));
            topicConfigWrapper.getDataVersion().setTimestamp(0);
        }
        // 用RouteInfoManager 注册broker
        RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(
                requestHeader.getClusterName(),
                requestHeader.getBrokerAddr(),
                requestHeader.getBrokerName(),
                requestHeader.getBrokerId(),
                requestHeader.getHaServerAddr(),
                topicConfigWrapper,
                null,
                ctx.channel()
        );
        // 响应broker
        responseHeader.setHaServerAddr(result.getHaServerAddr());
        responseHeader.setMasterAddr(result.getMasterAddr());
 
        byte[] jsonValue = this.namesrvController.getKvConfigManager().getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);
        response.setBody(jsonValue);
        response.setCode(ResponseCode.SUCCESS);
        response.setRemark(null);
        return response;
    }

RouteInfoManager 里的registerBroker方法将broker的信息最终添加到 clusterAddrTable、brokerAddrTable、brokerLiveTable、filterServerTable里。

2. 路由删除

RouteInfoManager 的scanNotActiveBroker ()方法, 采用了单线程定时线程池每隔10s扫描所有broker的策略, 该方法在NamesrvController里的initialize()方法里, newSingleThreadScheduledExecutor线程池里只有一个线程实例,利用此线程池能极大地减少系统资源地开销,因为扫描broker本身不需要过多的资源,开启一个线程足以。

代码语言:javascript
复制
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
 
            @Override
            public void run() {
                NamesrvController.this.routeInfoManager.scanNotActiveBroker();
            }
        }, 5, 10, TimeUnit.SECONDS);

NameServer是如何判定broker失效的呢?

继续跟着源码进入到scanNotActiveBroker()方法, 判定失效的逻辑是: 如果当前时间戳- 上一次更新的时间戳> 120s。那么判断该broker是失效的。BROKER_CHANNEL_EXPIRED_TIME默认值为120s。因为broker每隔30s会给NameServer发送一次心跳信息,因此此方式可以判定broker是否失效。

代码语言:javascript
复制
public void scanNotActiveBroker() {
        Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<String, BrokerLiveInfo> next = it.next();
            long last = next.getValue().getLastUpdateTimestamp();
            if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
                RemotingUtil.closeChannel(next.getValue().getChannel());
                it.remove();
                log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
                this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
            }
        }
    }

接着会将broker相关的信息从brokerLiveTable中移除掉,同时销毁掉netty对应的channel。brokerLiveTable是一个hashmap,归RouteInfoManager类持有。

3. 路由发现

RocketMQ的路由发现是非实时的,当Topic路由发生变化时,NameServer不主动推送给客户端,而是由客户端定时拉取主题最新的路由。根据主题拉取最新路由的编码为: GET_ROUTEINFO_BY_TOPIC。

我们可以找到DefaultRequestProcessor处理器里的processRequest()方法,该方法用来处理Netty请求, 该方法有个判断,当request里的code为GET_ROUTEINFO_BY_TOPIC时,执行this.getRouteInfoByTopic(ctx, request)方法。

代码语言:javascript
复制
 @Override
    public RemotingCommand processRequest(ChannelHandlerContext ctx,
                                          RemotingCommand request) throws RemotingCommandException {
 
        if (ctx != null) {
            log.debug("receive request, {} {} {}",
                    request.getCode(),
                    RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                    request);
        }
       // 根据请求代码code来判断业务逻辑
 
        switch (request.getCode()) {
            case RequestCode.PUT_KV_CONFIG:
                return this.putKVConfig(ctx, request);
            case RequestCode.GET_KV_CONFIG:
                return this.getKVConfig(ctx, request);
            case RequestCode.DELETE_KV_CONFIG:
                return this.deleteKVConfig(ctx, request);
            case RequestCode.QUERY_DATA_VERSION:
                return queryBrokerTopicConfig(ctx, request);
            // 注册broker
            case RequestCode.REGISTER_BROKER:
                Version brokerVersion = MQVersion.value2Version(request.getVersion());
                if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
                    return this.registerBrokerWithFilterServer(ctx, request);
                } else {
                    return this.registerBroker(ctx, request);
                }
                // 移除broker
            case RequestCode.UNREGISTER_BROKER:
                return this.unregisterBroker(ctx, request);
            // 根据topic获取路由
            case RequestCode.GET_ROUTEINFO_BY_TOPIC:
                return this.getRouteInfoByTopic(ctx, request);
            case RequestCode.GET_BROKER_CLUSTER_INFO:
                return this.getBrokerClusterInfo(ctx, request);
            case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
                return this.wipeWritePermOfBroker(ctx, request);
            case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:
                return getAllTopicListFromNameserver(ctx, request);
            case RequestCode.DELETE_TOPIC_IN_NAMESRV:
                return deleteTopicInNamesrv(ctx, request);
            case RequestCode.GET_KVLIST_BY_NAMESPACE:
                return this.getKVListByNamespace(ctx, request);
            case RequestCode.GET_TOPICS_BY_CLUSTER:
                return this.getTopicsByCluster(ctx, request);
            case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS:
                return this.getSystemTopicListFromNs(ctx, request);
            case RequestCode.GET_UNIT_TOPIC_LIST:
                return this.getUnitTopicList(ctx, request);
            case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST:
                return this.getHasUnitSubTopicList(ctx, request);
            case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:
                return this.getHasUnitSubUnUnitTopicList(ctx, request);
            case RequestCode.UPDATE_NAMESRV_CONFIG:
                return this.updateConfig(ctx, request);
            case RequestCode.GET_NAMESRV_CONFIG:
                return this.getConfig(ctx, request);
            default:
                break;
        }
        return null;
    }

然后DefaultRequestProceesor类里的getRouteInfoByTopic(ctx,request)方法里主要做了两件事:

  • 根据topic从RouteInfoManager的topicQueueTable里获取到所有的QueueData和BrokerData, 然后将他们设置到topicRouteData里返回出去。
  • 判断指定的topic是否是顺序消息的topic,如果是那么给返回配置顺序消息的路由, 即给setOrderTopicConf赋值。
代码语言:javascript
复制
public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx,
                                               RemotingCommand request) throws RemotingCommandException {
        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
        final GetRouteInfoRequestHeader requestHeader =
                (GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);
        // 1. 获取到topicRouteData,包含topic所有的QueueData和BrokerData
        TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());
 
        if (topicRouteData != null) {
            if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {
                // 2. 判断是否是顺序消息的topic, 如果是顺序消息,那么给该请求返回顺序消息的路由
                String orderTopicConf =
                        this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,
                                requestHeader.getTopic());
                topicRouteData.setOrderTopicConf(orderTopicConf);
            }
 
            byte[] content = topicRouteData.encode();
            response.setBody(content);
            response.setCode(ResponseCode.SUCCESS);
            response.setRemark(null);
            return response;
        }
 
        response.setCode(ResponseCode.TOPIC_NOT_EXIST);
        response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic()
                + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
        return response;
    }

◆ 小结

  1. NameServer相当于rocketmq的注册中心,能够维护并实时监听broker的地址信息和队列信息等。
  2. NameServer和broker之间是基于netty通信的。
  3. DefaultRequestProcessor的 getRouteInfoByTopic(ChannelHandlerContext ctx,RemotingCommand request)方法是根据topc获取到路由信息(包含topic对应的所有queue和所有的broker)、registBroker() 方法将broker信息注册到NameServer上、unregisterBroker()方法移除NameServer上的broker信息。
  4. RouteInfoManager 类管理了所有的broker、cluster集群、topicQueue主题队列以及broker存活的信息。

来源:

https://blog.csdn.net/qq_33036061/article/details/117930054

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-06-29,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 IT大咖说 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
负载均衡
负载均衡(Cloud Load Balancer,CLB)提供安全快捷的流量分发服务,访问流量经由 CLB 可以自动分配到云中的多台后端服务器上,扩展系统的服务能力并消除单点故障。负载均衡支持亿级连接和千万级并发,可轻松应对大流量访问,满足业务需求。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档