前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布

pyspark

作者头像
Tim在路上
发布2020-08-05 00:25:46
9640
发布2020-08-05 00:25:46
举报

pyspark version

输出spark的版本

print("pyspark version"+str(sc.version)) map

sc = spark context, parallelize creates an RDD from the passed object

x = sc.parallelize([1,2,3]) y = x.map(lambda x: (x,x**2))

collect copies RDD elements to a list on the driver

print(x.collect()) print(y.collect()) [1, 2, 3] [(1, 1), (2, 4), (3, 9)] map进行分片,collect进行合并,和parallelize负责并行创建数组 但是rdd中的map只能生成一个指定的新的rdd.任何原RDD中的元素在新RDD中都有且只有一个元素与之对应。 这里指的是1->1,2->4,3->9 flatMap x = sc.parallelize([1,2,3]) y = x.flatMap(lambda x: (x, 100x, x*2)) print(x.collect()) print(y.collect()) [1, 2, 3] [1, 100, 1, 2, 200, 4, 3, 300, 9] 原RDD中的元素经flatmap处理后可生成多个元素来构建新RDD。 这里指的是1->100,1 2->200,4 3->300,9 mapPartitions mapPartitions是map的一个变种。map的输入函数是应用于RDD中每个元素,而mapPartitions的输入函数是应用于每个分区,也就是把每个分区中的内容作为整体来处理的。 x = sc.parallelize([1,2,3], 2)#这句话一个参数是创建一个列表参数,第二个参数是创建时分区的个数

mapPartitions

x = sc.parallelize([1,2,3], 2) def f(iterator): yield sum(iterator) #创建一个求和的函数 y = x.mapPartitions(f)#x调用mapPartitions传入函数f

glom() flattens elements on the same partition

print(x.glom().collect()) print(y.glom().collect()) [[1], [2, 3]] [[1], [5]] mapPartitionsWithIndex 这个函数同上一个函数是一致的,只是加上了index标签 x = sc.parallelize([1,2,3], 2) def f(partitionIndex, iterator): yield (partitionIndex,sum(iterator)) y = x.mapPartitionsWithIndex(f)

glom() flattens elements on the same partition

print(x.glom().collect()) print(y.glom().collect()) [[1], [2, 3]] [[(0, 1)], [(1, 5)]] getNumPartitions 输出分区的个数 x = sc.parallelize([1,2,3], 2) y = x.getNumPartitions() print(x.glom().collect()) print(y) [[1], [2, 3]] 2 filter

filter

x = sc.parallelize([1,2,3]) y = x.filter(lambda x: x%2 == 1) # filters out even elements print(x.collect()) print(y.collect()) 过滤器 distinct

distinct

x = sc.parallelize(['A','A','B']) y = x.distinct() print(x.collect()) print(y.collect())

['A', 'A', 'B'] ['A', 'B'] 去重,distinct去重是保留了后面的元素,舍去了前面的元素 sample

sample

x = sc.parallelize(range(7))

call 'sample' 5 times

ylist = [x.sample(withReplacement=False, fraction=0.5) for i in range(5)] print('x = ' + str(x.collect())) for cnt,y in zip(range(len(ylist)), ylist):#zip就是将两个集合合并到一起 print('sample:' + str(cnt) + ' y = ' + str(y.collect()))

x = [0, 1, 2, 3, 4, 5, 6] sample:0 y = [0, 2, 5, 6] sample:1 y = [2, 6] sample:2 y = [0, 4, 5, 6] sample:3 y = [0, 2, 6] sample:4 y = [0, 3, 4] 抽样,每个数的取出存在一定的概率 takeSample

takeSample

x = sc.parallelize(range(7))

call 'sample' 5 times

ylist = [x.takeSample(withReplacement=False, num=3) for i in range(5)] print('x = ' + str(x.collect())) for cnt,y in zip(range(len(ylist)), ylist): print('sample:' + str(cnt) + ' y = ' + str(y)) # no collect on y

x = [0, 1, 2, 3, 4, 5, 6] sample:0 y = [0, 2, 6] sample:1 y = [6, 4, 2] sample:2 y = [2, 0, 4] sample:3 y = [5, 4, 1] sample:4 y = [3, 1, 4] 从样例中随机取出三个数字 union

union

x = sc.parallelize(['A','A','B']) y = sc.parallelize(['D','C','A']) z = x.union(y) print(x.collect()) print(y.collect()) print(z.collect())

