前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RocketMQ存储--主从同步【源码笔记】

RocketMQ存储--主从同步【源码笔记】

作者头像
瓜农老梁
发布2019-08-05 14:40:24
8290
发布2019-08-05 14:40:24
举报
文章被收录于专栏:瓜农老梁
目录
代码语言:javascript
复制
一、问题思考
二、Broker启动HA调用链
1.HA初始化调用链
2.启动调用链
三、线程类职责
1.AcceptSocketService职责
2.HAConnection职责
2.1 writeSocketService职责
2.2 readSocketService职责
3.GroupTransferService职责
4.HAClient职责
四、消息写入与线程类交互
五、主从同步示意图
1.主从同步交互消息格式
2.主从同步示意图
六、源代码清单
一、问题思考

1.消息存储在Master上了,如何同步到Slave上了呢? 2.同步复制和异步复制流程是怎么样的?

二、Broker启动HA调用链

1.HA初始化调用链

代码语言:javascript
复制
@1 BrokerStartup#main
start(createBrokerController(args));
@2 BrokerStartup#createBrokerController
boolean initResult = controller.initialize();
@3 BrokerController#initialize
this.messageStore = new DefaultMessageStore
@4 DefaultMessageStore#DefaultMessageStore()
this.haService = new HAService(this);
this.defaultMessageStore = defaultMessageStore;
this.acceptSocketService =
new AcceptSocketService(defaultMessageStore.getMessageStoreConfig()
.getHaListenPort());
this.groupTransferService = new GroupTransferService();
this.haClient = new HAClient();

2.启动调用链

代码语言:javascript
复制
@1 BrokerStartup#start
controller.start();
@2 BrokerController#start
this.messageStore.start();
@3 DefaultMessageStore#start
@4 this.haService.start();
this.acceptSocketService.beginAccept();
this.acceptSocketService.start();
this.groupTransferService.start();
this.haClient.start();

小结:从初始化和启动调用链中可以看到,在Broker启动时,初始化并启动了三个线程类,分别为AcceptSocketService, GroupTransferService, HAClient。

问题:这三个线程类在干啥?

三、线程类职责
1.AcceptSocketService职责

小结:AcceptSocketService职责初始化TCP通道,监听新的连接并创建HAConnection。

问题:HAConnection在做什么?

2.HAConnection职责
代码语言:javascript
复制
//构造方法
public HAConnection(final HAService haService, final SocketChannel socketChannel) throws IOException {
this.haService = haService;
this.socketChannel = socketChannel;
//获取客户端请求地址
this.clientAddr = this.socketChannel.socket().getRemoteSocketAddress().toString();
//将通道调整为非阻塞
this.socketChannel.configureBlocking(false);
//关闭连接前将数据发送完毕
this.socketChannel.socket().setSoLinger(false, -1);
//将Nagle算法关闭,客户端每发送一次数据无论大小,都会将其发送出去
this.socketChannel.socket().setTcpNoDelay(true);
//设置接受缓存区为64K
this.socketChannel.socket().setReceiveBufferSize(1024 * 64);
//设置发包缓存区为64K
this.socketChannel.socket().setSendBufferSize(1024 * 64);
//写数据线程类
this.writeSocketService = new WriteSocketService(this.socketChannel);
//读数据线程类
this.readSocketService = new ReadSocketService(this.socketChannel);
this.haService.getConnectionCount().incrementAndGet();
}
//启动
public void start() {
//启动读数据线程
this.readSocketService.start();
//启动写数据线程
this.writeSocketService.start();
}

疑问:HAConnection除了对通道做了一些设置外,启动了两个线程服务类,分别为readSocketService和writeSocketService,他们职责是什么呢?

2.1 writeSocketService职责 流程图

小结:writeSocketService主要职责,将数据不断写入socketChannel通道;写入数据的大小为nextTransferFromWhere与最大可读位置getReadPosition之间数据;每次写完传输指针自增this.nextTransferFromWhere += size;每隔5秒发送心跳包到socketChannel通道。

2.2 readSocketService职责

流程图

小结:readSocketService主要职责解析slave发来的请求位点,并更新push2SlaveMaxOffset为该请求位点;唤醒groupTransferService线程。

3.GroupTransferService职责

小结:GroupTransferService职责判断主从同步是否完成,完成后唤醒消息发送线程。

4.HAClient职责

小结:HAClient职责Slave封装实现类,负责与Master建立连接通道,并从通道中获取数据存储;并向Master上报Slave存储的最大物理偏移量。

五、主从同步示意图

1.主从同步交互消息格式 1.1 Slave上报物理偏移量reportOffset量格式

00000018516677754880|长度为8位的20位数字

1.2 Master写入Slave的信息由Header与Body构成

00000018516677754880+size|Header部分由8位物理偏移量+消息体大

消息Body具体内容|Slave请求的位点与Master可读位置之间的数据

2.主从同步示意图

六、源代码清单
  • HAService.java
  • HAService#AcceptSocketService
  • HAService#GroupTransferService
  • HAService#HAClient
  • HAConnection.java
  • HAConnection#ReadSocketService
  • HAConnection#WriteSocketService
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2019-07-24,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 瓜农老梁 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 目录
  • 一、问题思考
  • 二、Broker启动HA调用链
  • 三、线程类职责
  • 1.AcceptSocketService职责
  • 2.HAConnection职责
  • 六、源代码清单
相关产品与服务
数据保险箱
数据保险箱(Cloud Data Coffer Service,CDCS)为您提供更高安全系数的企业核心数据存储服务。您可以通过自定义过期天数的方法删除数据,避免误删带来的损害,还可以将数据跨地域存储,防止一些不可抗因素导致的数据丢失。数据保险箱支持通过控制台、API 等多样化方式快速简单接入,实现海量数据的存储管理。您可以使用数据保险箱对文件数据进行上传、下载,最终实现数据的安全存储和提取。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档