前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >原 荐 Spark框架核心概念

原 荐 Spark框架核心概念

作者头像
云飞扬
发布2018-05-17 15:52:57
1.3K0
发布2018-05-17 15:52:57
举报
文章被收录于专栏:星汉技术星汉技术

Spark框架核心概念

    首先介绍Spark中的核心名词概念,然后再逐一详细说明。

RDD:弹性分布式数据集,是Spark最核心的数据结构。有分区机制,所以可以分布式进行处理。有容错机制,通过RDD之间的依赖关系来恢复数据。

依赖关系:RDD的依赖关系是通过各种Transformation(变换)来得到的。父RDD和子RDD之间的依赖关系分两种:①窄依赖②宽依赖。

    ①窄依赖:父RDD的分区和子RDD的分区关系是:一对一。

    窄依赖不会发生Shuffle,执行效率高,spark框架底层会针对多个连续的窄依赖执行流水线优化,从而提高性能。例如map、flatMap等方法都是窄依赖方法。

    ②宽依赖:父RDD的分区和子RDD的分区关系是:一对多。

    宽依赖会产生shuffle,会产生磁盘读写,无法优化。

DAG:有向无环图,当一个RDD的依赖关系形成之后,就形成了一个DAG。一般来说,一个DAG,最后都至少会触发一个Action操作,触发执行。一个Action对应一个Job任务。

Stage:一个DAG会根据RDD之间的依赖关系进行Stage划分,流程是:以Action为基准,向前回溯,遇到宽依赖,就形成一个Stage。遇到窄依赖,则执行流水线优化(将多个连续的窄依赖放到一起执行)。

task:任务。一个分区对应一个task。可以这样理解:一个Stage是一组Task的集合。

    RDD的Transformation(变换)操作:懒执行,并不会立即执行。

    RDD的Action(执行)操作:触发真正的执行。

1、RDD

    Resilient Distributed Datasets (RDDs)

    Spark revolves around the concept of a resilient distributed dataset (RDD), which is a fault-tolerant collection of elements that can be operated on in parallel. There are two ways to create RDDs: parallelizing an existing collection in your driver program, or referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat.

    RDD弹性分布式数据集:就是带有分区的集合类型。特点是可以并行操作,并且是容错的。

    有两种方法可以创建RDD:

    1)执行Transform操作(变换操作),

    2)读取外部存储系统的数据集,如HDFS,HBase,或任何与Hadoop有关的数据源。

    1.RDD入门示例

案例一

    Parallelized collections are created by calling SparkContext’s parallelize method on an existing collection in your driver program (a Scala Seq). The elements of the collection are copied to form a distributed dataset that can be operated on in parallel. For example, here is how to create a parallelized collection holding the numbers 1 to 5:

代码语言:javascript
复制
val data = Array(1, 2, 3, 4, 5)
val r1 = sc.parallelize(data)
val r2 = sc.parallelize(data,2)

    你可以这样理解RDD:

    它是spark提供的一个特殊集合类。诸如普通的集合类型,如传统的Array:(1,2,3,4,5)是一个整体,但转换成RDD后,我们可以对数据进行Partition(分区)处理,这样做的目的就是为了分布式。

    你可以让这个RDD有两个分区,那么有可能是这个形式:RDD(1,2) (3,4)。

    这样设计的目的在于:可以进行分布式运算。

    注:创建RDD的方式有多种,比如案例一中是基于一个基本的集合类型(Array)转换而来,像parallelize这样的方法还有很多,之后就会学到。此外,我们也可以在读取数据集时就创建RDD。

案例二

    Spark can create distributed datasets from any storage source supported by Hadoop, including your local file system, HDFS, Cassandra, HBase, Amazon S3, etc. Spark supports text files, SequenceFiles, and any other Hadoop InputFormat.

    Text file RDDs can be created using SparkContext’s textFile method. This method takes an URI for the file (either a local path on the machine, or a hdfs://, s3n://, etc URI) and reads it as a collection of lines. Here is an example invocation:

代码语言:javascript
复制
val distFile = sc.textFile("data.txt")

    查看RDD

代码语言:javascript
复制
rdd.collect

    收集rdd中的数据组成Array返回,此方法将会把分布式存储的rdd中的数据集中到一台机器中组建Array。

    在生产环境下一定要慎用这个方法,容易内存溢出。

    查看RDD的分区数量:

代码语言:javascript
复制
rdd.partitions.size

    查看RDD每个分区的元素:

代码语言:javascript
复制
rdd.glom.collect

    此方法会将每个分区的元素以Array形式返回。

2.分区概念

    在上图中,一个RDD有item1~item25个数据,共5个分区,分别在3台机器上进行处理。此外,spark并没有原生的提供rdd的分区查看工具,我们可以自己来写一个。

    示例代码:

代码语言:javascript
复制
import org.apache.spark.rdd.RDD
import scala.reflect.ClassTag
object su {
 def debug[T: ClassTag](rdd: RDD[T]) = {
  rdd.mapPartitionsWithIndex((i: Int, iter: Iterator[T]) => {
   val m = scala.collection.mutable.Map[Int, List[T]]()
   var list = List[T]()
   while (iter.hasNext) {
    list = list :+ iter.next
   }
   m(i) = list
   m.iterator
  }).collect().foreach((x: Tuple2[Int, List[T]]) => {
   val i = x._1
   println(s"partition:[$i]")
   x._2.foreach { println }
  })
 }
}

3.RDD操作

    针对RDD的操作,分两种,一种是Transformation(变换),一种是Actions(执行)。

    Transformation(变换)操作属于懒操作,不会真正触发RDD的处理计算。

    Actions(执行)操作才会真正触发。

1>Transformations

map(func)

    Return a new distributed dataset formed by passing each element of the source through a function func.

    参数是函数,函数应用于RDD每一个元素,返回值是新的RDD。

    案例展示:

    map将函数应用到rdd的每个元素中。

代码语言:javascript
复制
val rdd = sc.makeRDD(List(1,3,5,7,9))
rdd.map(_*10)

flatMap(func)

    Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item).

    扁平化map,对RDD每个元素转换,然后再扁平化处理。

    案例展示:

    flatMap 扁平map处理

