前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >源码:Spark SQL 分区特性第一弹

源码:Spark SQL 分区特性第一弹

作者头像
Spark学习技巧
发布2018-12-25 17:43:05
1K0
发布2018-12-25 17:43:05
举报
文章被收录于专栏:Spark学习技巧

头条号上说过近期分享Spark SQL系列文章,前面在头条号上分享了Dataset API的基本操作和复杂操作,不知道下面大家有没有自己测试一下。

今天主要是分享Spark SQL Dataset数据源的分区特性,而且是第一弹的数据格式是partquet。

常见RDD分区

Spark Core 中的RDD的分区特性大家估计都很了解,这里说的分区特性是指从数据源读取数据的第一个RDD或者Dataset的分区,而后续再介绍转换过程中分区的变化。

举几个浪尖在星球里分享比较多的例子,比如:

  1. Spark Streaming 与kafka 结合 DirectDstream 生成的微批RDD(kafkardd)分区数和kafka分区数一样。
  2. Spark Streaming 与kafka结合 基于receiver的方式,生成的微批RDD(blockRDD),分区数就是block数。
  3. 普通的文件RDD,那么分可分割和不可分割,通常不可分割的分区数就是文件数。可分割需要计算而且是有条件的,在星球里分享过了。

这些都很简单,那么今天咱们要谈的是Spark DataSet的分区数的决定因素。

准备数据

首先是由Seq数据集合生成一个Dataset

代码语言:javascript
复制
val sales = spark.createDataFrame(Seq(
      ("Warsaw", 2016, 110),
      ("Warsaw", 2017, 10),
      ("Warsaw", 2015, 100),
      ("Warsaw", 2015, 50),
      ("Warsaw", 2015, 80),
      ("Warsaw", 2015, 100),
      ("Warsaw", 2015, 130),
      ("Warsaw", 2015, 160),
      ("Warsaw", 2017, 200),
      ("Beijing", 2017, 100),
      ("Beijing", 2016, 150),
      ("Beijing", 2015, 50),
      ("Beijing", 2015, 30),
      ("Beijing", 2015, 10),
      ("Beijing", 2014, 200),
      ("Beijing", 2014, 170),
      ("Boston", 2017, 50),
      ("Boston", 2017, 70),
      ("Boston", 2017, 110),
      ("Boston", 2017, 150),
      ("Boston", 2017, 180),
      ("Boston", 2016, 30),
      ("Boston", 2015, 200),
      ("Boston", 2014, 20)
    )).toDF("city", "year", "amount")

将Dataset存处为partquet格式的hive表,分两种情况:

用city和year字段分区

代码语言:javascript
复制
sales.write.partitionBy("city","year").mode(SaveMode.Overwrite).saveAsTable("ParquetTestCityAndYear")

用city字段分区

代码语言:javascript
复制
sales.write.partitionBy("city").mode(SaveMode.Overwrite).saveAsTable("ParquetTestCity")

读取数据采用的是

代码语言:javascript
复制
val res = spark.read.parquet("/user/hive/warehouse/parquettestcity")

直接展示,结果发现结果会随着spark.default.parallelism变化而变化。文章里只读取city字段分区的数据,特点就是只有单个分区字段。

1. spark.default.parallelism =40

Dataset的分区数是由参数:

代码语言:javascript
复制
println("partition size = "+res.rdd.partitions.length)

目录数和生成的FileScanRDD的分区数分别数下面截图的第一行和第二行。

这个分区数目正好是文件数,那么假如不了解细节的话,肯定会认为分区数就是由文件数决定的,其实不然。

2. spark.default.parallelism =4

Dataset的分区数是由参数:

代码语言:javascript
复制
println("partition size = "+res.rdd.partitions.length)

目录数和生成的FileScanRDD的分区数分别数下面截图的第一行和第二行。

那么数据源生成的Dataset的分区数到底是如何决定的呢?

我们这种情况,我只能告诉你是由下面的函数在生成FileScanRDD的时候计算得到的,具体计算细节可以仔细阅读该函数。该函数是类FileSourceScanExec的方法。

代码语言:javascript
复制
private def createNonBucketedReadRDD(
                                        readFile: (PartitionedFile) => Iterator[InternalRow],
                                        selectedPartitions: Seq[PartitionDirectory],
                                        fsRelation: HadoopFsRelation): RDD[InternalRow] = {
    /*
      selectedPartitions 的大小代表目录数目
     */
    println("selectedPartitions.size : "+ selectedPartitions.size)
    val defaultMaxSplitBytes =
      fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes
    val openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes

    // spark.default.parallelism
    val defaultParallelism = fsRelation.sparkSession.sparkContext.defaultParallelism

    // 计算文件总大小,单位字节数
    val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum

    //计算平均每个并行度读取数据大小
    val bytesPerCore = totalBytes / defaultParallelism

    // 首先spark.sql.files.openCostInBytes 该参数配置的值和bytesPerCore 取最大值
    // 然后,比较spark.sql.files.maxPartitionBytes 取小者
    val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
    logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " +
      s"open cost is considered as scanning $openCostInBytes bytes.")

    // 这对目录遍历
    val splitFiles = selectedPartitions.flatMap { partition =>
      partition.files.flatMap { file =>
        val blockLocations = getBlockLocations(file)

        //判断文件类型是否支持分割,以parquet为例,是支持分割的
        if (fsRelation.fileFormat.isSplitable(
          fsRelation.sparkSession, fsRelation.options, file.getPath)) {

//          eg. 0 until 2不包括 2。相当于
//        println(0 until(10) by 3) 输出 Range(0, 3, 6, 9)
          (0L until file.getLen by maxSplitBytes).map { offset =>

            // 计算文件剩余的量
            val remaining = file.getLen - offset

//            假如剩余量不足 maxSplitBytes 那么就剩余的作为一个分区
            val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining

//            位置信息
            val hosts = getBlockHosts(blockLocations, offset, size)
            PartitionedFile(
              partition.values, file.getPath.toUri.toString, offset, size, hosts)
          }
        } else {
//          不可分割的话,那即是一个文件一个分区
          val hosts = getBlockHosts(blockLocations, 0, file.getLen)
          Seq(PartitionedFile(
            partition.values, file.getPath.toUri.toString, 0, file.getLen, hosts))
        }
      }
    }.toArray.sortBy(_.length)(implicitly[Ordering[Long]].reverse)

    val partitions = new ArrayBuffer[FilePartition]
    val currentFiles = new ArrayBuffer[PartitionedFile]
    var currentSize = 0L

    /** Close the current partition and move to the next. */
    def closePartition(): Unit = {
      if (currentFiles.nonEmpty) {
        val newPartition =
          FilePartition(
            partitions.size,
            currentFiles.toArray.toSeq) // Copy to a new Array.
        partitions += newPartition
      }
      currentFiles.clear()
      currentSize = 0
    }

    // Assign files to partitions using "Next Fit Decreasing"
    splitFiles.foreach { file =>
      if (currentSize + file.length > maxSplitBytes) {
        closePartition()
      }
      // Add the given file to the current partition.
      currentSize += file.length + openCostInBytes
      currentFiles += file
    }
    closePartition()

    println("FileScanRDD partitions size : "+partitions.size)
    new FileScanRDD(fsRelation.sparkSession, readFile, partitions)
  }
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2018-12-09,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 浪尖聊大数据 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档