引言
分布式计算的基本思路是将数据分为多个部分,将同样的数据操作方式在数据的不同部分上执行,分别获得结果,然后通过“汇聚处理”的方式得到结果。如何将数据分为多个部分(也就是“分片”)便是其中的一个重要组成部分。Spark框架同样对使用分片的操作,将数据分片(partition)处理。本文对Spark框架中的数据分片作简单介绍。
输入数据的分片
对于读取批数据生成rdd的操作,数据的分片都是通过输入文件格式本身提供的getSplit方法来对数据进行分片。本部分主要介绍对于不同数据源的数据,spark如何定义/获取数据的分片数。text文件分片(sc.textFile为例):
deftextFile(
path:String,
minPartitions:Int=defaultMinPartitions):RDD[String]=withScope{
assertNotStopped()
hadoopFile(path,classOf[TextInputFormat]/* 数据文件的输入格式 :org.apache.hadoop.mapred.TextInputFormat */
,classOf[LongWritable],classOf[Text],
minPartitions).map(pair=>pair._2.toString).setName(path)
}
hadoopFile方法生成HadoopRDD
HadoopRDD(
this,
confBroadcast,
Some(setInputPathsFunc),
inputFormatClass,
keyClass,
valueClass,
minPartitions)/* minPartitions为生成该RDD的最小分片数,表示该RDD的分片数最小值,默认为2*/
.setName(path)通过跟踪该方法可以看出该函数最终会调用到HadoopRDD的getPartitions方法,在该方法中通过inputFormat的getSplit方法计算分片数
getInputFormat(jobConf).getSplits(jobConf,minPartitions)hbase表分片在读取HBase数据时,没有类似textFile的接口的封装,可调用如下接口生成给予hbase数据的RDD,
valhBaseRDD=sc.newAPIHadoopRDD(conf,
classOf[TableInputFormat],/*该类的全类名为: org.apache.hadoop.hbase.mapreduce.TableInputFormat */
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
该方法生成newNewHadoopRDD(this,fClass,kClass,vClass,jconf)在执行action操作时,同样调用到rdd.partitions方法,跟踪至newHadoopRDD之后,发现调用到
inputFormat.getSplits(newJobContextImpl(_conf,jobId))查看对应的getSplits方法可以看出:
splits伪代码如下(源码可参考TableInputFormatBase.calculateRebalancedSplits):
while(i
{
if(splits(i).size>averagesize*3){
if(!splitAble)
resultsplits.add(split(i))
else{
(split1,split2)=Split(splits(i))
resultsplits.add(split1)
resultsplits.add(split2)
}
i++
}
elseif(splits(i).size>averagesize) {
resultsplits.add(split(i))
i++
}else{
startKey=split(i).getStartRow
i++;
while(totalSize+splits(i).size
totalSize+=splits(i).size
endKey=splits(i).getEndRow
}
resultsplits.add(newTableSplit(startKey,endKey,*))
}
}Kafka数据的分片Spark框架在读取Kafka消息时,将Kafka数据抽象为KafkaRDD(SparkStreaming)或者KafkaSourceRDD(StructedStreaming),查看对应RDD的getPartitions方法和定义:KafkaSourceRDD:
overridedefgetPartitions:Array[Partition]={
offsetRanges.zipWithIndex.map{case(o,i)=>newKafkaSourceRDDPartition(i,o) }.toArray
}
offsetRanges的数据结构为
private[kafka010]caseclassKafkaSourceRDDOffsetRange(
topicPartition:TopicPartition,
fromOffset:Long,
untilOffset:Long,
preferredLoc:Option[String])可以看出partition个数为对应的TopicPartition的个数KafkaRDD
overridedefgetPartitions:Array[Partition]={
offsetRanges.zipWithIndex.map{case(o,i)=>
newKafkaRDDPartition(i,o.topic,o.partition,o.fromOffset,o.untilOffset)
}.toArray
}
offsetRanges数据结构为:
finalclassOffsetRangeprivate(
valtopic:String,
valpartition:Int,
valfromOffset:Long,
valuntilOffset:Long)可以看出partition个数为对应的partition的个数总结在spark框架中,对于输入数据获取RDD的处理:
读取数据时的分片由数据量,数据"存储格式"决定,框架/应用并不能真正决定分片数。
对于通过数据生成的RDD,如makeRDD,parallize等方法生成的RDD,则可以指定相应的RDD的分片数。
对于FileInputFormat格式的数据,可通过设置最小的分片数来扩大RDD分片数,但不能决定最终由多少分片数(最终分片数 >= 设置的最小分片数)
其他类型的数据/文件的分片方法也是通过输入文件格式的getSplit方法来获取分片
Split方法直接决定了输入数据的分片数,影响应用并行度,在一些场景下,应用可以定制特定的getSplits方法来实现一些特殊需求。如hive在处理小文件时自定义了combineFileInputForamt,Hbase在以region为单位划分split之后,再跟进每个region数据量来合并/分拆split来优化性能
PS: 其他相关的数据分片对于输入文件的分片,不同的文件格式使用的分片方法不尽相同。 如hive中使用的parquet,RCFIle格式文件,其getsplits方法直接使用的是FileInputFormat.getSplits, 而orc格式文件的getsplits方法则是继承于InputFormat在Hive中默认使用的是CombineFileInputFormat,它的作用是在启动map时,会将多个小文件进行合并,已启动较少的map提升应用运行速度。其getsplits方法在合并小文件时会考虑更多的因素,如:
经过转换的分片
Spark框架中,RDD的分片数决定了对RDD处理时的并发度,因此合理的RDD分片数,对应用的性能有较大影响。
RDD的转换通常不会改变RDD的partition数,如map,flatmap,mappartitions等操作并没有传入partition数的API,无法修改新生成的RDD的的分片数。可参考org.apache.spark.rdd.RDD
如果需要强制修改新生成RDD的分片数,可直接调用RDD.repartition,RDD.coalesce强制修改新生成RDD的分片数
对于RDD[KEY,VALUE]类型的RDD的操作如join,reduceByKey,aggregateByKey,combineByKey等接口可通过传入分片数/设置partitioner等方式设置shuffle之后的RDD的partition个数,从而调整后续的stage的并发task个数.可参考org.apache.spark.rdd.PairRDDFunctions
对于需要进行shuffle操作的算子,在变换的过程中,会自动生成shuffledRDD,该RDD的分片数可通过触发shuffle操作的算子调用时设置。如果没有设置时,则会使用默认的分片数。
对于普通的应用shuffle后的默认的分片数由spark.default.parallelism参数决定,默认200 对于sql相关的操作,shuffle后的默认分片数由spark.sql.shuffle.partitions操作决定,默认为200
领取专属 10元无门槛券
私享最新 技术干货