Python大数据处理扩展库pySpark用法精要

Spark是一个开源的、通用的并行计算与分布式计算框架,其活跃度在Apache基金会所有开源项目中排第三位,最大特点是基于内存计算,适合迭代计算,兼容多种应用场景,同时还兼容Hadoop生态系统中的组件,并且具有非常强的容错性。Spark的设计目的是全栈式解决批处理、结构化数据查询、流计算、图计算和机器学习等业务和应用,适用于需要多次操作特定数据集的应用场合。需要反复操作的次数越多,所需读取的数据量越大,效率提升越大。

Spark集成了Spark SQL(分布式SQL查询引擎,提供了一个DataFrame编程抽象)、Spark Streaming(把流式计算分解成一系列短小的批处理计算,并且提供高可靠和吞吐量服务)、MLlib(提供机器学习服务)、GraphX(提供图计算服务)、SparkR(R on Spark)等子框架,为不同应用领域的从业者提供了全新的大数据处理方式,越来越便捷、轻松。

为了适应迭代计算,Spark把经常被重用的数据缓存到内存中以提高数据读取和操作速度,比Hadoop快近百倍,并且支持Java、Scala、Python、R等多种语言。除map和reduce之外,Spark还支持filter、foreach、reduceByKey、aggregate以及SQL查询、流式查询等等。

扩展库pyspark提供了SparkContext(Spark功能的主要入口,一个SparkContext表示与一个Spark集群的连接,可用来创建RDD或在该集群上广播变量)、RDD(Spark中的基本抽象,弹性分布式数据集Resilient Distributed Dataset)、Broadcast(可以跨任务重用的广播变量)、Accumulator(共享变量,任务只能为其增加值)、SparkConf(用来配置Spark)、SparkFiles(访问任务的文件)、StorageLevel(更细粒度的缓冲永久级别)等可以公开访问的类,并且提供了pyspark.sql、pyspark.streaming与pyspark.mllib等模块与包。

>>> from pyspark import SparkFiles >>> path = 'test.txt' >>> with open(path, 'w') as fp: #创建文件 fp.write('100') >>> sc.addFile(path) #提交文件 >>> def func(iterator): with open(SparkFiles.get('test.txt')) as fp: #打开文件 Val = int(fp.readline()) #读取文件内容 return [x * Val for x in iterator] >>> sc.parallelize([1, 2, 3, 4, 5]).mapPartitions(func).collect() #并行处理,collect()返回包含RDD上所有元素的列表 [100, 200, 300, 400, 500] >>> sc.parallelize([2, 3, 4]).count() #count()用来返回RDD中元素个数,parallelize()用来分布本地的Python集合,并创建RDD 3 >>> rdd = sc.parallelize([1, 2]) >>> sorted(rdd.cartesian(rdd).collect()) #collect()返回包含RDD中元素的列表,cartesian()计算两个RDD的笛卡尔积

[(1, 1), (1, 2), (2, 1), (2, 2)] >>> rdd = sc.parallelize([1, 2, 3, 4, 5]) >>> rdd.filter(lambda x: x % 2 == 0).collect() #只保留符合条件的元素 [2, 4] >>> sorted(sc.parallelize([1, 1, 2, 3]).distinct().collect()) #返回唯一元素 [1, 2, 3] >>> rdd = sc.parallelize(range(10)) >>> rdd.map(lambda x: str(x)).collect() #映射 >>> rdd = sc.parallelize([1.0, 5.0, 43.0, 10.0]) >>> rdd.max() #最大值 43.0 >>> rdd.max(key=str) 5.0 >>> rdd.min() #最小值 1.0 >>> rdd.sum() #所有元素求和 59.0 >>> from random import randint >>> lst = [randint(1,100) for _ in range(20)] >>> lst [18, 55, 48, 13, 86, 23, 85, 62, 66, 58, 73, 96, 90, 16, 49, 98, 49, 69, 3, 53] >>> sc.parallelize(lst).top(3) #最大的3个元素 [98, 96, 90] >>> sorted(lst, reverse=True)[:3] [98, 96, 90] >>> sc.parallelize(range(100)).filter(lambda x:x>90).take(3) #使用take()返回前3个元素 [91, 92, 93] >>> sc.parallelize(range(20), 3).glom().collect() #查看分片情况 [[0, 1, 2, 3, 4, 5], [6, 7, 8, 9, 10, 11, 12], [13, 14, 15, 16, 17, 18, 19]] >>> sc.parallelize(range(20), 6).glom().collect() #查看分片情况 [[0, 1, 2], [3, 4, 5], [6, 7, 8, 9], [10, 11, 12], [13, 14, 15], [16, 17, 18, 19]] >>> myRDD = sc.parallelize(range(20), 6) #6表示分片数 >>> sc.runJob(myRDD, lambda part: [x ** 2 for x in part]) #执行任务 [0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361] >>> sc.runJob(myRDD, lambda part: [x ** 2 for x in part], [1]) #只查看第2个分片的结果 [9, 16, 25] >>> sc.runJob(myRDD, lambda part: [x ** 2 for x in part], [1,5]) #查看第2和第6个分片上的结果 [9, 16, 25, 256, 289, 324, 361] >>> sc.parallelize([1,2,3,3,3,2]).distinct().collect() #distinct()返回包含唯一元素的RDD [1, 2, 3] >>> from operator import add, mul >>> sc.parallelize([1, 2, 3, 4, 5]).fold(0, add) #把所有分片上的数据累加 15 >>> sc.parallelize([1, 2, 3, 4, 5]).fold(1, mul) #把所有分片上的数据连乘 120 >>> sc.parallelize([1, 2, 3, 4, 5]).reduce(add) #reduce()函数的并行版本 15 >>> sc.parallelize([1, 2, 3, 4, 5]).reduce(mul) 120 >>> result = sc.parallelize(range(1, 6)).groupBy(lambda x: x%3).collect() #对所有数据进行分组 >>> for k, v in result: print(k, sorted(v))

