前面学习了name server的主要代码,这篇文章开始学习broker的源码。broker是RocketMQ的核心模块,这篇文章我们先从整体看下代码结构、RocketMQ的领域模型,然后再看下一个broker节点的启动过程以及Controller暴露的接口。
broker的代码结构
MQ领域语言描述RocketMQ做的事情,producer构建Message,发送给broker的指定topic,broker负责将消息投递到指定topic下的队列,并记录消息队列的offset,consumer利用拉模式拉取消息进行消费。
ddd-for-rmq.png
BrokerController是broker模块的核心控制类,负责broker的初始化、启动、停止、资源管理,以及接受外部的请求并作出相应的动作。看下BrokerController中主要的属性,借此可以看下broker的基本功能
上面这些不是全部,除此之外,还有几个线程池和线程池对应的队列,以及用于做HA的管理模块。显然,broker功能非常多,我们在接下来的几篇中慢慢梳理其中的代码。
public class BrokerController {
private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final Logger LOG_PROTECTION = LoggerFactory.getLogger(LoggerName.PROTECTION_LOGGER_NAME);
private static final Logger LOG_WATER_MARK = LoggerFactory.getLogger(LoggerName.WATER_MARK_LOGGER_NAME);
/**
* Broker的配置
*/
private final BrokerConfig brokerConfig;
/**
* netty服务端配置,对于生产者和消费者来说,broker是服务端
*/
private final NettyServerConfig nettyServerConfig;
/**
* netty客户端,对于name server来说,broker是客户端
*/
private final NettyClientConfig nettyClientConfig;
/**
* 消息存储配置
*/
private final MessageStoreConfig messageStoreConfig;
/**
* 消费者的offset管理
*/
private final ConsumerOffsetManager consumerOffsetManager;
/**
* 消费者管理
*/
private final ConsumerManager consumerManager;
/**
* 消费过滤管理
*/
private final ConsumerFilterManager consumerFilterManager;
/**
* 生产者管理
*/
private final ProducerManager producerManager;
/**
* 监听客户端和broker建立的通信通道,当通道关闭时候清理信息
*/
private final ClientHousekeepingService clientHousekeepingService;
/**
* 拉取消息处理器
*/
private final PullMessageProcessor pullMessageProcessor;
/**
* ???暂时不理解
*/
private final PullRequestHoldService pullRequestHoldService;
/**
* 消息到达监听器
*/
private final MessageArrivingListener messageArrivingListener;
/**
* 用于broker对client发起指令
*/
private final Broker2Client broker2Client;
private final SubscriptionGroupManager subscriptionGroupManager;
private final ConsumerIdsChangeListener consumerIdsChangeListener;
/**
* 负载均衡管理器
*/
private final RebalanceLockManager rebalanceLockManager = new RebalanceLockManager();
/**
* broker对外暴露的API
*/
private final BrokerOuterAPI brokerOuterAPI;
/**
* 调度线程池
*/
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
"BrokerControllerScheduledThread"));
/**
* 子节点同步器
*/
private final SlaveSynchronize slaveSynchronize;
/**
* 发送消息的线程池任务队列
*/
private final BlockingQueue<Runnable> sendThreadPoolQueue;
/**
* 拉取消息的线程池任务队列
*/
private final BlockingQueue<Runnable> pullThreadPoolQueue;
private final BlockingQueue<Runnable> queryThreadPoolQueue;
private final BlockingQueue<Runnable> clientManagerThreadPoolQueue;
private final BlockingQueue<Runnable> consumerManagerThreadPoolQueue;
private final FilterServerManager filterServerManager;
private final BrokerStatsManager brokerStatsManager;
private final List<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>();
private final List<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>();
private MessageStore messageStore;
private RemotingServer remotingServer;
private RemotingServer fastRemotingServer;
private TopicConfigManager topicConfigManager;
private ExecutorService sendMessageExecutor;
private ExecutorService pullMessageExecutor;
private ExecutorService queryMessageExecutor;
private ExecutorService adminBrokerExecutor;
private ExecutorService clientManageExecutor;
private ExecutorService consumerManageExecutor;
private boolean updateMasterHAServerAddrPeriodically = false;
private BrokerStats brokerStats;
private InetSocketAddress storeHost;
private BrokerFastFailure brokerFastFailure;
private Configuration configuration;
//省略其他代码
}
这个方法用于初始化broker节点,主要的工作可以列举如下:
RocketMQ中的模板方法设计模式
插件设计模式 插件上下文(MessageStorePluginContext)用于保存跟插件相关的信息,看下插件上下文的代码: public class MessageStorePluginContext { private MessageStoreConfig messageStoreConfig; private BrokerStatsManager brokerStatsManager; private MessageArrivingListener messageArrivingListener; private BrokerConfig brokerConfig; public MessageStorePluginContext(MessageStoreConfig messageStoreConfig, BrokerStatsManager brokerStatsManager, MessageArrivingListener messageArrivingListener, BrokerConfig brokerConfig) { super(); this.messageStoreConfig = messageStoreConfig; this.brokerStatsManager = brokerStatsManager; this.messageArrivingListener = messageArrivingListener; this.brokerConfig = brokerConfig; } //省略了getter和setter方法 }
其他还有start、shutdown和registerBrokerAll等方法,其中reigsterBrokerAll方法的作用是将broker节点注册到name server,这样producer和consumer就可以拿到broker节点的地址信息。