pyspark version


print("pyspark version"+str(sc.version)) map

sc = spark context, parallelize creates an RDD from the passed object

x = sc.parallelize([1,2,3]) y = x.map(lambda x: (x,x**2))

collect copies RDD elements to a list on the driver

print(x.collect()) print(y.collect()) [1, 2, 3] [(1, 1), (2, 4), (3, 9)] map进行分片,collect进行合并,和parallelize负责并行创建数组 但是rdd中的map只能生成一个指定的新的rdd.任何原RDD中的元素在新RDD中都有且只有一个元素与之对应。 这里指的是1->1,2->4,3->9 flatMap x = sc.parallelize([1,2,3]) y = x.flatMap(lambda x: (x, 100x, x*2)) print(x.collect()) print(y.collect()) [1, 2, 3] [1, 100, 1, 2, 200, 4, 3, 300, 9] 原RDD中的元素经flatmap处理后可生成多个元素来构建新RDD。 这里指的是1->100,1 2->200,4 3->300,9 mapPartitions mapPartitions是map的一个变种。map的输入函数是应用于RDD中每个元素,而mapPartitions的输入函数是应用于每个分区,也就是把每个分区中的内容作为整体来处理的。 x = sc.parallelize([1,2,3], 2)#这句话一个参数是创建一个列表参数,第二个参数是创建时分区的个数


x = sc.parallelize([1,2,3], 2) def f(iterator): yield sum(iterator) #创建一个求和的函数 y = x.mapPartitions(f)#x调用mapPartitions传入函数f

glom() flattens elements on the same partition

print(x.glom().collect()) print(y.glom().collect()) [[1], [2, 3]] [[1], [5]] mapPartitionsWithIndex 这个函数同上一个函数是一致的,只是加上了index标签 x = sc.parallelize([1,2,3], 2) def f(partitionIndex, iterator): yield (partitionIndex,sum(iterator)) y = x.mapPartitionsWithIndex(f)

glom() flattens elements on the same partition

print(x.glom().collect()) print(y.glom().collect()) [[1], [2, 3]] [[(0, 1)], [(1, 5)]] getNumPartitions 输出分区的个数 x = sc.parallelize([1,2,3], 2) y = x.getNumPartitions() print(x.glom().collect()) print(y) [[1], [2, 3]] 2 filter


x = sc.parallelize([1,2,3]) y = x.filter(lambda x: x%2 == 1) # filters out even elements print(x.collect()) print(y.collect()) 过滤器 distinct


x = sc.parallelize(['A','A','B']) y = x.distinct() print(x.collect()) print(y.collect())

['A', 'A', 'B'] ['A', 'B'] 去重,distinct去重是保留了后面的元素,舍去了前面的元素 sample


x = sc.parallelize(range(7))

call 'sample' 5 times

ylist = [x.sample(withReplacement=False, fraction=0.5) for i in range(5)] print('x = ' + str(x.collect())) for cnt,y in zip(range(len(ylist)), ylist):#zip就是将两个集合合并到一起 print('sample:' + str(cnt) + ' y = ' + str(y.collect()))

x = [0, 1, 2, 3, 4, 5, 6] sample:0 y = [0, 2, 5, 6] sample:1 y = [2, 6] sample:2 y = [0, 4, 5, 6] sample:3 y = [0, 2, 6] sample:4 y = [0, 3, 4] 抽样,每个数的取出存在一定的概率 takeSample


x = sc.parallelize(range(7))

call 'sample' 5 times

ylist = [x.takeSample(withReplacement=False, num=3) for i in range(5)] print('x = ' + str(x.collect())) for cnt,y in zip(range(len(ylist)), ylist): print('sample:' + str(cnt) + ' y = ' + str(y)) # no collect on y

x = [0, 1, 2, 3, 4, 5, 6] sample:0 y = [0, 2, 6] sample:1 y = [6, 4, 2] sample:2 y = [2, 0, 4] sample:3 y = [5, 4, 1] sample:4 y = [3, 1, 4] 从样例中随机取出三个数字 union


