前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >我是一个DataFrame,来自Spark星球

我是一个DataFrame,来自Spark星球

作者头像
double
发布2019-07-23 10:48:58
1.7K0
发布2019-07-23 10:48:58
举报
文章被收录于专栏:算法channel算法channel算法channel

本文的开头,咱们正式给该系列取个名字了,就叫数据分析EPHS系列,EPHS分别是Excel、Python、Hive和SparkSQL的简称。本篇是该系列的第二篇,我们来讲一讲SparkSQL中DataFrame创建的相关知识。

说到DataFrame,你一定会联想到Python Pandas中的DataFrame,你别说,还真有点相似。这个在后面的文章中咱们在慢慢体会,本文咱们先来学习一下如何创建一个DataFrame对象。通体来说有三种方法,分别是使用toDF方法,使用createDataFrame方法和通过读文件的直接创建DataFrame。

本文中所使用的都是scala语言,对此感兴趣的同学可以看一下网上的教程,不过挺简单的,慢慢熟悉就好:https://www.runoob.com/scala/scala-tutorial.html

DataFrame(以下简称DF)的生成方式有很多,我们一一道来,不过在生成之前,我们首先要创建一个SparkSession:

val spark = SparkSession
      .builder()
      .appName("Spark SQL basic example")
      .enableHiveSupport()
      .getOrCreate()

1、使用toDF方法创建DataFrame对象

使用toDF方法,我们可以将本地序列(Seq), 列表或者RDD转为DataFrame。只要这些数据的内容能指定数据类型即可。

这里先讲一下什么是本地序列(Seq),Seq对应于Java中的java.util.List,可以参考:https://blog.csdn.net/bigdata_mining/article/details/81269704。

比如,我们可以将如下的Seq转换为DF:

def createDFByToDF(spark:SparkSession) = {
    import spark.implicits._

    val seqData = Seq(
      (1, "First Value", java.sql.Date.valueOf("2010-01-01")),
      (2, "Second Value", java.sql.Date.valueOf("2010-02-01"))
    )

    val seq2df = seqData.toDF("int_column","string_column","date_column")

    print(seq2df.dtypes)
    seq2df.show()
  }

模型输出为:

这里注意两点咱们再继续讲:

1)先导入spark.implicits._

import spark.implicits._

在对 DataFrame 进行许多操作都需要这个包进行支持。这是scala中隐式语法,感兴趣的同学可以参考:https://www.cnblogs.com/xia520pi/p/8745923.html,如果比较难理解的话,那就记得每次都导入这个就好了,或者一旦发现代码中有如下的红色错误的话,首先想到导入这个包:

2)run的时候还是要指定master url,否则报错:

这里还是要指定运行的参数:

好了,继续往下讲。同样,我们可以将一个RDD转化为DF:

val rdd = spark.sparkContext.parallelize(List(1,2,3,4,5))
val df = rdd.map(x=>(x,x^2)).toDF("org","xor")
df.show()

结果如下:

最后,我们还可以将一个Scala的列表转化为DF:

val arr = List((1,3),(2,4),(3,5))
val df1 = arr.toDF("first","second")
df1.show()

输出为:

2、使用createDataFrame方法创建DataFrame对象

这一种方法比较繁琐,通过row+schema创建DataFrame:

def createDFBySchema(spark:SparkSession) = {
    import spark.implicits._

    import org.apache.spark.sql.types._
    import org.apache.spark.sql.Row
    val schema = StructType(List(
      StructField("integer_column", IntegerType, nullable = false),
      StructField("string_column", StringType, nullable = true),
      StructField("date_column", DateType, nullable = true)
    ))

    val rdd = spark.sparkContext.parallelize(Seq(
      Row(1, "First Value", java.sql.Date.valueOf("2010-01-01")),
      Row(2, "Second Value", java.sql.Date.valueOf("2010-02-01"))
    ))
    val df = spark.createDataFrame(rdd, schema)
    df.show()
  }

输出为:

由于比较繁琐,所以感觉实际工作中基本没有用到过,大家了解一下就好。

3、通过文件直接创建DataFrame对象

我们介绍几种常见的通过文件创建DataFrame。包括通过JSON、CSV文件、MySQl和Hive表。

3.1 通过JSON创建

