Spark是一个Apache项目,被标榜为"Lightning-Fast"的大数据处理工具,它的开源社区也是非常活跃,与Hadoop相比,其在内存中运行的速度可以提升100倍。Apache Spark在Java、Scale、Python和R语言中提供了高级API,还支持一组丰富的高级工具,如Spark SQL(结构化数据处理)、MLlib(机器学习)、GraphX(图计算)、SparkR(统计分析)以及Spark Streaming(处理实时数据)。
Apache Spark 官方文档中文版:http://spark.apachecn.org/#/
在我们学习一个新工具之前,需要先了解一下这门技术出现的意义、应用的场景、与同类工具相比的优缺点等等,这样子才能更加条理地去学习它,也更加容易掌握。
对于Spark,我们需要问的是:为什么有Hadoop和MapReduce,还需要它呢?可能它解决了Hadoop和MapReduce不能解决的问题,具体是什么问题呢?
MapReduce的缺陷:
那么,Spark到底有哪些优势,让这么多的开发者如此着迷??
Spark的优势:
Spark最基本的数据抽象叫弹性分布式数据集(Resilient Distributed Dataset,RDD),它代表一个可以被分区(partition)的只读数据集,它内部可以有很多分区,每个分区又有大量的数据记录。
备注:图来自于极客时间
Spark的基础数据结构就是RDD,全称是Resilient Distributed Dataset,弹性分布式数据集。Spark基于RDD定义了很多数据操作,从而使得代码看起来非常简洁。
RDD是一个基于分布式内存的数据抽象,支持工作集的应用,也具有数据流模型的特点,表示已被分区、不可变的、并能够被并行操作的数据集合。
备注:点击看高清大图
重点说下RDD的主流分区方式:Hash partitioner和Range partitioner。前者对数据的key进行散列分区,后者则是按key的排序均匀分区,绝大部分情况下HashPartitioner都可以满足需求,但有的时候分区数据量会不均匀,而RangePartitioner则尽量保证每个分区的数据量均匀。具体的RDD分区策略可以参考:
Spark RDD分区策略:://www.jianshu.com/p/a21b3be88afd?utm_campaign
此外,也说下依赖关系,Spark支持的两种依赖关系:窄依赖(Narrow Dependency)和宽依赖(Wide Dependency)。前者就是父RDD的分区一一对应到子RDD,比如map、filter操作,后者则就是父RDD的每个分区都可以被多个子RDD的分区使用,比如Join、groupBy操作。窄依赖允许每个分区并行处理。
RDD的数据操作分为两种:Transformation(转换)和Action(动作)。
Transformation就是用来把一个RDD转换成另一个RDD,而Action则是通过计算返回一个结果。
L = [1,2,3,4,5]
old = sc.parallelize(L, 2)
print(old.collect()) // [1,2,3,4,5]
print(old.glom().collect()) // [[1,2],[3,4,5]]
rdd = sc.parallelize(["b","a","c"])
rdd2 = rdd.map(lambda x:(x, 1)) // [("b", 1), ("a", 1), ("c", 1)]
rdd = sc.parallelize([1, 2, 3, 4, 5])
rdd2 = rdd.filter(lambda x:x%2 == 0) // [2, 4]
rdd = sc.parallelize([1, 2, 3, 4], 2)
def f(iterator): yield sum(iterator)
rdd2 = rdd.mapPartitions(f) // [3, 7]
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 2)])
rdd.groupByKey().collect()
//"a" [1, 2]
//"b" [1]
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
sorted(rdd.countByKey().items()) // [('a', 2), ('b', 1)]
总结一下:
Spark在每次转换操作的时候,都是"惰性求值",使用了新产生的RDD来记录计算逻辑,这样就把作用在RDD上的所有计算逻辑串联起来,形成一个链条,当遇上RDD的动作操作时,Spark就会从计算链条的最后一个RDD开始,依次从上一个RDD获取数据并执行计算逻辑,最后输出结果。
每当我们对RDD调用一个新的action操作时,整个RDD都会从头开始计算,因此如果某一个RDD被反复利用的话,这样子的方式是低效的,我们需要对其进行持久化操作。Spark中persist()和cache()方法都支持,它将RDD的数据缓存到内存或者硬盘中,大大提高反复利用的计算效率。
rdd = sc.parallelize([1, 2, 3, 4, 5])
rdd1 = rdd.map(lambda x: x+5)
rdd2 = rdd1.filter(lambda x: x % 2 == 0)
rdd2.persist()
count = rdd2.count() // 3
first = rdd2.first() // 6
rdd2.unpersist()
其实在我们实际进行数据操作的时候,并不用像上面说的那样子操作,不需要到RDD层次进行编程的,Spark生态系统里有很多库可以用,而其中的数据查询模块Spark SQL就很常用。
在Hadoop/MapReduce流行的时候,HDFS里就累计了大量数据,但大部分的开发者只对关系型数据库熟悉,对于MapReduce的开发还是有点难度,因此,Hive应运而生,它提供类似SQL的编程接口,HQL语句可以经过语法解析、逻辑计划、物理计划转化成MapReduce程序执行,十分方便。
当Spark面世的时候,Spark团队也是开了一个Shark来支持SQL语言查询数据,但Shark的本质是Hive,对Hive是十分依赖的,制约了Shark和其他Spark组件之间的集成。于是,14年7月,Spark团队将Shark托管给Hive,转而自己开发Spark SQL。
SparkSQL提供了类似于SQL的操作接口,允许数据仓库、命令行、应用程序直接获取数据,提供两个API:DataFrame API和DataSet API,Python、Java和Scale的应用程序可以通过这两个API来读取和写入RDD。
备注:图来自于极客时间
下面给出了RDD、DataFrame和DataSet的对比:
备注:图来自于极客时间
总结一下:
DataFrame和DataSet都是SparkSQL提供的基于RDD的结构化数据抽象,具有RDD的不可变性、分区、存储依赖关系的特性,又有关系型数据库的结构化信息,更加容易操作,大大提供开发效率,无论是哪种API,都是基于批处理模式对静态数据进行处理。
上述说的SparkSQL都是基于批处理模式对静态数据进行处理,但如果我们需要处理流数据,就需要另外一个组件——Spark Streaming。
Spark Streaming提供了一个对于流数据的抽象 DStream,可以由来自Apache Kafka、Flume或者HDFS的流数据生成,也可以由别的DStream经过各种转换操作得到。DStream也是由很多个序列化的RDD构成,按时间片切分成的每个数据单位都是一个RDD,然后Spark核心引擎对DStream的Transformation操作变成对RDD的Transformation操作,将RDD经过操作变成中间结构保存在内存里。
DStream由一个个连续的RDD序列组成,每一个RDD代表一个时间窗口的输入数据流。对DStream进行操作,意味着对它包含的每一个RDD进行同样的操作。
备注:图来自于极客时间
任何Spark Streaming的程序都要首先创建一个StreamingContext的对象,它是所有Streaming操作的入口,当中最重要的参数是批处理的时间间隔,即把流数据细分成数据块的粒度大小。这个时间间隔决定了流处理的延迟性,所以我们需要根据实际需求和资源情况来权衡时间间隔。
滑动窗口操作有两个基本参数:
优点:
缺点: