前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >spark sql多维分析优化——提高读取文件的并行度

spark sql多维分析优化——提高读取文件的并行度

作者头像
数据仓库践行者
发布2020-04-18 00:09:52
2.2K0
发布2020-04-18 00:09:52
举报

这次分享多维分析优化的另一种情况

本文大纲

1、描述问题背景

2、讲一下解决思路

3、解决办法(spark sql处理parquet row group原理及分区原理,参数测试,解决方案)

4、效果

1、描述问题

代码如下:

代码语言:javascript
复制
select   netease_user,     if(campaign_id is null, 'all', campaign_id) as campaign_id,     if(spec_id is null, 'all', spec_id) as spec_id,     if(app_bundle is null, 'all', app_bundle) as app_bundle,     if(render_name is null, 'all', render_name) as render_name,     platform,    sum(bidfloor) as success_bidfloor,    count(distinct clk_request_id) as click_pv,     count(distinct exp_deviceid) as exp_uv,    count(distinct exp_request_id) as exp_pv,     count(distinct clk_deviceid) as click_uv,  round(sum(case when winprice<0 then 0 else winprice end)/1000, 4) as costfrom(select  distinct    nvl(netease_user , 'true') as netease_user,     nvl(render_name , 'null') as render_name,     platform,     nvl(campaign_id, 'null') as campaign_id,    nvl(spec_id, 'null') as spec_id,     nvl(app_bundle , 'null') as app_bundle,    clk_request_id, exp_deviceid, exp_request_id, clk_deviceid, winprice, bidfloorfrom table_a where day = '20190815' and platform is not null) tmpgroup by  netease_user, campaign_id, spec_id, app_bundle, render_name, platformgrouping sets(  ( netease_user, platform),
  ( netease_user, platform, campaign_id),  ( netease_user, platform, spec_id),  ( netease_user, platform,app_bundle),  ( netease_user, platform,render_name),
    ( netease_user, platform,campaign_id, spec_id),    ( netease_user, platform,campaign_id, app_bundle),     ( netease_user, platform,campaign_id, render_name),     ( netease_user, platform, spec_id, app_bundle),     ( netease_user, platform, spec_id, render_name),     ( netease_user, platform, app_bundle, render_name),        ( netease_user, platform, campaign_id, spec_id, app_bundle),     ( netease_user, platform, spec_id, app_bundle, render_name),     ( netease_user, platform, campaign_id, app_bundle, render_name),     ( netease_user, platform, campaign_id, spec_id, render_name),        ( netease_user, campaign_id, spec_id, app_bundle, render_name, platform));

整体逻辑与上一篇:【spark sql多维分析优化——细节是魔鬼】 差不多。

不同的是上一篇的基础表 table_a的总量很大,有几十亿,但是这次的基础表数据量有几百万,并不算很大。

但是运行时长还是挺长的:

需要60分钟左右。

来看一下日志:

第二个job比较慢,一定就是expand 慢了:

从上面可以看到,数据过滤后是582w,经过两次expand 后,变成了4.6个亿,4.6个亿的量本来不算大,但因为只有2个task在处理,就显的异常的慢

2、解决思路

解决多维分析的办法一般是:把逻辑拆开,分别计算指标,然后再 join 起来,这个也是上一篇【spark sql多维分析优化——细节是魔鬼】用到的一个办法。但这个办法有个缺点就是如果指标比较多的情况下,代码会写的很长,数据也会被多加载几遍。

对于这次案例来说,不用拆代码,因为5亿左右的量并不算很大,我们只用把task给扩展一下,从2个扩展到20个应该就能很快处理完了。

该怎么扩展呢?

首先我们先简化一下代码:

这里的distinct 是没必要的,从对业务的了解以及日志的数据来看,distinct 并没使数据大量减少,并且由于distinct引起了shuffle,也会占用一部分时间,因此可以把distinct去掉。

去掉distinct后,expand 操作就会被合并到Job 1 中,这样以来我们只要在读取文件时增加task, 让每个task处理更少的数据,就能提高效率。

3、解决办法及遇到的问题

该怎么提高读取文件的并行度呢?

基础表 table_a 存储格式为parquet,我们首先要了解spark sql 是怎么来处理parquet文件的。

3.1 spark sql分区方式(parquet)

spark 通过FileSourceScanExec 来处理hdfs文件:

