前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spart DataSet数据集

Spart DataSet数据集

作者头像
Albert陈凯
发布2018-04-04 15:22:24
8760
发布2018-04-04 15:22:24
举报
文章被收录于专栏:Albert陈凯Albert陈凯

]Spark引入DataFrame, 它可以提供high-level functions让Spark更好的处理结构数据的计算。 这让Catalyst optimizer 和Tungsten(钨丝) execution engine自动加速大数据分析。 发布DataFrame之后开发者收到了很多反馈, 其中一个主要的是大家反映缺乏编译时类型安全。 为了解决这个问题,Spark采用新的Dataset API (DataFrame API的类型扩展)。 Dataset API扩展DataFrame API支持静态类型和运行已经存在的Scala或Java语言的用户自定义函数。 对比传统的RDD API,Dataset API提供更好的内存管理,特别是在长任务中有更好的性能提升

SparkDatasets.png

#创建DataSet
case class Data(a: Int, b: String)
val ds = Seq(Data(1, "one"), Data(2, "two")).toDS()
ds.collect()
ds.show()

#创建DataSet
case class Person(name: String, zip: Long)
val df = sqlContext.read.json(sc.parallelize("""{"zip": 94709, "name": "Michael"}""" :: Nil))
df.as[Person].collect()
df.as[Person].show()

#DataSet的WordCount
import org.apache.spark.sql.functions._
val ds = sqlContext.read.text("hdfs://node-1.sxt.cn:9000/wc").as[String]
val result = ds.flatMap(_.split(" ")).filter(_ != "").toDF().groupBy($"value").agg(count("*") as "numOccurances").orderBy($"numOccurances" desc)


val wordCount = ds.flatMap(_.split(" ")).filter(_ != "").groupBy(_.toLowerCase()).count()


#创建DataSet
val lines = sqlContext.read.text("hdfs://node-1.sxt.cn:9000/wc").as[String]

#对DataSet进行操作
val words = lines.flatMap(_.split(" ")).filter(_ != "")

#查看DataSet中的内容
words.collect
words.show

#分组求和
val counts = words.groupBy(_.toLowerCase).count()

--------------------------------------------------------------------------------------------------------------
{"name": "UC Berkeley", "yearFounded": 1868, "numStudents": 37581}
{"name": "MIT", "yearFounded": 1860, "numStudents": 11318}
#向hdfs中上传数据:/usr/local/hadoop-2.6.4/bin/hdfs dfs -put schools.json /

#定义case class
case class University(name: String, numStudents: Long, yearFounded: Long)

#创建DataSet
val schools = sqlContext.read.json("hdfs://node-1.sxt.cn:9000/schools.json").as[University]
#操作DataSet
schools.map(sc => s"${sc.name} is ${2015 - sc.yearFounded} years old").show


#JSON -> DataFrame
val df = sqlContext.read.json("hdfs://node-1.sxt.cn:9000/person.json")

df.where($"age" >= 20).show
df.where(col("age") >= 20).show
df.printSchema

#DataFrame -> Dataset
case class Person(age: Long, name: String)

val ds = df.as[Person]
ds.filter(_.age >= 20).show


// Dataset -> DataFrame
val df2 = ds.toDF

import org.apache.spark.sql.types._


df.where($"age" > 0).groupBy((($"age" / 10) cast IntegerType) * 10 as "decade").agg(count("*")).orderBy($"decade").show 
  
ds.filter(_.age > 0).groupBy(p => (p.age / 10) * 10).agg(count("name")).toDF().withColumnRenamed("value", "decade").orderBy("decade") .show

  

val df = sqlContext.read.json("hdfs://node-1.sxt.cn:9000/student.json")
case class Student(name: String, age: Long, major: String)
val studentDS = df.as[Student]
studentDS.select($"name".as[String], $"age".as[Long]).filter(_._2 > 19).collect()

studentDS.groupBy(_.major).count().collect()

import org.apache.spark.sql.functions._

studentDS.groupBy(_.major).agg(avg($"age").as[Double]).collect()


case class Major(shortName: String, fullName: String)
val majors = Seq(Major("CS", "Computer Science"), Major("Math", "Mathematics")).toDS()

val joined = studentDS.joinWith(majors, $"major" === $"shortName")

joined.map(s => (s._1.name, s._2.fullName)).show()

joined.explain()
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2017.06.21 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档