['A', 'A', 'B'] ['D', 'C', 'A'] ['A', 'A', 'B', 'D', 'C', 'A'] 合并并不去重 intersection

intersection

x = sc.parallelize(['A','A','B']) y = sc.parallelize(['A','C','D']) z = x.intersection(y) print(x.collect()) print(y.collect()) print(z.collect()) ['A', 'A', 'B'] ['A', 'C', 'D'] ['A'] 交集 sortByKey

sortByKey

x = sc.parallelize([('B',1),('A',2),('C',3)]) y = x.sortByKey() print(x.collect()) print(y.collect()) [('B', 1), ('A', 2), ('C', 3)] [('A', 2), ('B', 1), ('C', 3)] 排序 glom

glom

x = sc.parallelize(['C','B','A'], 2) y = x.glom() print(x.collect()) print(y.collect())

['C', 'B', 'A'] [['C'], ['B', 'A']] 分区合并 cartesian

cartesian

x = sc.parallelize(['A','B']) y = sc.parallelize(['C','D']) z = x.cartesian(y) print(x.collect()) print(y.collect()) print(z.collect())

['A', 'B'] ['C', 'D'] [('A', 'C'), ('A', 'D'), ('B', 'C'), ('B', 'D')] 类似与笛卡尔积进行组合 groupBy

groupBy

x = sc.parallelize([1,2,3]) y = x.groupBy(lambda x: 'A' if (x%2 == 1) else 'B' ) print(x.collect())

y is nested, this iterates through it

print([(j[0],[i for i in j[1]]) for j in y.collect()])

[1, 2, 3] [('A', [1, 3]), ('B', [2])] 分组 pipe

pipe

x = sc.parallelize(['A', 'Ba', 'C', 'AD']) y = x.pipe('grep -i "A"') # calls out to grep, may fail under Windows print(x.collect()) print(y.collect())

['A', 'Ba', 'C', 'AD'] ['A', 'Ba', 'AD'] 管道可以输入命令来进行再次操作 foreach from future import print_function x = sc.parallelize([1,2,3]) def f(el): '''side effect: append the current RDD elements to a file''' f1=open("./foreachExample.txt", 'a+') print(el,file=f1)

first clear the file contents

open('./foreachExample.txt', 'w').close()

y = x.foreach(f) # writes into foreachExample.txt

print(x.collect()) print(y) # foreach returns 'None'

print the contents of foreachExample.txt

with open("./foreachExample.txt", "r") as foreachExample: print (foreachExample.read())

[1, 2, 3] None 3 1 2 循环操作,但是操作的过程可能是并发的并不是按顺序 foreachPartition

foreachPartition

from future import print_function x = sc.parallelize([1,2,3],5) def f(parition): '''side effect: append the current RDD partition contents to a file''' f1=open("./foreachPartitionExample.txt", 'a+') print([el for el in parition],file=f1)

first clear the file contents

open('./foreachPartitionExample.txt', 'w').close()

y = x.foreachPartition(f) # writes into foreachExample.txt

print(x.glom().collect()) print(y) # foreach returns 'None'

print the contents of foreachExample.txt

with open("./foreachPartitionExample.txt", "r") as foreachExample: print (foreachExample.read())

[[], [1], [], [2], [3]] None [] [] [1] [2] [3] 按分布进行循环遍历 reduce

reduce

x = sc.parallelize([1,2,3]) y = x.reduce(lambda obj, accumulated: obj + accumulated) # computes a cumulative sum print(x.collect()) print(y)

[1, 2, 3] 6 合并 fold

fold

x = sc.parallelize([1,2,3]) neutral_zero_value = 0 # 0 for sum, 1 for multiplication y = x.fold(neutral_zero_value,lambda obj, accumulated: accumulated + obj) # computes cumulative sum print(x.collect()) print(y)

[1, 2, 3] 6 折叠 aggregate

aggregate

x = sc.parallelize([2,3,4]) neutral_zero_value = (0,1) # sum: x+0 = x, product: 1*x = x seqOp = (lambda aggregated, el: (aggregated[0] + el, aggregated[1] * el)) combOp = (lambda aggregated, el: (aggregated[0] + el[0], aggregated[1] * el[1])) y = x.aggregate(neutral_zero_value,seqOp,combOp) # computes (cumulative sum, cumulative product) print(x.collect()) print(y) [2, 3, 4] (9, 24) 聚集 histogram

