专栏首页晏霖【RocketMq实战第七篇】-NameServer

【RocketMq实战第七篇】-NameServer

前言

NameServer 维护这些配置信息 、 状态信 息,其他角色都通过 NameServer 来协同执行,这章我们就来分析NameServer以及RocketMQ都通讯机制。

正文

NameServer简介

NameServer是整个消息队列中 的状态服务器,集群的各个组件通过它来了 解全局的信息 。 同时,各个角色的机器都要定期 向 NameServer上报自己的状 态,超 时不上报的 话, NameServer 会认为 某个机器出故障不可用了,其他的组 件会把这个机器从可用列表里移除 。

NamServer可以部署多个,相互之间独立,其他角色同时向多个 NameServer 机器上报状态信息,从而达到热备份的目的。 NameServer本身是无状态的,也就 是说 NameServer 中的 Broker、 Topic 等状态信息不会持久存储,都是由各个角色 定时上报并存储到内存中的。

集群状态的存储结构

代码位置package org.apache.rocketmq.namesrv.routeinfo; RoutelnfoManager类中,有五 个变量 ,集群的状态就保存在这五个变量中 。

NameServer 的主要工作就是维 护这五个 变量中存储的信 息 。

private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
说明:topicQueueTable 这个结构的 Key 是 Topic 的名称,它存储了所有 Topic 的属性信息 。
 Value 是个 QueueData 队列 , 队里的长度 等于这 个 Topic 数据存储的 MasterBroker的个数, 
QueueData里存储着 Broker的名称、 读写queue的数量、 同步标识等。

private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
说明:以 BrokerName 为 索 引 ,相 同 名 称的 Broker 可能存在多台机器, 一个 Master 
和多个 Slave。 这个结构存储着一个 BrokerName 对应的属性信 息,包括所属的 Cluster 名称,
 一 个 Master Broker 和多个 Slave Broker 的地址信息 。


private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
说明:存储的是集群中 Cluster 的信息,结果很简单,就是一个 Cluster 名称对 应一个由 BrokerName组成的集合。


private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
说明:这个结构和 BrokerAddrTable有关系,但是内容完全不同,这个结构的 Key 是 BrokerAddr,也就是对应着一台 机器, 
BrokerAddrTable 中的 Key 是BrokerName, 多个机器的BrokerName可以相同。 BrokerLiveTable 存储的内容是这台 
Broker机器的实时状态,包括上次更新状态的时间 戳, NameServer会定期检查这个时间戳,超时没有更新就认为这个 Broker无效了,
将其从 Broker列表里清除。


private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
说明:Filter Server是过滤服务器,是 RocketMQ 的一种服务端过滤方式,一 个 Broker 可以有 一个 或 多个 Filter Server。 
这个结构的 Key 是 Broker 的地址, Value 是和这个 Broker关联的多个 Filter Server 的地址 。

状态维护逻辑

因为其他角色会主动向 NameServer上报状态,所以 NameServer 的主 要逻 辑在 DefaultRequest­ Processor类中,根据上报消息里的请求码做相 应 的处理, 更新存 储的对应 信息 。 此外,连接断开的 事 件也 会 触发状态 更新。

当NameServer和Broker的长连接断掉以后,NameServer会把这个 Broker的信息清理出去。

NameServer还有定时检查时间戳的逻辑, Broker向 NameServer发送的心 跳会更新时间戳, 当 NameServer检查到时间戳长时间没有更新后,便会触发 清理逻辑(10秒检查一次,时间戳超过 2分钟则认为 Broker已 失效。)。

底层通信机制

Remoting 模块

RocketMQ 的通信相 关代码在 Remoting 模块里,先来 看看主要类结 构

RemotingService 为最上层接口,定义了 三个方法:

  • void start();
  • void shutdown();
  • void registerRPCHook(RPCHook rpcHook);

RemotingClient和 RemotingServer继承 RemotingService接口,并增加 了 自己特有的方法

NettyRemotingClient和NettyRemotingServer分 别实现了 RemotingCIient和RemotingServer,而且都继承了NettyRemotingAbstract 类 。

public class ClientRemotingProcessor implements NettyRequestProcessor {
    private final Logger log = ClientLogger.getLog();
    private final MQClientInstance mqClientFactory;

    public ClientRemotingProcessor(final MQClientInstance mqClientFactory) {
        this.mqClientFactory = mqClientFactory;
    }

