RDD转为Dataset如何指定schema?

与RDD进行互操作

Spark SQL支持两种不同方法将现有RDD转换为Datasets。第一种方法使用反射来推断包含特定类型对象的RDD的schema。这种基于反射的方法会导致更简洁的代码,并且在编写Spark应用程序时已经知道schema的情况下工作良好。

第二种创建Datasets的方法是通过编程接口,允许您构建schema,然后将其应用于现有的RDD。虽然此方法更详细,但它允许你在直到运行时才知道列及其类型的情况下去构件数据集。

使用反射推断模式

Spark SQL的Scala接口支持自动将包含case classes的RDD转换为DataFrame。Case class定义表的schema。使用反射读取case class的参数名称,并将其变为列的名称。Case class也可以嵌套或包含复杂类型,如Seqs或Arrays。此RDD可以隐式转换为DataFrame,然后将其注册为表格。表可以在随后的SQL语句中使用。

// For implicit conversions from RDDs to DataFrames

import spark.implicits._

// Create an RDD of Person objects from a text file, convert it to a Dataframe

val peopleDF = spark.sparkContext

.textFile("examples/src/main/resources/people.txt")

.map(_.split(","))

.map(attributes => Person(attributes(0), attributes(1).trim.toInt))

.toDF()

// Register the DataFrame as a temporary view

peopleDF.createOrReplaceTempView("people")

// SQL statements can be run by using the sql methods provided by Spark

val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")

// The columns of a row in the result can be accessed by field index

teenagersDF.map(teenager => "Name: " + teenager(0)).show()

// +------------+

// | value|

// +------------+

// |Name: Justin|

// +------------+

// or by field name

teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show()

// +------------+

// | value|

// +------------+

// |Name: Justin|

// +------------+

// No pre-defined encoders for Dataset[Map[K,V]], define explicitly

implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]

// Primitive types and case classes can be also defined as

// implicit val stringIntMapEncoder: Encoder[Map[String, Any]] = ExpressionEncoder()

// row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]

teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect()

// Array(Map("name" -> "Justin", "age" -> 19))

以编程方式指定模式

当case class不能提前定义时(例如,记录的结构用字符串编码,或者文本数据集将被解析并且字段对不同的用户值会不同),DataFrame可以以编程方式通过三个步骤创建 。

1, Row从原始RDD 创建元素类型为Row的RDD;

2,使用StructType创建一组schema,然后让其匹配步骤1中Rows的类型结构。

3,使用SparkSession 提供的方法createDataFrame,将schema应用于Rows 类型的RDD。

import org.apache.spark.sql.types._

// Create an RDD

val peopleRDD = spark.sparkContext.textFile("examples/src/main/resources/people.txt")

// The schema is encoded in a string

val schemaString = "name age"

// Generate the schema based on the string of schema

val fields = schemaString.split(" ")

.map(fieldName => StructField(fieldName, StringType, nullable = true))

val schema = StructType(fields)

// Convert records of the RDD (people) to Rows

val rowRDD = peopleRDD

.map(_.split(","))

.map(attributes => Row(attributes(0), attributes(1).trim))

// Apply the schema to the RDD

val peopleDF = spark.createDataFrame(rowRDD, schema)

// Creates a temporary view using the DataFrame

peopleDF.createOrReplaceTempView("people")

// SQL can be run over a temporary view created using DataFrames

val results = spark.sql("SELECT name FROM people")

// The results of SQL queries are DataFrames and support all the normal RDD operations

// The columns of a row in the result can be accessed by field index or by field name

results.map(attributes => "Name: " + attributes(0)).show()

// +-------------+

// | value|

// +-------------+

// |Name: Michael|

// | Name: Andy|

// | Name: Justin|

// +-------------+

原文发布于微信公众号 - Spark学习技巧(bigdatatip)

原文发表时间:2018-06-14

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏浪淘沙

spark求最受欢迎的老师的问题

1033
来自专栏Jed的技术阶梯

Spark常用Transformations算子(二)

介绍以下Transformations算子: aggregateByKey join cogroup cartesian pipe repartit...

1114
来自专栏xingoo, 一个梦想做发明家的程序员

Spark MLlib特征处理 之 StringIndexer、IndexToString使用说明以及源码剖析

StringIndexer可以把字符串的列按照出现频率进行排序,出现次数最高的对应的Index为0。比如下面的列表进行StringIndexer

2670
来自专栏PPV课数据科学社区

【学习】七天搞定SAS(二):基本操作(判断、运算、基本函数)

SAS生成新变量 SAS支持基本的加减乘除,值得一提的是它的**代表指数,而不是^。 * Modify homegarden data set with ass...

4394
来自专栏扎心了老铁

spark三种连接join

本文主要介绍spark join相关操作。 讲述spark连接相关的三个方法join,left-outer-join,right-outer-join,在这之前...

3598
来自专栏函数式编程语言及工具

SDP(3):ScalikeJDBC- JDBC-Engine:Fetching

  ScalikeJDBC在覆盖JDBC基本功能上是比较完整的,而且实现这些功能的方式比较简洁,运算效率方面自然会稍高一筹了。理论上用ScalikeJDBC作为...

3715
来自专栏跟着阿笨一起玩NET

.Net 2.0中使用扩展方法

1202
来自专栏听Allen瞎扯淡

Spark 的惰性运算

作者的意图很简单,就是将RDD中的数据转换为新的数据格式,并统计非法数据的个数。咋一看代码,似乎没有什么问题,可是,这段代码真的能得到正确的结果么?答案是否定的...

5571
来自专栏岑玉海

Spark Streaming自定义Receivers

自定义一个Receiver class SocketTextStreamReceiver(host: String, port: Int( ...

2823
来自专栏有趣的Python

数据机构探险之图(下)篇:代码实战数据机构探险之图篇(代码编写)

数据机构探险之图篇(代码编写) 图的存储(邻接矩阵) & 图的深度优先 & 广度优先 图的编码实战-最小生成树(普里姆算法) 图的编码实战-最小生成树之克鲁斯卡...

3765

扫码关注云+社区