一个 Parquet 文件是由一个 header 以及一个或多个 block 块组成,以一个 footer 结尾。
header 中只包含一个 4 个字节的数字 PAR1 用来识别整个 Parquet 文件格式。
文件中所有的 metadata 都存在于 footer 中。
footer 中的 metadata 包含了格式的版本信息,schema 信息、key-value paris 以及所有 block 中的 metadata 信息。
footer 中最后两个字段为一个以 4 个字节长度的 footer 的 metadata,以及同 header 中包含的一样的 PAR1。
上图展示了一个 Parquet 文件的结构
可以看出在 Schema 中所有的基本类型字段都是叶子节点,在这个 Schema 中一共存在 6 个叶子节点,如果把这样的 Schema 转换成扁平式的关系模型,就可以理解为该表包含六个列。
❝Parquet 中没有 Map、Array 这样的复杂数据结构每一个数据模型的 schema 包含多个字段,每一个字段又可以包含多个字段,每一个字段有三个属性:重复数、数据类型和字段名, 重复数可以是以下三种:required(出现 1 次),repeated(出现 0 次或多次),optional(出现 0 次或 1 次)。每一个字段的数据类型可以分成两种:group(复杂类型)和 primitive(基本类型)。以上实现列式存储,但是无法将其恢复到原来的数据行的结构形式,Parquet 采用了 Dremel 中(R, D, V)模型 R,即 Repetition Level,用于表达一个列有重复,即有多个值的情况,其值为重复是在第几层上发生。D,即 Definition Level,用于表达某个列是否为空、在哪里为空,其值为当前列在第几层上有值 V,表示数据值 ❞
❝spark.read.parquet("") ❞
org.apache.spark.sql.DataFrameReader.java
val cls = DataSource.lookupDataSource(source, sparkSession.sessionState.conf)
val jdbc = classOf[JdbcRelationProvider].getCanonicalName
val json = classOf[JsonFileFormat].getCanonicalName
val parquet = classOf[ParquetFileFormat].getCanonicalName
val csv = classOf[CSVFileFormat].getCanonicalName
val libsvm = "org.apache.spark.ml.source.libsvm.LibSVMFileFormat"
val orc = "org.apache.spark.sql.hive.orc.OrcFileFormat"
val nativeOrc = classOf[OrcFileFormat].getCanonicalName
val socket = classOf[TextSocketSourceProvider].getCanonicalName --->DataSourceV2
val rate = classOf[RateStreamProvider].getCanonicalName --->DataSourceV2
private def loadV1Source(paths: String*) = {
// Code path for data source v1.
sparkSession.baseRelationToDataFrame(
DataSource.apply(
sparkSession,
paths = paths,
userSpecifiedSchema = userSpecifiedSchema,
className = source,
options = extraOptions.toMap).resolveRelation())
}
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation
->getOrInferFileFormatSchema()
**Returns an [[InMemoryFileIndex]] that can be used to get partition schema and file list.**
private def createInMemoryFileIndex(globbedPaths: Seq[Path]): InMemoryFileIndex = {
val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
new InMemoryFileIndex(
sparkSession, globbedPaths, options, userSpecifiedSchema, fileStatusCache)
}
InMemoryFileIndex.refresh0()
InMemoryFileIndex.listLeafFiles()
InMemoryFileIndex.bulkListLeafFiles()
val parallelPartitionDiscoveryParallelism =
private[sql] def bulkListLeafFiles(
...
spark.sql.sources.parallelPartitionDiscovery.parallelism 默认10000
sparkSession.sessionState.conf.parallelPartitionDiscoveryParallelism
**设置并行度来防止下面的文件列表生成许多任务**
**in case of large defaultParallelism.**
**val numParallelism = Math.min(paths.size, parallelPartitionDiscoveryParallelism)**
val previousJobDescription = sparkContext.getLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION)
val statusMap = try {
val description = paths.size match {
case 0 =>
s"Listing leaf files and directories 0 paths"
case 1 =>
s"Listing leaf files and directories for 1 path:<br/>${paths(0)}"
case s =>
s"Listing leaf files and directories for $s paths:<br/>${paths(0)}, ..."
}
sparkContext.setJobDescription(description)
sparkContext
.parallelize(serializedPaths, numParallelism)
.mapPartitions { pathStrings =>
val hadoopConf = serializableConfiguration.value
pathStrings.map(new Path(_)).toSeq.map { path =>
(path, listLeafFiles(path, hadoopConf, filter, None))
}.iterator
}.map { case (path, statuses) =>
val serializableStatuses = statuses.map { status =>
// Turn FileStatus into SerializableFileStatus so we can send it back to the driver
val blockLocations = status match {
case f: LocatedFileStatus =>
f.getBlockLocations.map { loc =>
SerializableBlockLocation(
loc.getNames,
loc.getHosts,
loc.getOffset,
loc.getLength)
}
case _ =>
Array.empty[SerializableBlockLocation]
}
SerializableFileStatus(
status.getPath.toString,
status.getLen,
status.isDirectory,
status.getReplication,
status.getBlockSize,
status.getModificationTime,
status.getAccessTime,
blockLocations)
}
(path.toString, serializableStatuses)
}.collect()
...
)
真正读取数据是 DataSourceScanExec
❝注意:这里有 DataSourceV2ScanExec v2 版本,经上面代码分析,parquet,orc 使用的是 v1 版 org.apache.spark.sql.execution.DataSourceScanExec.scala ❞
Physical plan node for scanning data from HadoopFsRelations.
FileSourceScanExec
private lazy val inputRDD: RDD[InternalRow] = {
val readFile: (PartitionedFile) => Iterator[InternalRow] =
relation.fileFormat.buildReaderWithPartitionValues(
sparkSession = relation.sparkSession,
dataSchema = relation.dataSchema,
partitionSchema = relation.partitionSchema,
requiredSchema = requiredSchema,
filters = pushedDownFilters,
options = relation.options,
hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
relation.bucketSpec match {
case Some(bucketing) if relation.sparkSession.sessionState.conf.bucketingEnabled =>
createBucketedReadRDD(bucketing, readFile, selectedPartitions, relation)
case _ =>
createNonBucketedReadRDD(readFile, selectedPartitions, relation)
}
}
private def createNonBucketedReadRDD(
readFile: (PartitionedFile) => Iterator[InternalRow],
selectedPartitions: Seq[PartitionDirectory],
fsRelation: HadoopFsRelation): RDD[InternalRow] = {
128M
val defaultMaxSplitBytes =
fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes
4M
val openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes
上面代码sparkcontent设置的
val defaultParallelism = fsRelation.sparkSession.sparkContext.defaultParallelism
val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum
val bytesPerCore = totalBytes / defaultParallelism
val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " +
s"open cost is considered as scanning $openCostInBytes bytes.")
切文件
val splitFiles = selectedPartitions.flatMap { partition =>
partition.files.flatMap { file =>
val blockLocations = getBlockLocations(file)
if (fsRelation.fileFormat.isSplitable(
fsRelation.sparkSession, fsRelation.options, file.getPath)) {
(0L until file.getLen by maxSplitBytes).map { offset =>
val remaining = file.getLen - offset
val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining
val hosts = getBlockHosts(blockLocations, offset, size)
PartitionedFile(
partition.values, file.getPath.toUri.toString, offset, size, hosts)
}
} else {
val hosts = getBlockHosts(blockLocations, 0, file.getLen)
Seq(PartitionedFile(
partition.values, file.getPath.toUri.toString, 0, file.getLen, hosts))
}
}
}.toArray.sortBy(_.length)(implicitly[Ordering[Long]].reverse)
val partitions = new ArrayBuffer[FilePartition]
val currentFiles = new ArrayBuffer[PartitionedFile]
var currentSize = 0L
/** Close the current partition and move to the next. */
合并小文件,大文件就直接变为partition了。一路下来会以为会切大文件,然而并不会。
def closePartition(): Unit = {
if (currentFiles.nonEmpty) {
val newPartition =
FilePartition(
partitions.size,
currentFiles.toArray.toSeq) // Copy to a new Array.
partitions += newPartition
}
currentFiles.clear()
currentSize = 0
}
// Assign files to partitions using "Next Fit Decreasing"
splitFiles.foreach { file =>
这里遇到大文件直接放入partitions分区,小文件是几个大小达到maxSplitBytes,放入一个分区提高
if (currentSize + file.length > maxSplitBytes) {
closePartition()
}
// Add the given file to the current partition.
currentSize += file.length + openCostInBytes
currentFiles += file
}
closePartition()
new FileScanRDD(fsRelation.sparkSession, readFile, partitions)
}
org.apache.spark.sql.execution.datasources.FileScanRDD
private def readCurrentFile(): Iterator[InternalRow] = {
try {
readFunction(currentFile)
} catch {
case e: FileNotFoundException =>
throw new FileNotFoundException(
e.getMessage + "\n" +
"It is possible the underlying files have been updated. " +
"You can explicitly invalidate the cache in Spark by " +
"running 'REFRESH TABLE tableName' command in SQL or " +
"by recreating the Dataset/DataFrame involved.")
}
}
ParquetFileFormat.buildReaderWithPartitionValues(该方法上面有提)构造reader,
override def buildReaderWithPartitionValues(
...
if (enableVectorizedReader) {
val vectorizedReader = new VectorizedParquetRecordReader(
convertTz.orNull, enableOffHeapColumnVector && taskContext.isDefined, capacity)
val iter = new RecordReaderIterator(vectorizedReader)
// SPARK-23457 Register a task completion lister before `initialization`.
taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close()))
vectorizedReader.initialize(split, hadoopAttemptContext)
vectorizedReader.initBatch(partitionSchema, file.partitionValues)
if (returningBatch) {
vectorizedReader.enableReturningBatches()
}
} else {
...
reader.initialize(split, hadoopAttemptContext)
}
vectorizedReader.initialize(split, hadoopAttemptContext)
->SpecificParquetRecordReaderBase.initialize
->ParquetMetadata footer = readFooter(config, file, range(0, length));注意这里传入的range
->ParquetMetadataConverter.converter.readParquetMetadata(f, filter)
public ParquetMetadata readParquetMetadata(final InputStream from, MetadataFilter filter) throws IOException {
FileMetaData fileMetaData = filter.accept(new MetadataFilterVisitor<FileMetaData, IOException>() {
@Override
public FileMetaData visit(NoFilter filter) throws IOException {
return readFileMetaData(from);
}
@Override
public FileMetaData visit(SkipMetadataFilter filter) throws IOException {
return readFileMetaData(from, true);
}
@Override
public FileMetaData visit(OffsetMetadataFilter filter) throws IOException {
return filterFileMetaDataByStart(readFileMetaData(from), filter);
}
@Override
public FileMetaData visit(RangeMetadataFilter filter) throws IOException {
return filterFileMetaDataByMidpoint(readFileMetaData(from), filter);
}
});
LOG.debug("{}", fileMetaData);
ParquetMetadata parquetMetadata = fromParquetMetadata(fileMetaData);
if (LOG.isDebugEnabled()) LOG.debug(ParquetMetadata.toPrettyJSON(parquetMetadata));
return parquetMetadata;
}
RangeMetadataFilter filterFileMetaDataByMidpoint(readFileMetaData(from), filter);
static FileMetaData filterFileMetaDataByMidpoint(FileMetaData metaData, RangeMetadataFilter filter) {
List<RowGroup> rowGroups = metaData.getRow_groups();
List<RowGroup> newRowGroups = new ArrayList<RowGroup>();
for (RowGroup rowGroup : rowGroups) {
long totalSize = 0;
long startIndex = getOffset(rowGroup.getColumns().get(0));
for (ColumnChunk col : rowGroup.getColumns()) {
totalSize += col.getMeta_data().getTotal_compressed_size();
}
long midPoint = startIndex + totalSize / 2;
if (filter.contains(midPoint)) {
newRowGroups.add(rowGroup);
}
}
metaData.setRow_groups(newRowGroups);
return metaData;
}
到这里分割的关键点找到
现在假设我们有一个40m 的文件, 只有一个 row group, 10m 一分, 那么将会有4个 partitions
但是只有一个 partition 会占有这个 row group 的中点, 所以也只有这一个 partition 会有数据