    @Override
    public RemotingCommand processRequest(ChannelHandlerContext ctx,
        RemotingCommand request) throws RemotingCommandException {
        switch (request.getCode()) {
            case RequestCode.CHECK_TRANSACTION_STATE:
                return this.checkTransactionState(ctx, request);
            case RequestCode.NOTIFY_CONSUMER_IDS_CHANGED:
                return this.notifyConsumerIdsChanged(ctx, request);
            case RequestCode.RESET_CONSUMER_CLIENT_OFFSET:
                return this.resetOffset(ctx, request);
            case RequestCode.GET_CONSUMER_STATUS_FROM_CLIENT:
                return this.getConsumeStatus(ctx, request);

            case RequestCode.GET_CONSUMER_RUNNING_INFO:
                return this.getConsumerRunningInfo(ctx, request);

            case RequestCode.CONSUME_MESSAGE_DIRECTLY:
                return this.consumeMessageDirectly(ctx, request);
            default:
                break;
        }
        return null;
    }

RocketMQ 中复杂的通信过程,被 RemotingCommand统一起来,大部分的逻辑都是通 过发送、接受并处理 Command 来完成的 。

协议设计和编解码

RocketMQ 自己定义了一个通信协议,使得模块间传输的二进制消息和有 意义的内容之间互相转换

  • 第一部分是大端 4个字节整数,值等于第二、三、 四部分长度的总和;
  • 第二部分是大端 4 个字节整数,值等于第 三部分的长度;
  • 第三部分是通过 Json序列化的数据;
  • 第四部分是通过应用自定义二进制序列化的数据 。

Netty 库

RocketMQ是基于Netty库来完成RemotingServer和RemotingClient具体的 通信实现的, Netty是个事件驱动的网络编程框架,它屏蔽了 Java Socket、 NIO 等复杂细节,用户只需用好 Netty,就可以实现一个“网络编程专家+并发编程 专家”水平的 Server、 Client 网络程序 。 应用 Netty 有一定的门槛,需要了解它 的 EventLoopGroup、 Channel、 Handler 模型以及各种具体的配置。 RocketMQ 利用 Netty 实现的通信类是 NettyRemotingServer 和 NettyRemotingClient,用户 也可以参考这两个类的实现来学习使用 Netty。

本文分享自微信公众号 - 晏霖(yanlin199507)

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2019-02-10

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • kakfa学习总结

    KAFKA是Apache基金会的一个开源项目,是一个分布式的发布-订阅的消息系统;

    用户5522200
  • 处理java访问mysql连接数太多的错误

    在生产环境处理故障的过程出现了java服务连接mysql,由于连接数太多被拒绝连接的故障,那么下面来看看怎么优化一下吧。

    Devops海洋的渔夫
  • Java 学习笔记(7)——接口与多态

    上一篇说了Java面向对象中的继承关系,在继承中说到:调用对象中的成员变量时,根据引用类型来决定调用谁,而调用成员方法时由于多态的存在,具体调用谁的方法需要根据...

    Masimaro
  • Java并发-22.阻塞队列

    无界阻塞队列永远不满,put和offer方法永远不阻塞,offer永远返回true

    悠扬前奏
  • 【Mysql中间件】Mycat安装部署+读写分离

    说明: mysql-master:172.16.200.43 Mycat:172.16.200.43 mysql-slave1:172.16.200.45 my...

    用户5522200
  • Java线程安全面试题,你真的了解吗?

    多个线程不管以何种方式访问某个类,并且在主调代码中不需要进行同步,都能表现正确的行为。

    李红
  • 喝杯奶茶来了解装饰模式?

    使用到装饰模式的情况,关键之处就是在于装饰两字,翻译过来就是需要为一个类或者对象添加额外的功能,而且这个功能可能是出于特定的情况之下才会需要的。

    程序员小强
  • 高效率使用 Github

    此处默认你有 Github 账号、安装了 Git 并且熟悉基本的 Git 操作,只是需要寻求部署 Github Pages 方面的知识。GitHub Pages...

    周三不加班
  • Java 中处理异常的 9 个实践

    在本文中,介绍了 9 个处理异常的最佳方法与实践,以举例与代码展示结合的方式,让开发者更好的理解这 9 种方式,并指导读者在不同情况下选择不同的异常处理方式。

    周三不加班
  • Python 迭代器 - Iterable对象

    迭代是访问集合元素的一种方式。迭代器是一个可以记住遍历的位置的对象。迭代器对象从集合的第一个元素开始访问,直到所有的元素被访问完结束。迭代器只能往前不会后退。

    Devops海洋的渔夫

扫码关注云+社区

领取腾讯云代金券