双值类型DoubleValueType
Key-Value值类型
单value类型代码
# -*- coding: utf-8 -*-
# Program function:完成单Value类型RDD的转换算子的演示
from pyspark import SparkConf,SparkContext
import re
'''
分区内:一个rdd可以分为很多分区,每个分区里面都是有大量元素,每个分区都需要线程执行
分区间:有一些操作分区间做一些累加
'''
if __name__ == '__main__':
# 1-创建SparkContext申请资源
conf = SparkConf().setAppName("mini").setMaster("local[*]")
sc = SparkContext.getOrCreate(conf=conf)
sc.setLogLevel("WARN")#一般在工作中不这么写,直接复制log4j文件
# 2-map操作
rdd1 = sc.parallelize([1, 2, 3, 4, 5, 6])
rdd__map = rdd1.map(lambda x: x * 2)
print(rdd__map.glom().collect())#[2, 4, 6, 8, 10, 12],#[[2, 4, 6], [8, 10, 12]]
# 3-filter操作
print(rdd1.glom().collect())
print(rdd1.filter(lambda x: x > 3).glom().collect())
# 4-flatMap
rdd2 = sc.parallelize([" hello you", "hello me "])
print(rdd2.flatMap(lambda word: re.split("\s+", word.strip())).collect())
# 5-groupBY
x = sc.parallelize([1, 2, 3])
y = x.groupBy(lambda x: 'A' if (x % 2 == 1) else 'B')
print(y.mapValues(list).collect())#[('A', [1, 3]), ('B', [2])]
# 6-mapValue
x1 = sc.parallelize([("a", ["apple", "banana", "lemon"]), ("b", ["grapes"])])
def f(x): return len(x)
print(x1.mapValues(f).collect())
双value类型的代码
# -*- coding: utf-8 -*-
# Program function:完成单Value类型RDD的转换算子的演示
from pyspark import SparkConf, SparkContext
import re
'''
分区内:一个rdd可以分为很多分区,每个分区里面都是有大量元素,每个分区都需要线程执行
分区间:有一些操作分区间做一些累加
'''
if __name__ == '__main__':
# 1-创建SparkContext申请资源
conf = SparkConf().setAppName("mini2").setMaster("local[*]")
sc = SparkContext.getOrCreate(conf=conf)
sc.setLogLevel("WARN") # 一般在工作中不这么写,直接复制log4j文件
# 2-对两个RDD求并集
rdd1 = sc.parallelize([1, 2, 3, 4, 5])
rdd2 = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8])
Union_RDD = rdd1.union(rdd2)
print(Union_RDD.collect())
print(rdd1.intersection(rdd2).collect())
print(rdd2.subtract(rdd1).collect())
# Return a new RDD containing the distinct elements in this RDD.
print(Union_RDD.distinct().collect())
print(Union_RDD.distinct().glom().collect())
key-Value算子
from pyspark import SparkConf, SparkContext import re
‘’’ 分区内:一个rdd可以分为很多分区,每个分区里面都是有大量元素,每个分区都需要线程执行 分区间:有一些操作分区间做一些累加 ‘’’ if name == ‘main’:
conf = SparkConf().setAppName(“mini2”).setMaster(“local[*]”) sc = SparkContext.getOrCreate(conf=conf) sc.setLogLevel(“WARN”) # 一般在工作中不这么写,直接复制log4j文件
rdd1 = sc.parallelize([(“a”, 1), (“b”, 2)]) rdd2 = sc.parallelize([(“c”, 1), (“b”, 3)]) rdd3 = rdd1.union(rdd2) key1 = rdd3.groupByKey() print(“groupByKey:”,key1.collect()) #groupByKey:
print(key1.mapValues(list).collect())#需要通过mapValue获取groupByKey的值 print(key1.mapValues(tuple).collect())
key2 = rdd3.reduceByKey(lambda x, y: x + y) print(key2.collect())
print(key2.map(lambda x: (x[1], x[0])).sortByKey(False).collect())#[(5, ‘b’), (1, ‘c’), (1, ‘a’)]
print(rdd3.countByValue())#defaultdict(<class ‘int’>, {(‘a’, 1): 1, (‘b’, 2): 1, (‘c’, 1): 1, (‘b’, 3): 1})
*
部分操作
# -*- coding: utf-8 -*-
# Program function:完成单Value类型RDD的转换算子的演示
from pyspark import SparkConf, SparkContext
import re
'''
分区内:一个rdd可以分为很多分区,每个分区里面都是有大量元素,每个分区都需要线程执行
分区间:有一些操作分区间做一些累加
'''
if __name__ == '__main__':
# 1-创建SparkContext申请资源
conf = SparkConf().setAppName("mini2").setMaster("local[*]")
sc = SparkContext.getOrCreate(conf=conf)
sc.setLogLevel("WARN") # 一般在工作中不这么写,直接复制log4j文件
# 2-key和value类型算子
# groupByKey
rdd1 = sc.parallelize([("a", 1), ("b", 2)])
rdd2 = sc.parallelize([("c", 1), ("b", 3)])
print(rdd1.first())
print(rdd1.take(2))
print(rdd1.top(2))
print(rdd1.collect())
rdd3 = sc.parallelize([1, 2, 3, 4, 5])
from operator import add
from operator import mul
print(rdd3.reduce(add))
print(rdd3.reduce(mul))
rdd4 = sc.parallelize(range(0, 10))
# 能否保证每次抽样结果是一致的,使用seed随机数种子
print(rdd4.takeSample(True, 3, 123))
print(rdd4.takeSample(True, 3, 123))
print(rdd4.takeSample(True, 3, 123))
print(rdd4.takeSample(True, 3, 34))
其他补充算子
from pyspark import SparkConf, SparkContext import re
‘’’ 分区内:一个rdd可以分为很多分区,每个分区里面都是有大量元素,每个分区都需要线程执行 分区间:有一些操作分区间做一些累加 ‘’’
def f(iterator): # 【1,2,3】 【4,5】 for x in iterator: # for x in 【1,2,3】 x=1,2,3 print 1.2.3 print(x)
def f1(iterator): # 【1,2,3】 【4,5】 sum(1+2+3) sum(4+5) yield sum(iterator)
if name == ‘main’:
conf = SparkConf().setAppName(“mini2”).setMaster(“local[*]”) sc = SparkContext.getOrCreate(conf=conf) sc.setLogLevel(“WARN”) # 一般在工作中不这么写,直接复制log4j文件
rdd1 = sc.parallelize([(“a”, 1), (“b”, 2)]) print(rdd1.glom().collect())
rdd1.foreach(lambda x: print(x))
rdd1.foreachPartition(f)
rdd2 = sc.parallelize([1, 2, 3, 4]) print(rdd2.map(lambda x: x * 2).collect())
print(rdd2.mapPartitions(f1).collect()) # [3, 7]
*
基本函数
分区操作函数
重分区函数
# -*- coding: utf-8 -*-
# Program function:完成单Value类型RDD的转换算子的演示
from pyspark import SparkConf, SparkContext
import re
'''
分区内:一个rdd可以分为很多分区,每个分区里面都是有大量元素,每个分区都需要线程执行
分区间:有一些操作分区间做一些累加
alt+6 可以调出来所有TODO,
TODO是Python提供了预留功能的地方
'''
if __name__ == '__main__':
#TODO: 1-创建SparkContext申请资源
conf = SparkConf().setAppName("mini2").setMaster("local[*]")
sc = SparkContext.getOrCreate(conf=conf)
sc.setLogLevel("WARN") # 一般在工作中不这么写,直接复制log4j文件
#TODO: 2-执行重分区函数--repartition
rdd1 = sc.parallelize([1, 2, 3, 4, 5, 6], 3)
print("partitions num:",rdd1.getNumPartitions())
print(rdd1.glom().collect())#[[1, 2], [3, 4], [5, 6]]
print("repartition result:")
#TODO: repartition可以增加分区也可以减少分区,但是都会产生shuflle,如果减少分区的化建议使用coalesc避免发生shuffle
rdd__repartition1 = rdd1.repartition(5)
print("increase partition",rdd__repartition1.glom().collect())#[[], [1, 2], [5, 6], [3, 4], []]
rdd__repartition2 = rdd1.repartition(2)
print("decrease partition",rdd__repartition2.glom().collect())#decrease partition [[1, 2, 5, 6], [3, 4]]
#TODO: 3-减少分区--coalese
print(rdd1.coalesce(2).glom().collect())#[[1, 2], [3, 4, 5, 6]]
print(rdd1.coalesce(5).glom().collect())#[[1, 2], [3, 4], [5, 6]]
print(rdd1.coalesce(5,True).glom().collect())#[[], [1, 2], [5, 6], [3, 4], []]
# 结论:repartition默认调用的是coalese的shuffle为True的方法
# TODO: 4-PartitonBy,可以调整分区,还可以调整分区器(一种hash分区器(一般打散数据),一种range分区器(排序拍好的))
# 此类专门针对RDD中数据类型为KeyValue对提供函数
# rdd五大特性中有第四个特点key-value分区器,默认是hashpartitioner分区器
rdd__map = rdd1.map(lambda x: (x, x))
print("partitions length:",rdd__map.getNumPartitions())#partitions length: 3
print(rdd__map.partitionBy(2).glom().collect())
聚合函数
代码:
# -*- coding: utf-8 -*-
# Program function:完成单Value类型RDD的转换算子的演示
from pyspark import SparkConf, SparkContext
import re
'''
分区内:一个rdd可以分为很多分区,每个分区里面都是有大量元素,每个分区都需要线程执行
分区间:有一些操作分区间做一些累加
alt+6 可以调出来所有TODO,
TODO是Python提供了预留功能的地方
'''
def addNum(x,y):
return x+y
if __name__ == '__main__':
# TODO: 1-创建SparkContext申请资源
conf = SparkConf().setAppName("mini2").setMaster("local[*]")
sc = SparkContext.getOrCreate(conf=conf)
sc.setLogLevel("WARN") # 一般在工作中不这么写,直接复制log4j文件
# TODO: 2-使用reduce进行聚合计算
rdd1 = sc.parallelize([1, 2, 3, 4, 5, 6], 3)
from operator import add
# 直接得到返回值-21
print(rdd1.reduce(add))
# TODO: 3-使用fold进行聚合计算
# 第一个参数zeroValue是初始值,会参与分区的计算
# 第二个参数是执行运算的operation
print(rdd1.fold(0, add)) # 21
print(rdd1.getNumPartitions()) # 3
print(rdd1.glom().collect())
print("fold result:", rdd1.fold(10, add))
# TODO: 3-使用aggreate进行聚合计算
# seqOp分区内的操作, combOp分区间的操作
print(rdd1.aggregate(0, add, add)) # 21
print(rdd1.glom().collect())
print("aggregate result:", rdd1.aggregate(1, add, add)) # aggregate result: 25
# 结论:fold是aggregate的简化版本,fold分区内和分区间的函数是一致的
print("aggregate result:", rdd1.aggregate(1, addNum, addNum)) # aggregate result: 25
byKey类的聚合函数
groupByKey----如何获取value的数据?------答案:result.mapValue(list).collect
reduceByKey
foldBykey
aggregateByKey
CombineByKey:这是一个更为底层实现的bykey 聚合算子,可以实现更多复杂功能
案例1:
# -*- coding: utf-8 -*-
# Program function:完成单Value类型RDD的转换算子的演示
from pyspark import SparkConf, SparkContext
import re
'''
分区内:一个rdd可以分为很多分区,每个分区里面都是有大量元素,每个分区都需要线程执行
分区间:有一些操作分区间做一些累加
alt+6 可以调出来所有TODO,
TODO是Python提供了预留功能的地方
'''
'''
对初始值进行操作
'''
def createCombiner(value): #('a',[1])
return [value]
# 这里的x=createCombiner得到的[value]结果
def mergeValue(x,y): #这里相同a的value=y=1
x.append(y)#('a', [1, 1]),('b', [1])
return x
def mergeCombiners(a,b):
a.extend(b)
return a
if name == ‘main’:
conf = SparkConf().setAppName(“mini2”).setMaster(“local[*]”) sc = SparkContext.getOrCreate(conf=conf) sc.setLogLevel(“WARN”) # 一般在工作中不这么写,直接复制log4j文件
from operator import add
rdd = sc.parallelize([(“a”, 1), (“b”, 1), (“a”, 1)])
print(sorted(rdd.groupByKey().mapValues(list).collect()))
createCombiner
, which turns a V into a C (e.g., creates a one-element list)mergeValue
, to merge a V into a C (e.g., adds it to the end ofa list)mergeCombiners
, to combine two C’s into a single one (e.g., merges the lists)by_key_result = rdd.combineByKey(createCombiner, mergeValue, mergeCombiners) print(sorted(by_key_result.collect()))#[(‘a’, [1, 1]), (‘b’, [1])]
* 案例2
* ```python
from pyspark import SparkConf, SparkContext import re
‘’’ 分区内:一个rdd可以分为很多分区,每个分区里面都是有大量元素,每个分区都需要线程执行 分区间:有一些操作分区间做一些累加 alt+6 可以调出来所有TODO, TODO是Python提供了预留功能的地方 ‘’’
‘’’ 对初始值进行操作 [value,1],value指的是当前学生成绩,1代表的是未来算一下一个学生考了几次考试 (“Fred”, 88)---------->[88,1] ‘’’
def createCombiner(value): # return [value, 1]
‘’’ x代表的是 [value,1]值,x=[88,1] y代表的相同key的value,比如(“Fred”, 95)的95,执行分区内的累加 ‘’’
def mergeValue(x, y): return [x[0] + y, x[1] + 1]
‘’’ a = a[0] value,a[1] 几次考试 ‘’’
def mergeCombiners(a, b): return [a[0] + b[0], a[1] + b[1]]
if name == ‘main’:
conf = SparkConf().setAppName(“mini2”).setMaster(“local[*]”) sc = SparkContext.getOrCreate(conf=conf) sc.setLogLevel(“WARN”) # 一般在工作中不这么写,直接复制log4j文件
from operator import add
x = sc.parallelize([(“Fred”, 88), (“Fred”, 95), (“Fred”, 91), (“Wilma”, 93), (“Wilma”, 95), (“Wilma”, 98)], 3) print(x.glom().collect())
reduce_by_key_rdd = x.reduceByKey(lambda x, y: x + y) print(“reduceBykey:”, reduce_by_key_rdd.collect()) # [(‘Fred’, 274), (‘Wilma’, 286)]
createCombiner
, which turns a V into a C (e.g., creates a one-element list)mergeValue
, to merge a V into a C (e.g., adds it to the end ofa list)mergeCombiners
, to combine two C’s into a single one (e.g., merges the lists)combine_by_key_rdd = x.combineByKey(createCombiner, mergeValue, mergeCombiners) print(combine_by_key_rdd.collect()) # [(‘Fred’, [274, 3]), (‘Wilma’, [286, 3])]
print(combine_by_key_rdd.map(lambda x: (x[0], int(x[1][0] / x[1][1]))).collect())
*
* 面试题: