RocketMQ学习-Broker-1

前面学习了name server的主要代码,这篇文章开始学习broker的源码。broker是RocketMQ的核心模块,这篇文章我们先从整体看下代码结构、RocketMQ的领域模型,然后再看下一个broker节点的启动过程以及Controller暴露的接口。

代码结构

broker的代码结构

领域模型

MQ领域语言描述RocketMQ做的事情,producer构建Message,发送给broker的指定topic,broker负责将消息投递到指定topic下的队列,并记录消息队列的offset,consumer利用拉模式拉取消息进行消费。

ddd-for-rmq.png

启动过程

BrokerController

BrokerController是broker模块的核心控制类,负责broker的初始化、启动、停止、资源管理,以及接受外部的请求并作出相应的动作。看下BrokerController中主要的属性,借此可以看下broker的基本功能

主要属性

  1. BrokerConfig,用于维护broker的配置信息
  2. NettyServerConfig,对于producer和consumer来说,broker是服务端
  3. NettyClientConfig,对于name server来说,broker是客户端
  4. MessageStoreConfig,消息存储的配置,RocketMQ一个非常厉害的特性就是上亿消息的堆积能力,堆积的消息是存储在broker的磁盘上的,那么这个类就是维护broker的消息存储的配置信息
  5. ConsumerManager,消费者管理
  6. ConsumerFilterManager,消费者消息过滤管理
  7. ProducerManager,生产者管理
  8. MessageArrivingListener,消息到达监听器
  9. BrokerOuterAPI,broker和外部系统沟通的适配层,有几个功能:(1)和name server交互,进行broker节点的注册和取消;(2)和其他broker节点交互;

上面这些不是全部,除此之外,还有几个线程池和线程池对应的队列,以及用于做HA的管理模块。显然,broker功能非常多,我们在接下来的几篇中慢慢梳理其中的代码。

public class BrokerController {
    private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
    private static final Logger LOG_PROTECTION = LoggerFactory.getLogger(LoggerName.PROTECTION_LOGGER_NAME);
    private static final Logger LOG_WATER_MARK = LoggerFactory.getLogger(LoggerName.WATER_MARK_LOGGER_NAME);
    /**
     * Broker的配置
     */
    private final BrokerConfig brokerConfig;
    /**
     * netty服务端配置,对于生产者和消费者来说,broker是服务端
     */
    private final NettyServerConfig nettyServerConfig;
    /**
     * netty客户端,对于name server来说,broker是客户端
     */
    private final NettyClientConfig nettyClientConfig;
    /**
     * 消息存储配置
     */
    private final MessageStoreConfig messageStoreConfig;
    /**
     * 消费者的offset管理
     */
    private final ConsumerOffsetManager consumerOffsetManager;
    /**
     * 消费者管理
     */
    private final ConsumerManager consumerManager;
    /**
     * 消费过滤管理
     */
    private final ConsumerFilterManager consumerFilterManager;
    /**
     * 生产者管理
     */
    private final ProducerManager producerManager;
    /**
     * 监听客户端和broker建立的通信通道,当通道关闭时候清理信息
     */
    private final ClientHousekeepingService clientHousekeepingService;
    /**
     * 拉取消息处理器
     */
    private final PullMessageProcessor pullMessageProcessor;
    /**
     * ???暂时不理解
     */
    private final PullRequestHoldService pullRequestHoldService;
    /**
     * 消息到达监听器
     */
    private final MessageArrivingListener messageArrivingListener;
    /**
     * 用于broker对client发起指令
     */
    private final Broker2Client broker2Client;
    private final SubscriptionGroupManager subscriptionGroupManager;
    private final ConsumerIdsChangeListener consumerIdsChangeListener;
    /**
     * 负载均衡管理器
     */
    private final RebalanceLockManager rebalanceLockManager = new RebalanceLockManager();
    /**
     * broker对外暴露的API
     */
    private final BrokerOuterAPI brokerOuterAPI;

