大数据实战课程-SparkSQL实战

1课时
0学过
8分

课程评价 (0)

请对课程作出评价:
0/300

学员评价

暂无精选评价
120分钟

大数据实战课程-SparkSQL实战

SparkSQL开发实战

实验预计耗时:60分钟

1. 课程背景

1.1 课程目的

Spark-SQL是一个用于结构化数据处理的Spark模块。与基本的Spark RDD API不同,Spark SQL提供的接口Dataset/DataFrame为Spark提供了有关数据结构和正在执行的计算的更多信息。在内部,Spark SQL使用这些额外的信息来执行额外的优化。与Spark SQL交互有几种方法,包括SQL和Dataset API。当计算一个结果时,使用相同的执行引擎,开发人员可以轻松地在不同的api之间来回切换。

本次实验通过SparkSQL的Scala开发练习,使学员可以深入了解DataFrame的开发与功能原理。

1.2 课前知识准备

学习本课程前,学员需要掌握以下前置知识:

1、能力基础

  • Linux基本操作:掌握Linux远程登录、文件与目录管理、vim 编辑器使用等。
  • Hadoop基础:理解Hadoop基本组件的功能与原理。
  • SQL语法基础:包括SQL基本语法、数据类型、常用内置函数等。
  • Scala开发基础:Scala基本语法、Maven工程基本操作。

2、相关技术

  • Spark:是基于内存计算的大数据分布式计算框架。Spark基于内存计算,提高了在大数据环境下数据处理的实时性,同时保证了高容错性和高可伸缩性,允许用户将Spark部署在大量廉价硬件之上,形成集群。
    • 提供分布式计算功能,将分布式存储的数据读入,同时将任务分发到各个节点进行计算;
    • 基于内存计算,将磁盘数据读入内存,将计算的中间结果保存在内存,这样可以很好的进行迭代运算;
    • 支持高容错;
    • 提供多计算范式也叫算子;
  • Spark程序也是通常说的SparkCore程序。
  • DataFrame:是SparkSQL进行数据处理的数据结构,是一种以RDD为基础的分布式数据集,和RDD相比,DataFrame增加了对RDD中每个元素中子元素的类型和字段信息。
    • 与RDD相似,DataFrame也是数据的一个不可变分布式集合。
    • 但与RDD不同的是,数据都被组织到有名字的列中,就像关系型数据库中的表一样。
    • 有了DataFrame的定义,sparksql就可以处理针对DataFrame的SQL查询。

2. 实验环境

2.1 实验操作环境

本课程需要以下实验操作环境:

  1. 可以接入互联网的笔记本电脑或者台式机,本实验使用Windows系统
  2. 实验环境:计算机本地(具备Java开发环境)+腾讯云控制台

2.2 实验架构图

本实验将使用EMR搭建Spark三节点集群(Master节点和两个Core节点),其架构图如下:

2.3 实验的数据规划表

资源名称

数据

说明

腾讯云账号

账号:XXXXXXXX、密码:XXXXXXXX

涉及产品如下:VPC、EMR

PuTTY

版本:0.73

student.txt

实验数据

PuTTY下载

student数据集下载

3. 实验流程

本实验分为4个部分。首先实验环境准备,需要借助腾讯云弹性MapReduce服务帮助我们快速搭建一个三节点的Spark集群,注意主节点最低配置为4核8G。随后使用spark-shell和本地IDEA两种方式分别实现对SparkSQL的开发与使用。

相信通过本实验的学习,学员可以掌握SparkSQL的基本开发语法,理解DataFrame与RDD之间的相互转换,以及SparkSQL在Spark集群上从开发到部署的整个流程。

4. 实验步骤

任务1 实验环境准备

【任务目标】

通过EMR集群的搭建练习,使学员可以熟练搭建EMR集群,快速构建实验所需大数据平台环境。

【任务步骤】

1、EMR创建Hadoop集群