假设我们的JSON文件内容如下:

通过其创建DataFrame代码如下:

def createDFByJson(spark:SparkSession) = {
    val df = spark.read.json("resources/test.json")
    df.show()
  }

结果为:

3.2 通过CSV文件创建

这里,首先需要导入一个包,可以在:https://www.mvnjar.com/com.databricks/spark-csv_2.11/1.5.0/detail.html 进行下载。下载完成后导入:

随后,我们准备一份鸢尾花的数据集:

通过代码进行读入:

def createDFByCSV(spark:SparkSession) = {
    val df = spark.sqlContext.read.format("com.databricks.spark.csv")
      .option("header","true") //这里如果在csv第一行有属性的话,没有就是"false"
      .option("inferSchema",true.toString)//这是自动推断属性列的数据类型。
      .load("resources/iris.csv")

    df.show()
  }

结果如下:

3.3 通过Mysql创建

咱们先简单的创建一个数据表:

建表语句如下:

CREATE TABLE IF NOT EXISTS `runoob_tbl`(
    ->    `runoob_id` INT UNSIGNED AUTO_INCREMENT,
    ->    `runoob_title` VARCHAR(100) NOT NULL,
    ->    `runoob_author` VARCHAR(40) NOT NULL,
    ->    `submission_date` DATE,
    ->    PRIMARY KEY ( `runoob_id` )
    -> )ENGINE=InnoDB DEFAULT CHARSET=utf8;

插入语句如下:

insert into runoob_tbl(runoob_id,runoob_title,runoob_author) values(10,'hhaha','ac');

接下来通过spark进行读取:

def createDFByMysql(spark:SparkSession) = {

    val url = "jdbc:mysql://localhost:3306/test"
    val df = spark.read
      .format("jdbc")
      .option("url", url)
      .option("dbtable", "runoob_tbl")
      .option("user", "root")
      .option("password", "0845")
      .load()
    df.show()
  }

读取成功,结果如下:

3.4 通过Hive创建

这是咱们最常用的方式了,假设咱们已经把鸢尾花数据导入到hive中了:

val df = spark.sqlContext.read.format("com.databricks.spark.csv")
      .option("header","true") //这里如果在csv第一行有属性的话,没有就是"false"
      .option("inferSchema",true.toString)//这是自动推断属性列的数据类型。
      .load("resources/iris.csv")

    df.show()


    spark.sql(
      s"""
         |CREATE TABLE IF NOT EXISTS iris(
         | feature1 double,
         | feature2 double,
         | feature3 double,
         | feature4 double,
         | label string
         |)
       """.stripMargin)



    df.createOrReplaceTempView("outputdata")


    spark.sql("set hive.exec.dynamic.partition.mode=nonstrict")
    spark.sql(
      s"""
         |insert overwrite table iris
         |select
         |  feature1,
         |  feature2,
         |  feature3,
         |  feature4,
         |  label
         |from
         |  outputdata
      """.stripMargin)

先在hive下查看数据:

接下来,在spark中同样写sql就好了:

val df = spark.sql(
      """
        |select
        | *
        |from
        | iris
        |limit
        | 10
      """.stripMargin)

    df.show()

成功!

4、总结

今天咱们总结了一下创建Spark的DataFrame的几种方式,在实际的工作中,大概最为常用的就是从Hive中读取数据,其次就可能是把RDD通过toDF的方法转换为DataFrame。

spark.sql()函数中的sql语句,大部分时候是和hive sql一致的,但在工作中也发现过一些不同的地方,比如解析json类型的字段,hive中可以解析层级的json,但是spark的话只能解析一级的json(这是我在工作中的发现,也可能不太对,大家可以自己尝试一下)。

这些我们在后面会继续讲到。后面的话,咱们先介绍一点hive的基础知识,如数据类型和常用的函数等等。期待一下吧。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-07-19,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 程序员郭震zhenguo 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1、使用toDF方法创建DataFrame对象
  • 2、使用createDataFrame方法创建DataFrame对象
  • 3、通过文件直接创建DataFrame对象
    • 3.1 通过JSON创建
      • 3.2 通过CSV文件创建
        • 3.3 通过Mysql创建
          • 3.4 通过Hive创建
          • 4、总结
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档