Spark应用数据分片介绍

引言

分布式计算的基本思路是将数据分为多个部分,将同样的数据操作方式在数据的不同部分上执行,分别获得结果,然后通过“汇聚处理”的方式得到结果。如何将数据分为多个部分(也就是“分片”)便是其中的一个重要组成部分。Spark框架同样对使用分片的操作,将数据分片(partition)处理。本文对Spark框架中的数据分片作简单介绍。

输入数据的分片

对于读取批数据生成rdd的操作,数据的分片都是通过输入文件格式本身提供的getSplit方法来对数据进行分片。本部分主要介绍对于不同数据源的数据,spark如何定义/获取数据的分片数。text文件分片(sc.textFile为例):

deftextFile(

path:String,

minPartitions:Int=defaultMinPartitions):RDD[String]=withScope{

assertNotStopped()

hadoopFile(path,classOf[TextInputFormat]/* 数据文件的输入格式 :org.apache.hadoop.mapred.TextInputFormat */

,classOf[LongWritable],classOf[Text],

minPartitions).map(pair=>pair._2.toString).setName(path)

}

hadoopFile方法生成HadoopRDD

HadoopRDD(

this,

confBroadcast,

Some(setInputPathsFunc),

inputFormatClass,

keyClass,

valueClass,

minPartitions)/* minPartitions为生成该RDD的最小分片数,表示该RDD的分片数最小值,默认为2*/

.setName(path)通过跟踪该方法可以看出该函数最终会调用到HadoopRDD的getPartitions方法,在该方法中通过inputFormat的getSplit方法计算分片数

getInputFormat(jobConf).getSplits(jobConf,minPartitions)hbase表分片在读取HBase数据时,没有类似textFile的接口的封装,可调用如下接口生成给予hbase数据的RDD,

valhBaseRDD=sc.newAPIHadoopRDD(conf,

classOf[TableInputFormat],/*该类的全类名为: org.apache.hadoop.hbase.mapreduce.TableInputFormat */

classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],

classOf[org.apache.hadoop.hbase.client.Result])

该方法生成newNewHadoopRDD(this,fClass,kClass,vClass,jconf)在执行action操作时,同样调用到rdd.partitions方法,跟踪至newHadoopRDD之后,发现调用到

inputFormat.getSplits(newJobContextImpl(_conf,jobId))查看对应的getSplits方法可以看出:

splits伪代码如下(源码可参考TableInputFormatBase.calculateRebalancedSplits):

while(i

{

if(splits(i).size>averagesize*3){

if(!splitAble)

resultsplits.add(split(i))

else{

(split1,split2)=Split(splits(i))

resultsplits.add(split1)

resultsplits.add(split2)

}

i++

}

elseif(splits(i).size>averagesize) {

resultsplits.add(split(i))

i++

}else{

startKey=split(i).getStartRow

i++;

while(totalSize+splits(i).size

totalSize+=splits(i).size

endKey=splits(i).getEndRow

}

resultsplits.add(newTableSplit(startKey,endKey,*))

}

}Kafka数据的分片Spark框架在读取Kafka消息时,将Kafka数据抽象为KafkaRDD(SparkStreaming)或者KafkaSourceRDD(StructedStreaming),查看对应RDD的getPartitions方法和定义:KafkaSourceRDD:

overridedefgetPartitions:Array[Partition]={

offsetRanges.zipWithIndex.map{case(o,i)=>newKafkaSourceRDDPartition(i,o) }.toArray

}

offsetRanges的数据结构为

private[kafka010]caseclassKafkaSourceRDDOffsetRange(

topicPartition:TopicPartition,

fromOffset:Long,

untilOffset:Long,

preferredLoc:Option[String])可以看出partition个数为对应的TopicPartition的个数KafkaRDD

overridedefgetPartitions:Array[Partition]={

offsetRanges.zipWithIndex.map{case(o,i)=>

newKafkaRDDPartition(i,o.topic,o.partition,o.fromOffset,o.untilOffset)

}.toArray

}

offsetRanges数据结构为:

finalclassOffsetRangeprivate(

valtopic:String,

valpartition:Int,

valfromOffset:Long,

valuntilOffset:Long)可以看出partition个数为对应的partition的个数总结在spark框架中,对于输入数据获取RDD的处理:

读取数据时的分片由数据量,数据"存储格式"决定,框架/应用并不能真正决定分片数。

对于通过数据生成的RDD,如makeRDD,parallize等方法生成的RDD,则可以指定相应的RDD的分片数。

对于FileInputFormat格式的数据,可通过设置最小的分片数来扩大RDD分片数,但不能决定最终由多少分片数(最终分片数 >= 设置的最小分片数)

其他类型的数据/文件的分片方法也是通过输入文件格式的getSplit方法来获取分片

Split方法直接决定了输入数据的分片数,影响应用并行度,在一些场景下,应用可以定制特定的getSplits方法来实现一些特殊需求。如hive在处理小文件时自定义了combineFileInputForamt,Hbase在以region为单位划分split之后,再跟进每个region数据量来合并/分拆split来优化性能

PS: 其他相关的数据分片对于输入文件的分片,不同的文件格式使用的分片方法不尽相同。 如hive中使用的parquet,RCFIle格式文件,其getsplits方法直接使用的是FileInputFormat.getSplits, 而orc格式文件的getsplits方法则是继承于InputFormat在Hive中默认使用的是CombineFileInputFormat,它的作用是在启动map时,会将多个小文件进行合并,已启动较少的map提升应用运行速度。其getsplits方法在合并小文件时会考虑更多的因素,如:

经过转换的分片

Spark框架中,RDD的分片数决定了对RDD处理时的并发度,因此合理的RDD分片数,对应用的性能有较大影响。

RDD的转换通常不会改变RDD的partition数,如map,flatmap,mappartitions等操作并没有传入partition数的API,无法修改新生成的RDD的的分片数。可参考org.apache.spark.rdd.RDD

如果需要强制修改新生成RDD的分片数,可直接调用RDD.repartition,RDD.coalesce强制修改新生成RDD的分片数

对于RDD[KEY,VALUE]类型的RDD的操作如join,reduceByKey,aggregateByKey,combineByKey等接口可通过传入分片数/设置partitioner等方式设置shuffle之后的RDD的partition个数,从而调整后续的stage的并发task个数.可参考org.apache.spark.rdd.PairRDDFunctions

对于需要进行shuffle操作的算子,在变换的过程中,会自动生成shuffledRDD,该RDD的分片数可通过触发shuffle操作的算子调用时设置。如果没有设置时,则会使用默认的分片数。

对于普通的应用shuffle后的默认的分片数由spark.default.parallelism参数决定,默认200 对于sql相关的操作,shuffle后的默认分片数由spark.sql.shuffle.partitions操作决定,默认为200

  • 发表于:
  • 原文链接http://kuaibao.qq.com/s/20180511G14H3E00?refer=cp_1026
  • 腾讯「云+社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。

扫码关注云+社区

领取腾讯云代金券

玩转腾讯云 有奖征文活动