代码语言:javascript
复制
val rdd = sc.makeRDD(List("hello world","hello count","world spark"),2)
rdd.map(_.split{" "})//Array(Array(hello, world), Array(hello, count), Array(world, spark))
rdd.flatMap(_.split{" "})//Array[String] = Array(hello, world, hello, count, world, spark)
//Array[String] = Array(hello, world, hello, count, world, spark)

    注:map和flatMap有何不同?

    map:对RDD每个元素转换。

    flatMap:对RDD每个元素转换,然后再扁平化(即去除集合)

    所以,一般我们在读取数据源后,第一步执行的操作是flatMap。

filter(func)

    Return a new dataset formed by selecting those elements of the source on which func returns true.

    参数是函数,函数会过滤掉不符合条件的元素,返回值是新的RDD。

    案例展示:

    filter用来从rdd中过滤掉不符合条件的数据。

代码语言:javascript
复制
val rdd = sc.makeRDD(List(1,3,5,7,9))
rdd.filter(_<5)

mapPartitions(func)

    Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator<T> => Iterator<U> when running on an RDD of type T.

    该函数和map函数类似,只不过映射函数的参数由RDD中的每一个元素变成了RDD中每一个分区的迭代器。

    案例展示:

代码语言:javascript
复制
scala>val rdd3 = rdd1.mapPartitions{ x => {
      val result = List[Int]()
      var i = 0
      while(x.hasNext){
      i += x.next()
      }
      result.::(i).iterator
      }}
scala>rdd3.collect

mapPartitionsWithIndex(func)

    Similar to mapPartitions, but also provides func with an integer value representing the index of the partition, so func must be of type (Int, Iterator<T>) => Iterator<U> when running on an RDD of type T.

    函数作用同mapPartitions,不过提供了两个参数,第一个参数为分区的索引。

    案例展示:

代码语言:javascript
复制
var rdd1 = sc.makeRDD(1 to 5,2)
var rdd2 = rdd1.mapPartitionsWithIndex{
(index,iter) => {
var result = List[String]()
var i = 0
while(iter.hasNext){
i += iter.next()
}
result.::(index + "|" + i).iterator
}
}

案例展示:

代码语言:javascript
复制
val rdd = sc.makeRDD(List(1,2,3,4,5),2)
rdd.mapPartitionsWithIndex((index,iter)=>{
var list = List[String]()
while(iter.hasNext){
if(index==0)
list = list :+ (iter.next + "a")
else {
list = list :+ (iter.next + "b")
}
}
list.iterator
})

union(otherDataset)

    Return a new dataset that contains the union of the elements in the source dataset and the argument.

    union并集,也可以用++实现。

    案例展示:

代码语言:javascript
复制
val rdd1 = sc.makeRDD(List(1,3,5))
val rdd2 = sc.makeRDD(List(2,4,6,8))
val rdd = rdd1.union(rdd2)
val rdd = rdd1 ++ rdd2

intersection(otherDataset)

    Return a new RDD that contains the intersection of elements in the source dataset and the argument.

    intersection交集

    案例展示:

代码语言:javascript
复制
val rdd1 = sc.makeRDD(List(1,3,5,7))
val rdd2 = sc.makeRDD(List(5,7,9,11))
val rdd = rdd1.intersection(rdd2)

subtract

    求差集。

    案例展示:

代码语言:javascript
复制
val rdd1 = sc.makeRDD(List(1,3,5,7,9))
val rdd2 = sc.makeRDD(List(5,7,9,11,13))
val rdd =  rdd1.subtract(rdd2)

distinct(numTasks)

    Return a new dataset that contains the distinct elements of the source dataset.

    没有参数,将RDD里的元素进行去重操作。

    案例展示:

代码语言:javascript
复制
val rdd = sc.makeRDD(List(1,3,5,7,9,3,7,10,23,7))
rdd.distinct

groupByKey(numTasks)

    When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable<V>) pairs.

    Note:If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey or aggregateByKey will yield much better performance.

    Note:By default, the level of parallelism in the output depends on the number of partitions of the parent RDD. You can pass an optional numTasks argument to set a different number of tasks.

    groupByKey对于数据格式是有要求的,即操作的元素必须是一个二元tuple,tuple._1是key,tuple._2是value。

    比如下面两种种数据格式都不符合要求:

