前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
社区首页 >专栏 >Hudi Log日志文件写入分析(二)

Hudi Log日志文件写入分析(二)

作者头像
ApacheHudi
发布2021-04-13 10:33:39
发布2021-04-13 10:33:39
1.2K10
代码可运行
举报
文章被收录于专栏:ApacheHudiApacheHudi
运行总次数:0
代码可运行

1. 介绍

前面介绍了log日志文件的相关类,接着分析记录写入log日志文件的具体实现。

2. 分析

写日志文件的入口在 HoodieMergeOnReadTable#handleUpdate,其核心代码如下

代码语言:javascript
代码运行次数:0
运行
复制
public Iterator<List<WriteStatus>> handleUpdate(String commitTime, String fileId, Iterator<HoodieRecord<T>> recordItr)
      throws IOException {
    logger.info("Merging updates for commit " + commitTime + " for file " + fileId);

    if (!index.canIndexLogFiles() && mergeOnReadUpsertPartitioner.getSmallFileIds().contains(fileId)) {
      logger.info("Small file corrections for updates for commit " + commitTime + " for file " + fileId);
      return super.handleUpdate(commitTime, fileId, recordItr);
    } else {
      // 写日志文件的入口
      HoodieAppendHandle<T> appendHandle = new HoodieAppendHandle<>(config, commitTime, this, fileId, recordItr);
      appendHandle.doAppend();
      appendHandle.close();
      return Collections.singletonList(Collections.singletonList(appendHandle.getWriteStatus())).iterator();
    }
  }

在处理 update时,如果日志文件不支持索引或者文件不是小文件,则会使用 HoodieApppendHandle#doAppend处理,其核心代码如下

代码语言:javascript
代码运行次数:0
运行
复制
public void doAppend() {
    while (recordItr.hasNext()) {
      // 获取记录
      HoodieRecord record = recordItr.next();
      // 初始化
      init(record);
      // 刷盘
      flushToDiskIfRequired(record);
      // 写入缓存
      writeToBuffer(record);
    }
    doAppend(header);
    estimatedNumberOfBytesWritten += averageRecordSize * numberOfRecords;
  }

有记录存在,则会进行初始化(init),初始化包括统计信息的初始化、HoodieLogFormatWriter的初始化等。

调用 flushToDiskIfRequired进行刷盘处理,其核心代码如下

代码语言:javascript
代码运行次数:0
运行
复制
private void flushToDiskIfRequired(HoodieRecord record) {
    // 当前记录条数大于等于block块可以存的最大记录条数
    if (numberOfRecords >= (int) (maxBlockSize / averageRecordSize)) {
      // 重新计算记录的平均大小
      averageRecordSize = (averageRecordSize + SizeEstimator.estimate(record)) / 2;
      // append写入
      doAppend(header);
      estimatedNumberOfBytesWritten += averageRecordSize * numberOfRecords;
      // 重置当前记录条数
      numberOfRecords = 0;
    }
  }

Hudi会估算数据块中可以存放多少条记录,然后当已缓存的记录条数大于等于当前块中可存放的条数时,会将其append写入。

调用 writerToBuffer将记录缓存起来,其核心代码如下

代码语言:javascript
代码运行次数:0
运行
复制
private void writeToBuffer(HoodieRecord<T> record) {
    // 获取IndexedRecord便于写入log文件
    Option<IndexedRecord> indexedRecord = getIndexedRecord(record);
    if (indexedRecord.isPresent()) { // 存在表示是新插入的记录
      recordList.add(indexedRecord.get());
    } else { // 不存在表示需要删除
      keysToDelete.add(record.getKey());
    }
    numberOfRecords++;
  }

可以看到其会保存需要插入或者删除的记录。

使用 doAppend写入日志文件,其核心代码如下

代码语言:javascript
代码运行次数:0
运行
复制
private void doAppend(Map<HeaderMetadataType, String> header) {
    try {
      header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, instantTime);
      header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, writerSchema.toString());
      if (recordList.size() > 0) { // 新插入的记录不为空
        // 使用Writer写入Data类型的Block
        writer = writer.appendBlock(new HoodieAvroDataBlock(recordList, header));
        // 清空缓存
        recordList.clear();
      }
      if (keysToDelete.size() > 0) { // 删除的记录不为空
        // 使用Writer写入Delete类型的Block
        writer = writer.appendBlock(new HoodieDeleteBlock(keysToDelete.stream().toArray(HoodieKey[]::new), header));
        // 清空缓存
        keysToDelete.clear();
      }
    } catch (Exception e) {
      throw new HoodieAppendException("Failed while appending records to " + currentLogFile.getPath(), e);
    }
  }