1.在腾讯云官网,找到弹性MapReduce首页,点击立即购买

2.可用区与软件配置如下:

配置项

配置项说明

计费模式

按量计费

地域/可用区

广州/广州四区(可根据所在地自定义选择)

产品版本

EMR-V3.0.0

必选组件

hadoop、zookeeper、knox

可选组件

spark_hadoop3.1 2.4.3

确认配置无误后,点击下一步:硬件配置

3.硬件配置如下:

配置项

配置项说明

节点高可用

不启用

Master配置1台

EMR标准型S2 / 4核8G,CBS云盘:100G高效云盘 X 1

Core配置2台

EMR标准型S2 / 4核8G,CBS云盘:100G高效云盘 X 1

集群外网

开启集群Master节点公网

集群网络

新建或选择已有的私有网络

注意:启动高可用选项可以自定义选择,默认是选择的,如果取消需要手动取消选择。由于我们这里的实验环境仅仅是一个学习的实验环境所以这里我们将此选项取消,实际生产中要根据实际环境合理选择是否需要这个配置。

确认硬件配置信息无误后,点击下一步:基础配置

注意:Master节点核心数选择应不小于4核。

4.基础配置如下:

配置项

配置项说明

集群名称

emr-test

远程登录

开启

安全组

创建新安全组

对象存储

不开启

登录密码

EMR集群云主机root用户登录的密码

确认信息无误后,点击购买,会自动跳转至集群页。图中的集群实例状态中显示集群创建中

等待5min左右,集群构建成功,截图如下:

2、第三方工具连接EMR集群

1.复制集群页的主节点外网IP,打开PuTTY创建连接,将复制的外网IP粘贴至Host Name,端口默认22,如图:

2.点击Open,第一次连接会弹出安全警告,点击是(Y)

3.接下在login as:后填写用户名为root,密码为构建EMR的时候设置的密码:

备注:这里只能使用root用户进行连接。

任务2 spark-shell执行SparkSQL代码

【任务目标】

通过spark-shell执行SparkSQL代码,掌握SparkSQL命令行运行方式,了解SaprkSQL的基本语法。

【任务步骤】

1、上传数据至HDFS

1.使用mkdir命令在Master节点创建一个/test目录。

创建文件夹test;

mkdir /test

切换到test路径下;

cd /test

2.获取数据文件student.txt,student文件内数据每行有五列,分别是id、name、sex、age、department且以空格分隔。获取成功后上传到HDFS上。

wget https://course-public-resources-1252758970.cos.ap-chengdu.myqcloud.com/%E5%AE%9E%E6%88%98%E8%AF%BE/202001bigdata/8-sparksql/student.txt

切换至hadoop用户;

su hadoop

创建一个/test文件夹;

hdfs dfs -mkdir /test

上传文件至/test下;

hdfs dfs -put student.txt /test/student.txt

验证:查看/test目录下文件详情;

hdfs dfs -ls /test/

2、数据输出DataFrame

1.进入spark-shell中;

spark-shell

2.在spark shell执行下面命令,读取数据,将每一行的数据使用列分隔符分隔;

注意:此处需要修改ip地址为Master节点的内网ip,可以云服务器控制台实例列表查看,云服务器控制台链接。

val linesRDD = sc.textFile("hdfs://xxx.xxx.xxx.xxx:4007/test/student.txt").map(_.split(","))

下图为通过该命令在hdfs上读取文件的效果图;

2.定义Student class(相当于表的schema),定义语句如下:

case class Student(id:Int, name:String, sex:String, age:Int, department:String)

下图为创建一个Student class的效果图:

3.将RDD和Student class关联:

val studentRDD = linesRDD.map(x => Student(x(0).toInt, x(1), x(2), x(3).toInt, x(4)))

下图为生成studentRDD的效果图:

4.将RDD转换成DataFrame,命令为 studentDF:

