
对于旧版本的数据 Hudi需要将其删除以节约宝贵的存储空间, Clean操作有两种策略:KEEP_LATEST_FILE_VERSIONS(保留最新的文件版本)和 KEEP_LATEST_COMMITS(保留最新的提交),不同的策略会有不同的行为, Clean阶段被分为生成 HoodieCleanerPlan和执行 HoodieCleanerPlan,下面分析 Clean的具体实现。
在每次 commit时,会判断是否开启自动 clean来执行 clean操作,或者由用户手动触发 clean操作。
对于在 commit时执行的 clean操作,其核心入口为 HoodieCleanClient#clean,核心代码如下
protected HoodieCleanMetadata clean(String startCleanTime) throws HoodieIOException {
final HoodieTable<T> table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc);
table.getCleanTimeline().filterInflightsAndRequested().getInstants().forEach(hoodieInstant -> {
// 先处理未完成的clean操作
runClean(table, hoodieInstant.getTimestamp());
});
// 生成cleanerPlan
Option<HoodieCleanerPlan> cleanerPlanOpt = scheduleClean(startCleanTime);
if (cleanerPlanOpt.isPresent()) {
HoodieCleanerPlan cleanerPlan = cleanerPlanOpt.get();
if ((cleanerPlan.getFilesToBeDeletedPerPartition() != null)
&& !cleanerPlan.getFilesToBeDeletedPerPartition().isEmpty()) {
final HoodieTable<T> hoodieTable = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc);
// 开始执行clean操作
return runClean(hoodieTable, startCleanTime);
}
}
return null;
}需要先处理未完成的 clean,然后再通过 scheduleClean方法生成 HoodieCleanerPlan,其核心代码如下
protected Option<HoodieCleanerPlan> scheduleClean(String startCleanTime) {
HoodieTable<T> table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc);
// 生成Plan
HoodieCleanerPlan cleanerPlan = table.scheduleClean(jsc);
if ((cleanerPlan.getFilesToBeDeletedPerPartition() != null)
&& !cleanerPlan.getFilesToBeDeletedPerPartition().isEmpty()) {
// 生成REQUESTED状态的instant
HoodieInstant cleanInstant = new HoodieInstant(State.REQUESTED, HoodieTimeline.CLEAN_ACTION, startCleanTime);
// Save to both aux and timeline folder
try {
// 将Plan序列化后保存至aux文件夹(Plan序列化)和.hooidie文件夹(空内容)
table.getActiveTimeline().saveToCleanRequested(cleanInstant, AvroUtils.serializeCleanerPlan(cleanerPlan));
} catch (IOException e) {
LOG.error("Got exception when saving cleaner requested file", e);
throw new HoodieIOException(e.getMessage(), e);
}
return Option.of(cleanerPlan);
}
return Option.empty();
}会继续调用 HoodieCopyOnWriteTable#scheduleClean生成 HoodieCleanerPlan,核心代码如下
public HoodieCleanerPlan scheduleClean(JavaSparkContext jsc) {
try {
HoodieCleanHelper cleaner = new HoodieCleanHelper(this, config);
// 找出保留的最早的instant
Option<HoodieInstant> earliestInstant = cleaner.getEarliestCommitToRetain();
// 获取需要处理的partition,对于增量clean而言,会找到上
List<String> partitionsToClean = cleaner.getPartitionPathsToClean(earliestInstant);
if (partitionsToClean.isEmpty()) {
// 空的Plan
return HoodieCleanerPlan.newBuilder().setPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name()).build();
}
Map<String, List<String>> cleanOps = jsc.parallelize(partitionsToClean, cleanerParallelism)
.map(partitionPathToClean -> Pair.of(partitionPathToClean, cleaner.getDeletePaths(partitionPathToClean)))
.collect().stream().collect(Collectors.toMap(Pair::getKey, Pair::getValue));
return new HoodieCleanerPlan(earliestInstant
.map(x -> new HoodieActionInstant(x.getTimestamp(), x.getAction(), x.getState().name())).orElse(null),
config.getCleanerPolicy().name(), cleanOps, 1);
} catch (IOException e) {
throw new HoodieIOException("Failed to schedule clean operation", e);
}
}对于 HoodieCleanerPlan的生成,首先会找出最早需要保留的 instant,然后获取对应所有的分区路径。
获取所有待clean分区路径的核心代码如下
public List<String> getPartitionPathsToClean(Option<HoodieInstant> newInstantToRetain) throws IOException {
if (config.incrementalCleanerModeEnabled() && newInstantToRetain.isPresent()
&& (HoodieCleaningPolicy.KEEP_LATEST_COMMITS == config.getCleanerPolicy())) {
Option<HoodieInstant> lastClean =
hoodieTable.getCleanTimeline().filterCompletedInstants().lastInstant();
if (lastClean.isPresent()) {
HoodieCleanMetadata cleanMetadata = AvroUtils
.deserializeHoodieCleanMetadata(hoodieTable.getActiveTimeline().getInstantDetails(lastClean.get()).get());
if ((cleanMetadata.getEarliestCommitToRetain() != null)
&& (cleanMetadata.getEarliestCommitToRetain().length() > 0)) {
// 过滤出大于上次clean的instant时间并且小于当前clean中最早保留的instant时间
return hoodieTable.getCompletedCommitsTimeline().getInstants().filter(instant -> {
return HoodieTimeline.compareTimestamps(instant.getTimestamp(), cleanMetadata.getEarliestCommitToRetain(),
HoodieTimeline.GREATER_OR_EQUAL) && HoodieTimeline.compareTimestamps(instant.getTimestamp(),
newInstantToRetain.get().getTimestamp(), HoodieTimeline.LESSER);
}).flatMap(instant -> {
try {
// 获取对应的partition
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(
hoodieTable.getActiveTimeline().getInstantDetails(instant).get(), HoodieCommitMetadata.class);
return commitMetadata.getPartitionToWriteStats().keySet().stream();
} catch (IOException e) {
throw new HoodieIOException(e.getMessage(), e);
}
}).distinct().collect(Collectors.toList());
}
}
}
// Otherwise go to brute force mode of scanning all partitions
return FSUtils.getAllPartitionPaths(hoodieTable.getMetaClient().getFs(),
hoodieTable.getMetaClient().getBasePath(), config.shouldAssumeDatePartitioning());
}可以看到,如果开启了增量 clean并且当前 instant不为空,同时策略为 KEEP_LATEST_COMMITS时,则会先取上次已完成的 clean,然后过滤出从上次已完成 clean的 instant时间到当前clean中需要保留的最早的 instant时间,并获取对应的分区路径;否则直接返回所有的分区路径。
在获取了所有待 clean的分区路径后,还需要调用 HoodieCleanHelper#getDeletePaths获取所有待删除的文件路径。
获取待删除文件的核心代码如下
public List<String> getDeletePaths(String partitionPath) throws IOException {
// 获取策略
HoodieCleaningPolicy policy = config.getCleanerPolicy();
List<String> deletePaths;
if (policy == HoodieCleaningPolicy.KEEP_LATEST_COMMITS) {
// 获取待删除的文件
deletePaths = getFilesToCleanKeepingLatestCommits(partitionPath);
} else if (policy == HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS) {
// 获取待删除的文件
deletePaths = getFilesToCleanKeepingLatestVersions(partitionPath);
} else {
throw new IllegalArgumentException("Unknown cleaning policy : " + policy.name());
}
return deletePaths;
}可以看到,根据策略的不同调用不同方法来获取分区下待删除的所有文件。
对于 KEEP_LATEST_COMMITS策略而言, getFilesToCleanKeepingLatestCommits核心代码如下
private List<String> getFilesToCleanKeepingLatestCommits(String partitionPath) throws IOException {
int commitsRetained = config.getCleanerCommitsRetained();
List<String> deletePaths = new ArrayList<>();
// 获取所有完成savepoint的文件
List<String> savepointedFiles = hoodieTable.getSavepoints().stream()
.flatMap(s -> hoodieTable.getSavepointedDataFiles(s)).collect(Collectors.toList());
// 是否达到指定大小
if (commitTimeline.countInstants() > commitsRetained) {
// 找到最早需要保留的instant
HoodieInstant earliestCommitToRetain = getEarliestCommitToRetain().get();
// 获取指定分区路径下的所有HoodieFileGroup
List<HoodieFileGroup> fileGroups = fileSystemView.getAllFileGroups(partitionPath).collect(Collectors.toList());
for (HoodieFileGroup fileGroup : fileGroups) {
// 获取所有的FileSlice
List<FileSlice> fileSliceList = fileGroup.getAllFileSlices().collect(Collectors.toList());
if (fileSliceList.isEmpty()) {
continue;
}
// 获取最大的instant时间
String lastVersion = fileSliceList.get(0).getBaseInstantTime();
// 获取小于最早需要保留的instant的最新版本
String lastVersionBeforeEarliestCommitToRetain =
getLatestVersionBeforeCommit(fileSliceList, earliestCommitToRetain);
// 遍历FileSlice
for (FileSlice aSlice : fileSliceList) {
// 获取数据文件
Option<HoodieDataFile> aFile = aSlice.getDataFile();
String fileCommitTime = aSlice.getBaseInstantTime();
if (aFile.isPresent() && savepointedFiles.contains(aFile.get().getFileName())) {
// 数据文件存在并且包含在savepoint文件里面,则略过
continue;
}
if (fileCommitTime.equals(lastVersion) || (fileCommitTime.equals(lastVersionBeforeEarliestCommitToRetain))) {
// 文件时间与FileSlice时间相等,或者文件时间与小于最早需要保留的instant的最新版本相等,则略过
continue;
}
// 文件不用于压缩并且最早需要保留时间大于文件时间
if (!isFileSliceNeededForPendingCompaction(aSlice) && HoodieTimeline
.compareTimestamps(earliestCommitToRetain.getTimestamp(), fileCommitTime, HoodieTimeline.GREATER)) {
// 需要被clean
aFile.ifPresent(hoodieDataFile -> deletePaths.add(hoodieDataFile.getFileName()));
if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) {
// 如果是MOR,则将所有的日志也可一并clean
deletePaths.addAll(aSlice.getLogFiles().map(file -> file.getFileName()).collect(Collectors.toList()));
}
}
}
}
}
return deletePaths;
}可以看到,找出待删除的文件需要会经过一系列判断,如不能删除 savepoint文件,不能删除小于最早需要保留的 instant的时间文件(因为该文件可能还是会被使用),不能删除待压缩的文件。仅删除那些小于最早需要保留的 instant的文件,并且如果是 MOR类型,那么可以将日志文件一并删除。
对于 KEEP_LATEST_FILE_VERSIONS策略而言, getFilesToCleanKeepingLatestVersions获取待删除文件的核心代码如下
private List<String> getFilesToCleanKeepingLatestVersions(String partitionPath) throws IOException {
// 获取所有的文件组
List<HoodieFileGroup> fileGroups = fileSystemView.getAllFileGroups(partitionPath).collect(Collectors.toList());
List<String> deletePaths = new ArrayList<>();
// 获取所有完成savepoint的文件
List<String> savepointedFiles = hoodieTable.getSavepoints().stream()
.flatMap(s -> hoodieTable.getSavepointedDataFiles(s)).collect(Collectors.toList());
for (HoodieFileGroup fileGroup : fileGroups) {
// 需要保留的版本
int keepVersions = config.getCleanerFileVersionsRetained();
// 过滤需要被压缩的文件
Iterator<FileSlice> fileSliceIterator =
fileGroup.getAllFileSlices().filter(fs -> !isFileSliceNeededForPendingCompaction(fs)).iterator();
if (isFileGroupInPendingCompaction(fileGroup)) { // 正进行compaction
keepVersions--;
}
while (fileSliceIterator.hasNext() && keepVersions > 0) {
FileSlice nextSlice = fileSliceIterator.next();
Option<HoodieDataFile> dataFile = nextSlice.getDataFile();
if (dataFile.isPresent() && savepointedFiles.contains(dataFile.get().getFileName())) {
// 跳过savepoint文件处理
continue;
}
keepVersions--;
}
while (fileSliceIterator.hasNext()) { // 还剩余FileSlice
FileSlice nextSlice = fileSliceIterator.next();
if (nextSlice.getDataFile().isPresent()) {
HoodieDataFile dataFile = nextSlice.getDataFile().get();
deletePaths.add(dataFile.getFileName());
}
if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) {
// 如果是MOR,则将所有的日志也可一并clean
deletePaths.addAll(nextSlice.getLogFiles().map(file -> file.getFileName()).collect(Collectors.toList()));
}
}
}
return deletePaths;
}可以看到,对于 savepoint的文件也不能删除,也不能删除待压缩的文件。仅删除那些不需要继续保留的版本的文件,如果是 MOR类型,那么可以将日志文件一并删除。
在生成 HoodieCleanerPlan后,会将其序列化并保存至元数据目录,然后开始执行,其核心在 HoodieCleanClient#runClean,其核心代码如下
public List<HoodieCleanStat> clean(JavaSparkContext jsc, HoodieInstant cleanInstant) {
try {
// 从元数据目录下反序列化HoodieCleanerPlan
HoodieCleanerPlan cleanerPlan = AvroUtils.deserializeCleanerPlan(getActiveTimeline()
.getInstantAuxiliaryDetails(HoodieTimeline.getCleanRequestedInstant(cleanInstant.getTimestamp())).get());
// 计算并行度
int cleanerParallelism = Math.min(
(int) (cleanerPlan.getFilesToBeDeletedPerPartition().values().stream().mapToInt(x -> x.size()).count()),
config.getCleanerParallelism());
List<Tuple2<String, PartitionCleanStat>> partitionCleanStats = jsc
.parallelize(cleanerPlan.getFilesToBeDeletedPerPartition().entrySet().stream()
.flatMap(x -> x.getValue().stream().map(y -> new Tuple2<String, String>(x.getKey(), y)))
.collect(Collectors.toList()), cleanerParallelism)
.mapPartitionsToPair(deleteFilesFunc(this)).reduceByKey((e1, e2) -> e1.merge(e2)).collect();
Map<String, PartitionCleanStat> partitionCleanStatsMap =
partitionCleanStats.stream().collect(Collectors.toMap(Tuple2::_1, Tuple2::_2));
// 返回统计信息
return cleanerPlan.getFilesToBeDeletedPerPartition().keySet().stream().map(partitionPath -> {
PartitionCleanStat partitionCleanStat =
(partitionCleanStatsMap.containsKey(partitionPath)) ? partitionCleanStatsMap.get(partitionPath)
: new PartitionCleanStat(partitionPath);
HoodieActionInstant actionInstant = cleanerPlan.getEarliestInstantToRetain();
return HoodieCleanStat.newBuilder().withPolicy(config.getCleanerPolicy()).withPartitionPath(partitionPath)
.withEarliestCommitRetained(Option.ofNullable(
actionInstant != null
? new HoodieInstant(HoodieInstant.State.valueOf(actionInstant.getState()),
actionInstant.getAction(), actionInstant.getTimestamp())
: null))
.withDeletePathPattern(partitionCleanStat.deletePathPatterns)
.withSuccessfulDeletes(partitionCleanStat.successDeleteFiles)
.withFailedDeletes(partitionCleanStat.failedDeleteFiles).build();
}).collect(Collectors.toList());
} catch (IOException e) {
throw new HoodieIOException("Failed to clean up after commit", e);
}
}这块代码的核心逻辑非常简单,首先反序列出 HoodieCleanerPlan,然后再开始执行删除操作,实际删除文件操作由 deleteFilesFunc处理,然后返回 Clean的统计信息。
对于 Clean操作, Hudi提供了两种策略:基于文件版本和基于提交保留数。并且将 Clean分为生成 HoodieCleanerPlan和执行 HoodieCleanerPlan两个阶段,两阶段并不直接关联.在生成 HoodieCleanerPlan时会找出所有符合指定策略的待删除文件,并且为了避免每次全分区处理,Hudi还提供了增量 Clean配置项,即仅仅只处理从上次 Clean后影响的分区,然后将 HoodieCleanerPlan序列化至元数据(.aux)目录,在执行阶段会从元数据目录中反序列化后执行删除文件操作。