RocketMQ作为消息队列的典型代表,其在高并发状况下处理消息又很不错的性能,同时又能够通过将消息持久化到磁盘确保消息不会丢失,本文旨在从RocketMQ的源码剖析为何它能够高效处理消息,并且又如何高效组织消息并写入磁盘
在阅读本文之前,建议不了解RocketMQ基础架构的小伙伴先看一下这篇文章了解一下:
https://cloud.tencent.com/developer/article/2485324
在RocketMQ中,消息Message投递到Broker后,会存储到CommitLog中,然后通过CommitLog存储到磁盘中,而CommitLog最后会被持久化到磁盘中,因此也就确保了即使写入消息的时候机器发生宕机,其中的消息也不会丢失:
而最终存储到磁盘中的commit log是以文件偏移量命名,那么Broker能做到持久化处理消息,又是如何高效地对消息进行读取呢?
如果你对于MySQL存储数据的流程比较清晰,应该能够联想到,MySQL在对数据进行存取时,并不会直接将数据存到磁盘上,而是先写入到缓冲区中,之后将缓冲区的数据持久化到磁盘的日志中,与MySQL类似,RocketMQ在对消息写入时,也引入了内存来高效处理消息,不过这里应用到的是mmap技术,将磁盘中文件对应页表的物理地址以虚拟地址映射到内存中,这样能够使得系统在获取数据时直接拿到数据在磁盘的物理地址,另外还会开启一个专门将内存数据持久化到磁盘的服务,从而完成对于消息的持久化操作
接下来就让我们从源码剖析,RocketMQ是如何通过将消息存储到CommitLog中,并且建立了与内存之间的地址映射
Broker在接收到Message后,会为要写入的CommitLog生成对应偏移量的文件名,那么这个偏移量是如何生成的呢?让我们一步步探究:
Broker在初始化时,会初始化各个Processor来完成对于消息的接收以及处理消息等功能,而其中负责接收消息的Processor为SendMessageProcesser
:
/**
* 包名: broker
* 类名: BrokerController
* 方法名: registerProcessor
* 行数:545
*/
public void registerProcessor() {
/**
* SendMessageProcessor
*/
SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
sendProcessor.registerSendMessageHook(sendMessageHookList);
sendProcessor.registerConsumeMessageHook(consumeMessageHookList);
在这个类中有很多接收Message的异步处理方法:
public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
private List<ConsumeMessageHook> consumeMessageHookList;
public SendMessageProcessor(final BrokerController brokerController) {
super(brokerController);
}
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request);
@Override
public void asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand request, RemotingResponseCallback responseCallback);
public CompletableFuture<RemotingCommand> asyncProcessRequest(ChannelHandlerContext ctx,
RemotingCommand request) {
......
return this.asyncSendMessage(ctx, request, mqtraceContext, requestHeader);
......
}
Broker会通过asyncProcessRequest
在其内部异步处理消息,调用asyncSendMessage
来调用MessageStore
存取消息,在调用之前,它会进行一系列的参数校验:
private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,
SendMessageContext mqtraceContext,
SendMessageRequestHeader requestHeader) {
......
int queueIdInt = requestHeader.getQueueId();
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
if (queueIdInt < 0) {
queueIdInt = randomQueueId(topicConfig.getWriteQueueNums());
}
......
putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
......
}
MessageStore
是存取消息的核心类,它是Broker启动时第一个启动的Processor类,它其中最主要处理消息的方法就是asyncPutMessage
:
类名:DefaultMessageStore
@Override
public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner msg) {
......
CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessage(msg);
......
}
因此,消息存取通过层层封装,消息被Broker处理的逻辑大致如下所示:
我们可以看到,在这个方法中,Message会移交给commitLog
来完成最终的磁盘存储,但是,如果我们深究commitLog
底层存储消息的代码,我们可以看到真正写入数据的地方在这里:
类名:CommitLog
public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
......
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
this.beginTimeInLock = beginLockTimestamp;
// Here settings are stored timestamp, in order to ensure an orderly
// global
msg.setStoreTimestamp(beginLockTimestamp);
if (null == mappedFile || mappedFile.isFull()) {
mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
}
......
}
我们发现,这里有多了一个操作消息数据的类MappedFile
,也许你会感到困惑,这又是什么类,没关系,接下来我们介绍一下这个类:
我们知道CommitLog在磁盘上存储文件名时文件对应的偏移量,而MappedFile
就可以理解为对一个个磁盘文件的封装,它是CommitLog
存储到磁盘上的文件单元形式,也就是说,一份CommitLog中可以包含多个MappedFile
,而如果仅仅包含多个MappedFile
不好管理,因此在RocketMQ中单独定义了MappedFileQueue
来管理一个CommitLog
包含的多个MappedFile
,而在MappedFileQueue
中,则是很简便地用了一个List来存储MappedFile
:
public class CommitLog {
protected final MappedFileQueue mappedFileQueue;
}
public class MappedFileQueue {
protected final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();
}
通过这样的组织方式,便能够存储多个MappedFile
的同时很好地管理这些存储文件,而我们再看上面初始化mappedFile
时调用的getLastMappedFile
方法,也能够很好地理解,它其实就是去除当前列表中的最后一个元素:
public MappedFile getLastMappedFile() {
MappedFile mappedFileLast = null;
while (!this.mappedFiles.isEmpty()) {
try {
mappedFileLast = this.mappedFiles.get(this.mappedFiles.size() - 1);
break;
} catch (IndexOutOfBoundsException e) {
//continue;
} catch (Exception e) {
log.error("getLastMappedFile has exception.", e);
break;
}
}
return mappedFileLast;
}
在Broker初始化的时候,此时MappedFile并不会立即创建,而是如我们看到的,在调用getLastMappedFile
方法时,从保存了MappedFile
的列表中取出对象,但是如果此时列表中没有数据,那么此时是如何取出对象的呢?
我们细究源码可以发现还有另外一个getLastMappedFile
负责返回一个对象文件:
public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {
long createOffset = -1;
MappedFile mappedFileLast = getLastMappedFile();
if (mappedFileLast == null) {
createOffset = startOffset - (startOffset % this.mappedFileSize);
}
if (mappedFileLast != null && mappedFileLast.isFull()) {
createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;
}
if (createOffset != -1 && needCreate) {
return tryCreateMappedFile(createOffset);
}
return mappedFileLast;
}
在这个方法中,如果MappedFile
列表为空或者存储的文件已满,就会调用tryCreateMappedFile
方法来初始化一个文件对象进行返回:
protected MappedFile tryCreateMappedFile(long createOffset) {
String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
String nextNextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset
+ this.mappedFileSize);
return doCreateMappedFile(nextFilePath, nextNextFilePath);
}
我们可以发现,最终创建文件还是通过doCreateMappedFile
方法执行,在这个方法中,会传递下一个文件路径以及下下个文件路径,为什么在这里要传递到下下个文件路径呢?我们再往下追源码:
protected MappedFile doCreateMappedFile(String nextFilePath, String nextNextFilePath) {
MappedFile mappedFile = null;
if (this.allocateMappedFileService != null) {
mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
nextNextFilePath, this.mappedFileSize);
} else {
try {
mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
} catch (IOException e) {
log.error("create mappedFile exception", e);
}
}
if (mappedFile != null) {
if (this.mappedFiles.isEmpty()) {
mappedFile.setFirstCreateInQueue(true);
}
this.mappedFiles.add(mappedFile);
}
return mappedFile;
}
我们看到,在这个方法中,如果当前创建文件的服务allocateMappedFileService
不为空,会继续调用服务的方法创建文件,该方法的主要作用就是创建了两个文件,一个是nextFilePath
对应的当前要使用的文件,另一个就是nextNextFilePath
对应的下一个文件,之所以创建两个文件的目的就在于:
如果当前的文件对象写满了,可以直接通过列表进行获取切换到下一个文件对象继续写入数据,从而减少再次调用服务创建文件对象的开销
到了这里,创建MappedFile
并将数据写入CommitLog
就结束了,接下来让我们看看·CommitLog`最终是如何写入磁盘的
我们看完上面的创建MappedFile
代码知道,创建文件并完成落盘的核心代码就在putRequestAndReturnMappedFile
中,而我们追到这个方法里,可以看到这样的代码:
public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String nextNextFilePath, int fileSize) {
......
boolean nextPutOK = this.requestTable.putIfAbsent(nextFilePath, nextReq) == null;
boolean offerOK = this.requestQueue.offer(nextReq);
......
boolean nextNextPutOK = this.requestTable.putIfAbsent(nextNextFilePath, nextNextReq) == null;
boolean offerOK = this.requestQueue.offer(nextNextReq);
......
}
你看到这里可能会疑惑,不是要将文件持久化到磁盘吗,怎么一直在向requestTable
与·requestQueue
中放入数据,没错,这段代码写了一百多行,其实就在向请求表与请求队列中放入新创建的文件信息,那落盘代码在哪里呢?
我们可以看到,在上面的创建文件方法中,它仅仅是将创建文件的请求以及实际文件名放入了请求队列与请求表中,而这些数据是暂时保存在内存队列中的,而真正处理这些请求完成创建文件操作的核心,我们还是要提及前面介绍的存取文件的核心:MessageStore
在MessageStore
初始化启动时,会初始化一个服务allocateMappedFileService
,这个服务见名知意,它就是主要负责创建MappedFile
的类,而在调用this.allocateMappedFileService.start()
启动后,start方法会执行下面的操作:
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped() && this.mmapOperation()) {
}
log.info(this.getServiceName() + " service end");
}
在这个方法中,会调用一个核心函数mmapOperation
,这个函数会将存到请求队列中的请求取出进行处理:
private boolean mmapOperation() {
boolean isSuccess = false;
AllocateRequest req = null;
try {
req = this.requestQueue.take();
AllocateRequest expectedRequest = this.requestTable.get(req.getFilePath());
......
}
我们可以看到,这个方法会取出请求队列中的请求进行消费,那么此时我们思考一下,既然这里的allocateMappedFileService
是消费者,那么真正的生产者是谁?
到这里我们再回看一下源码:
protected MappedFile doCreateMappedFile(String nextFilePath, String nextNextFilePath) {
MappedFile mappedFile = null;
if (this.allocateMappedFileService != null) {
mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
nextNextFilePath, this.mappedFileSize);
我们可以看到,通过调用本类中实例化的allocateMappedFileService
对象来将创建文件的请求放入请求队列中,因此,这个内存队列的生产者与消费者其实allocateMappedFileService自己!
那么,创建文件的关键函数在哪里呢,我们继续往下看:
注意这里,我并没有说FileChannel
是创建文件的核心对象,它只是建立了一个文件映射,后续向CommitLog
中写入文件是依靠它来进行,我们继续上面的代码可以看到,mappedFile
通过init函数完成初始化:
private boolean mmapOperation() {
......
mappedFile = ServiceLoader.load(MappedFile.class).iterator().next();
mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
......
}
我们继续看init方法:
private void init(final String fileName, final int fileSize) throws IOException {
......
this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
......
}
我们可以发现,在这里会初始化fileChannel
对象,之后又通过fileChannel
方法将mappedByteBuffer
进行map映射,这里就是借鉴了mmap思想建立了一个磁盘文件页表的物理地址与内存虚拟地址的直接映射,可以通过建立这种映射提高对于磁盘文件的操作效率,减少从内核态到用户态的转换次数,这也是RocketMQ具有高性能的一个底层原因之一,而我们熟知的高性能消费队列Kafka也是通过mappedByteBuffer
直接建立磁盘文件与内存的映射,从而提高处理消息性能
至此,我们完成了文件命名、文件内存关系映射、参数校验等等一系列工作,开始将commitLog
中的内容写入磁盘,因为我们通过mappedByteBuffer
建立了内存地址与磁盘文件的映射关系,所以与MySQL的写入数据思想类似,直接将内存中的数据持久化到磁盘中即可,首先我们来看将消息数据写入缓存中的操作,在Broker内部处理消息的asyncPutMessage
方法中,会调用appendMessage
将文件追加到缓存中:
public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
......
result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
......
}
而appendMessage
的具体执行逻辑如下:
public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb,
PutMessageContext putMessageContext) {
int currentPos = this.wrotePosition.get();
if (currentPos < this.fileSize) {
ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
byteBuffer.position(currentPos);
AppendMessageResult result;
if (messageExt instanceof MessageExtBrokerInner) {
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,
(MessageExtBrokerInner) messageExt, putMessageContext);
} else if (messageExt instanceof MessageExtBatch) {
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,
(MessageExtBatch) messageExt, putMessageContext);
} else {
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
this.wrotePosition.addAndGet(result.getWroteBytes());
this.storeTimestamp = result.getStoreTimestamp();
return result;
}
log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
在这里会有许多逻辑判断,但是最终目的就是将文件写入到缓存中,而将缓存持久化到磁盘的服务则是由FlushRealTimeService
来完成:
class FlushRealTimeService extends FlushCommitLogService {
public void run() {
......
CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);
......
}
}
这个对象会定期每500ms将4页的数据刷回磁盘中,而这个任务又会交给mappedFileQueue
来完成,它执行的逻辑如下:
public boolean flush(final int flushLeastPages) {
boolean result = true;
MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);
if (mappedFile != null) {
long tmpTimeStamp = mappedFile.getStoreTimestamp();
int offset = mappedFile.flush(flushLeastPages);
long where = mappedFile.getFileFromOffset() + offset;
result = where == this.flushedWhere;
this.flushedWhere = where;
if (0 == flushLeastPages) {
this.storeTimestamp = tmpTimeStamp;
}
}
return result;
}
文件写入磁盘的偏移量即mappedFile
的文件名,最终就完成了写入磁盘
综上所述,其实将消息数据写入磁盘的大致流程如下:
而写入过程经过的对象可以表示如下
CommitLog → MappedFileQueue → MappedFile → mappedByteBuffer → mmap → 磁盘
如果你能看到这里,想必应该收获很多吧,不得不说,虽然RocketMQ写入消息的流程复杂,但是终究还是抓住了mmap映射物理数据页表地址的高效操作方法,与通过将缓存持久化磁盘的持久化方法,所以只要把其核心思想与原理把握住,其他就是通过一些数据结构组织来实现的问题了
希望对你有所帮助!!!另外,创作不易,如果能给个赞就更好了!!!
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。