上周简单介绍了下pyspark及spark环境搭建,这周我们就来介绍下pyspark的简单应用。
一、RDD
RDD,Resilient DistributedDatasets,是spark中最基础、最常用的数据结构,是一个容错的、并行的数据结构。可以让用户显式地将数据存储到磁盘和内存中,并能控制数据的分区,本质上是一个只读的分区记录集合。
1、RDD的创建
(1)并行化集合创建RDD: parallelize和makeRDD方法
>>>rdd1 = sc.parallelize([1,2,3,4,5],3) #定义1-5分三部分存储
>>>rdd1.collect() #查看数据集
>>>rdd1.getNumPartitions() #查看分区
还可以通过makeRdd创建
scala>val aa = Seq((1 to 10,Seq("super2","super3")),(11 to15,Seq("super2","super3")))
scala>val rdd2 = sc.makeRDD(aa)
scala>rdd2.collect()
scala>rdd2.partitions.size
(2)外部存储数据创建RDD,利用textFile分别读取本地和hdfs文件
读取本地文件
>>> rdd3 = sc.textFile("/spark/bigdata/people.txt")
>>> rdd3.collect()
读取hdfs文件
>>>rdd4 = sc.textFile("hdfs:///user/bigdata/people.txt")
>>>rdd4.collect()
2、RDD简单操作
RDD有两种操作算子:
(1)Transformation(转换):Transformation属于延迟计算,当一个RDD转换成另一个RDD时并没有立即进行转换,仅仅是记住了数据集的逻辑操作
(2)Ation(执行):触发Spark作业的运行,真正触发转换算子的计算
特别注意:Spark的一个核心概念是惰性计算。当你把一个RDD转换成另一个的时候,这个转换不会立即生效执行!!!Spark会把它先记在心里,等到真的有actions需要取转换结果时,才会重新组织transformations(因为可能有一连串的变换)。这样可以避免不必要的中间结果存储和通信。
(1)基础Transformation转换操作:
#map:数据集中的每个元素经过用户自定义的函数转换形成一个新的RDD,新的RDD叫MappedRDD,例子:对每个元素乘2
>>>rdd=sc.parallelize([1,2,3,4,5])
>>>map = rdd.map(lambda x :x*2)
>>>map.collect()
# flatMap:与map类似,但每个元素输入项都可以被映射到0个或多个的输出项,最终将结果”扁平化“后输出,可根据下面实例理解下map和flapMap的区别
>>>sentencesRDD=sc.parallelize(['Hello world','My name is spark'])
>>>mapRDD=sentencesRDD.map(lambda x: x.split(" "))
>>>mapRDD.collect()
>>>wordsRDD=sentencesRDD.flatMap(lambda x: x.split(" "))
>>>wordsRDD.collect()
#filter:筛选出来满足条件的item,例子:筛选出能整除2的数
>>>rdd=sc.parallelize([1,2,3,4,5])
>>>filteredRDD = rdd.filter(lambda x: x % 2 == 0)
>>>filteredRDD.collect()
# Sample:从RDD中的item中采样一部分出来,有放回或者无放回,例子:取1-10之间的50%的样本随机选择
>>>rdd=sc.parallelize([1,2,3,4,5,6,7,8,9,10])
>>>sample=rdd.sample(False,0.5,100)
>>>sample.collect()
#zip:将两个RDD对应元素组合为元组
>>>x = sc.parallelize(range(0,5))
>>>y = sc.parallelize(range(1000, 1005))
>>>x.zip(y).collect()
RDD的转换方法还有很多,具体可根据官网实例操作…………………
(2)基础action操作
# collect(): 计算所有的items并返回所有的结果到driver端,接着 collect()会以Python list的形式返回结果,即上面数据结果展示方法
# first(): 和上面是类似的,不过只返回第1个item
# take(n): 类似,但是返回n个item
# count(): 计算RDD中item的个数
# top(n): 返回头n个items,按照自然结果排序
# reduce(): 对RDD中的items做聚合
>>>a=sc.parallelize([1,2,3,4,5])
>>>rdd_all=a.reduce(lambda x,y:x+y)
>>>print(rdd_all)
RDD的action方法还有很多,具体可根据官网实例操作…………………
二、DataFrame
在Spark中,DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库中的二维表格。DataFrame与RDD的主要区别在于,前者带有schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型。
1、DataFrame的创建
(1)通过反射推断Schema(只支持scla或者java)
scala
scala
scala Person(p(0), p(1).trim.toInt))
scala
scala
scala
scala
scala
(2)通过StructType直接指定Schema(scala\java\python)
>>>sc = spark.sparkContext
>>>from pyspark.sql import SQLContext
>>>dataRdd = sc.textFile("/spark/bigdata/people.txt").map(lambdax:x.split(","))
>>>people = dataRdd.map(lambda x: (x[0], x[1].strip()))
>>>schemaString = "name age"
>>>fields = [StructField(field_name, StringType(), True) for field_name inschemaString.split()]
>>>schema = StructType(fields)
>>>schemaPeople = spark.createDataFrame(people, schema)
>>>schemaPeople.show()
(3)通过json、parquet创建
>>>df = spark.read.json("/spark/bigdata/people.json")
>>>df = spark.read.json("/spark/bigdata/people. parquet")
>>>df.show()
(4)通过读取外部数据JDBC或ODBC创建,或者利用hiveContext连接hive读取数据
2、DataFrame操作
DataFrame操作相比RDD简单便捷,具体如下:
(1)DataFrame对象上Action操作
#show(n)展示数据,n为行数
#show(numRows: Int, truncate: Boolean) 显示完整信息truncate=False
>>>df=spark.read.json("/spark/spark-2.1.0-bin-hadoop2.7/examples/src/main/resources/people.json")
>>>df.show()
# collect:获取所有数据到数组
# collectAsList:获取所有数据到List
# describe(cols: String*):获取指定字段的统计信息
# first, head, take, takeAsList:获取若干行记录
(2)常用操作
# select:获取指定字段值(可获取多个)
>>>df.select("name","age").show()
# where():SQL语言中where关键字后的条件
>>>df.where("age=30").show()
# filter:根据字段进行筛选
>>>df.filter("age=30").show()
# limit方法获取指定DataFrame的前n行记录
>>>df.limit(1).show()
# groupBy:根据字段进行group by操作
>>>df.groupBy("age")
# unionAll方法:对两个DataFrame进行组合
>>>df.unionAll(df.limit(1)).show()
#结果保存
df.coalesce(1).write.format().mode().save保存可设置文件类型、mode方式
>>>df.coalesce(1).write.format(‘csv’).mode(“append”).save(“people”)
以上就是RDD和DataFrame的基本操作,掌握这些基本操作下面可以开启我们的pyspark算法模型之旅啦。
喜欢
分享
or
领取专属 10元无门槛券
私享最新 技术干货