代码语言:javascript
复制
sc.parallelize(List("dog", "tiger", "lion", "cat", "spider", "eagle"), 2)
sc.parallelize(List(("cat",2,1),("dog",5,1),("cat",4,1),("dog",3,2),
                    ("cat",6,2),("dog",3,4),("cat",9,4),("dog",1,4)),2)

案例展示:

代码语言:javascript
复制
scala>val rdd = sc.parallelize(List(("cat",2), ("dog",5),("cat",4),
                                    ("dog",3),("cat",6),("dog",3),
                                    ("cat",9),("dog",1)),2)
scala>rdd.groupByKey()

reduceByKey(func, numTasks)

    When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.

    reduceByKey操作的数据格式必须是一个二元tuple

    案例展示:

代码语言:javascript
复制
scala>var rdd = sc.makeRDD( List( ("hello",1),("spark",1),("hello",1),("world",1) ) )
rdd.reduceByKey(_+_);

aggregateByKey(zeroValue)(seqOp,combOp,numTasks)

    When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral "zero" value. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.

    aggregateByKey(zeroValue)(func1,func2)

    zeroValue表示初始值,初始值会参与func1的计算,在分区内,按key分组,把每组的值进行fun1的计算,再将每个分区每组的计算结果按fun2进行计算

代码语言:javascript
复制
scala> val rdd = sc.parallelize(List(("cat",2),("dog",5),("cat",4),("dog",3),("cat",6),("dog",3),("cat",9),("dog",1)),2);

    查看分区结果:

代码语言:javascript
复制
partition:[0]
(cat,2)
(dog,5)
(cat,4)
(dog,3)
partition:[1]
(cat,6)
(dog,3)
(cat,9)
(dog,1)
scala> rdd.aggregateByKey(0)(_+_,_+_);
代码语言:javascript
复制
scala> rdd.aggregateByKey(0)(_+_,_*_);

sortByKey(ascending,numTasks)

    When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument.

    案例展示:

代码语言:javascript
复制
val d2 = sc.parallelize(Array(("cc",32),("bb",32),("cc",22),("aa",18),("bb",6),("dd",16),("ee",104),("cc",1),("ff",13),("gg",68),("bb",44)))
d2.sortByKey(true).collect

join(otherDataset,numTasks)

    When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. Outer joins are supported through leftOuterJoin,rightOuterJoin, and fullOuterJoin.

    案例展示:

代码语言:javascript
复制
val rdd1 = sc.makeRDD(List(("cat",1),("dog",2)))
val rdd2 = sc.makeRDD(List(("cat",3),("dog",4),("tiger",9)))
rdd1.join(rdd2);

cartesian(otherDataset)

    When called on datasets of types T and U, returns a dataset of (T, U) pairs (all pairs of elements).

    参数是RDD,求两个RDD的笛卡儿积。

    案例展示:

    cartesian 笛卡尔积

代码语言:javascript
复制
val rdd1 = sc.makeRDD(List(1,2,3))
val rdd2 = sc.makeRDD(List("a","b"))
rdd1.cartesian(rdd2);

coalesce(numPartitions)

    Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently after filtering down a large dataset.

    coalesce(n,true/false)扩大或缩小分区。

    案例展示:

代码语言:javascript
复制
val rdd = sc.makeRDD(List(1,2,3,4,5),2)
rdd9.coalesce(3,true);

    如果是扩大分区,需要传入一个true,表示要重新shuffle。

代码语言:javascript
复制
rdd9.coalesce(2);

    如果是缩小分区,默认就是false,不需要明确的传入。

repartition(numPartitions)

    Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network.

    repartition(n) 等价于上面的coalesce

partitionBy

    通常在创建RDD时指定分区规则,将会导致数据自动分区;也可以通过partitionBy方法人为指定分区方式来进行分区。

    常见的分区器有:

    HashPartitioner、RangePartitioner

    案例展示:

代码语言:javascript
复制
import org.apache.spark._
val r1 = sc.makeRDD(List((2,"aaa"),(9,"bbb"),(7,"ccc"),(9,"ddd"),(3,"eee"),(2,"fff")),2);
val r2=r1.partitionBy(new HashPartitioner(2))

    按照键的hash%分区数得到的编号去往指定的分区,这种方式可以实现将相同键的数据分发给同一个分区的效果。

代码语言:javascript
复制
val r3=r1.partitionBy(new RangePartitioner(2,r1))

    将数据按照值的字典顺序进行排序,再分区。

2>Actions

reduce(func)

    Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel.

    并行整合所有RDD数据,例如求和操作。

collect

    Return all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data.

    返回RDD所有元素,将rdd分布式存储在集群中不同分区的数据获取到一起组成一个数组返回。

    要注意:这个方法将会把所有数据收集到一个机器内,容易造成内存的溢出,在生产环境下千万慎用。

count

    Return the number of elements in the dataset.

    统计RDD里元素个数

    案例展示:

代码语言:javascript
复制
val rdd = sc.makeRDD(List(1,2,3,4,5),2)
rdd.count