0 [3] 1 [1, 4] 2 [2, 5] >>> rdd1 = sc.parallelize(range(10)) >>> rdd2 = sc.parallelize(range(5, 20)) >>> rdd1.intersection(rdd2).collect() #交集 [8, 9, 5, 6, 7] >>> rdd1.subtract(rdd2).collect() #差集 [0, 1, 2, 3, 4] >>> rdd1.union(rdd2).collect() #合并两个RDD上的元素 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19] >>> rdd1 = sc.parallelize('abcd') >>> rdd2 = sc.parallelize(range(4)) >>> rdd1.zip(rdd2).collect() #两个RDD必须等长 [('a', 0), ('b', 1), ('c', 2), ('d', 3)] >>> rdd = sc.parallelize('abcd') >>> rdd.map(lambda x: (x, 1)).collect() #内置函数map()的并行版本 [('a', 1), ('b', 1), ('c', 1), ('d', 1)] >>> sc.parallelize([1, 2, 3, 4, 5]).stdev() #计算标准差 1.4142135623730951 >>> sc.parallelize([1, 1, 1, 1, 1]).stdev() 0.0

原文发布于微信公众号 - Python小屋(Python_xiaowu)

原文发表时间:2017-01-24

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏简单聊聊Spark

Spark性能调优篇四之使用Kryo进行序列化操作

        接着上一篇文章,今天介绍一下通过使用Kryo这个东东来进一步降低网络IO的传输量和内存的占用率。在介绍Kryo之前,接下来我们先来对比一下默认的...

3363
来自专栏灯塔大数据

每周学点大数据 | No.73 在 HDFS 上使用 Spark

编者按:灯塔大数据将每周持续推出《从零开始学大数据算法》的连载,本书为哈尔滨工业大学著名教授王宏志老师的扛鼎力作,以对话的形式深入浅出的从何为大数据说到大数据算...

3767
来自专栏牛肉圆粉不加葱

Spark Shuffle 模块② - Hash Based Shuffle write

Spark 最开始只有 Hash Based Shuffle,因为在很多场景中并不需要排序,在这些场景中多余的排序反而会损耗性能。

911
来自专栏肖力涛的专栏

Spark 踩坑记:从 RDD 看集群调度

本文的思路是从spark最细节的本质,即核心的数据结构RDD出发,到整个Spark集群宏观的调度过程做一个整理归纳,从微观到宏观两方面总结,方便自己在调优过程中...

1.1K2
来自专栏CSDN技术头条

SreamCQL架构解析,来自华为的开源流处理框架

StreamCQL是一个类SQL的声明式语言,它用于在流(streams)和可更新关系(updatable relation)上的可持续查询,目的是在流处理平台...

2189
来自专栏JavaEdge

史上最快! 10小时大数据入门实战(五)-分布式计算框架MapReduce1 MapReduce概述2 MapReduce编程模型之通过wordcount词频统计分析案例入门MapReduce执行流程

1893
来自专栏斑斓

大数据 | 理解Spark的核心RDD

与许多专有的大数据处理平台不同,Spark建立在统一抽象的RDD之上,使得它可以以基本一致的方式应对不同的大数据处理场景,包括MapReduce,Streami...

3969
来自专栏Albert陈凯

Hive迁移Saprk SQL的坑和改进办法

Qcon 全球软件开发者大会2016北京站 演讲主题:Spark在360的大规模实践与经验分享 李远策 360-Spark集群概况 ? 360-Spark集...

6687
来自专栏懒人开发

hadoop(1):hadoop概述

hadoop是 Doug Cutting 在 Lucene 之后的一个项目 主要用于 计算 是一个 开源,可靠,可扩展 的分布式计算框架 主要有

1063
来自专栏Albert陈凯

3.6 Shuffle机制

3.6 Shuffle机制 在MapReduce框架中,Shuffle是连接Map和Reduce之间的桥梁,Map的输出要用到Reduce中必须经过Shuff...

3004

扫码关注云+社区

领取腾讯云代金券