代码语言:javascript
复制
/** 基础表table_a不为分桶表,读取数据的分区方式走此方法*/private def createNonBucketedReadRDD(      readFile: (PartitionedFile) => Iterator[InternalRow],      selectedPartitions: Seq[PartitionDirectory],      fsRelation: HadoopFsRelation): RDD[InternalRow] = {    /**defaultMaxSplitBytes 即为spark.sql.files.maxPartitionBytes 参数,默认为128M*/    val defaultMaxSplitBytes =      fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes     /**openCostInBytes 即为spark.sql.files.openCostInBytes 参数,默认为4M*/    val openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes    /**defaultParallelism  并行度参数 即 spark.default.parallelism */    val defaultParallelism = fsRelation.sparkSession.sparkContext.defaultParallelism    val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum    val bytesPerCore = totalBytes / defaultParallelism        /**分片方法的计算公式*/    /**openCostInBytes与bytesPerCore取最大,然后再与defaultMaxSplitBytes取最小*/    val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))       logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " +      s"open cost is considered as scanning $openCostInBytes bytes.")        /**遍历文件*/    val splitFiles = selectedPartitions.flatMap { partition =>      partition.files.flatMap { file =>        val blockLocations = getBlockLocations(file)           /**判断文件是否支持分割,parquet可分割*/          if (fsRelation.fileFormat.isSplitable(            fsRelation.sparkSession, fsRelation.options, file.getPath)) {          /**依据分片大小maxSplitBytes计算要多少分区来处理数据*/          (0L until file.getLen by maxSplitBytes).map { offset =>            val remaining = file.getLen - offset            /**假如剩余量不足,那么该文件剩余的作为一个分区*/            val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining            val hosts = getBlockHosts(blockLocations, offset, size)            PartitionedFile(              partition.values, file.getPath.toUri.toString, offset, size, hosts)          }        } else {          /**判断文件是否支持分割,如果不能分割,一个文件一个partition*/          val hosts = getBlockHosts(blockLocations, 0, file.getLen)          Seq(PartitionedFile(            partition.values, file.getPath.toUri.toString, 0, file.getLen, hosts))        }      }    }.toArray.sortBy(_.length)(implicitly[Ordering[Long]].reverse)    .....

如果想要增加分区,即task 数量,就要降低最终分片 maxSplitBytes的值,可以通过降低spark.sql.files.maxPartitionBytes 的值来降低 maxSplitBytes 的值

3.2 参数测试及问题

spark.sql.files.maxPartitionBytes 参数默认为128M,生成了四个分区:

table_a 在hdfs 20190815日的数据情况:

代码语言:javascript
复制
205.2 M  part-00000-30ceee1e-2ed6-4239-8a6b-45fc6cbf1ef6.c000205.2 M  part-00001-30ceee1e-2ed6-4239-8a6b-45fc6cbf1ef6.c0003.8 M    part-00002-30ceee1e-2ed6-4239-8a6b-45fc6cbf1ef6.c000

共三个数据文件,如果设置参数 spark.sql.files.maxPartitionBytes为64M,会把数据分8个块:

代码语言:javascript
复制
##part-00000  四块range: 0-67108864  ; range: 67108864-134217728;  range: 134217728-201326592range: 201326592-215189723

代码语言:javascript
复制
##part-00001  四块range: 0-67108864  ; range: 67108864-134217728;  range: 134217728-201326592range: 201326592-215167669

代码语言:javascript
复制
##part-00002  一块range: 0-4002630

启动7个task:

理论上有6个task分别负责每个64M的块数据,然后最后一个task负责part-00000,part-00001剩余的不足64M的两个块以及part-00002。

然而事实是:

分区数确实增加了,由四个增加到了7个,但是新增的3个却没处理什么数据,大部分的数据还是4个partition在处理,所以还是很慢~~~~

task数增加了,但是数据并没有均分到每个task,为什么呢?

仔细研究了一下parquet 文件的结构:

parquet 文件的数据是以row group 存储,一个parquet 文件可能只含有一个row group,也有可能含有多个row group ,row group 的大小 主要由parquet.block.size 决定。

spark 在处理parquet 文件时,一个row group 只能由一个task 来处理,在hdfs 中一个row group 可能横跨hdfs block ,那么spark是怎么保证一个task只处理一个 row group 的呢?

代码语言:javascript
复制
static FileMetaData filterFileMetaDataByMidpoint(FileMetaData metaData, RangeMetadataFilter filter) {  List<RowGroup> rowGroups = metaData.getRow_groups();  List<RowGroup> newRowGroups = new ArrayList<RowGroup>();  for (RowGroup rowGroup : rowGroups) {    long totalSize = 0;    long startIndex = getOffset(rowGroup.getColumns().get(0));    for (ColumnChunk col : rowGroup.getColumns()) {      totalSize += col.getMeta_data().getTotal_compressed_size();    }    /**计算row group中点*/    long midPoint = startIndex + totalSize / 2;    /**谁拥有这个row group的中点,谁就可以处理这个row group*/    if (filter.contains(midPoint)) {      newRowGroups.add(rowGroup);    }  }  metaData.setRow_groups(newRowGroups);  return metaData;}

这就导致并不是所有task 都能够分到数据。

检查table_a发现,生成table_a时,parquet.block.size 用的默认值128M ,这样就导致一个row group 有128M 的大小。

parquet.block.size 是可以依据实际使用情况来调优的,对于做多维分析表,可以设置稍小一点。

最终 经过调试设置parquet.block.size 为16M ;设置spark.sql.files.maxPartitionBytes为16M

4、效果

修改参数后:

读取hdfs文件时,并行了22个task,并且每个task处理数据均匀。

2分40秒就能完成,有没有棒棒哒?

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-08-25,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 数据仓库践行者 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1、描述问题
  • 2、解决思路
  • 3、解决办法及遇到的问题
    • 3.1 spark sql分区方式(parquet)
      • 3.2 参数测试及问题
      • 4、效果
      相关产品与服务
      大数据
      全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档