有一个疑问,当client给broker发送消息的时候,怎么知道在commitlog的第几个字节开始写呢?
commitlog中存储的是客户端发送的所有数据
ConsumeQueue存的是主题的逻辑信息,如下图所示,代表一条记录。其中记录的信息存储在commitLog中,位置是CommitLog Offset。
DefaultMessageStore###load
public boolean load() {
boolean result = true;
try {
//省略
// 装载Commit Log
result = result && this.commitLog.load();
if (result) {
//省略
//确定Commit Log文件下一个写的位置
this.recover(lastExitOK);
}
} catch (Exception e) {
}
return result;
}
CommitLog###load
public boolean load() {
//跟进去,调用mappedFileQueue.load方法
boolean result = this.mappedFileQueue.load();
log.info("load commit log " + (result ? "OK" : "Failed"));
return result;
}
MappedFileQueue###load方法:在该方法中把commitlog下的文件映射成MappedFile
public boolean load() {
//window上默认的目录:C:\Users\25682\store\commitlog
File dir = new File(this.storePath);
//上面目录下子文件
File[] files = dir.listFiles();
if (files != null) {
// ascending order
Arrays.sort(files);
for (File file : files) {
if (file.length() != this.mappedFileSize) {
log.warn(file + "\t" + file.length()
+ " length not matched message store config value, please check it manually");
return false;
}
try {
//把每一个文件映射成MappedFile对象,方便读取
MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize);
//此时wrotePosition设置的为mappedFileSize,不准确
mappedFile.setWrotePosition(this.mappedFileSize);
mappedFile.setFlushedPosition(this.mappedFileSize);
mappedFile.setCommittedPosition(this.mappedFileSize);
this.mappedFiles.add(mappedFile);
log.info("load " + file.getPath() + " OK");
} catch (IOException e) {
log.error("load file " + file + " error", e);
return false;
}
}
}
return true;
}
此时CommitLog下的MappedFile的wrotePosition设置为mappedFileSize,但是最后这个MappedFile的wrotePosition还不对,因此下面需要修改
DefaultMessageStore###recover
private void recover(final boolean lastExitOK) {
//从ConsumeQueue中获取最大的物理偏移量
long maxPhyOffsetOfConsumeQueue = this.recoverConsumeQueue();
if (lastExitOK) {
this.commitLog.recoverNormally(maxPhyOffsetOfConsumeQueue);
} else {
//
this.commitLog.recoverAbnormally(maxPhyOffsetOfConsumeQueue);
}
this.recoverTopicQueueTable();
}
DefaultMessageStore###recoverConsumeQueue:获取每一个主题里每一个队列里的最大commitlog偏移量
private long recoverConsumeQueue() {
long maxPhysicOffset = -1;
for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
for (ConsumeQueue logic : maps.values()) {
logic.recover();
if (logic.getMaxPhysicOffset() > maxPhysicOffset) {
maxPhysicOffset = logic.getMaxPhysicOffset();
}
}
}
return maxPhysicOffset;
}
CommitLog###recoverAbnormally
public void recoverAbnormally(long maxPhyOffsetOfConsumeQueue) {
// recover by the minimum time stamp
boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
if (!mappedFiles.isEmpty()) {
int index = mappedFiles.size() - 1;
//获取最后一个CommitLog的MapperFile
MappedFile mappedFile = null;
for (; index >= 0; index--) {
mappedFile = mappedFiles.get(index);
if (this.isMappedFileMatchedRecover(mappedFile)) {
log.info("recover from this mapped file " + mappedFile.getFileName());
break;
}
}
if (index < 0) {
index = 0;
mappedFile = mappedFiles.get(index);
}
ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
long processOffset = mappedFile.getFileFromOffset();
long mappedFileOffset = 0;
while (true) {
//不断从MapperFile中根据CommitLog的数据单元格式读取数据,当读取到数据为0时,跳出循环,说明该位置为下个需要写的位置
DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);
int size = dispatchRequest.getMsgSize();
if (dispatchRequest.isSuccess()) {
// Normal data
if (size > 0) {
mappedFileOffset += size;
if (this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) {
if (dispatchRequest.getCommitLogOffset() < this.defaultMessageStore.getConfirmOffset()) {
this.defaultMessageStore.doDispatch(dispatchRequest);
}
} else {
this.defaultMessageStore.doDispatch(dispatchRequest);
}
}
else if (size == 0) {
index++;
if (index >= mappedFiles.size()) {
log.info("recover physics file over, last mapped file " + mappedFile.getFileName());
break;
} else {
mappedFile = mappedFiles.get(index);
byteBuffer = mappedFile.sliceByteBuffer();
processOffset = mappedFile.getFileFromOffset();
mappedFileOffset = 0;
log.info("recover next physics file, " + mappedFile.getFileName());
}
}
} else {
log.info("recover physics file end, " + mappedFile.getFileName() + " pos=" + byteBuffer.position());
break;
}
}
processOffset += mappedFileOffset;
this.mappedFileQueue.setFlushedWhere(processOffset);
this.mappedFileQueue.setCommittedWhere(processOffset);
//该位置为真正要插入的位置,所以修正上面的设置的错误的wrotePosition
this.mappedFileQueue.truncateDirtyFiles(processOffset);
// Clear ConsumeQueue redundant data
if (maxPhyOffsetOfConsumeQueue >= processOffset) {
log.warn("maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, processOffset);
this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
}
}
// Commitlog case files are deleted
else {
log.warn("The commitlog files are deleted, and delete the consume queue files");
this.mappedFileQueue.setFlushedWhere(0);
this.mappedFileQueue.setCommittedWhere(0);
this.defaultMessageStore.destroyLogics();
}
}
CommitLog一开始是把wrotePosition设置为CommitLog文件的大小,这样只有最后一个CommitLog的wrotePosition的数据是不正确的,所以后面在确定最后一个CommitLog的wrotePosition的时候是通过读取CommitLog文件里的数据来确定wrotePosition位置的,因为CommitLog里前四个字节代表这条消息的大小,这样我读取前四个字节以后就可以读取这一条数据,然后以此类推,当读取消息的大小为0时,代表此处没有消息,则确定wrotePosition的位置。
CommitLog格式 https://blog.csdn.net/meilong_whpu/article/details/76919267 ConsumeQueue格式 https://www.cnblogs.com/gaojy/p/15087869.html