前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RocketMQ存储--日志文件创建与映射流程【源码笔记】

RocketMQ存储--日志文件创建与映射流程【源码笔记】

作者头像
瓜农老梁
发布2019-08-08 16:40:11
1.5K0
发布2019-08-08 16:40:11
举报
文章被收录于专栏:瓜农老梁瓜农老梁
1.问题描述

日志目录(可配置)/data/rocketmq/store/commitlog会有20位长度的日志文件。 1.日志文件什么时候创建的? 2.日志文件创建流程是什么? 3.日志文件和内存映射是怎么样的?

代码语言:javascript
复制
-rw-rw-r-- 1 baseuser baseuser 1073741824 Jun 27 22:50 00000117290188144640
-rw-rw-r-- 1 baseuser baseuser 1073741824 Jun 27 22:52 00000117291261886464
-rw-rw-r-- 1 baseuser baseuser 1073741824 Jun 27 22:54 00000117292335628288
-rw-rw-r-- 1 baseuser baseuser 1073741824 Jun 27 22:56 00000117293409370112
-rw-rw-r-- 1 baseuser baseuser 1073741824 Jun 27 22:57 00000117294483111936
-rw-rw-r-- 1 baseuser baseuser 1073741824 Jun 27 22:56 00000117295556853760
2.日志映射相关类初始化

在Broker启动时实例化了两个类DefaultMessageStore和AllocateMappedFileService。 AllocateMappedFileService是线程类继承了Runnable接口,该线程类持有DefaultMessageStore的引用(即:可操作管理DefaultMessageStore),并启动该线程类。 调用链

代码语言:javascript
复制
//Broker启动时调用
@1 BrokerStartup#main#createBrokerController()
boolean initResult = controller.initialize();
@2 Controller#initialize()
this.messageStore = new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, this.brokerConfig);
@3 DefaultMessageStore#DefaultMessageStore()
//将DefaultMessageStore自身引用传给AllocateMappedFileService
this.allocateMappedFileService = new AllocateMappedFileService(this);
this.allocateMappedFileService.start();//启动该线程类
@4 class AllocateMappedFileService extends ServiceThread
public AllocateMappedFileService(DefaultMessageStore messageStore) {
this.messageStore = messageStore;
}
3.线程类一直运行在干啥

既然在Broker启动时该线程类AllocateMappedFileService就启动了,那么在做什么呢?run方法为while循环,即:只要服务不停止并且mmapOperation()返回true则一直运行

代码语言:javascript
复制
//异步处理,调用mmapOperation完成请求的处理
public void run() {
log.info(this.getServiceName() + " service started");
//while循环,只要服务部停止即调用 mmapOperation方法
while (!this.isStopped() && this.mmapOperation()) {
}
log.info(this.getServiceName() + " service end");
}

mmapOperation方法流程图

小结:mmapOperation方法主要做了两件事:初始化MappedFile和预热MappedFile。只要服务不停止和线程不被中断,这个过程一直重复运行。

4.提交映射文件请求(AllocateRequest)

既然AllocateMappedFileService一直从容器(优先级队列和ConcurrentHashMap)中获取AllocateRequest。AllocateRequest是什么时候产生并放到容器中的呢? RocketMQ消息存储概览【源码笔记】中写入commitLog流程,获取最新的日志文件。 调用链

代码语言:javascript
复制
@1 CommitLog#putMessage
//文件已满或者没有映射文件重新创建一个文件
if (null == mappedFile || mappedFile.isFull()) {
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
// Mark: NewFile may be cause noise
}
@2 MappedFileQueue#getLastMappedFile

流程图

小结:MappedFileQueue#getLastMappedFile会向线程类AllocateMappedFileServic提交两个映射文件创建请求:分别为nextFilePath和nextNextFilePath;如果线程类AllocateMappedFileServic为null,则直接new一个MappedFile,此时只会创建一个文件。

下面为提交两个映射文件请求流程,即:

AllocateMappedFileServic#putRequestAndReturnMappedFile

调用链

代码语言:javascript
复制
@1 MappedFileQueue#getLastMappedFile
if (this.allocateMappedFileService != null) {
mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
nextNextFilePath, this.mappedFileSize);
}
@2 AllocateMappedFileService#putRequestAndReturnMappedFile

流程图

小结:处理提交的映射文件请求指的是: 实例化两个AllocateRequest并把他们提交到requestTable(ConcurrentHashMap) 和requestQueue(PriorityBlockingQueue)中,等待5秒,此段时间线程会从这两个容器中获取请求并创建MappedFile,并将结果返回。

5.MappedFile初始化

本段梳理下上文中mmapOperation方法流程图第5步初始化MappedFile的流程 调用链

代码语言:javascript
复制
@1 AllocateMappedFileService#mmapOperation
mappedFile = new MappedFile(req.getFilePath(), req.getFileSize());
@2 MappedFile#init(fileName, fileSize);

流程图

小结:MappedFile主要干了两件事:1.创建日志文件。2.并将文件映射到内存中。

6.总结

1.日志文件什么时候创建的? 备注:在写入消息时,需要获取最新的日志文件(MappedFile),如果文件不存在或者已经写满,此时需要创建MappedFile。具体在MappedFile#init方法this.file = new File(fileName)进行创建。 2.日志文件创建流程是什么? 备注:将两个映射创建请求(nextReq和nextNextReq)提交到requestTable(ConcurrentHashMap)和requestQueue(PriorityBlockingQueue)容器中;由线程不断检查并从容器中取出创建日志文件(MappedFile)。 3.日志文件和内存映射是怎么样的? 备注:具体的映射时在MappedFile#init方法中通过this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);将日志文件映射到内存中的。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-08-05,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 瓜农老梁 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1.问题描述
  • 2.日志映射相关类初始化
  • 3.线程类一直运行在干啥
  • 6.总结
相关产品与服务
容器服务
腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档