RDD转为Dataset如何指定schema?

与RDD进行互操作

Spark SQL支持两种不同方法将现有RDD转换为Datasets。第一种方法使用反射来推断包含特定类型对象的RDD的schema。这种基于反射的方法会导致更简洁的代码,并且在编写Spark应用程序时已经知道schema的情况下工作良好。

第二种创建Datasets的方法是通过编程接口,允许您构建schema,然后将其应用于现有的RDD。虽然此方法更详细,但它允许你在直到运行时才知道列及其类型的情况下去构件数据集。

使用反射推断模式

Spark SQL的Scala接口支持自动将包含case classes的RDD转换为DataFrame。Case class定义表的schema。使用反射读取case class的参数名称,并将其变为列的名称。Case class也可以嵌套或包含复杂类型,如Seqs或Arrays。此RDD可以隐式转换为DataFrame,然后将其注册为表格。表可以在随后的SQL语句中使用。

// For implicit conversions from RDDs to DataFrames

import spark.implicits._

// Create an RDD of Person objects from a text file, convert it to a Dataframe

val peopleDF = spark.sparkContext

.textFile("examples/src/main/resources/people.txt")

.map(_.split(","))

.map(attributes => Person(attributes(0), attributes(1).trim.toInt))

.toDF()

// Register the DataFrame as a temporary view

peopleDF.createOrReplaceTempView("people")

// SQL statements can be run by using the sql methods provided by Spark

val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")

// The columns of a row in the result can be accessed by field index

teenagersDF.map(teenager => "Name: " + teenager(0)).show()

// +------------+

// | value|

// +------------+

// |Name: Justin|

// +------------+

// or by field name

teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show()

// +------------+

// | value|

// +------------+

// |Name: Justin|

// +------------+

// No pre-defined encoders for Dataset[Map[K,V]], define explicitly

implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]

// Primitive types and case classes can be also defined as

// implicit val stringIntMapEncoder: Encoder[Map[String, Any]] = ExpressionEncoder()

// row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]

teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect()

// Array(Map("name" -> "Justin", "age" -> 19))

以编程方式指定模式

当case class不能提前定义时(例如,记录的结构用字符串编码,或者文本数据集将被解析并且字段对不同的用户值会不同),DataFrame可以以编程方式通过三个步骤创建 。

1, Row从原始RDD 创建元素类型为Row的RDD;

2,使用StructType创建一组schema,然后让其匹配步骤1中Rows的类型结构。

3,使用SparkSession 提供的方法createDataFrame,将schema应用于Rows 类型的RDD。

import org.apache.spark.sql.types._

// Create an RDD

val peopleRDD = spark.sparkContext.textFile("examples/src/main/resources/people.txt")

// The schema is encoded in a string

val schemaString = "name age"

// Generate the schema based on the string of schema

val fields = schemaString.split(" ")

.map(fieldName => StructField(fieldName, StringType, nullable = true))

val schema = StructType(fields)

// Convert records of the RDD (people) to Rows

val rowRDD = peopleRDD

.map(_.split(","))

.map(attributes => Row(attributes(0), attributes(1).trim))

// Apply the schema to the RDD

val peopleDF = spark.createDataFrame(rowRDD, schema)

// Creates a temporary view using the DataFrame

peopleDF.createOrReplaceTempView("people")

// SQL can be run over a temporary view created using DataFrames

val results = spark.sql("SELECT name FROM people")

// The results of SQL queries are DataFrames and support all the normal RDD operations

// The columns of a row in the result can be accessed by field index or by field name

results.map(attributes => "Name: " + attributes(0)).show()

// +-------------+

// | value|

// +-------------+

// |Name: Michael|

// | Name: Andy|

// | Name: Justin|

// +-------------+

原文发布于微信公众号 - Spark学习技巧(bigdatatip)

原文发表时间:2018-06-14

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏码匠的流水账

聊聊eureka client的serviceUrl

eureka-client-1.8.8-sources.jar!/com/netflix/discovery/DiscoveryClient.java

911
来自专栏函数式编程语言及工具

FunDA(16)- 示范:整合并行运算 - total parallelism solution

   在对上两篇讨论中我们介绍了并行运算的两种体现方式:并行构建数据源及并行运算用户自定义函数。我们分别对这两部分进行了示范。本篇我准备示范把这两种情况集成一体...

18110
来自专栏绿巨人专栏

[Java] Design Pattern:Code Shape - manage your code shape

2885
来自专栏码匠的流水账

聊聊eureka的preferSameZoneEureka参数

spring-cloud-netflix-eureka-client-2.0.0.RC1-sources.jar!/org/springframework/cl...

671
来自专栏斑斓

框架 | Spark中的combineByKey

在数据分析中,处理Key,Value的Pair数据是极为常见的场景,例如我们可以针对这样的数据进行分组、聚合或者将两个包含Pair数据的RDD根据key进行jo...

3185
来自专栏Albert陈凯

Spart DataSet数据集

]Spark引入DataFrame, 它可以提供high-level functions让Spark更好的处理结构数据的计算。 这让Catalyst opt...

2556
来自专栏码匠的流水账

聊聊curator recipes的LeaderLatch

curator-recipes-4.0.1-sources.jar!/org/apache/curator/framework/recipes/leader/L...

1043
来自专栏丑胖侠

《Drools7.0.0.Final规则引擎教程》番外实例篇——activation-group的多FACT对象

场景 当我们使用activation-group时,默认会执行优先级最高的一个规则,然后其他规则不再执行,这也是此属性的基本特性。那么,大家是否考虑过这样一个问...

2148
来自专栏函数式编程语言及工具

FunDA(15)- 示范:任务并行运算 - user task parallel execution

    FunDA的并行运算施用就是对用户自定义函数的并行运算。原理上就是把一个输入流截分成多个输入流并行地输入到一个自定义函数的多个运行实例。这些函数运行实例...

1759
来自专栏猿天地

spring-data-mongodb之自增ID实现

来源:猿天地 链接:http://cxytiandi.com/blog/detail/1897 用了mongodb之后要是问我mongo和mysql的区别在哪里...

39012

扫码关注云+社区