可以看到,该方法会将缓存的记录和头部信息(时间、schema信息)组装成 HoodieLogBlock后写入日志,其中 appendBlock核心代码如下

代码语言:javascript
代码运行次数:0
运行
复制
public Writer appendBlock(HoodieLogBlock block) throws IOException, InterruptedException {

    // Find current version
    HoodieLogFormat.LogFormatVersion currentLogFormatVersion =
        new HoodieLogFormatVersion(HoodieLogFormat.currentVersion);
    long currentSize = this.output.size();

    // 1. Write the magic header for the start of the block
    this.output.write(HoodieLogFormat.MAGIC);

    // bytes for header
    byte[] headerBytes = HoodieLogBlock.getLogMetadataBytes(block.getLogBlockHeader());
    // content bytes
    byte[] content = block.getContentBytes();
    // bytes for footer
    byte[] footerBytes = HoodieLogBlock.getLogMetadataBytes(block.getLogBlockFooter());

    // 2. Write the total size of the block (excluding Magic)
    this.output.writeLong(getLogBlockLength(content.length, headerBytes.length, footerBytes.length));

    // 3. Write the version of this log block
    this.output.writeInt(currentLogFormatVersion.getVersion());
    // 4. Write the block type
    this.output.writeInt(block.getBlockType().ordinal());

    // 5. Write the headers for the log block
    this.output.write(headerBytes);
    // 6. Write the size of the content block
    this.output.writeLong(content.length);
    // 7. Write the contents of the data block
    this.output.write(content);
    // 8. Write the footers for the log block
    this.output.write(footerBytes);
    // 9. Write the total size of the log block (including magic) which is everything written
    // until now (for reverse pointer)
    this.output.writeLong(this.output.size() - currentSize);
    // Flush every block to disk
    flush();

    // roll over if size is past the threshold
    return rolloverIfNeeded();
  }

可以看到,对于Block块的写入,顺序如下

  • 写入MAGIC( hudi);
  • 写入Block块的大小;
  • 写入版本号;
  • 写入 Block的类型;
  • 写入头部;
  • 写入数据内容;
  • 写入尾部;
  • 写入本次写数据的总大小;

调用 flush将数据写入文件,如果需要可能会滚动至下个日志文件,其中 rolloverIfNeeded核心代码如下

代码语言:javascript
代码运行次数:0
运行
复制
private Writer rolloverIfNeeded() throws IOException, InterruptedException {
    if (getCurrentSize() > sizeThreshold) { // 当前大小大于阈值(512M)
      // 生成一个新的日志文件,版本号+1
      HoodieLogFile newLogFile = logFile.rollOver(fs, rolloverLogWriteToken);
      // 关闭当前的Writer,会再次触发写文件
      close();
      // 返回新文件对应的Writer
      return new HoodieLogFormatWriter(fs, newLogFile, bufferSize, replication, sizeThreshold, logWriteToken,
          rolloverLogWriteToken);
    }
    // 不需要滚动,直接返回
    return this;
  }

可以看到,若当前写入的文件大小大于配置的阈值时会滚动到下个版本的新文件,并返回新文件对应的Writer继续写入。另外对于 HoodieLogFormatWriter的初始化,其首先会判断当前文件是否存在,若存在,进一步判断该文件的 FileSystem是否支持 Append,现在只有HDFS、MAPRFS、IGNITE、VIEWFS文件系统支持Append,若支持,则接着Append,若不支持,则滚动到下个新文件写入;若不存在,则直接创建新文件写入。

3. 总结

对于日志文件的写入,Hudi采用基于 HoodieLogBlock为单元的写入粒度,其策略是先将记录缓存至内存,然后再批量构造成 Block后写入日志文件,而对于 Block的头部、实际内容、尾部的写入采用了指定的顺序,并且采用了自动滚动日志文件的方式写入(当日志文件大小达到指定配置大小时自动滚动到下一个文件继续写入)。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2019-12-04,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 ApacheHudi 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. 介绍
  • 2. 分析
  • 3. 总结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档