first

    Return the first element of the dataset (similar to take(1)).

take(n)

    Return an array with the first n elements of the dataset.

    take获取前n个数据。

    案例展示:

代码语言:javascript
复制
val rdd = sc.makeRDD(List(52,31,22,43,14,35))
rdd.take(2)

takeOrdered(n,ordering)

    Return the first n elements of the RDD using either their natural order or a custom comparator.

    takeOrdered(n)先将对象中的数据进行升序排序,然后取前n个。

    案例展示:

代码语言:javascript
复制
val rdd = sc.makeRDD(List(52,31,22,43,14,35))
rdd.takeOrdered(3)

top(n)

    top(n)先将对象中的数据进行降序排序,然后取前n个。

代码语言:javascript
复制
val rdd = sc.makeRDD(List(52,31,22,43,14,35))
rdd.top(3)

saveAsTextFile(path)

    Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file.

    saveAsTextFile 按照文本方式保存分区数据,到指定路径。

    案例示例:

代码语言:javascript
复制
val rdd = sc.makeRDD(List(1,2,3,4,5),2);
rdd.saveAsTextFile("/root/work/aaa")

countByKey()

    Only available on RDDs of type (K, V). Returns a hashmap of (K, Int) pairs with the count of each key.

foreach(func)

    Run a function func on each element of the dataset. This is usually done for side effects such as updating an Accumulator or interacting with external storage systems. 

    Note: modifying variables other than Accumulators outside of the foreach() may result in undefined behavior. See Understanding closures for more details.

3>案例

    通过rdd实现统计文件中的单词数量。

代码语言:javascript
复制
sc.textFile("/root/work/words.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).saveAsTextFile("/root/work/wcresult")

4.RDD的依赖关系

    RDD之间的关系可以从两个维度来理解:

    一个是RDD是从哪些RDD转换而来,也就是RDD的parent RDD(s)是什么;还有就是依赖于parent RDD(s)的哪些Partition(s)。这个关系,就是RDD之间的依赖,org.apache.spark.Dependency。

    根据依赖于parent RDD(s)的Partitions的不同情况,Spark将这种依赖分为两种,一种是宽依赖,一种是窄依赖。

    RDD和它依赖的parent RDD(s)的关系有两种不同的类型,即窄依赖(narrow dependency)和宽依赖(wide dependency)。

    窄依赖指的是每一个parent RDD的Partition最多被子RDD的一个Partition使用,如下图所示。

宽依赖指的是多个子RDD的Partition会依赖同一个parent RDD的Partition。

我们可以从不同类型的转换来进一步理解RDD的窄依赖和宽依赖的区别,如下图所示。

1>窄依赖

    对于窄依赖操作,它们只是将Partition的数据根据转换的规则进行转化,并不涉及其他的处理,可以简单地认为只是将数据从一个形式转换到另一个形式。

    窄依赖底层的源码:

代码语言:javascript
复制
abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {
 //返回子RDD的partitionId依赖的所有的parent RDD的Partition(s)
 def getParents(partitionId: Int): Seq[Int]
  override def rdd: RDD[T] = _rdd
 }
 class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
  override def getParents(partitionId: Int) = List(partitionId)
}

    所以对于窄依赖,并不会引入昂贵的Shuffle。所以执行效率非常高。如果整个DAG中存在多个连续的窄依赖,则可以将这些连续的窄依赖整合到一起连续执行,中间不执行shuffle 从而提高效率,这样的优化方式称之为流水线优化。

    此外,针对窄依赖,如果子RDD某个分区数据丢失,只需要找到父RDD对应依赖的分区,恢复即可。但如果是宽依赖,当分区丢失时,最糟糕的情况是要重算所有父RDD的所有分区。

2>宽依赖

    对于groupByKey这样的操作,子RDD的所有Partition(s)会依赖于parent RDD的所有Partition(s),子RDD的Partition是parent RDD的所有Partition Shuffle的结果。

    宽依赖的源码:

代码语言:javascript
复制
class ShuffleDependency[K, V, C](
    @transient _rdd: RDD[_ <: Product2[K, V]],
    val partitioner: Partitioner,
    val serializer: Option[Serializer] = None,
    val keyOrdering: Option[Ordering[K]] = None,
    val aggregator: Option[Aggregator[K, V, C]] = None,
    val mapSideCombine: Boolean = false)
extends Dependency[Product2[K, V]] {
 
override def rdd = _rdd.asInstanceOf[RDD[Product2[K, V]]]
//获取新的shuffleId
val shuffleId: Int = _rdd.context.newShuffleId()
//向ShuffleManager注册Shuffle的信息
val shuffleHandle: ShuffleHandle =
_rdd.context.env.shuffleManager.registerShuffle(
    shuffleId, _rdd.partitions.size, this)
 
    _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
}

    spark中一旦遇到宽依赖就需要进行shuffle的操作,所谓的shuffle的操作的本质就是将数据汇总后重新分发的过程。

    这个过程数据要汇总到一起,数据量可能很大所以不可避免的需要进行数据落磁盘的操作,会降低程序的性能,所以spark并不是完全内存不读写磁盘,只能说它尽力避免这样的过程来提高效率 。

