在2014年11月5日举行的Daytona Gray Sort 100TB Benchmark竞赛中,Databricks 用构建于206个运算节点之上的spark运算框架在23分钟内完成100TB数据的排序,一举击败了该赛事2013年的冠军—Yahoo团队建立在2100个运算节点之上的Hadoop MapReduce集群,该集群耗时72分钟排序了102.5TB的数据。换句话说,Spark用了十分之一的资源在三分之一的时间里完成了Hadoop做的事情。
Hadoop | Spark | |
---|---|---|
被排序数据大小 | 102.5 TB | 100 TB |
持续时间 | 72 mins | 23 mins |
节点数 | 2100 | 206 |
内核数 | 50400 | 6592 |
排序速率 | 1.42 TB/min | 4.27 TB/min |
单节点排序速率 | 0.67 GB/min | 20.7 GB/min |
Spark的单节点排序速率是Hadoop的31倍。这也快太多了吧!当然喽,Spark是内存运算框架,Hadoop MapReduce则是硬盘存储全部中间结果,内存和硬盘速度当然不一样啦,快30多倍也没什么吧。不过请注意,Databricks团队特别说明,为了和Hadoop对比,这次用于排序的Spark集群没有使用它们的内存缓存机制,他们也是用硬盘存储的中间结果!
Spark到底是个怎样神奇的框架,不用内存缓存也能比Hadoop快31倍?
客观来讲,Spark的架构设计,的确使得它在很多应用场景的速度上大幅超越了Hadoop,而排序这是这样的应用场景之一。不过,上面表格显示的31倍速率的差异,是否完全是Spark框架本身的贡献? 如果仔细看Hadoop和Spark团队的两次排序实践,会发现除了运算框架的选取,其他还有很多不同。下面我们就从非框架因素和框架因素两方面来做下分析。
他们的集群配置不同。
2100节点的Hadoop集群建立在一个专门的数据中心内部,单个节点配置:
CPU | 2 x 2.3Ghz hexcore Xeon E5-2630 |
---|---|
Memory | 64GB |
Disk | 12 x 3TB |
Network | 10Gbps |
而206节点的Spark集群则直接使用了Amazon EC2 i2.8xlarge 节点,单个节点配置:
CPU | 32 vCores - 2.5Ghz Intel Xeon E5-2670 v2 |
---|---|
Memory | 244GB |
Disk | 8x800 GB SSD |
Network | iperf shows ~9.5Gbps |
Spark节点的内存量是Hadoop节点的约4倍,而且他们的持久化存储采用的SSD而非Hadoop集群那样的普通硬盘。另外,Spark集群建立在EC2之上,集群本身的运行维护有Amazon团队协助。
两次排序的算法不同。Spark采取的是TimSort,Hadoop则是Terasort。前者衍生自归并排序和插入排序,具有快排的性能,而最差情况时间复杂度为O(NLogN),比快排还要低。后者从原理上讲也是基于归并的,不过针对Hadoop的map->reduce模式进行了优化。
Databricks团队还专门做了针对”高速缓存命中率“(cachelocality)的优化。用于排序的数据,每条记录长度为100Byte,其中key的长度为10Byte。Databricks团队在profilling排序程序时发现高速缓存未命中率(cachemissing rate)很高,原因是排序过程中,每一次数值比较所需的对象指针查找都是随机。于是团队重新设计了算法,每条记录由16Byte表示,其中前10个Byte是Key,最后4个Byte是实际记录的位置信息。这样,每次比较所进行的高速缓存查找几乎是顺序进行的,而不再是随机内存查找。
排除了其上这些非运算框架因素的影响之后,再让我们来看看Hadoop MapReduce和Spark的差别。
我们先看一下双方的软件配置:
2013 冠军 Yahoo Hadoop | 2014 冠军 Databricks Spark | |
---|---|---|
OS | RHEL Server 6.3, Linux 2.6.32-279,19.1.e16.YAHOO.201301014.x86_64 | Linux 3.10.53-56.140.amzn1.x86_64 |
Framework | Hadoop 0.23.7 | Apache Spark master branch (target for Spark 1.2.0 release) |
JDK | Oracle JDK 1.7(u17) – 64bit | OpenJDK 1.7 amzn-2.5.1.2.44.amzn1-86_64 u65-b17 |
Other | N/A | Apache Hadoop HDFS 2.4.1 with short-circuit local reads enabled;Netty 2.0.23.Final with native Epoll transport |
Databricks一方采用的是Amazon的Linuxdistribution,又得到了Amazon团队的支持,很可能在OS层就进行了调优,不过因为笔者未见确切说明的资料,两者又都是采用Linux系统,此处就不讨论OS的区别了。JDK同理。
首先有一点需要说明:Hadoop只有两种操作-- map和reduce。Spark的操作则分为两类:transform和action。其中Transform包括:map,filter, flatMap, mapPartitions, mapPartitionsWithIndex, sample, pipe, union, intersection,distinct, groupByKey, reduceByKey, sortByKey, join, cogroup, Cartesian, coalesce,repartition。Action包括:reduce, collect, count, take,first, takeSample, saveAsTextFile, saveAsSequenceFile, saveAsObjectFile, countByKey,foreach。
Spark中每个transform的返回值都是RDD,也就是transform是那些真正转换了RDD的操作,而Action操作会返回结果或把RDD数据写到存储系统中。Spark在遇到Transformations操作时只会记录需要这样的操作,并不会去执行,需要等到有Actions操作的时候才会真正启动计算过程进行计算。这一点区别对于两个框架性能的影响通过下面这些方面体现。
Shuffle对于Hadoop MapReduce框架是个核心概念。简单而言,HadoopMapReduce Job的map task结束后,会将输出结果存储在map task所在DataNode的硬盘上,这些结果实际上是reducetask的输入。于是reducetask就需要再把maptask的输出读人到内存里,这个整个过程叫做shuffle。reducetask和maptask往往不在一个DataNode上,因此,这个过程除了硬盘读写,一般还包括网络传输。由此可见,shuffle相当消耗资源和时间。
对于Hadoop比较擅长的data ETL或者aggregation的场景,因为输入和输出数据量相差很大,相对而言,shuffle的次数和shuffle数据的体量都比较小。而排序这种操作,输入和输出的数据量是一样的,因此shuffle的负担特别严重。
当然,shuffle并不是Hadoop独有的,Spark同样需要shuffle。但是Spark的shuffle和Hadoop有所不同。Hadoop MapReduce 将处理流程划分为:map, spill, merge, shuffle, sort, reduce等阶段,shuffle是位于map和reduce中间的一个阶段。在 Spark 中,没有这样功能明确的阶段。Spark将用户定义的计算过程转化为一个被称作Job逻辑执行图的有向无环图(DAG),图中的顶点代表RDD,边代表RDD之间的依赖关系。再将这个逻辑执行图转化为物理执行图,具体方法是:从逻辑图后往前推算,遇到 ShuffleDependency 就断开,最后根据断开的次数n,将其化分为(n+1)个stage。每个 stage 里面 task 的数目由该 stage 最后一个 RDD 中的 partition 个数决定。因此,Spark的Job的shuffle数是不固定的。
在Spark早期的版本中,Spark使用的是hash-based的shuffle,通常使用 HashMap 来对 shuffle 来的数据进行聚合,不会对数据进行提前排序。而Hadoop MapReduce 一直使用的就是 sort-based shuffle,进入 combine和 reduce的数据都会先经过排序(mapper 对每段数据先做排序,reducer 的 shuffle 对排好序的每段数据做归并)。不过在Spark1.1已经支持sorted-basedshuffle,在这一点上做到了扬长避短。这次排序比赛中所使用的是Spark 1.2,采用的就是sorted-based shuffle。
此外,Databricks还创建了一个外部shuffle服务,该服务和Spark执行器(executor)本身是分离的。这个服务使得即使是Spark 执行器在因GC导致的暂停时仍然可以正常进行shuffle。
Databricks将Spark的网络通讯模块通过Java Native Interface转给原生的Netty来做,而不是像Yahoo那样,让Hadoop框架自己负担。这样,Spark实际上借助了Netty来提升了网络传输性能。前面说到的外部shuffle服务就是建立在这个模块之上的。
Hadoop MapReduce的所有中间结果(包括map/reduce task内部的中间结果)都会存储在硬盘上,期间虽然会使用缓存,但当中间数据输出超出一定阈值(比如100M),就必须将其写入到磁盘中去。Spark虽然设置成了不使用内存缓存,但即使这种设置,Spark也只有在shuffle的时候才将中间结果输出到硬盘上。两者比较,Spark的硬盘I/O要少得多。
Hadoop MapReduce和Spark都会将计算过程拆解成若干task,这些task分布在不同的DataNode(Hadoop)或Worker(Spark)上执行。不同的是,Hadoop的task对应的是一个个进程,而Spark的task对应的,则是线程。这就天然造成了Hadooptask和Sparktask在启动运行是的overhead不同。Spark上每个task的生命周期都比Hadoop更轻量级,当然也更快。
虽然Hadoop和Spark都支持Java,但这次Databricks是用Scala语言实现的排序算法。函数式编程语言不需要考虑死锁,因为它不修改变量,所以根本不存在"锁"线程的问题。不必担心一个线程的数据,被另一个线程修改,所以可以很放心地把工作分摊到多个线程,实现并发编程。因此,Scala的并行性明显优于面向对象的Java语言。Spark对于Scala的原生支持也是其优势之一。