前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【RocketMq实战第七篇】-NameServer

【RocketMq实战第七篇】-NameServer

作者头像
胖虎
发布2019-06-26 17:07:58
4350
发布2019-06-26 17:07:58
举报
文章被收录于专栏:晏霖晏霖

前言

NameServer 维护这些配置信息 、 状态信 息,其他角色都通过 NameServer 来协同执行,这章我们就来分析NameServer以及RocketMQ都通讯机制。

正文

NameServer简介

NameServer是整个消息队列中 的状态服务器,集群的各个组件通过它来了 解全局的信息 。 同时,各个角色的机器都要定期 向 NameServer上报自己的状 态,超 时不上报的 话, NameServer 会认为 某个机器出故障不可用了,其他的组 件会把这个机器从可用列表里移除 。

NamServer可以部署多个,相互之间独立,其他角色同时向多个 NameServer 机器上报状态信息,从而达到热备份的目的。 NameServer本身是无状态的,也就 是说 NameServer 中的 Broker、 Topic 等状态信息不会持久存储,都是由各个角色 定时上报并存储到内存中的。

集群状态的存储结构

代码位置package org.apache.rocketmq.namesrv.routeinfo; RoutelnfoManager类中,有五 个变量 ,集群的状态就保存在这五个变量中 。

NameServer 的主要工作就是维 护这五个 变量中存储的信 息 。

代码语言:javascript
复制
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
说明:topicQueueTable 这个结构的 Key 是 Topic 的名称,它存储了所有 Topic 的属性信息 。
 Value 是个 QueueData 队列 , 队里的长度 等于这 个 Topic 数据存储的 MasterBroker的个数, 
QueueData里存储着 Broker的名称、 读写queue的数量、 同步标识等。

private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
说明:以 BrokerName 为 索 引 ,相 同 名 称的 Broker 可能存在多台机器, 一个 Master 
和多个 Slave。 这个结构存储着一个 BrokerName 对应的属性信 息,包括所属的 Cluster 名称,
 一 个 Master Broker 和多个 Slave Broker 的地址信息 。


private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
说明:存储的是集群中 Cluster 的信息,结果很简单,就是一个 Cluster 名称对 应一个由 BrokerName组成的集合。


private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
说明:这个结构和 BrokerAddrTable有关系,但是内容完全不同,这个结构的 Key 是 BrokerAddr,也就是对应着一台 机器, 
BrokerAddrTable 中的 Key 是BrokerName, 多个机器的BrokerName可以相同。 BrokerLiveTable 存储的内容是这台 
Broker机器的实时状态,包括上次更新状态的时间 戳, NameServer会定期检查这个时间戳,超时没有更新就认为这个 Broker无效了,
将其从 Broker列表里清除。


private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
说明:Filter Server是过滤服务器,是 RocketMQ 的一种服务端过滤方式,一 个 Broker 可以有 一个 或 多个 Filter Server。 
这个结构的 Key 是 Broker 的地址, Value 是和这个 Broker关联的多个 Filter Server 的地址 。

状态维护逻辑

因为其他角色会主动向 NameServer上报状态,所以 NameServer 的主 要逻 辑在 DefaultRequest­ Processor类中,根据上报消息里的请求码做相 应 的处理, 更新存 储的对应 信息 。 此外,连接断开的 事 件也 会 触发状态 更新。

当NameServer和Broker的长连接断掉以后,NameServer会把这个 Broker的信息清理出去。

NameServer还有定时检查时间戳的逻辑, Broker向 NameServer发送的心 跳会更新时间戳, 当 NameServer检查到时间戳长时间没有更新后,便会触发 清理逻辑(10秒检查一次,时间戳超过 2分钟则认为 Broker已 失效。)。

底层通信机制

Remoting 模块

RocketMQ 的通信相 关代码在 Remoting 模块里,先来 看看主要类结 构

RemotingService 为最上层接口,定义了 三个方法:

  • void start();
  • void shutdown();
  • void registerRPCHook(RPCHook rpcHook);

RemotingClient和 RemotingServer继承 RemotingService接口,并增加 了 自己特有的方法

NettyRemotingClient和NettyRemotingServer分 别实现了 RemotingCIient和RemotingServer,而且都继承了NettyRemotingAbstract 类 。

代码语言:javascript
复制
public class ClientRemotingProcessor implements NettyRequestProcessor {
    private final Logger log = ClientLogger.getLog();
    private final MQClientInstance mqClientFactory;

    public ClientRemotingProcessor(final MQClientInstance mqClientFactory) {
        this.mqClientFactory = mqClientFactory;
    }

    @Override
    public RemotingCommand processRequest(ChannelHandlerContext ctx,
        RemotingCommand request) throws RemotingCommandException {
        switch (request.getCode()) {
            case RequestCode.CHECK_TRANSACTION_STATE:
                return this.checkTransactionState(ctx, request);
            case RequestCode.NOTIFY_CONSUMER_IDS_CHANGED:
                return this.notifyConsumerIdsChanged(ctx, request);
            case RequestCode.RESET_CONSUMER_CLIENT_OFFSET:
                return this.resetOffset(ctx, request);
            case RequestCode.GET_CONSUMER_STATUS_FROM_CLIENT:
                return this.getConsumeStatus(ctx, request);

            case RequestCode.GET_CONSUMER_RUNNING_INFO:
                return this.getConsumerRunningInfo(ctx, request);

            case RequestCode.CONSUME_MESSAGE_DIRECTLY:
                return this.consumeMessageDirectly(ctx, request);
            default:
                break;
        }
        return null;
    }

RocketMQ 中复杂的通信过程,被 RemotingCommand统一起来,大部分的逻辑都是通 过发送、接受并处理 Command 来完成的 。

协议设计和编解码

RocketMQ 自己定义了一个通信协议,使得模块间传输的二进制消息和有 意义的内容之间互相转换

  • 第一部分是大端 4个字节整数,值等于第二、三、 四部分长度的总和;
  • 第二部分是大端 4 个字节整数,值等于第 三部分的长度;
  • 第三部分是通过 Json序列化的数据;
  • 第四部分是通过应用自定义二进制序列化的数据 。

Netty 库

RocketMQ是基于Netty库来完成RemotingServer和RemotingClient具体的 通信实现的, Netty是个事件驱动的网络编程框架,它屏蔽了 Java Socket、 NIO 等复杂细节,用户只需用好 Netty,就可以实现一个“网络编程专家+并发编程 专家”水平的 Server、 Client 网络程序 。 应用 Netty 有一定的门槛,需要了解它 的 EventLoopGroup、 Channel、 Handler 模型以及各种具体的配置。 RocketMQ 利用 Netty 实现的通信类是 NettyRemotingServer 和 NettyRemotingClient,用户 也可以参考这两个类的实现来学习使用 Netty。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2019-02-10,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 晏霖 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 正文
    • 底层通信机制
    相关产品与服务
    文件存储
    文件存储(Cloud File Storage,CFS)为您提供安全可靠、可扩展的共享文件存储服务。文件存储可与腾讯云服务器、容器服务、批量计算等服务搭配使用,为多个计算节点提供容量和性能可弹性扩展的高性能共享存储。腾讯云文件存储的管理界面简单、易使用,可实现对现有应用的无缝集成;按实际用量付费,为您节约成本,简化 IT 运维工作。
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档