5.RDD容错机制

    分布式系统通常在一个机器集群上运行,同时运行的几百台机器中某些出问题的概率大大增加,所以容错设计是分布式系统的一个重要能力。

    Spark以前的集群容错处理模型,像MapReduce,将计算转换为一个有向无环图(DAG)的任务集合,这样可以通过重复执行DAG里的一部分任务来完成容错恢复。但是由于主要的数据存储在分布式文件系统中,没有提供其他存储的概念,容错过程需要在网络上进行数据复制,从而增加了大量的消耗。所以,分布式编程中经常需要做检查点,即将某个时机的中间数据写到存储(通常是分布式文件系统)中。

    RDD也是一个DAG,每一个RDD都会记住创建该数据集需要哪些操作,跟踪记录RDD的继承关系,这个关系在Spark里面叫lineage(血缘关系)。当一个RDD的某个分区丢失时,RDD是有足够的信息记录其如何通过其他RDD进行计算,且只需重新计算该分区,这是Spark的一个创新。

6.RDD的缓存

    相比Hadoop MapReduce来说,Spark计算具有巨大的性能优势,其中很大一部分原因是Spark对于内存的充分利用,以及提供的缓存机制。

    持久化在早期被称作缓存(cache),但缓存一般指将内容放在内存中。虽然持久化操作在绝大部分情况下都是将RDD缓存在内存中,但一般都会在内存不够时用磁盘顶上去(比操作系统默认的磁盘交换性能高很多)。当然,也可以选择不使用内存,而是仅仅保存到磁盘中。所以,现在Spark使用持久化(persistence)这一更广泛的名称。

    如果一个RDD不止一次被用到,那么就可以持久化它,这样可以大幅提升程序的性能,甚至达10倍以上。

    默认情况下,RDD只使用一次,用完即扔,再次使用时需要重新计算得到,而持久化操作避免了这里的重复计算,实际测试也显示持久化对性能提升明显,这也是Spark刚出现时被人称为内存计算框架的原因。

    假设首先进行了RDD0→RDD1→RDD2的计算作业,那么计算结束时,RDD1就已经缓存在系统中了。在进行RDD0→RDD1→RDD3的计算作业时,由于RDD1已经缓存在系统中,因此RDD0→RDD1的转换不会重复进行,计算作业只须进行RDD1→RDD3的计算就可以了,因此计算速度可以得到很大提升。

1>持久化等级

    持久化的方法是调用persist()函数,除了持久化至内存中,还可以在persist()中指定storage level参数使用其他的类型,具体如下:

MEMORY_ONLY

    MEMORY_ONLY:将RDD以反序列化的Java对象的形式存储在JVM中。如果内存空间不足,部分数据分区将不会被缓存,在每次需要用到这些数据时重新进行计算。这是默认的级别。

    persist()方法的默认级别就是cache()方法,cache()方法对应的级别就是MEMORY_ONLY级别。

MEMORY_AND_DISK

    MEMORY_AND_DISK:将RDD以反序列化的Java对象的形式存储在JVM中。如果内存空间不够,将未缓存的数据分区存储到磁盘,在需要使用这些分区时从磁盘读取,存入磁盘的对象也是没有经过序列化的。

MEMORY_ONLY_SER

    MEMORY_ONLY_SER:将RDD以序列化的Java对象的形式进行存储(每个分区为一个byte数组)。这种方式会比反序列化对象的方式节省很多空间,尤其是在使用fast serialize时会节省更多的空间,但是在读取时会使得CPU的read变得更加密集。如果内存空间不够,部分数据分区将不会被缓存,在每次需要用到这些数据时重新进行计算。

MEMORY_AND_DISK_SER

    MEMORY_AND_DISK_SER:类似于MEMORY_ONLY_SER,但是溢出的分区会存储到磁盘,而不是在用到它们时重新计算,存储到磁盘上的对象会进行序列化。在需要使用这些分区时从磁盘读取。

DISK_ONLY

    DISK_ONLY:只在磁盘上缓存RDD。

MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc.

    MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. :与上面的级别功能相同,只不过每个分区在集群中两个节点上建立副本。

OFF_HEAP

    OFF_HEAP:将数据存储在off-heap memory中。使用堆外内存,这是Java虚拟机里面的概念,堆外内存意味着把内存对象分配在Java虚拟机的堆以外的内存,这些内存直接受操作系统管理(而不是虚拟机)。注意,可能带来一些GC回收问题。

    Spark也会自动持久化一些在shuffle操作过程中产生的临时数据(比如reduceByKey),即便是用户并没有调用持久化的方法。这样做可以避免当shuffle阶段时如果一个节点挂掉了就得重新计算整个数据的问题。如果用户打算多次重复使用这些数据,我们仍然建议用户自己调用持久化方法对数据进行持久化。

2>使用缓存

    需要先导包,然后才能调用命令。

代码语言:javascript
复制
scala> import org.apache.spark.storage._
scala> val rdd1=sc.makeRDD(1 to 5)
scala> rdd1.cache  //cache只有一种默认的缓存级别,即MEMORY_ONLY
scala> rdd1.persist(StorageLevel.MEMORY_ONLY)
3>缓存数据的清除

    Spark会自动监控每个节点上的缓存数据,然后使用least-recently-used(LRU)机制来处理旧的缓存数据。如果你想手动清理这些缓存的RDD数据而不是去等待它们被自动清理掉,

    可以使用RDD.unpersist()方法。

