前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Python大数据处理扩展库pySpark用法精要

Python大数据处理扩展库pySpark用法精要

作者头像
Python小屋屋主
发布2018-04-16 16:26:46
1.7K0
发布2018-04-16 16:26:46
举报
文章被收录于专栏:Python小屋Python小屋

Spark是一个开源的、通用的并行计算与分布式计算框架,其活跃度在Apache基金会所有开源项目中排第三位,最大特点是基于内存计算,适合迭代计算,兼容多种应用场景,同时还兼容Hadoop生态系统中的组件,并且具有非常强的容错性。Spark的设计目的是全栈式解决批处理、结构化数据查询、流计算、图计算和机器学习等业务和应用,适用于需要多次操作特定数据集的应用场合。需要反复操作的次数越多,所需读取的数据量越大,效率提升越大。

Spark集成了Spark SQL(分布式SQL查询引擎,提供了一个DataFrame编程抽象)、Spark Streaming(把流式计算分解成一系列短小的批处理计算,并且提供高可靠和吞吐量服务)、MLlib(提供机器学习服务)、GraphX(提供图计算服务)、SparkR(R on Spark)等子框架,为不同应用领域的从业者提供了全新的大数据处理方式,越来越便捷、轻松。

为了适应迭代计算,Spark把经常被重用的数据缓存到内存中以提高数据读取和操作速度,比Hadoop快近百倍,并且支持Java、Scala、Python、R等多种语言。除map和reduce之外,Spark还支持filter、foreach、reduceByKey、aggregate以及SQL查询、流式查询等等。

扩展库pyspark提供了SparkContext(Spark功能的主要入口,一个SparkContext表示与一个Spark集群的连接,可用来创建RDD或在该集群上广播变量)、RDD(Spark中的基本抽象,弹性分布式数据集Resilient Distributed Dataset)、Broadcast(可以跨任务重用的广播变量)、Accumulator(共享变量,任务只能为其增加值)、SparkConf(用来配置Spark)、SparkFiles(访问任务的文件)、StorageLevel(更细粒度的缓冲永久级别)等可以公开访问的类,并且提供了pyspark.sql、pyspark.streaming与pyspark.mllib等模块与包。

>>> from pyspark import SparkFiles >>> path = 'test.txt' >>> with open(path, 'w') as fp: #创建文件 fp.write('100') >>> sc.addFile(path) #提交文件 >>> def func(iterator): with open(SparkFiles.get('test.txt')) as fp: #打开文件 Val = int(fp.readline()) #读取文件内容 return [x * Val for x in iterator] >>> sc.parallelize([1, 2, 3, 4, 5]).mapPartitions(func).collect() #并行处理,collect()返回包含RDD上所有元素的列表 [100, 200, 300, 400, 500] >>> sc.parallelize([2, 3, 4]).count() #count()用来返回RDD中元素个数,parallelize()用来分布本地的Python集合,并创建RDD 3 >>> rdd = sc.parallelize([1, 2]) >>> sorted(rdd.cartesian(rdd).collect()) #collect()返回包含RDD中元素的列表,cartesian()计算两个RDD的笛卡尔积

[(1, 1), (1, 2), (2, 1), (2, 2)] >>> rdd = sc.parallelize([1, 2, 3, 4, 5]) >>> rdd.filter(lambda x: x % 2 == 0).collect() #只保留符合条件的元素 [2, 4] >>> sorted(sc.parallelize([1, 1, 2, 3]).distinct().collect()) #返回唯一元素 [1, 2, 3] >>> rdd = sc.parallelize(range(10)) >>> rdd.map(lambda x: str(x)).collect() #映射 >>> rdd = sc.parallelize([1.0, 5.0, 43.0, 10.0]) >>> rdd.max() #最大值 43.0 >>> rdd.max(key=str) 5.0 >>> rdd.min() #最小值 1.0 >>> rdd.sum() #所有元素求和 59.0 >>> from random import randint >>> lst = [randint(1,100) for _ in range(20)] >>> lst [18, 55, 48, 13, 86, 23, 85, 62, 66, 58, 73, 96, 90, 16, 49, 98, 49, 69, 3, 53] >>> sc.parallelize(lst).top(3) #最大的3个元素 [98, 96, 90] >>> sorted(lst, reverse=True)[:3] [98, 96, 90] >>> sc.parallelize(range(100)).filter(lambda x:x>90).take(3) #使用take()返回前3个元素 [91, 92, 93] >>> sc.parallelize(range(20), 3).glom().collect() #查看分片情况 [[0, 1, 2, 3, 4, 5], [6, 7, 8, 9, 10, 11, 12], [13, 14, 15, 16, 17, 18, 19]] >>> sc.parallelize(range(20), 6).glom().collect() #查看分片情况 [[0, 1, 2], [3, 4, 5], [6, 7, 8, 9], [10, 11, 12], [13, 14, 15], [16, 17, 18, 19]] >>> myRDD = sc.parallelize(range(20), 6) #6表示分片数 >>> sc.runJob(myRDD, lambda part: [x ** 2 for x in part]) #执行任务 [0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361] >>> sc.runJob(myRDD, lambda part: [x ** 2 for x in part], [1]) #只查看第2个分片的结果 [9, 16, 25] >>> sc.runJob(myRDD, lambda part: [x ** 2 for x in part], [1,5]) #查看第2和第6个分片上的结果 [9, 16, 25, 256, 289, 324, 361] >>> sc.parallelize([1,2,3,3,3,2]).distinct().collect() #distinct()返回包含唯一元素的RDD [1, 2, 3] >>> from operator import add, mul >>> sc.parallelize([1, 2, 3, 4, 5]).fold(0, add) #把所有分片上的数据累加 15 >>> sc.parallelize([1, 2, 3, 4, 5]).fold(1, mul) #把所有分片上的数据连乘 120 >>> sc.parallelize([1, 2, 3, 4, 5]).reduce(add) #reduce()函数的并行版本 15 >>> sc.parallelize([1, 2, 3, 4, 5]).reduce(mul) 120 >>> result = sc.parallelize(range(1, 6)).groupBy(lambda x: x%3).collect() #对所有数据进行分组 >>> for k, v in result: print(k, sorted(v))

0 [3] 1 [1, 4] 2 [2, 5] >>> rdd1 = sc.parallelize(range(10)) >>> rdd2 = sc.parallelize(range(5, 20)) >>> rdd1.intersection(rdd2).collect() #交集 [8, 9, 5, 6, 7] >>> rdd1.subtract(rdd2).collect() #差集 [0, 1, 2, 3, 4] >>> rdd1.union(rdd2).collect() #合并两个RDD上的元素 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19] >>> rdd1 = sc.parallelize('abcd') >>> rdd2 = sc.parallelize(range(4)) >>> rdd1.zip(rdd2).collect() #两个RDD必须等长 [('a', 0), ('b', 1), ('c', 2), ('d', 3)] >>> rdd = sc.parallelize('abcd') >>> rdd.map(lambda x: (x, 1)).collect() #内置函数map()的并行版本 [('a', 1), ('b', 1), ('c', 1), ('d', 1)] >>> sc.parallelize([1, 2, 3, 4, 5]).stdev() #计算标准差 1.4142135623730951 >>> sc.parallelize([1, 1, 1, 1, 1]).stdev() 0.0

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2017-01-24,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Python小屋 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档