专栏 | Learning Spark (Python版) 学习笔记(二)----键值对、数据读取与保存、共享特性

本来应该上周更新的,结果碰上五一,懒癌发作,就推迟了 = =。以后还是要按时完成任务。废话不多说,第四章-第六章主要讲了三个内容:键值对、数据读取与保存与Spark的两个共享特性(累加器和广播变量)。

键值对(PaiRDD)

1.创建

2.转化(Transformation)

转化操作很多,有reduceByKey,foldByKey(),combineByKey()等,与普通RDD中的reduce()、fold()、aggregate()等类似,只不过是根据键来进行操作。

reduceByKey():与recude()类似,只不过是根据键进行聚合foldByKey():与fold()类似combineByKey():与aggregate()类似

3.行动操作(Action)

数据分区:数据比较大时,可以用partitionBy()转化为哈希分区。即通过向partitionBy传递一个spark.HashPartitioner对象来实现该操作。在Python中不能将HashPartitioner对象传递给partitionBy,只需要把需要的分区数传递过去(如 rdd.partitionBy(100))。

在spark中,会为生成的结果RDD设好分区方式的操作有:cogroup(),groupWith(),join(),leftOuterJoin(),rightOutJoin,groupByKey(),reduceByKey(),combineByKey(),partitionBy(),sort(),mapValues(),flatMapValues(),filter()。最后三种只有当父RDD有分区方式时,结果RDD才会有分区RDD。其他的操作生成的结果都不会存在特定的分区方式。

自定义分区方式:

数据的读取与保存

文件格式

文本文件

JSON

CSV文件

SequenceFile

对象文件

Spark SQL中的结构化数据

Apache Hive

JSON数据

这章关于sql的命令比较少,关于SQL的其他命令可以看看Spark的官方文档(PySpark 1.6.1 documentation),讲的比较详细。注意,这是spark 1.6版本,如果你安装的是1.2版本,1.6的有些命令是用不了的,可以先升级再用。

最后再来讲讲Spark中两种类型的共享变量:累加器(accumulator)和广播变量(broadcast variable)

累加器

对信息进行聚合。常见的一个用法是在调试时对作业执行进行计数。举个例子:假设我们从文件中读取呼号列表对应的日志,同时也想知道输入文件中有多少空行,就可以用到累加器。实例:

我们来看看这段程序,首先创建了一个叫做blankLines的Accumulator[Int]对象,然后在输入中看到空行就+1,执行完转化操作后就打印出累加器中的值。注意:只有在执行完saveAsTextFile()这个action操作后才能看到正确的计数,flatMap()是transformation操作,是惰性的,这点在上一篇博文已经讲过。

但是我们上一篇文章中也提到过reduce()等这样的操作也是聚合操作,那为什么还有累加器这个东西存在呢?因为RDD本身提供的同步机制粒度太粗,尤其在transformation操作中变量状态不能同步,而累加器可以对那些与RDD本身的范围和粒度不一样的值进行聚合,不过它是一个write-only的变量,无法读取这个值,只能在驱动程序中使用value方法来读取累加器的值。

累加器的用法:

  • 通过在驱动器中调用SparkContext.accumulator(initialValue)方法,创建出存有初始值的累加器。返回值为org.apache.spark.Accumulator[T]对象,其中T是初始值initialValue的类型。
  • Spark闭包里的执行器代码可以使用累加器的 += 方法(在Java中是add)增加累加器的值。
  • 驱动器程序可以调用累加器的Value属性来访问累加器的值(在Java中使用value()或setValue())

对于之前的数据,我们可以做进一步计算:

累加器与容错性:

我们知道Spark是分布式计算,当有些机器执行得比较慢或者出错的时候,Spark会自动重新执行这些失败的或比较慢的任务。这样会导致同一个函数可能对同一个数据运行了多次,简单的说就是耗内存,降低了计算速度。在这种情况下,累加器怎么处理呢?

对于要在Action操作中使用的累加器,Spark只会把每个任务对累加器的修改应用一次,一般放在foreach()操作中。而对于Transformation操作中的累加器,可能不止更新一次。所以Transformation中的累加器最好只在调试中使用。

广播变量

广播变量允许程序员缓存一个只读的变量在每台机器上面,而不是每个任务保存一份拷贝。利用广播变量,我们能够以一种更有效率的方式将一个大数据量输入集合的副本分配给每个节点。广播变量通过两个方面提高数据共享效率:1,集群中每个节点(物理机器)只有一个副本,默认的闭包是每个任务一个副本;2,广播传输是通过BT下载模式实现的,也就是P2P下载,在集群多的情况下,可以极大的提高数据传输速率。广播变量修改后,不会反馈到其他节点。