histogram (example #1)

x = sc.parallelize([1,3,1,2,3]) y = x.histogram(buckets = 2) print(x.collect()) print(y)

[1, 3, 1, 2, 3] ([1, 2, 3], [2, 3])

histogram (example #2)

x = sc.parallelize([1,3,1,2,3]) y = x.histogram([0,0.5,1,1.5,2,2.5,3,3.5]) print(x.collect()) print(y)

[1, 3, 1, 2, 3] ([0, 0.5, 1, 1.5, 2, 2.5, 3, 3.5], [0, 0, 2, 0, 1, 0, 2]) 输出的第一参数是桶的范围,第二个参数为每一个桶中数据的频数 variance

variance

x = sc.parallelize([1,3,2]) y = x.variance() # divides by N print(x.collect()) print(y) [1, 3, 2] 0.666666666667 方差 stdev

stdev

x = sc.parallelize([1,3,2]) y = x.stdev() # divides by N print(x.collect()) print(y)

[1, 3, 2] 0.816496580928 标准差

sampleStdev

x = sc.parallelize([1,3,2]) y = x.sampleStdev() # divides by N-1 print(x.collect()) print(y) [1, 3, 2] 1.0 抽样标准差除数为N-1 countByValue

countByValue

x = sc.parallelize([1,3,1,2,3]) y = x.countByValue() print(x.collect()) print(y)

[1, 3, 1, 2, 3] defaultdict(<type 'int'>, {1: 2, 2: 1, 3: 2}) top

top

x = sc.parallelize([1,3,1,2,3]) y = x.top(num = 3) print(x.collect()) print(y)

[1, 3, 1, 2, 3] [3, 3, 2] 排序取前几个,从大到小 takeOrdered

takeOrdered

x = sc.parallelize([1,3,1,2,3]) y = x.takeOrdered(num = 3) print(x.collect()) print(y)

[1, 3, 1, 2, 3] [1, 1, 2] 从小到大排序取值 take

take

x = sc.parallelize([1,3,1,2,3]) y = x.take(num = 3) print(x.collect()) print(y)

[1, 3, 1, 2, 3] [1, 3, 1] 不排序直接取 first

first

x = sc.parallelize([1,3,1,2,3]) y = x.first() print(x.collect()) print(y)

[1, 3, 1, 2, 3] 1 取第一个 collectAsMap

collectAsMap

x = sc.parallelize([('C',3),('A',1),('B',2)]) y = x.collectAsMap() print(x.collect()) print(y)

[('C', 3), ('A', 1), ('B', 2)] {'A': 1, 'C': 3, 'B': 2} 将列表转化为map keys

keys

x = sc.parallelize([('C',3),('A',1),('B',2)]) y = x.keys() print(x.collect()) print(y.collect())

[('C', 3), ('A', 1), ('B', 2)] ['C', 'A', 'B'] 只取出key值 values 只取出value countByKey

countByKey

x = sc.parallelize([('B',1),('B',2),('A',3),('A',4),('A',5)]) y = x.countByKey() print(x.collect()) print(y)

[('B', 1), ('B', 2), ('A', 3), ('A', 4), ('A', 5)] defaultdict(<type 'int'>, {'A': 3, 'B': 2}) 统计key的次数 join

join

x = sc.parallelize([('C',4),('B',3),('A',2),('A',1)]) y = sc.parallelize([('A',8),('B',7),('A',6),('D',5)]) z = x.join(y) print(x.collect()) print(y.collect()) print(z.collect())

[('C', 4), ('B', 3), ('A', 2), ('A', 1)] [('A', 8), ('B', 7), ('A', 6), ('D', 5)] [('A', (2, 8)), ('A', (2, 6)), ('A', (1, 8)), ('A', (1, 6)), ('B', (3, 7))] 合并 leftOuterJoin rightOuterJoin partitionBy

partitionBy

x = sc.parallelize([(0,1),(1,2),(2,3)],2) y = x.partitionBy(numPartitions = 3, partitionFunc = lambda x: x) # only key is passed to paritionFunc print(x.glom().collect()) print(y.glom().collect())

[[(0, 1)], [(1, 2), (2, 3)]] [[(0, 1)], [(1, 2)], [(2, 3)]] 分区,每一个分区是一个列表 combineByKey

combineByKey

x = sc.parallelize([('B',1),('B',2),('A',3),('A',4),('A',5)]) createCombiner = (lambda el: [(el,el2)]) mergeVal = (lambda aggregated, el: aggregated + [(el,el2)]) # append to aggregated mergeComb = (lambda agg1,agg2: agg1 + agg2 ) # append agg1 with agg2 y = x.combineByKey(createCombiner,mergeVal,mergeComb) print(x.collect()) print(y.collect())

[('B', 1), ('B', 2), ('A', 3), ('A', 4), ('A', 5)] [('A', [(3, 9), (4, 16), (5, 25)]), ('B', [(1, 1), (2, 4)])] 通过key进行数据合并 aggregateByKey

aggregateByKey

x = sc.parallelize([('B',1),('B',2),('A',3),('A',4),('A',5)]) zeroValue = [] # empty list is 'zero value' for append operation mergeVal = (lambda aggregated, el: aggregated + [(el,el**2)]) mergeComb = (lambda agg1,agg2: agg1 + agg2 ) y = x.aggregateByKey(zeroValue,mergeVal,mergeComb) print(x.collect()) print(y.collect())

[('B', 1), ('B', 2), ('A', 3), ('A', 4), ('A', 5)] [('A', [(3, 9), (4, 16), (5, 25)]), ('B', [(1, 1), (2, 4)])] foldByKey 通过key进行聚集 foldByKey

foldByKey

x = sc.parallelize([('B',1),('B',2),('A',3),('A',4),('A',5)]) zeroValue = 1 # one is 'zero value' for multiplication y = x.foldByKey(zeroValue,lambda agg,x: agg*x ) # computes cumulative product within each key print(x.collect()) print(y.collect())

[('B', 1), ('B', 2), ('A', 3), ('A', 4), ('A', 5)] [('A', 60), ('B', 2)] 按key值折叠 groupByKey

groupByKey

x = sc.parallelize([('B',5),('B',4),('A',3),('A',2),('A',1)]) y = x.groupByKey() print(x.collect()) print([(j[0],[i for i in j[1]]) for j in y.collect()])

[('B', 5), ('B', 4), ('A', 3), ('A', 2), ('A', 1)] [('A', [3, 2, 1]), ('B', [5, 4])] flatMapValues

flatMapValues

x = sc.parallelize([('A',(1,2,3)),('B',(4,5))]) y = x.flatMapValues(lambda x: [i**2 for i in x]) # function is applied to entire value, then result is flattened print(x.collect()) print(y.collect())

[('A', (1, 2, 3)), ('B', (4, 5))] [('A', 1), ('A', 4), ('A', 9), ('B', 16), ('B', 25)] 对map的值进行操作,并拆分为单维map mapValues 仅仅对map值操作,其他不改变 groupWith

groupWith

x = sc.parallelize([('C',4),('B',(3,3)),('A',2),('A',(1,1))]) y = sc.parallelize([('B',(7,7)),('A',6),('D',(5,5))]) z = sc.parallelize([('D',9),('B',(8,8))]) a = x.groupWith(y,z) print(x.collect()) print(y.collect()) print(z.collect()) print("Result:") for key,val in list(a.collect()): print(key, [list(i) for i in val])

[('C', 4), ('B', (3, 3)), ('A', 2), ('A', (1, 1))] [('B', (7, 7)), ('A', 6), ('D', (5, 5))] [('D', 9), ('B', (8, 8))] Result: D [[], [(5, 5)], [9]] C [[4], [], []] B [[(3, 3)], [(7, 7)], [(8, 8)]] A [[2, (1, 1)], [6], []]

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 输出spark的版本
  • sc = spark context, parallelize creates an RDD from the passed object
  • collect copies RDD elements to a list on the driver
  • mapPartitions
  • glom() flattens elements on the same partition
  • glom() flattens elements on the same partition
  • filter
  • distinct
  • sample
  • call 'sample' 5 times
  • takeSample
  • call 'sample' 5 times
  • union
  • intersection
  • sortByKey
  • glom
  • cartesian
  • groupBy
  • y is nested, this iterates through it
  • pipe
  • first clear the file contents
  • print the contents of foreachExample.txt
  • foreachPartition
  • first clear the file contents
  • print the contents of foreachExample.txt
  • reduce
  • fold
  • aggregate
  • histogram (example #1)
  • histogram (example #2)
  • variance
  • stdev
  • sampleStdev
  • countByValue
  • top
  • takeOrdered
  • take
  • first
  • collectAsMap
  • keys
  • countByKey
  • join
  • partitionBy
  • combineByKey
  • aggregateByKey
  • foldByKey
  • groupByKey
  • flatMapValues
  • groupWith
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档