x = sc.parallelize(['A','A','B']) y = sc.parallelize(['D','C','A']) z = x.union(y) print(x.collect()) print(y.collect()) print(z.collect())

['A', 'A', 'B'] ['D', 'C', 'A'] ['A', 'A', 'B', 'D', 'C', 'A'] 合并并不去重 intersection


x = sc.parallelize(['A','A','B']) y = sc.parallelize(['A','C','D']) z = x.intersection(y) print(x.collect()) print(y.collect()) print(z.collect()) ['A', 'A', 'B'] ['A', 'C', 'D'] ['A'] 交集 sortByKey


x = sc.parallelize([('B',1),('A',2),('C',3)]) y = x.sortByKey() print(x.collect()) print(y.collect()) [('B', 1), ('A', 2), ('C', 3)] [('A', 2), ('B', 1), ('C', 3)] 排序 glom


x = sc.parallelize(['C','B','A'], 2) y = x.glom() print(x.collect()) print(y.collect())

['C', 'B', 'A'] [['C'], ['B', 'A']] 分区合并 cartesian


x = sc.parallelize(['A','B']) y = sc.parallelize(['C','D']) z = x.cartesian(y) print(x.collect()) print(y.collect()) print(z.collect())

['A', 'B'] ['C', 'D'] [('A', 'C'), ('A', 'D'), ('B', 'C'), ('B', 'D')] 类似与笛卡尔积进行组合 groupBy


x = sc.parallelize([1,2,3]) y = x.groupBy(lambda x: 'A' if (x%2 == 1) else 'B' ) print(x.collect())

y is nested, this iterates through it

print([(j[0],[i for i in j[1]]) for j in y.collect()])

[1, 2, 3] [('A', [1, 3]), ('B', [2])] 分组 pipe


x = sc.parallelize(['A', 'Ba', 'C', 'AD']) y = x.pipe('grep -i "A"') # calls out to grep, may fail under Windows print(x.collect()) print(y.collect())

['A', 'Ba', 'C', 'AD'] ['A', 'Ba', 'AD'] 管道可以输入命令来进行再次操作 foreach from future import print_function x = sc.parallelize([1,2,3]) def f(el): '''side effect: append the current RDD elements to a file''' f1=open("./foreachExample.txt", 'a+') print(el,file=f1)

first clear the file contents

open('./foreachExample.txt', 'w').close()

y = x.foreach(f) # writes into foreachExample.txt

print(x.collect()) print(y) # foreach returns 'None'

print the contents of foreachExample.txt

with open("./foreachExample.txt", "r") as foreachExample: print (foreachExample.read())

[1, 2, 3] None 3 1 2 循环操作,但是操作的过程可能是并发的并不是按顺序 foreachPartition


from future import print_function x = sc.parallelize([1,2,3],5) def f(parition): '''side effect: append the current RDD partition contents to a file''' f1=open("./foreachPartitionExample.txt", 'a+') print([el for el in parition],file=f1)

first clear the file contents

open('./foreachPartitionExample.txt', 'w').close()

y = x.foreachPartition(f) # writes into foreachExample.txt

print(x.glom().collect()) print(y) # foreach returns 'None'

print the contents of foreachExample.txt

with open("./foreachPartitionExample.txt", "r") as foreachExample: print (foreachExample.read())

[[], [1], [], [2], [3]] None [] [] [1] [2] [3] 按分布进行循环遍历 reduce


x = sc.parallelize([1,2,3]) y = x.reduce(lambda obj, accumulated: obj + accumulated) # computes a cumulative sum print(x.collect()) print(y)

[1, 2, 3] 6 合并 fold


x = sc.parallelize([1,2,3]) neutral_zero_value = 0 # 0 for sum, 1 for multiplication y = x.fold(neutral_zero_value,lambda obj, accumulated: accumulated + obj) # computes cumulative sum print(x.collect()) print(y)

[1, 2, 3] 6 折叠 aggregate