在Spark中,它会自动的把所有引用到的变量发送到工作节点上,这样做很方便,但是也很低效:一是默认的任务发射机制是专门为小任务进行优化的,二是在实际过程中可能会在多个并行操作中使用同一个变量,而Spark会分别为每个操作发送这个变量。举个例子,假设我们通过呼号的前缀查询国家,用Spark直接实现如下:

数据量小的时候可以运行,但是如果这个表很大,signPrefixes的很容易达到MB级别,从主节点为每个任务发送这样的数组会非常消耗内存,而且如果之后还需要用到signPrefixes这个变量,还需要再向每个节点发送一遍。

如果把signPrefixes变为广播变量,就可以解决这个问题:

总结一下广播变量的过程:

  • 通过对一个类型T的对象调用SparkContext.broadcast创建一个Broadcast[T]对象。任何可序列化的对象都可以这么实现。
  • 通过value属性访问该对象的值
  • 变量只会发到各个节点一次,应作为只读值处理(修改这个值不会影响到别的节点)。

广播的优化

如果广播的值比较大,可以选择既快又好的序列化格式。Scala和Java API中默认使用Java序列化库,对于除基本类型的数组以外的任何对象都比较低效。我们可以使用spark.serializer属性选择另一个序列化库来优化序列化过程。(也可以使用reduce()方法为Python的pickle库自定义序列化)

基于分区进行操作

两个函数:map() 和 foreach()

示例:我们有一个在线的电台呼号数据,可以通过这个数据库查询日志中记录过的联系人呼号列表。

再举个例子说明一下mapPartitions()的功能:

数值RDD的操作

举例:从呼叫日志中移除距离过远的联系点

这三章的内容比较实用,在生产中也会有实际应用。下周更新第7-9章,主要讲Spark在集群上的运行、Spark调优与调试和Spark SQL。

Charlotte ,数学系的数据挖掘民工,喜欢算法和建模。

欢迎关注我的博客:

http://www.cnblogs.com/charlotte77/

原文发布于微信公众号 - CDA数据分析师(cdacdacda)

原文发表时间:2016-05-05

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏个人分享

SparkConf加载与SparkContext创建(源码阅读一)

即日起开始spark源码阅读之旅,这个过程是相当痛苦的,也许有大量的看不懂,但是每天一个方法,一点点看,相信总归会有极大地提高的。那么下面开始:

1441
来自专栏岑玉海

hbase源码系列(十三)缓存机制MemStore与Block Cache

这一章讲hbase的缓存机制,这里面涉及的内容也是比较多,呵呵,我理解中的缓存是保存在内存中的特定的便于检索的数据结构就是缓存。 之前在讲put的时候,put是...

3997
来自专栏肖力涛的专栏

Spark踩坑记:共享变量

如果我们想在节点之间共享一份变量,比如一份公共的配置项,该怎么办呢?Spark为我们提供了两种特定的共享变量,来完成节点间变量的共享。 本文首先简单的介绍spa...

1K1
来自专栏牛肉圆粉不加葱

揭开Spark Streaming神秘面纱③ - 动态生成 job

JobScheduler有两个重要成员,一是上文介绍的 ReceiverTracker,负责分发 receivers 及源源不断地接收数据;二是本文将要介绍的 ...

863
来自专栏FreeBuf

VMware更新 | 修复Apache Flex BlazeDS中的漏洞

VMware发布了数个产品的版本更新,目的是修复Apache Flex BlazeDS中的一个漏洞。 据VMware介绍,Flex BlazeDS组件应用在数个...

2135
来自专栏安恒网络空间安全讲武堂

从零基础到成功解题之0ctf-ezdoor

2044
来自专栏祝威廉

Spark Streaming + Spark SQL 实现配置化ETL流程

通常而言,你可能会因为要走完上面的流程而构建了一个很大的程序,比如一个main方法里上百行代码,虽然在开发小功能上足够便利,但是复用度更方面是不够的,而且不利于...

1903
来自专栏菩提树下的杨过

ZooKeeper 笔记(6) 分布式锁

  目前分布式锁,比较成熟、主流的方案有基于redis及基于zookeeper的二种方案。   大体来讲,基于redis的分布式锁核心指令为SETNX,即如果目...

2098
来自专栏LhWorld哥陪你聊算法

Hive篇---Hive使用优化

本节主要描述Hive的优化使用,Hive的优化着重强调一个 把Hive SQL 当做Mapreduce程序去优化 二.主要优化点

5121
来自专栏伦少的博客

SparkStreaming+Kafka 实现统计基于缓存的实时uv

2223

扫码关注云+社区