val studentDF = spark.createDataFrame(studentRDD)

下图为将RDD转换为DataFrame的效果图:

3、执行数据查询

1.对DataFrame的Schema进行打印,命令如下;

studentDF.printSchema

2.对DataFrame的内容进行打印,命令如下;

studentDF.show()

3.查询所有的id,name和age,并将age+1,语法如下;

studentDF.select(col("id"), col("name"), col("age") + 1).show()

下图为查询所有的name和age的结果图;

退出spark-shell;

:quit

任务3 编程方式实现sparkSQL

【任务目标】

通过SparkSQL实战,使学员可以加强代码能力和DataFrame的使用,以及对idea开发工具的熟悉和使用。

【任务步骤】

1、创建Scala支持的Maven项目

1.在IDEA中创建Maven项目

参数名

输入内容

GroupId

com.test

ArtifactId

spark-sql-study

Project name

spark-sql-study

2.项目中增加Scala支持;

右键点击项目名称,选择Add Framework Support...

Add Framework Support的左侧列表中选择Scala。

3.在main目录下创建目录scala,修改scala目录为source目录;

右键点击创建的scala目录,选择Mark Directory as中的Sources Root

2、代码编写与打包

1.修改pom.xml文件,加入以下内容。

<dependencies>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
        <version>2.4.3</version>
    </dependency>
</dependencies>

2.编scala目录下创建包com.test.bigdata,在包内编写Scala object类StudentSparkSQL:

StudentSparkSQL内代码编写如下,其中Schema借助Student类来进行定义:

package com.test.bigdata.sparksql

import org.apache.spark.sql.SparkSession

object StudentSparkSQL {
    def main(args: Array[String]): Unit = {
        if(args == null || args.length <2) {
            println(
                """Parameter Errors! Usage: <inputpath> <outputPath>
                """.stripMargin)
        }
        var Array(inputpath, outputpath) = args
        val spark = SparkSession.builder()
                .appName(s"${StudentSparkSQL.getClass.getSimpleName}")
                .getOrCreate()
        val sqlContext = spark.sqlContext
        val linesRDD = spark.sparkContext.textFile(inputpath).map(_.split(","))
        val studentRDD = linesRDD.map(x => Student(x(0).toInt, x(1), x(2), x(3).toInt, x(4)))

        import sqlContext.implicits._
        val studentDF = studentRDD.toDF

        //注册表
        studentDF.createOrReplaceTempView("t_student")
		// sql语句:对大于6人的部门进行统计和倒叙查询。
        val df = sqlContext.sql(
            """
              |SELECT
              |    department,
              |    count(*) as total
              |FROM t_student
              |GROUP BY department
              |HAVING total > 6
              |ORDER BY total DESC
            """.stripMargin)

        //将结果以JSON的方式存储到指定位置
        df.write.json(outputpath)

        spark.stop()
    }
}
// 编写case class类Student
case class Student(id:Int, name:String, sex:String, age:Int, department:String)

3.将程序打包,并将生成的jar包移动至D盘等待上传。

3、程序上传与运行

1.找到PuTTY的安装目录,在上方地址栏输入cmd并执行。

2.打开psftp传输文件,使用命令如下:

在弹出的黑窗口首先输入psftp,打开psftp工具用来传输文件;

psftp

接下来连接服务器,open后输入集群的公网IP,回车后需要输入用户名和密码;

open xxx.xxx.xxx.xxx 

用于切换远程Linux 服务器上的目录;

cd /test/

lcd命令用于切换本地的路径;

lcd D:\

上传文件;

put spark-sql-study-1.0-SNAPSHOT.jar

命令使用可以参考下图:

3.Spark客户端运行程序,以client模式提交。

注意:需要先切换至hadoop用户执行

