前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >DataFrame与RDD的互操作

DataFrame与RDD的互操作

作者头像
sparkle123
发布2018-04-26 14:42:48
8720
发布2018-04-26 14:42:48
举报
文章被收录于专栏:大数据-Hadoop、Spark
DataFrame Interoperating with RDDs

参考官网 http://spark.apache.org/docs/2.2.0/sql-programming-guide.html#interoperating-with-rdds

DataFrameRDD互操作的两种方式比较: 1)反射推导式:case class 前提:事先需要知道字段、字段类型 2)编程式:Row 如果第一种情况不能满足要求(事先不知道列等schema信息)

  1. 选型:优先考虑第一种,使用简单

下面的代码演示了

  • Inferring the Schema Using Reflection
  • Programmatically Specifying the Schema
代码语言:javascript
复制
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{StringType, StructField, StructType,IntegerType}

object DataFrameRDDApp {

  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder().appName("DataFrameRDDApp").master("local[2]").getOrCreate();
    // Create an RDD of Person objects from a text file
    val testRDD = spark.sparkContext.textFile("C:\\Users\\Administrator\\IdeaProjects\\SparkSQLProject\\spark-warehouse\\test.txt")
    //inferReflection(spark,testRDD)

    program(spark,testRDD)

    spark.stop();

  }

  def inferReflection(spark: SparkSession,testRDD: RDD[String]): Unit = {

    // RDD ==> DataFrame

    // For implicit conversions from RDDs to DataFrames
    import spark.implicits._
    val infoDF = testRDD.map(_.split(",")).map(line => Info(line(0).toInt, line(1), line(2).toInt)).toDF();

    infoDF.show();

    infoDF.filter(infoDF.col("age") > 30).show

    // Register the DataFrame as a temporary view
    infoDF.createOrReplaceTempView("infos")

    // SQL statements can be run by using the sql methods provided by Spark
    spark.sql("select * from infos where age > 30").show();
  }

  def program(spark:SparkSession,testRDD: RDD[String]): Unit = {

    // The schema is encoded in a string
    val schemaString = "id name age"
    // Generate the schema based on the string of schema
    val fields = schemaString.split(" ").map(fieldName => StructField(fieldName,StringType,nullable = true))

    val schema = StructType(fields)


    val structType = StructType(Array(StructField("id",IntegerType,true),
      StructField("name",StringType,true),
      StructField("age",IntegerType,true)))

    // Convert records of the RDD (people) to Rows
    val rowRDD = testRDD.map(_.split(","))
      .map(attributes => Row(attributes(0),attributes(1).trim,attributes(2)))

    val infoDF = spark.createDataFrame(rowRDD,schema)

    infoDF.printSchema()
    infoDF.show()

    infoDF.filter(infoDF.col("age") > 30).show
    infoDF.createOrReplaceTempView("infos")
    spark.sql("select * from infos where age > 30").show()

  }

  case class Info(id: Int, name: String, age: Int)

}

查看源码,发现里面的注释写的挺好。

SparkSession源码

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2018.03.05 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • DataFrame Interoperating with RDDs
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档