代码语言:javascript
复制
cala> rdd1.unpersist()

2、DAG

    Spark会根据用户提交的计算逻辑中的RDD的转换和动作来生成RDD之间的依赖关系,同时这个计算链也就生成了逻辑上的DAG。接下来以“Word Count”为例,详细描述这个DAG生成的实现过程。

    Spark Scala版本的Word Count程序如下:

代码语言:javascript
复制
1:val file=sc.textFile("hdfs://hadoop01:9000/hello1.txt")
2:val counts = file.flatMap(line => line.split(" "))
3: .map(word=>(word,1))
4: .reduceByKey(_+_)
5:counts.saveAsTextFile("hdfs://...")

    file和counts都是RDD,其中file是从HDFS上读取文件并创建了RDD,而counts是在file的基础上通过flatMap、map和reduceByKey这三个RDD转换生成的。最后,counts调用了动作saveAsTextFile,用户的计算逻辑就从这里开始提交的集群进行计算。那么上面这5行代码的具体实现是什么呢?

    行1:sc是org.apache.spark.SparkContext的实例,它是用户程序和Spark的交互接口,会负责连接到集群管理者,并根据用户设置或者系统默认设置来申请计算资源,完成RDD的创建等。

    sc.textFile("hdfs://...")就完成了一个org.apache.spark.rdd.HadoopRDD的创建,并且完成了一次RDD的转换:通过map转换到一个org.apache.spark.rdd.MapPartitions-RDD。也就是说,file实际上是一个MapPartitionsRDD,它保存了文件的所有行的数据内容。

    行2:将file中的所有行的内容,以空格分隔为单词的列表,然后将这个按照行构成的单词列表合并为一个列表。最后,以每个单词为元素的列表被保存到MapPartitionsRDD。

    行3:将第2步生成的MapPartittionsRDD再次经过map将每个单词word转为(word,1)的元组。这些元组最终被放到一个MapPartitionsRDD中。

    行4:首先会生成一个MapPartitionsRDD,起到map端combiner的作用;然后会生成一个ShuffledRDD,它从上一个RDD的输出读取数据,作为reducer的开始;最后,还会生成一个MapPartitionsRDD,起到reducer端reduce的作用。

    行5:向HDFS输出RDD的数据内容。最后,调用org.apache.spark.SparkContext#runJob向集群提交这个计算任务。

1.DAG的产生

    原始的RDD(s)通过一系列转换就形成了DAG。RDD之间的依赖关系,包含了RDD由哪些Parent RDD(s)转换而来和它依赖parent RDD(s)的哪些Partitions,是DAG的重要属性。

    借助这些依赖关系,DAG可以认为这些RDD之间形成了Lineage(血统,血缘关系)。借助Lineage,能保证一个RDD被计算前,它所依赖的parent RDD都已经完成了计算;同时也实现了RDD的容错性,即如果一个RDD的部分或者全部的计算结果丢失了,那么就需要重新计算这部分丢失的数据。

2.Stage

    Spark在执行任务(job)时,首先会根据依赖关系,将DAG划分为不同的阶段(Stage)。

    处理流程是:

    1)Spark在执行Transformation类型操作时都不会立即执行,而是懒执行(计算)。

    2)执行若干步的Transformation类型的操作后,一旦遇到Action类型操作时,才会真正触发执行(计算)。

    3)执行时,从当前Action方法向前回溯,如果遇到的是窄依赖则应用流水线优化,继续向前找,直到碰到某一个宽依赖。

    4)因为宽依赖必须要进行shuffle,无法实现优化,所以将这一次段执行过程组装为一个stage。

    5)再从当前宽依赖开始继续向前找。重复刚才的步骤,从而将这个DAG还分为若干的stage。

    在stage内部可以执行流水线优化,而在stage之间没办法执行流水线优化,因为有shuffle。但是这种机制已经尽力的去避免了shuffle。

3.Job和Task

    原始的RDD经过一系列转换后(一个DAG),会在最后一个RDD上触发一个动作,这个动作会生成一个Job。

    所以可以这样理解:一个DAG对应一个Spark的Job。

    在Job被划分为一批计算任务(Task)后,这批Task会被提交到集群上的计算节点去计算。

    Spark的Task分为两种:

    1)org.apache.spark.scheduler.ShuffleMapTask

    2)org.apache.spark.scheduler.ResultTask

    简单来说,DAG的最后一个阶段会为每个结果的Partition生成一个ResultTask,其余所有的阶段都会生成ShuffleMapTask。

3、可视化理解窄依赖和宽依赖

    案例  单词统计

代码语言:javascript
复制
scala>val data=sc.textFile("/home/software/hello.txt",2)
scala> data.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect

    1)打开web页面控制台(ip:4040端口地址),刷新,会发现刚才的操作会出现在页面上。

2)点击 Description下的 collect at…… 进入job的详细页面

3)点击 DAG Visualization 会出现如下图形

4、综合案例

1.WordCount

    数据样例:

