前面介绍了log日志文件的相关类,接着分析记录写入log日志文件的具体实现。
写日志文件的入口在 HoodieMergeOnReadTable#handleUpdate
,其核心代码如下
public Iterator<List<WriteStatus>> handleUpdate(String commitTime, String fileId, Iterator<HoodieRecord<T>> recordItr)
throws IOException {
logger.info("Merging updates for commit " + commitTime + " for file " + fileId);
if (!index.canIndexLogFiles() && mergeOnReadUpsertPartitioner.getSmallFileIds().contains(fileId)) {
logger.info("Small file corrections for updates for commit " + commitTime + " for file " + fileId);
return super.handleUpdate(commitTime, fileId, recordItr);
} else {
// 写日志文件的入口
HoodieAppendHandle<T> appendHandle = new HoodieAppendHandle<>(config, commitTime, this, fileId, recordItr);
appendHandle.doAppend();
appendHandle.close();
return Collections.singletonList(Collections.singletonList(appendHandle.getWriteStatus())).iterator();
}
}
在处理 update
时,如果日志文件不支持索引或者文件不是小文件,则会使用 HoodieApppendHandle#doAppend
处理,其核心代码如下
public void doAppend() {
while (recordItr.hasNext()) {
// 获取记录
HoodieRecord record = recordItr.next();
// 初始化
init(record);
// 刷盘
flushToDiskIfRequired(record);
// 写入缓存
writeToBuffer(record);
}
doAppend(header);
estimatedNumberOfBytesWritten += averageRecordSize * numberOfRecords;
}
有记录存在,则会进行初始化(init),初始化包括统计信息的初始化、HoodieLogFormatWriter的初始化等。
调用 flushToDiskIfRequired
进行刷盘处理,其核心代码如下
private void flushToDiskIfRequired(HoodieRecord record) {
// 当前记录条数大于等于block块可以存的最大记录条数
if (numberOfRecords >= (int) (maxBlockSize / averageRecordSize)) {
// 重新计算记录的平均大小
averageRecordSize = (averageRecordSize + SizeEstimator.estimate(record)) / 2;
// append写入
doAppend(header);
estimatedNumberOfBytesWritten += averageRecordSize * numberOfRecords;
// 重置当前记录条数
numberOfRecords = 0;
}
}
Hudi会估算数据块中可以存放多少条记录,然后当已缓存的记录条数大于等于当前块中可存放的条数时,会将其append写入。
调用 writerToBuffer
将记录缓存起来,其核心代码如下
private void writeToBuffer(HoodieRecord<T> record) {
// 获取IndexedRecord便于写入log文件
Option<IndexedRecord> indexedRecord = getIndexedRecord(record);
if (indexedRecord.isPresent()) { // 存在表示是新插入的记录
recordList.add(indexedRecord.get());
} else { // 不存在表示需要删除
keysToDelete.add(record.getKey());
}
numberOfRecords++;
}
可以看到其会保存需要插入或者删除的记录。
使用 doAppend
写入日志文件,其核心代码如下
private void doAppend(Map<HeaderMetadataType, String> header) {
try {
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, instantTime);
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, writerSchema.toString());
if (recordList.size() > 0) { // 新插入的记录不为空
// 使用Writer写入Data类型的Block
writer = writer.appendBlock(new HoodieAvroDataBlock(recordList, header));
// 清空缓存
recordList.clear();
}
if (keysToDelete.size() > 0) { // 删除的记录不为空
// 使用Writer写入Delete类型的Block
writer = writer.appendBlock(new HoodieDeleteBlock(keysToDelete.stream().toArray(HoodieKey[]::new), header));
// 清空缓存
keysToDelete.clear();
}
} catch (Exception e) {
throw new HoodieAppendException("Failed while appending records to " + currentLogFile.getPath(), e);
}
}
可以看到,该方法会将缓存的记录和头部信息(时间、schema信息)组装成 HoodieLogBlock
后写入日志,其中 appendBlock
核心代码如下
public Writer appendBlock(HoodieLogBlock block) throws IOException, InterruptedException {
// Find current version
HoodieLogFormat.LogFormatVersion currentLogFormatVersion =
new HoodieLogFormatVersion(HoodieLogFormat.currentVersion);
long currentSize = this.output.size();
// 1. Write the magic header for the start of the block
this.output.write(HoodieLogFormat.MAGIC);
// bytes for header
byte[] headerBytes = HoodieLogBlock.getLogMetadataBytes(block.getLogBlockHeader());
// content bytes
byte[] content = block.getContentBytes();
// bytes for footer
byte[] footerBytes = HoodieLogBlock.getLogMetadataBytes(block.getLogBlockFooter());
// 2. Write the total size of the block (excluding Magic)
this.output.writeLong(getLogBlockLength(content.length, headerBytes.length, footerBytes.length));
// 3. Write the version of this log block
this.output.writeInt(currentLogFormatVersion.getVersion());
// 4. Write the block type
this.output.writeInt(block.getBlockType().ordinal());
// 5. Write the headers for the log block
this.output.write(headerBytes);
// 6. Write the size of the content block
this.output.writeLong(content.length);
// 7. Write the contents of the data block
this.output.write(content);
// 8. Write the footers for the log block
this.output.write(footerBytes);
// 9. Write the total size of the log block (including magic) which is everything written
// until now (for reverse pointer)
this.output.writeLong(this.output.size() - currentSize);
// Flush every block to disk
flush();
// roll over if size is past the threshold
return rolloverIfNeeded();
}
可以看到,对于Block块的写入,顺序如下
hudi
);Block
的类型;调用 flush
将数据写入文件,如果需要可能会滚动至下个日志文件,其中 rolloverIfNeeded
核心代码如下
private Writer rolloverIfNeeded() throws IOException, InterruptedException {
if (getCurrentSize() > sizeThreshold) { // 当前大小大于阈值(512M)
// 生成一个新的日志文件,版本号+1
HoodieLogFile newLogFile = logFile.rollOver(fs, rolloverLogWriteToken);
// 关闭当前的Writer,会再次触发写文件
close();
// 返回新文件对应的Writer
return new HoodieLogFormatWriter(fs, newLogFile, bufferSize, replication, sizeThreshold, logWriteToken,
rolloverLogWriteToken);
}
// 不需要滚动,直接返回
return this;
}
可以看到,若当前写入的文件大小大于配置的阈值时会滚动到下个版本的新文件,并返回新文件对应的Writer继续写入。另外对于 HoodieLogFormatWriter
的初始化,其首先会判断当前文件是否存在,若存在,进一步判断该文件的 FileSystem
是否支持 Append
,现在只有HDFS、MAPRFS、IGNITE、VIEWFS文件系统支持Append,若支持,则接着Append,若不支持,则滚动到下个新文件写入;若不存在,则直接创建新文件写入。
对于日志文件的写入,Hudi采用基于 HoodieLogBlock
为单元的写入粒度,其策略是先将记录缓存至内存,然后再批量构造成 Block
后写入日志文件,而对于 Block
的头部、实际内容、尾部的写入采用了指定的顺序,并且采用了自动滚动日志文件的方式写入(当日志文件大小达到指定配置大小时自动滚动到下一个文件继续写入)。