    /**
     * 调度线程池
     */
    private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
        "BrokerControllerScheduledThread"));
    /**
     * 子节点同步器
     */
    private final SlaveSynchronize slaveSynchronize;
    /**
     * 发送消息的线程池任务队列
     */
    private final BlockingQueue<Runnable> sendThreadPoolQueue;
    /**
     * 拉取消息的线程池任务队列
     */
    private final BlockingQueue<Runnable> pullThreadPoolQueue;
    private final BlockingQueue<Runnable> queryThreadPoolQueue;
    private final BlockingQueue<Runnable> clientManagerThreadPoolQueue;
    private final BlockingQueue<Runnable> consumerManagerThreadPoolQueue;
    private final FilterServerManager filterServerManager;
    private final BrokerStatsManager brokerStatsManager;
    private final List<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>();
    private final List<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>();
    private MessageStore messageStore;
    private RemotingServer remotingServer;
    private RemotingServer fastRemotingServer;
    private TopicConfigManager topicConfigManager;
    private ExecutorService sendMessageExecutor;
    private ExecutorService pullMessageExecutor;
    private ExecutorService queryMessageExecutor;
    private ExecutorService adminBrokerExecutor;
    private ExecutorService clientManageExecutor;
    private ExecutorService consumerManageExecutor;
    private boolean updateMasterHAServerAddrPeriodically = false;
    private BrokerStats brokerStats;
    private InetSocketAddress storeHost;
    private BrokerFastFailure brokerFastFailure;
    private Configuration configuration;
    
    //省略其他代码
}

initialize

这个方法用于初始化broker节点,主要的工作可以列举如下:

  1. 加载主要模块的配置信息 这个部分,会从外存总加载各个模块的配置信息,包括:topicConfigManager、consumerOffsetManager、subscriptionGroupManager、consumerFilterManager、messageStore。这里代码写得非常漂亮,使用了配置外化的思路和实现、应用了模板设计模式、插件设计模式和工厂设计模式。
    • 模板设计模式

RocketMQ中的模板方法设计模式

  • 插件设计模式 插件设计模式和工厂设计模式一起使用,需要包含一个插件上下文、一个抽象插件类(AbstractPluginMessageStore),主要模块入下图所示。

插件设计模式 插件上下文(MessageStorePluginContext)用于保存跟插件相关的信息,看下插件上下文的代码: public class MessageStorePluginContext { private MessageStoreConfig messageStoreConfig; private BrokerStatsManager brokerStatsManager; private MessageArrivingListener messageArrivingListener; private BrokerConfig brokerConfig; public MessageStorePluginContext(MessageStoreConfig messageStoreConfig, BrokerStatsManager brokerStatsManager, MessageArrivingListener messageArrivingListener, BrokerConfig brokerConfig) { super(); this.messageStoreConfig = messageStoreConfig; this.brokerStatsManager = brokerStatsManager; this.messageArrivingListener = messageArrivingListener; this.brokerConfig = brokerConfig; } //省略了getter和setter方法 }

  • 工厂设计模式 public final class MessageStoreFactory { public final static MessageStore build(MessageStorePluginContext context, MessageStore messageStore) throws IOException { //从配置文件中取出配置好的插件 String plugin = context.getBrokerConfig().getMessageStorePlugIn(); if (plugin != null && plugin.trim().length() != 0) { String[] pluginClasses = plugin.split(","); //依次加载插件类对象,并生成对应的MessageStore对象 for (int i = pluginClasses.length - 1; i >= 0; --i) { String pluginClass = pluginClasses[i]; try { @SuppressWarnings("unchecked") Class<AbstractPluginMessageStore> clazz = (Class<AbstractPluginMessageStore>) Class.forName(pluginClass); Constructor<AbstractPluginMessageStore> construct = clazz.getConstructor(MessageStorePluginContext.class, MessageStore.class); messageStore = construct.newInstance(context, messageStore); } catch (Throwable e) { throw new RuntimeException(String.format( "Initialize plugin's class %s not found!", pluginClass), e); } } } return messageStore; } } ​
  1. 启动服务器 this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService); NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone(); fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2); this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);
  2. 初始化各种线程池 this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor( this.brokerConfig.getSendMessageThreadPoolNums(), this.brokerConfig.getSendMessageThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.sendThreadPoolQueue, new ThreadFactoryImpl("SendMessageThread_")); this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor( this.brokerConfig.getPullMessageThreadPoolNums(), this.brokerConfig.getPullMessageThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.pullThreadPoolQueue, new ThreadFactoryImpl("PullMessageThread_")); this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor( this.brokerConfig.getQueryMessageThreadPoolNums(), this.brokerConfig.getQueryMessageThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.queryThreadPoolQueue, new ThreadFactoryImpl("QueryMessageThread_")); this.adminBrokerExecutor = Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl( "AdminBrokerThread_")); this.clientManageExecutor = new ThreadPoolExecutor( this.brokerConfig.getClientManageThreadPoolNums(), this.brokerConfig.getClientManageThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.clientManagerThreadPoolQueue, new ThreadFactoryImpl("ClientManageThread_")); this.consumerManageExecutor = Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl( "ConsumerManageThread_"));
  3. 注册请求处理器 这个方法类似于name server里的用法,这里不仔细展开讲
  4. 设置各种定时任务,包括:获取broker状态的、周期性将消费者的offset刷到硬盘、周期性检查消费者的消费能力以保护broker、定期打印消息消费标记等等。
  5. 获取name server的地址 if (this.brokerConfig.getNamesrvAddr() != null) { this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr()); log.info("Set user specified name server address: {}", this.brokerConfig.getNamesrvAddr()); } else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) { this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.brokerOuterAPI.fetchNameServerAddr(); } catch (Throwable e) { log.error("ScheduledTask fetchNameServerAddr exception", e); } } }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS); } ​
  6. slave节点和master节点的不同处理 如果当前节点是slave节点,则设置一个定时任务:每隔一段时间,就将配置信息从master节点同步到当前节点;如果当前节点是master节点,则设置一个定时任务:每隔一段时间,就对比master节点和slave节点的配置信息,并打印出不相同的配置。