代码语言:javascript
复制
hello scala
hello spark
hello world
1>导入jar包

    创建spark的项目,在scala中创建项目,导入spark相关的jar包。

2>代码示例

    统计单词个数,并输出

代码语言:javascript
复制
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

object Driver {
  
  def main(args: Array[String]): Unit = {
    //创建Spark的环境对象,配置运行模式
    //1:集群模式:setMaster("spark://yun01:7077")
    //2:本地模式:setMaster("local")
    val conf=new SparkConf().setMaster("spark://yun01:7077").setAppName("wordcount")
    //获取Spark上下文对象
    val sc=new SparkContext(conf)
    val data=sc.textFile("hdfs://yun01:9000/words.txt", 2)
    val result=data.flatMap { x => x.split(" ") }.map { x => (x,1) }.reduceByKey(_+_)
    result.saveAsTextFile("hdfs://yun01:9000/wcresult")
  }
}
3>服务器运行

    将写好的项目打成jar,上传到服务器,进入bin目录,执行如下命令:

代码语言:javascript
复制
spark-submit --class cn.tedu.WordCountDriver /home/software/spark/conf/wc.jar

    注意输出的目录必须是不存在的,如果存在会报错。

2.求平均值

    数据样例:

    第一列是编号,第二列是数据。

代码语言:javascript
复制
1 16
2 74
3 51
4 35
5 44
6 95
7 5
8 29
10 60
11 13
12 99
13 7
14 26

    正确答案:42

1>分析

    ①只拿第二列,形成RDD

    ②类型转换->String->Int

    ③和/个数

2>代码示例一
代码语言:javascript
复制
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

object Average {
  def main(args: Array[String]): Unit = {
    val conf=new SparkConf().setMaster("local").setAppName("average")
    val sc=new SparkContext(conf)
    val data=sc.textFile("d://data/average.txt")
    val r11=data.map { line => line.split(" ")(1).toInt }.reduce(_+_)
    val r1=data.flatMap { x => x.split(" ").drop(1) }.map { x => x.toInt }.reduce(_+_)
    val r2=data.count()
    val result=r1/r2
    println(result)
  }
}
3>代码示例二
代码语言:javascript
复制
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

object AverageDriver {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("AverageDriver")
    val sc = new SparkContext(conf)
    val data = sc.textFile("d://average.txt", 3)
    val ageData = data.map { line => { line.split(" ")(1).toInt } }
    val ageSum = ageData.mapPartitions { it =>
      {
        val result = List[Int]()
        var i = 0
        while (it.hasNext) {
          i += it.next()
        }
        result.::(i).iterator
      }
    }.reduce(_ + _)
    val pepopleCount = data.count()
    val average = ageSum / pepopleCount
    println(average)
  }
}

3.求最大值和最小值

    数据样例:

    第一列是编号,第二列是性别,第三列是身高。

代码语言:javascript
复制
1 M 174
2 F 165
3 M 172
4 M 180
5 F 160
6 F 162
7 M 172
8 M 191
9 F 175
10 F 167
1>代码示例

    获取最大、最小值。

代码语言:javascript
复制
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import breeze.linalg.split
import scala.collection.Iterable
object MaxMin {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("Max")
    val sc = new SparkContext(conf)
    val data = sc.textFile("d://data/MaxMin.txt")
    //第一种方法
    val r1 = data.filter { line => line.contains("M") }
                 .map { line => line.split(" ")(2).toInt }
    //第二种方法
    val r2 = data.filter { line => line.split(" ")(1)
                 .equals("M") }.map { line => line.split(" ")(2).toInt }
    val max = r1.max()
    val min = r1.min()
    println(max + " " + min)
  }
}
2>代码示例

    获取最大、最小值的全部信息。

代码语言:javascript
复制
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

object MaxMinInfo {
  def main(args: Array[String]): Unit = {
    val conf=new SparkConf().setMaster("local").setAppName("Info")
    val sc=new SparkContext(conf)
    val data=sc.textFile("d://data/MaxMin.txt")
    
    val r1=data.filter { line => line.split(" ")(1)
               .equals("M") }
               .map { x => (x.split(" ")(0),x.split(" ")(1),x.split(" ")(2).toInt) }
    
    val max=r1.sortBy(x=> -x._3, true, 1).take(1)
    val min=r1.sortBy(x=> x._3).take(1)
    
    println(max(0)._1+" "+max(0)._2+" "+max(0)._3)
    println(min(0)._1+" "+min(0)._2+" "+min(0)._3)
  }
}

4.TopK

    统计单词出现的次数最多的前三个。

1>数据样例
代码语言:javascript
复制
hello world bye world
hello hadoop bye hadoop
hello world java web
hadoop scala java hive
hadoop hive redis hbase
hello hbase java redis
2>分析

    Top K算法有两步,一是统计词频,二是找出词频最高的前K个词。

