前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >大数据随记 —— DataFrame 与 RDD 之间的相互转换

大数据随记 —— DataFrame 与 RDD 之间的相互转换

作者头像
繁依Fanyi
发布2023-05-07 19:26:44
9680
发布2023-05-07 19:26:44
举报
在这里插入图片描述
在这里插入图片描述

在 Spark SQL 中有两种方式可以在 DataFrame 和 RDD 中进行转换:

  • ① 利用反射机制,推导包含某种类型的 RDD,通过反射将其转换为指定类型的 DataFrame,适用于提前知道 RDD 的 Schema。
  • ② 通过编程借口与 RDD 进行交互获取 Schema,并动态创建 DataFrame,在运行时决定列及其类型。

DataFrame 中的数据结构信息,即为 Scheme

① 通过反射获取 RDD 内的 Scheme

(使用条件)已知类的 Schema,使用这种基于反射的方法会让代码更加简洁而且效果也更好。在 Scala 中,使用 case class 类型导入 RDD 并转换为 DataFrame,通过 case class 创建 Schema,case class 的参数名称会被利用反射机制作为列名。case class 可以嵌套组合成 Sequences 或者 Array。这种 RDD 可以高效的转换为 DataFrame 并注册为表。

其次,如果需要 RDD 与 DFS 或者 DS 之间互相操作,那么需要引入 import sqlContext.implicits._

  • 这里的 sqlContext 不是包名,而是创建的 SparkSession 对象(这里为 SQLContext 对象)的变量名称,所以必须先创建 SparkSession 对象再导入。
  • 这里 sqlContext 对象不能使用 var 声明,因为 Scala 只支持 val 修饰的对象的引入。

SparkSession 是 Spark 2.0 引入的概念,其封装了 SQLContext 和 HiveContext。

代码语言:javascript
复制
package sparksql  
  
import org.apache.spark.sql.SQLContext  
import org.apache.spark.{SparkConf, SparkContext}  
  
object DataFrametoRDDofReflection {  
  def main(args: Array[String]): Unit = {  
  
  }  

  def method1():Unit = {  
  
    val sparkConf = new SparkConf().setAppName("DataFrametoRDDofReflection").setMaster("local[2]")  
    val sc = new SparkContext(sparkConf)  
    val sqlContext = new SQLContext(sc)  
    
    // 引入 sqlContext.implicits._
    import sqlContext.implicits._  
  
    // 将 RDD 转成 DataFrame    
	/*val people = sc.textFile("people.txt").toDF()*/    
	val people = sc.textFile("people.txt").map(_.split(",")).map(p=>Person(p(0),p(1).trim.toInt)).toDF()  
  
    people.show()  
  
    people.registerTempTable("people")  
    val teenagers = sqlContext.sql("SELECT name,age FROM people WHERE age >= 13 AND age <= 19")  
    teenagers.show()  
  
    // DataFrame 转成 RDD 进行操作:根据索引号取值  
    teenagers.map(t=>"Name:" + t(0)).collect().foreach(println)  
  
    // DataFrame 转成 RDD 进行操作:根据字段名称取值  
    teenagers.map(t=>"Name:" + t.getAs[String]("name")).collect().foreach(println)  
  
    // DataFrame 转成 RDD 进行操作:一次返回多列的值  
    teenagers.map(_.getValuesMap[Any](List("name","age"))).collect().foreach(println)  
  
    sc.stop()  
  
  }

  
  /**  
   * 定义 Person 类  
   * @param name 姓名  
   * @param age 年龄  
   */  
  case class Person(name:String,age:Int)  
  
}

② 通过编程接口执行 Scheme

通过 Spark SQL 的接口创建 RDD 的 Schema,这种方式会让代码比较冗长。这种方法的好处是,在运行时才知道数据的列以及列的类型的情况下,可以动态生成 Schema。可以通过以下三步创建 DataFrame:

  • 第一步将 RDD 转为包含 row 对象的 RDD
  • 第二步基于 structType 类型创建 Schema,与第一步创建的 RDD 想匹配
  • 第三步通过 SQLContext 的 createDataFrame 方法对第一步的 RDD 应用 Schema
代码语言:javascript
复制
package sparksql  
  
import org.apache.spark.sql.SQLContext  
import org.apache.spark.{SparkConf, SparkContext}  
  
object DataFrametoRDDofInterface {  
  
  def main(args: Array[String]): Unit = {  
    method2()  
  }  
  
  def method2(): Unit = {  
    val sparkConf = new SparkConf().setAppName("DataFrametoRDDofInterface").setMaster("local[2]")  
    val sc = new SparkContext(sparkConf)  
    val sqlContext = new SQLContext(sc)  
  
    import sqlContext.implicits._  
    
    val people = sc.textFile("people.txt")  
  
    // 以字符串的方式定义 DataFrame 的 Schema 信息  
    val schemaString = "name age"  
  
    // 导入所需要的类  
    import org.apache.spark.sql.Row  
    import org.apache.spark.sql.types.{StructType,StructField,StringType}  
  
    // 根据自定义的字符串 schema 信息产生 DataFrame 的 Schema    
    val schema = StructType(  
      schemaString.split(" ").map(fieldName=>StructField(fieldName,StringType,true)))  
    
    // 将 RDD 转换成 Row    
    val rowRDD = people.map(_.split(",")).map(p=>Row(p(0),p(1).trim))  
  
    // 将 Schema 作用到 RDD 上  
    val peopleDataFrame = sqlContext.createDataFrame(rowRDD,schema)  
  
    // 将 DataFrame 注册成临时表  
    peopleDataFrame.registerTempTable("people")  
  
    // 获取 name 字段的值  
    val results = sqlContext.sql("SELECT name FROM people")  
    results.map(t => "Name" + t(0)).collect().foreach(println) 
     
    sc.stop()  
    
  }  
}
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022-11-07,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • ① 通过反射获取 RDD 内的 Scheme
  • ② 通过编程接口执行 Scheme
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档