本来应该上周更新的,结果碰上五一,懒癌发作,就推迟了 = =。以后还是要按时完成任务。废话不多说,第四章-第六章主要讲了三个内容:键值对、数据读取与保存与Spark的两个共享特性(累加器和广播变量)。
键值对(PaiRDD)
1.创建
2.转化(Transformation)
转化操作很多,有reduceByKey,foldByKey(),combineByKey()等,与普通RDD中的reduce()、fold()、aggregate()等类似,只不过是根据键来进行操作。
reduceByKey():与recude()类似,只不过是根据键进行聚合foldByKey():与fold()类似combineByKey():与aggregate()类似
3.行动操作(Action)
数据分区:数据比较大时,可以用partitionBy()转化为哈希分区。即通过向partitionBy传递一个spark.HashPartitioner对象来实现该操作。在Python中不能将HashPartitioner对象传递给partitionBy,只需要把需要的分区数传递过去(如 rdd.partitionBy(100))。
在spark中,会为生成的结果RDD设好分区方式的操作有:cogroup(),groupWith(),join(),leftOuterJoin(),rightOutJoin,groupByKey(),reduceByKey(),combineByKey(),partitionBy(),sort(),mapValues(),flatMapValues(),filter()。最后三种只有当父RDD有分区方式时,结果RDD才会有分区RDD。其他的操作生成的结果都不会存在特定的分区方式。
自定义分区方式:
数据的读取与保存
文件格式
文本文件
JSON
CSV文件
SequenceFile
对象文件
Spark SQL中的结构化数据
Apache Hive
JSON数据
这章关于sql的命令比较少,关于SQL的其他命令可以看看Spark的官方文档(PySpark 1.6.1 documentation),讲的比较详细。注意,这是spark 1.6版本,如果你安装的是1.2版本,1.6的有些命令是用不了的,可以先升级再用。
最后再来讲讲Spark中两种类型的共享变量:累加器(accumulator)和广播变量(broadcast variable)
累加器
对信息进行聚合。常见的一个用法是在调试时对作业执行进行计数。举个例子:假设我们从文件中读取呼号列表对应的日志,同时也想知道输入文件中有多少空行,就可以用到累加器。实例:
我们来看看这段程序,首先创建了一个叫做blankLines的Accumulator[Int]对象,然后在输入中看到空行就+1,执行完转化操作后就打印出累加器中的值。注意:只有在执行完saveAsTextFile()这个action操作后才能看到正确的计数,flatMap()是transformation操作,是惰性的,这点在上一篇博文已经讲过。
但是我们上一篇文章中也提到过reduce()等这样的操作也是聚合操作,那为什么还有累加器这个东西存在呢?因为RDD本身提供的同步机制粒度太粗,尤其在transformation操作中变量状态不能同步,而累加器可以对那些与RDD本身的范围和粒度不一样的值进行聚合,不过它是一个write-only的变量,无法读取这个值,只能在驱动程序中使用value方法来读取累加器的值。
累加器的用法:
对于之前的数据,我们可以做进一步计算:
累加器与容错性:
我们知道Spark是分布式计算,当有些机器执行得比较慢或者出错的时候,Spark会自动重新执行这些失败的或比较慢的任务。这样会导致同一个函数可能对同一个数据运行了多次,简单的说就是耗内存,降低了计算速度。在这种情况下,累加器怎么处理呢?
对于要在Action操作中使用的累加器,Spark只会把每个任务对累加器的修改应用一次,一般放在foreach()操作中。而对于Transformation操作中的累加器,可能不止更新一次。所以Transformation中的累加器最好只在调试中使用。
广播变量
广播变量允许程序员缓存一个只读的变量在每台机器上面,而不是每个任务保存一份拷贝。利用广播变量,我们能够以一种更有效率的方式将一个大数据量输入集合的副本分配给每个节点。广播变量通过两个方面提高数据共享效率:1,集群中每个节点(物理机器)只有一个副本,默认的闭包是每个任务一个副本;2,广播传输是通过BT下载模式实现的,也就是P2P下载,在集群多的情况下,可以极大的提高数据传输速率。广播变量修改后,不会反馈到其他节点。
在Spark中,它会自动的把所有引用到的变量发送到工作节点上,这样做很方便,但是也很低效:一是默认的任务发射机制是专门为小任务进行优化的,二是在实际过程中可能会在多个并行操作中使用同一个变量,而Spark会分别为每个操作发送这个变量。举个例子,假设我们通过呼号的前缀查询国家,用Spark直接实现如下:
数据量小的时候可以运行,但是如果这个表很大,signPrefixes的很容易达到MB级别,从主节点为每个任务发送这样的数组会非常消耗内存,而且如果之后还需要用到signPrefixes这个变量,还需要再向每个节点发送一遍。
如果把signPrefixes变为广播变量,就可以解决这个问题:
总结一下广播变量的过程:
广播的优化
如果广播的值比较大,可以选择既快又好的序列化格式。Scala和Java API中默认使用Java序列化库,对于除基本类型的数组以外的任何对象都比较低效。我们可以使用spark.serializer属性选择另一个序列化库来优化序列化过程。(也可以使用reduce()方法为Python的pickle库自定义序列化)
基于分区进行操作
两个函数:map() 和 foreach()
示例:我们有一个在线的电台呼号数据,可以通过这个数据库查询日志中记录过的联系人呼号列表。
再举个例子说明一下mapPartitions()的功能:
数值RDD的操作
举例:从呼叫日志中移除距离过远的联系点
这三章的内容比较实用,在生产中也会有实际应用。下周更新第7-9章,主要讲Spark在集群上的运行、Spark调优与调试和Spark SQL。
Charlotte ,数学系的数据挖掘民工,喜欢算法和建模。
欢迎关注我的博客:
http://www.cnblogs.com/charlotte77/