首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

遇见YI算法之初识Pyspark(二)

上周简单介绍了下pyspark及spark环境搭建,这周我们就来介绍下pyspark的简单应用。

一、RDD

RDD,Resilient DistributedDatasets,是spark中最基础、最常用的数据结构,是一个容错的、并行的数据结构。可以让用户显式地将数据存储到磁盘和内存中,并能控制数据的分区,本质上是一个只读的分区记录集合。

1、RDD的创建

(1)并行化集合创建RDD: parallelize和makeRDD方法

>>>rdd1 = sc.parallelize([1,2,3,4,5],3) #定义1-5分三部分存储

>>>rdd1.collect() #查看数据集

>>>rdd1.getNumPartitions() #查看分区

还可以通过makeRdd创建

scala>val aa = Seq((1 to 10,Seq("super2","super3")),(11 to15,Seq("super2","super3")))

scala>val rdd2 = sc.makeRDD(aa)

scala>rdd2.collect()

scala>rdd2.partitions.size

(2)外部存储数据创建RDD,利用textFile分别读取本地和hdfs文件

读取本地文件

>>> rdd3 = sc.textFile("/spark/bigdata/people.txt")

>>> rdd3.collect()

读取hdfs文件

>>>rdd4 = sc.textFile("hdfs:///user/bigdata/people.txt")

>>>rdd4.collect()

2、RDD简单操作

RDD有两种操作算子:

(1)Transformation(转换):Transformation属于延迟计算,当一个RDD转换成另一个RDD时并没有立即进行转换,仅仅是记住了数据集的逻辑操作

(2)Ation(执行):触发Spark作业的运行,真正触发转换算子的计算

特别注意:Spark的一个核心概念是惰性计算。当你把一个RDD转换成另一个的时候,这个转换不会立即生效执行!!!Spark会把它先记在心里,等到真的有actions需要取转换结果时,才会重新组织transformations(因为可能有一连串的变换)。这样可以避免不必要的中间结果存储和通信。

(1)基础Transformation转换操作:

#map:数据集中的每个元素经过用户自定义的函数转换形成一个新的RDD,新的RDD叫MappedRDD,例子:对每个元素乘2

>>>rdd=sc.parallelize([1,2,3,4,5])

>>>map = rdd.map(lambda x :x*2)

>>>map.collect()

# flatMap:与map类似,但每个元素输入项都可以被映射到0个或多个的输出项,最终将结果”扁平化“后输出,可根据下面实例理解下map和flapMap的区别

>>>sentencesRDD=sc.parallelize(['Hello world','My name is spark'])

>>>mapRDD=sentencesRDD.map(lambda x: x.split(" "))

>>>mapRDD.collect()

>>>wordsRDD=sentencesRDD.flatMap(lambda x: x.split(" "))

>>>wordsRDD.collect()

#filter:筛选出来满足条件的item,例子:筛选出能整除2的数

>>>rdd=sc.parallelize([1,2,3,4,5])

>>>filteredRDD = rdd.filter(lambda x: x % 2 == 0)

>>>filteredRDD.collect()

# Sample:从RDD中的item中采样一部分出来,有放回或者无放回,例子:取1-10之间的50%的样本随机选择

>>>rdd=sc.parallelize([1,2,3,4,5,6,7,8,9,10])

>>>sample=rdd.sample(False,0.5,100)

>>>sample.collect()

#zip:将两个RDD对应元素组合为元组

>>>x = sc.parallelize(range(0,5))

>>>y = sc.parallelize(range(1000, 1005))

>>>x.zip(y).collect()

RDD的转换方法还有很多,具体可根据官网实例操作…………………

(2)基础action操作

# collect(): 计算所有的items并返回所有的结果到driver端,接着 collect()会以Python list的形式返回结果,即上面数据结果展示方法

# first(): 和上面是类似的,不过只返回第1个item

# take(n): 类似,但是返回n个item

# count(): 计算RDD中item的个数

# top(n): 返回头n个items,按照自然结果排序

# reduce(): 对RDD中的items做聚合

>>>a=sc.parallelize([1,2,3,4,5])

>>>rdd_all=a.reduce(lambda x,y:x+y)

>>>print(rdd_all)

RDD的action方法还有很多,具体可根据官网实例操作…………………

二、DataFrame

在Spark中,DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库中的二维表格。DataFrame与RDD的主要区别在于,前者带有schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型。

1、DataFrame的创建

(1)通过反射推断Schema(只支持scla或者java)

scala

scala

scala Person(p(0), p(1).trim.toInt))

scala

scala

scala

scala

scala

(2)通过StructType直接指定Schema(scala\java\python)

>>>sc = spark.sparkContext

>>>from pyspark.sql import SQLContext

>>>dataRdd = sc.textFile("/spark/bigdata/people.txt").map(lambdax:x.split(","))

>>>people = dataRdd.map(lambda x: (x[0], x[1].strip()))

>>>schemaString = "name age"

>>>fields = [StructField(field_name, StringType(), True) for field_name inschemaString.split()]

>>>schema = StructType(fields)

>>>schemaPeople = spark.createDataFrame(people, schema)

>>>schemaPeople.show()

(3)通过json、parquet创建

>>>df = spark.read.json("/spark/bigdata/people.json")

>>>df = spark.read.json("/spark/bigdata/people. parquet")

>>>df.show()

(4)通过读取外部数据JDBC或ODBC创建,或者利用hiveContext连接hive读取数据

2、DataFrame操作

DataFrame操作相比RDD简单便捷,具体如下:

(1)DataFrame对象上Action操作

#show(n)展示数据,n为行数

#show(numRows: Int, truncate: Boolean) 显示完整信息truncate=False

>>>df=spark.read.json("/spark/spark-2.1.0-bin-hadoop2.7/examples/src/main/resources/people.json")

>>>df.show()

# collect:获取所有数据到数组

# collectAsList:获取所有数据到List

# describe(cols: String*):获取指定字段的统计信息

# first, head, take, takeAsList:获取若干行记录

(2)常用操作

# select:获取指定字段值(可获取多个)

>>>df.select("name","age").show()

# where():SQL语言中where关键字后的条件

>>>df.where("age=30").show()

# filter:根据字段进行筛选

>>>df.filter("age=30").show()

# limit方法获取指定DataFrame的前n行记录

>>>df.limit(1).show()

# groupBy:根据字段进行group by操作

>>>df.groupBy("age")

# unionAll方法:对两个DataFrame进行组合

>>>df.unionAll(df.limit(1)).show()

#结果保存

df.coalesce(1).write.format().mode().save保存可设置文件类型、mode方式

>>>df.coalesce(1).write.format(‘csv’).mode(“append”).save(“people”)

以上就是RDD和DataFrame的基本操作,掌握这些基本操作下面可以开启我们的pyspark算法模型之旅啦。

喜欢

分享

or

  • 发表于:
  • 原文链接http://kuaibao.qq.com/s/20180128G0OQGY00?refer=cp_1026
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券