x = sc.parallelize([2,3,4]) neutral_zero_value = (0,1) # sum: x+0 = x, product: 1*x = x seqOp = (lambda aggregated, el: (aggregated[0] + el, aggregated[1] * el)) combOp = (lambda aggregated, el: (aggregated[0] + el[0], aggregated[1] * el[1])) y = x.aggregate(neutral_zero_value,seqOp,combOp) # computes (cumulative sum, cumulative product) print(x.collect()) print(y) [2, 3, 4] (9, 24) 聚集 histogram

histogram (example #1)

x = sc.parallelize([1,3,1,2,3]) y = x.histogram(buckets = 2) print(x.collect()) print(y)

[1, 3, 1, 2, 3] ([1, 2, 3], [2, 3])

histogram (example #2)

x = sc.parallelize([1,3,1,2,3]) y = x.histogram([0,0.5,1,1.5,2,2.5,3,3.5]) print(x.collect()) print(y)

[1, 3, 1, 2, 3] ([0, 0.5, 1, 1.5, 2, 2.5, 3, 3.5], [0, 0, 2, 0, 1, 0, 2]) 输出的第一参数是桶的范围,第二个参数为每一个桶中数据的频数 variance


x = sc.parallelize([1,3,2]) y = x.variance() # divides by N print(x.collect()) print(y) [1, 3, 2] 0.666666666667 方差 stdev


x = sc.parallelize([1,3,2]) y = x.stdev() # divides by N print(x.collect()) print(y)

[1, 3, 2] 0.816496580928 标准差


x = sc.parallelize([1,3,2]) y = x.sampleStdev() # divides by N-1 print(x.collect()) print(y) [1, 3, 2] 1.0 抽样标准差除数为N-1 countByValue


x = sc.parallelize([1,3,1,2,3]) y = x.countByValue() print(x.collect()) print(y)

[1, 3, 1, 2, 3] defaultdict(<type 'int'>, {1: 2, 2: 1, 3: 2}) top


x = sc.parallelize([1,3,1,2,3]) y = x.top(num = 3) print(x.collect()) print(y)

[1, 3, 1, 2, 3] [3, 3, 2] 排序取前几个,从大到小 takeOrdered


x = sc.parallelize([1,3,1,2,3]) y = x.takeOrdered(num = 3) print(x.collect()) print(y)

[1, 3, 1, 2, 3] [1, 1, 2] 从小到大排序取值 take


x = sc.parallelize([1,3,1,2,3]) y = x.take(num = 3) print(x.collect()) print(y)

[1, 3, 1, 2, 3] [1, 3, 1] 不排序直接取 first


x = sc.parallelize([1,3,1,2,3]) y = x.first() print(x.collect()) print(y)

[1, 3, 1, 2, 3] 1 取第一个 collectAsMap


x = sc.parallelize([('C',3),('A',1),('B',2)]) y = x.collectAsMap() print(x.collect()) print(y)

[('C', 3), ('A', 1), ('B', 2)] {'A': 1, 'C': 3, 'B': 2} 将列表转化为map keys


x = sc.parallelize([('C',3),('A',1),('B',2)]) y = x.keys() print(x.collect()) print(y.collect())

[('C', 3), ('A', 1), ('B', 2)] ['C', 'A', 'B'] 只取出key值 values 只取出value countByKey


x = sc.parallelize([('B',1),('B',2),('A',3),('A',4),('A',5)]) y = x.countByKey() print(x.collect()) print(y)

[('B', 1), ('B', 2), ('A', 3), ('A', 4), ('A', 5)] defaultdict(<type 'int'>, {'A': 3, 'B': 2}) 统计key的次数 join


x = sc.parallelize([('C',4),('B',3),('A',2),('A',1)]) y = sc.parallelize([('A',8),('B',7),('A',6),('D',5)]) z = x.join(y) print(x.collect()) print(y.collect()) print(z.collect())

[('C', 4), ('B', 3), ('A', 2), ('A', 1)] [('A', 8), ('B', 7), ('A', 6), ('D', 5)] [('A', (2, 8)), ('A', (2, 6)), ('A', (1, 8)), ('A', (1, 6)), ('B', (3, 7))] 合并 leftOuterJoin rightOuterJoin partitionBy


x = sc.parallelize([(0,1),(1,2),(2,3)],2) y = x.partitionBy(numPartitions = 3, partitionFunc = lambda x: x) # only key is passed to paritionFunc print(x.glom().collect()) print(y.glom().collect())

[[(0, 1)], [(1, 2), (2, 3)]] [[(0, 1)], [(1, 2)], [(2, 3)]] 分区,每一个分区是一个列表 combineByKey


x = sc.parallelize([('B',1),('B',2),('A',3),('A',4),('A',5)]) createCombiner = (lambda el: [(el,el2)]) mergeVal = (lambda aggregated, el: aggregated + [(el,el2)]) # append to aggregated mergeComb = (lambda agg1,agg2: agg1 + agg2 ) # append agg1 with agg2 y = x.combineByKey(createCombiner,mergeVal,mergeComb) print(x.collect()) print(y.collect())

[('B', 1), ('B', 2), ('A', 3), ('A', 4), ('A', 5)] [('A', [(3, 9), (4, 16), (5, 25)]), ('B', [(1, 1), (2, 4)])] 通过key进行数据合并 aggregateByKey


x = sc.parallelize([('B',1),('B',2),('A',3),('A',4),('A',5)]) zeroValue = [] # empty list is 'zero value' for append operation mergeVal = (lambda aggregated, el: aggregated + [(el,el**2)]) mergeComb = (lambda agg1,agg2: agg1 + agg2 ) y = x.aggregateByKey(zeroValue,mergeVal,mergeComb) print(x.collect()) print(y.collect())

[('B', 1), ('B', 2), ('A', 3), ('A', 4), ('A', 5)] [('A', [(3, 9), (4, 16), (5, 25)]), ('B', [(1, 1), (2, 4)])] foldByKey 通过key进行聚集 foldByKey


x = sc.parallelize([('B',1),('B',2),('A',3),('A',4),('A',5)]) zeroValue = 1 # one is 'zero value' for multiplication y = x.foldByKey(zeroValue,lambda agg,x: agg*x ) # computes cumulative product within each key print(x.collect()) print(y.collect())

[('B', 1), ('B', 2), ('A', 3), ('A', 4), ('A', 5)] [('A', 60), ('B', 2)] 按key值折叠 groupByKey


x = sc.parallelize([('B',5),('B',4),('A',3),('A',2),('A',1)]) y = x.groupByKey() print(x.collect()) print([(j[0],[i for i in j[1]]) for j in y.collect()])

[('B', 5), ('B', 4), ('A', 3), ('A', 2), ('A', 1)] [('A', [3, 2, 1]), ('B', [5, 4])] flatMapValues


x = sc.parallelize([('A',(1,2,3)),('B',(4,5))]) y = x.flatMapValues(lambda x: [i**2 for i in x]) # function is applied to entire value, then result is flattened print(x.collect()) print(y.collect())

[('A', (1, 2, 3)), ('B', (4, 5))] [('A', 1), ('A', 4), ('A', 9), ('B', 16), ('B', 25)] 对map的值进行操作,并拆分为单维map mapValues 仅仅对map值操作,其他不改变 groupWith


x = sc.parallelize([('C',4),('B',(3,3)),('A',2),('A',(1,1))]) y = sc.parallelize([('B',(7,7)),('A',6),('D',(5,5))]) z = sc.parallelize([('D',9),('B',(8,8))]) a = x.groupWith(y,z) print(x.collect()) print(y.collect()) print(z.collect()) print("Result:") for key,val in list(a.collect()): print(key, [list(i) for i in val])

[('C', 4), ('B', (3, 3)), ('A', 2), ('A', (1, 1))] [('B', (7, 7)), ('A', 6), ('D', (5, 5))] [('D', 9), ('B', (8, 8))] Result: D [[], [(5, 5)], [9]] C [[4], [], []] B [[(3, 3)], [(7, 7)], [(8, 8)]] A [[2, (1, 1)], [6], []]

