RDD转为Dataset如何指定schema?

与RDD进行互操作

Spark SQL支持两种不同方法将现有RDD转换为Datasets。第一种方法使用反射来推断包含特定类型对象的RDD的schema。这种基于反射的方法会导致更简洁的代码,并且在编写Spark应用程序时已经知道schema的情况下工作良好。

第二种创建Datasets的方法是通过编程接口,允许您构建schema,然后将其应用于现有的RDD。虽然此方法更详细,但它允许你在直到运行时才知道列及其类型的情况下去构件数据集。

使用反射推断模式

Spark SQL的Scala接口支持自动将包含case classes的RDD转换为DataFrame。Case class定义表的schema。使用反射读取case class的参数名称,并将其变为列的名称。Case class也可以嵌套或包含复杂类型,如Seqs或Arrays。此RDD可以隐式转换为DataFrame,然后将其注册为表格。表可以在随后的SQL语句中使用。

// For implicit conversions from RDDs to DataFrames

import spark.implicits._

// Create an RDD of Person objects from a text file, convert it to a Dataframe

val peopleDF = spark.sparkContext

.textFile("examples/src/main/resources/people.txt")

.map(_.split(","))

.map(attributes => Person(attributes(0), attributes(1).trim.toInt))

.toDF()

// Register the DataFrame as a temporary view

peopleDF.createOrReplaceTempView("people")

// SQL statements can be run by using the sql methods provided by Spark

val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")

// The columns of a row in the result can be accessed by field index

teenagersDF.map(teenager => "Name: " + teenager(0)).show()

// +------------+

// | value|

// +------------+

// |Name: Justin|

// +------------+

// or by field name

teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show()

// +------------+

// | value|

// +------------+

// |Name: Justin|

// +------------+

// No pre-defined encoders for Dataset[Map[K,V]], define explicitly

implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]

// Primitive types and case classes can be also defined as

// implicit val stringIntMapEncoder: Encoder[Map[String, Any]] = ExpressionEncoder()

// row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]

teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect()

// Array(Map("name" -> "Justin", "age" -> 19))

以编程方式指定模式

当case class不能提前定义时(例如,记录的结构用字符串编码,或者文本数据集将被解析并且字段对不同的用户值会不同),DataFrame可以以编程方式通过三个步骤创建 。

1, Row从原始RDD 创建元素类型为Row的RDD;

2,使用StructType创建一组schema,然后让其匹配步骤1中Rows的类型结构。

3,使用SparkSession 提供的方法createDataFrame,将schema应用于Rows 类型的RDD。

import org.apache.spark.sql.types._

// Create an RDD

val peopleRDD = spark.sparkContext.textFile("examples/src/main/resources/people.txt")

// The schema is encoded in a string

val schemaString = "name age"

// Generate the schema based on the string of schema

val fields = schemaString.split(" ")

.map(fieldName => StructField(fieldName, StringType, nullable = true))

val schema = StructType(fields)

// Convert records of the RDD (people) to Rows

val rowRDD = peopleRDD

.map(_.split(","))

.map(attributes => Row(attributes(0), attributes(1).trim))

// Apply the schema to the RDD

val peopleDF = spark.createDataFrame(rowRDD, schema)

// Creates a temporary view using the DataFrame

peopleDF.createOrReplaceTempView("people")

// SQL can be run over a temporary view created using DataFrames

val results = spark.sql("SELECT name FROM people")

// The results of SQL queries are DataFrames and support all the normal RDD operations

// The columns of a row in the result can be accessed by field index or by field name

results.map(attributes => "Name: " + attributes(0)).show()

// +-------------+

// | value|

// +-------------+

// |Name: Michael|

// | Name: Andy|

// | Name: Justin|

// +-------------+

原文发布于微信公众号 - Spark学习技巧(bigdatatip)

原文发表时间:2018-06-14

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Pulsar-V

Save Camera Document

#pragma once #include "HCCamera.h" #include <time.h> #include <cstdio> #incl...

2888
来自专栏一个会写诗的程序员的博客

查看域名服务器命令: $ dig -t NS xvideos.com

; <<>> DiG 9.8.3-P1 <<>> -t NS xvideos.com ;; global options: +cmd ;; Got answ...

2.8K2
来自专栏MelonTeam专栏

Bitmap 源码阅读笔记

导语: Android 系统上的图片的处理,跟Bitmap 这个类脱不了关系,我们有必要去深入阅读里面的源码,以便在工作中能更好的处理Bitmap相关的问题...

2598
来自专栏前端儿

Web 前端颜色值--字体--使用,整理整理

颜色值 CSS 颜色使用组合了红绿蓝颜色值 (RGB) 的十六进制 (hex) 表示法进行定义。对光源进行设置的最低值可以是 0(十六进制 00)。最高值是 2...

2702
来自专栏WOLFRAM

向日葵中的数学之美

1923
来自专栏搞前端的李蚊子

Html5模拟通讯录人员排序(sen.js)

// JavaScript Document  var PY_Json_Str = ""; var PY_Str_1 = ""; var PY_Str_...

6326
来自专栏码匠的流水账

聊聊HystrixThreadPool

hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/HystrixThreadPool.java

921
来自专栏Golang语言社区

Knapsack problem algorithms for my real-life carry-on knapsack

I'm a nomad and live out of one carry-on bag. This means that the total weight o...

1182
来自专栏增长技术

App Guide相关

##TourGuide https://github.com/worker8/TourGuide

812
来自专栏xingoo, 一个梦想做发明家的程序员

使用计时器-方法1

Windows将WM_TIMER消息发送到应用程序的窗口过程。相应SetTimer的调用方法: SetTimer(hwnd,1,uiMsecInterval,N...

1858

扫码关注云+社区