其他

其他还有start、shutdown和registerBrokerAll等方法,其中reigsterBrokerAll方法的作用是将broker节点注册到name server,这样producer和consumer就可以拿到broker节点的地址信息。

参考资料

  1. Apache RocketMQ背后的设计思路与最佳实践
  2. 消息队列MQ

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Java架构

Java程序员必知的并发编程艺术——并发机制的底层原理实现

Java编程语言允许线程访问共享变量,为了确保共享变量能被准确和一致的更新,线程应该确保通过排他锁单独获得这个变量。

9310
来自专栏北京马哥教育

Nginx专题: 从编译安装到URL重写

前言 本文主要实现使用Nginx作为Web服务器,并使用URL Rewrite实现将手机对Web站点的请求专门重写到一个专门为手机定制的Web页面中。 环境介绍...

37650
来自专栏JetpropelledSnake

ELK学习笔记之Logstash和Filebeat解析对java异常堆栈下多行日志配置支持

logstash官方最新文档。 假设有几十台服务器,每台服务器要监控系统日志syslog、tomcat日志、nginx日志、mysql日志等等,监控OOM、内存...

62110
来自专栏非著名程序员

基于 RxJava2+Retrofit2 精心打造的 Android 基础框架 XSnow

XSnow ? 基于RxJava2+Retrofit2精心打造的Android基础框架,包含网络、上传、下载、缓存、事件总线、权限管理、数据库、图片加载、UI模...

33170
来自专栏SpringBoot 核心技术

SpringCloud组件:Eureka服务注册中心内置的REST节点列表

你有没有考虑过Eureka Client与Eureka Server是通过什么方式进行通讯的? 为什么Client启动成功后Server就会被注册到Serve...

1K20
来自专栏乐沙弥的世界

Linux 下安装及配置heartbeat

a、配置主机host解析 b、配置等效验证 c、高可用的相关服务配置(如httpd,myqld等),关闭自启动 d、如需要用到共享存储,还应配置相关...

48640
来自专栏老码农专栏

ActFramework r1.3.0 - 激动人心的特性一览

14220
来自专栏JavaEdge

JVM源码分析之synchronized1 字节码实现2 偏向锁

javap命令生成的字节码中包含 ** monitorenter ** 和 ** monitorexit **指令

10530
来自专栏行者常至

Hibernate之HelloWorld

6920
来自专栏帘卷西风的专栏

使用Cmake生成跨平台项目编译解决方案

    项目最近有需求在windows下面运行,我花了几周时间将linux的服务器移植到windows下面,目前已经能够正常运行服务器,目前又有了新需求,两边的...

71520

扫码关注云+社区

领取腾讯云代金券