3>代码示例
代码语言:javascript
复制
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object Topk {
  def main(args: Array[String]): Unit = {
    
    val conf=new SparkConf().setMaster("local").setAppName("topk")
    val sc=new SparkContext(conf)
    val data=sc.textFile("d://data/topk.txt")
    //方法一
    val r1=data.flatMap { _.split(" ").map { x => (x,1) } }
               .reduceByKey(_+_).sortBy(x=> -x._2).take(3)
    r1.foreach(println(_))
    //方法二
    val r2=data.flatMap { x => x.split(" ") }.groupBy { x => x }
               .map{x=>(x._1,x._2.count { x => true })}
               .map{case(word,count)=>(count,word)}.top(3)
    r2.foreach(println(_))
    //方法三
    val wordcunt=data.flatMap { x => x.split(" ")
                     .map { x => (x,1) } }.reduceByKey(_+_)
    val top3=wordcunt.top(3)(Ordering.by { case(word,count) => count })
    top3.foreach(println(_))
  }
}
4>应用场景

    Top K的示例模型可以应用在求过去一段时间消费次数最多的消费者、访问最频繁的IP地址和最近、更新、最频繁的微博等应用场景。

5.求中位数

    数据样例:

代码语言:javascript
复制
1 20 8 2 5 11 29 10
7 4 45 6 23 17 19

    一共是15个数,正确答案是10

    代码示例

代码语言:javascript
复制
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.dmg.pmml.True

object Median {
  def main(args: Array[String]): Unit = {
    val conf=new SparkConf().setMaster("local").setAppName("median")
    val sc=new SparkContext(conf)
    val data=sc.textFile("d://data/median.txt")
    //方法一
    val r1=data.flatMap { x => x.split(" ") }.count().toInt
    val zhi=r1/2
    val r2=data.flatMap { x => x.split(" ") }
               .map { x => x.toInt }
               .sortBy(x=>x, true, 1).take(r1)(zhi)
    println(r2)
    //方法二
    val sortData=data.flatMap { x => x.split(" ") }
                     .map { x => x.toInt }.sortBy(x=>x)
                     .take(zhi+1).last
    println(sortData)
  }
}

6.二次排序

    数据样例:

代码语言:javascript
复制
aa 12
bb 32
aa 3
cc 43
dd 23
cc 5
cc 8
bb 33
bb 12

    要求:先按第一例升序排序,再按第二列降序排序。

1>代码示例

自定义排序类

代码语言:javascript
复制
import scala.math.Ordered

class SecondarySort(v1:String,v2:Int) extends Ordered[SecondarySort] with Serializable {
  var col1=v1
  var col2=v2
  def compare(that: SecondarySort): Int = {
    //按第一列做升序排序
    val tmp=this.col1.compareTo(that.col1)
    if(tmp==0){
      //按第二列做降序排序
      that.col2.compareTo(this.col2)
    }else{
      tmp
    }
  }
}

Driver

代码语言:javascript
复制
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

object Driver {
  def main(args: Array[String]): Unit = {
    val conf=new SparkConf().setMaster("local").setAppName("ssort")
    val sc=new SparkContext(conf)
    val data=sc.textFile("d://data/ssort.txt")
    //用sortByKey来实现二次排序,所以先把数据组成一个二元Tuple
    //二元Tuple的形式(SecondarySort(col1,col2),line)
    val result=data.map { line => 
      val infos=line.split(" ")
      (new SecondarySort(infos(0),infos(1).toInt),line)
    }.sortByKey().map(x=>x._2)
    result.foreach(println(_))
  }
}

7.倒排索引

    数据样例:

代码语言:javascript
复制
doc1.txt:
hello spark
hello hadoop

doc2.txt:
hello hive
hello hbase
hello spark

doc3.txt:
hadoop hbase
hive scala

    最后的结果形式为:

1>代码示例
代码语言:javascript
复制
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

object Driver {
  def main(args: Array[String]): Unit = {
    val conf=new SparkConf().setMaster("local").setAppName("invert")
    val sc=new SparkContext(conf)
    //将指定目录下的所有文件,返回到一个RDD中
    val data=sc.wholeTextFiles("d://data/inverted/*")
    data.foreach(println(_))
    //且分时,先按\r\n,然后按空格切
    val result=data.map{case(filePath,text)=>
      //切分出文件名称
      val fileName=filePath.split("/").last.dropRight(4)
      (fileName,text)
    }.flatMap{case(filename,text)=>
      //切分文件内容
      text.split("\r\n").flatMap { line => line.split(" ") }
      //调换内容和文件名称的位置
      .map { word => (word,filename) } 
    }
    //通过单词分组
    .groupByKey()
    //聚合文件名称
    .map{case(word,buffer)=>(word,buffer.toSet.mkString(","))}
    result.foreach(println)
    
  }
}

上一篇:Spark On Yarn完全分布式搭建

下一篇:Spark的架构

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Spark框架核心概念
    • 1、RDD
      •     1.RDD入门示例
      • 2.分区概念
      • 3.RDD操作
      • 4.RDD的依赖关系
      • 5.RDD容错机制
      • 6.RDD的缓存
    • 2、DAG
      • 1.DAG的产生
      • 2.Stage
      • 3.Job和Task
    • 3、可视化理解窄依赖和宽依赖
      • 4、综合案例
        • 1.WordCount
        • 2.求平均值
        • 3.求最大值和最小值
        • 4.TopK
        • 5.求中位数
        • 6.二次排序
        • 7.倒排索引
    相关产品与服务
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档