spark-submit \
--class com.test.bigdata.sparksql.StudentSparkSQL \
--master yarn \
--deploy-mode client \
--num-executors 1 \
--executor-cores 1 \
--executor-memory 600m \
--driver-memory 600m \
/test/spark-sql-study-1.0-SNAPSHOT.jar \
/test/student.txt /test/output_sparksql

4.查看Spark任务的结果:

hdfs dfs -cat /test/output_sparksql/p*

下图为查询spark任务结果,可知“MA”和“CS”两个部门的人数分别为8人和7人。

至此你已实现了使用case class方式开发SparkSQL代码。

任务4 通过StructType直接指定Schema

【任务目标】

通过SparkSQL实战,使学员可以加强代码能力和算子的使用,以及对idea开发工具的熟悉和使用,熟悉StructType方式去编写sparkSQL代码。

【任务步骤】

1、代码编写与打包

1.在spark-sql-study工程下创建com.test.bigdata.sparksql包,包下创建一个Scala对象StudentSparkSQL2,代码编写如下:

package com.test.bigdata.sparksql

import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}

object StudentSparkSQL2 {
    def main(args: Array[String]): Unit = {
        if(args == null || args.length <2) {
            println(
                """Parameter Errors! Usage: <inputpath> <outputPath>
                """.stripMargin)
        }
        var Array(inputpath, outputpath) = args
        val spark = SparkSession.builder()
            .appName(s"${StudentSparkSQL2.getClass.getSimpleName}")
            .getOrCreate()
        val sqlContext = spark.sqlContext
        val linesRDD = spark.sparkContext.textFile(inputpath).map(_.split(","))
        val rowRDD = linesRDD.map(p => Row(p(0).toInt, p(1).trim, p(2).trim, p(3).toInt, p(4).trim))
        val schema = StructType(
            List(
                StructField("id", IntegerType, true),
                StructField("name", StringType, true),
                StructField("sex", StringType, true),
                StructField("age", IntegerType, true),
                StructField("department", StringType, true)
            )
        )
        val studentDF = sqlContext.createDataFrame(rowRDD, schema)
        //注册表
        studentDF.createOrReplaceTempView("t_student")
        // sql语句:对大于6人的部门进行统计和倒叙查询。
        val df = sqlContext.sql(
            """
              |SELECT
              |    department,
              |    count(*) as total
              |FROM t_student
              |GROUP BY department
              |HAVING total > 6
              |ORDER BY total DESC
            """.stripMargin)

        //将结果以JSON的方式存储到指定位置
        df.write.json(outputpath)
        spark.stop()
    }
}

2.对程序再洗进行打包,并生成的jar包(target目录下)移动至D盘,覆盖同名jar包等待上传。删除原/test/目录下的jar包。

2、程序上传与运行

1.Jar包上传至EMR集群/test目录下,并运行;

su hadoop

使用spark-submit对任务进行提交;

spark-submit \
--class com.test.bigdata.sparksql.StudentSparkSQL2 \
--master yarn \
--deploy-mode client \
--num-executors 1 \
--executor-cores 1 \
--executor-memory 600m \
--driver-memory 600m \
/test/spark-sql-study-1.0-SNAPSHOT.jar \
/test/student.txt /test/output_sparksql2

2.查看Spark任务的结果。

查看/test目录下文件;

hdfs dfs -ls /test

对/test/output_sparksql2目录下的内容进行显示;

hdfs dfs -cat /test/output_sparksql2/p*

下图为查询Spark任务结果:

至此你已实现StructType指定Schema的方式开发SparkSQL代码。

5. 注意事项

如实验资源无需保留,请在实验结束后及时销毁,以免产生额外费用。

6. FAQ

【问题】1、在任务3中,Maven下载依赖失败。

【解决】学员如果下载依赖失败,可以使用下面接连下载依赖。

SparkSQL项目所需依赖下载

SparkSQL实验jar包参考

下载依赖包解压后,复制到Maven的本地仓库下(粘贴选择合并文件夹,同名文件选择覆盖原有文件)。