前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >专栏 | Learning Spark (Python版) 学习笔记(二)----键值对、数据读取与保存、共享特性

专栏 | Learning Spark (Python版) 学习笔记(二)----键值对、数据读取与保存、共享特性

作者头像
CDA数据分析师
发布2018-02-05 12:12:19
8280
发布2018-02-05 12:12:19
举报
文章被收录于专栏:CDA数据分析师CDA数据分析师

本来应该上周更新的,结果碰上五一,懒癌发作,就推迟了 = =。以后还是要按时完成任务。废话不多说,第四章-第六章主要讲了三个内容:键值对、数据读取与保存与Spark的两个共享特性(累加器和广播变量)。

键值对(PaiRDD)

1.创建

2.转化(Transformation)

转化操作很多,有reduceByKey,foldByKey(),combineByKey()等,与普通RDD中的reduce()、fold()、aggregate()等类似,只不过是根据键来进行操作。

reduceByKey():与recude()类似,只不过是根据键进行聚合foldByKey():与fold()类似combineByKey():与aggregate()类似

3.行动操作(Action)

数据分区:数据比较大时,可以用partitionBy()转化为哈希分区。即通过向partitionBy传递一个spark.HashPartitioner对象来实现该操作。在Python中不能将HashPartitioner对象传递给partitionBy,只需要把需要的分区数传递过去(如 rdd.partitionBy(100))。

在spark中,会为生成的结果RDD设好分区方式的操作有:cogroup(),groupWith(),join(),leftOuterJoin(),rightOutJoin,groupByKey(),reduceByKey(),combineByKey(),partitionBy(),sort(),mapValues(),flatMapValues(),filter()。最后三种只有当父RDD有分区方式时,结果RDD才会有分区RDD。其他的操作生成的结果都不会存在特定的分区方式。

自定义分区方式:

数据的读取与保存

文件格式

文本文件

JSON

CSV文件

SequenceFile

对象文件

Spark SQL中的结构化数据

Apache Hive

JSON数据

这章关于sql的命令比较少,关于SQL的其他命令可以看看Spark的官方文档(PySpark 1.6.1 documentation),讲的比较详细。注意,这是spark 1.6版本,如果你安装的是1.2版本,1.6的有些命令是用不了的,可以先升级再用。

最后再来讲讲Spark中两种类型的共享变量:累加器(accumulator)和广播变量(broadcast variable)

累加器

对信息进行聚合。常见的一个用法是在调试时对作业执行进行计数。举个例子:假设我们从文件中读取呼号列表对应的日志,同时也想知道输入文件中有多少空行,就可以用到累加器。实例:

我们来看看这段程序,首先创建了一个叫做blankLines的Accumulator[Int]对象,然后在输入中看到空行就+1,执行完转化操作后就打印出累加器中的值。注意:只有在执行完saveAsTextFile()这个action操作后才能看到正确的计数,flatMap()是transformation操作,是惰性的,这点在上一篇博文已经讲过。

但是我们上一篇文章中也提到过reduce()等这样的操作也是聚合操作,那为什么还有累加器这个东西存在呢?因为RDD本身提供的同步机制粒度太粗,尤其在transformation操作中变量状态不能同步,而累加器可以对那些与RDD本身的范围和粒度不一样的值进行聚合,不过它是一个write-only的变量,无法读取这个值,只能在驱动程序中使用value方法来读取累加器的值。

累加器的用法:

  • 通过在驱动器中调用SparkContext.accumulator(initialValue)方法,创建出存有初始值的累加器。返回值为org.apache.spark.Accumulator[T]对象,其中T是初始值initialValue的类型。
  • Spark闭包里的执行器代码可以使用累加器的 += 方法(在Java中是add)增加累加器的值。
  • 驱动器程序可以调用累加器的Value属性来访问累加器的值(在Java中使用value()或setValue())

对于之前的数据,我们可以做进一步计算:

累加器与容错性:

我们知道Spark是分布式计算,当有些机器执行得比较慢或者出错的时候,Spark会自动重新执行这些失败的或比较慢的任务。这样会导致同一个函数可能对同一个数据运行了多次,简单的说就是耗内存,降低了计算速度。在这种情况下,累加器怎么处理呢?

对于要在Action操作中使用的累加器,Spark只会把每个任务对累加器的修改应用一次,一般放在foreach()操作中。而对于Transformation操作中的累加器,可能不止更新一次。所以Transformation中的累加器最好只在调试中使用。

广播变量

广播变量允许程序员缓存一个只读的变量在每台机器上面,而不是每个任务保存一份拷贝。利用广播变量,我们能够以一种更有效率的方式将一个大数据量输入集合的副本分配给每个节点。广播变量通过两个方面提高数据共享效率:1,集群中每个节点(物理机器)只有一个副本,默认的闭包是每个任务一个副本;2,广播传输是通过BT下载模式实现的,也就是P2P下载,在集群多的情况下,可以极大的提高数据传输速率。广播变量修改后,不会反馈到其他节点。

在Spark中,它会自动的把所有引用到的变量发送到工作节点上,这样做很方便,但是也很低效:一是默认的任务发射机制是专门为小任务进行优化的,二是在实际过程中可能会在多个并行操作中使用同一个变量,而Spark会分别为每个操作发送这个变量。举个例子,假设我们通过呼号的前缀查询国家,用Spark直接实现如下:

数据量小的时候可以运行,但是如果这个表很大,signPrefixes的很容易达到MB级别,从主节点为每个任务发送这样的数组会非常消耗内存,而且如果之后还需要用到signPrefixes这个变量,还需要再向每个节点发送一遍。

如果把signPrefixes变为广播变量,就可以解决这个问题:

总结一下广播变量的过程:

  • 通过对一个类型T的对象调用SparkContext.broadcast创建一个Broadcast[T]对象。任何可序列化的对象都可以这么实现。
  • 通过value属性访问该对象的值
  • 变量只会发到各个节点一次,应作为只读值处理(修改这个值不会影响到别的节点)。

广播的优化

如果广播的值比较大,可以选择既快又好的序列化格式。Scala和Java API中默认使用Java序列化库,对于除基本类型的数组以外的任何对象都比较低效。我们可以使用spark.serializer属性选择另一个序列化库来优化序列化过程。(也可以使用reduce()方法为Python的pickle库自定义序列化)

基于分区进行操作

两个函数:map() 和 foreach()

示例:我们有一个在线的电台呼号数据,可以通过这个数据库查询日志中记录过的联系人呼号列表。

再举个例子说明一下mapPartitions()的功能:

数值RDD的操作

举例:从呼叫日志中移除距离过远的联系点

这三章的内容比较实用,在生产中也会有实际应用。下周更新第7-9章,主要讲Spark在集群上的运行、Spark调优与调试和Spark SQL。

Charlotte ,数学系的数据挖掘民工,喜欢算法和建模。

欢迎关注我的博客:

http://www.cnblogs.com/charlotte77/

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2016-05-05,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 CDA数据分析师 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档