前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark SQL 数据统计 Scala 开发小结

Spark SQL 数据统计 Scala 开发小结

原创
作者头像
李德鑫
修改2017-08-16 15:00:34
9.5K3
修改2017-08-16 15:00:34
举报
文章被收录于专栏:李德鑫的专栏李德鑫的专栏

导语:关于 API 使用踩过的一些坑。

1、RDD Dataset 和 DataFrame 速览

RDD 和 DataFrame 都是一个可以看成有很多行,每一行有若干列的数据集(姑且先按照记录和字段的概念来理解)

在 scala 中可以这样表示一个 RDD: RDD[Array[String]] 每条记录是字符串构成的数组 RDD[(String, Int, ….)] 每条记录是多个不同类型的数据构成的元组

RDD 是分布式的 Java 对象的集合,RDD 中每个字段的数据都是强类型的

当在程序中处理数据的时候,遍历每条记录,每个值,往往通过索引读取

代码语言:javascript
复制
val filterRdd = rdd.filter(
    x => {
        var result = true
        if((x(30)=="" && x(13)!="5" && x(13)!="")){
                result = false
        }
        result
    }
)

这种方式在 MapReduce 程序中也常常见到。

DataFrame 则是一个每列有命名的数据集,类似于关系数据库中的表,读取某一列数据的时候可以通过列名读取。所以相对于 RDD,DataFrame 提供了更详细的数据的结构信息 schema。

在 Spark 2.1 中, DataFrame 的概念已经弱化了,将它视为 DataSet 的一种实现

DataFrame is simply a type alias of Dataset[Row] @DataFrame=org.apache.spark.sql.Dataset[org.apache.spark.sql.Row"">http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.package@DataFrame=org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]

除了 Row 这种类型之外,还可以是一些其他自定义的类。

Dataset API 属于用于处理结构化数据的 Spark SQL 模块(这个模块还有 SQL API),通过比 RDD 多的数据的结构信息(Schema),Spark SQL 在计算的时候可以进行额外的优化。 Spark SQL's optimized execution engine[1]。通过列名,在处理数据的时候就可以通过列名操作。

RDD、DataFrame 和 DataSet 的区别中介绍了 DatasetAPI 的优势,MLlib 里也加大了对 DataSetAPI 的支持,并且提到 The RDD-based API is expected to be removed in Spark 3.0。所以未来推荐使用 DataSetAPI。

2、使用介绍

2.1 加载数据

目前 tdw 提供了读取 tdw 表生成 RDD 或 DataFrame 的 API。

代码语言:javascript
复制
1.6API
val sparkConf = new SparkConf()
val sparkContext = new SparkContext(sparkConf)
val rdd = new TDWProvider(sparkContext, tdwUser, tdwPasswd, dbName).table(tblName, partitions)


2.0API
val sparkSession = SparkSession.builder().getOrCreate()
val tdwDataFrame = new TDWSQLProvider(sparkSession, tdwUser, tdwPasswd, dbName).table(tblName, partitions)
这个返回的是 带有 Schema 的 数据,DataFrame 即 Dataset[Row]

val tdwRDD: RDD[Array[String]] = new TDWProvider(sparkSession.sparkContext, tdwUser, tdwPasswd, dbName).table(tblName, partitions)

或者直接读取 HDFS

代码语言:javascript
复制
val hdfsURL="hdfs://**/**/**/**/ds=20170101/*gz"
val hdfsRdd = sparkSession.sparkContext.textFile(hdfsURL)

这个生成的是 一个 RDD[Sting], 每一行是一个字符串,需要用户自己去分割读取

2.2 转换操作

1、选择指定列
代码语言:javascript
复制
//查看表的 Schema
tdwDataFrame.printSchema()
                root
                 |-- tdbank_imp_date: string (nullable = true)
                 |-- ftime: string (nullable = true)
                 |-- gid: long (nullable = true)
                 |-- aid: string (nullable = true)
                 |-- adid: long (nullable = true)

//选取指定列 方法 1
tdwDataFrame.createTempView("table1")
// 函数说明 createTempView Creates a temporary view using the given name. The lifetime of this temporary view is tied to the SparkSession that was used to create this Dataset. 在整个 SparkSession 期间创建一次就好,如果同一个创建了两次车,会报错
val selectDataFrame1 = sparkSession.sql("select ftime, gid from table1")


//选取指定列 方法 2
val columnNames: List[Column] = List(col("ftime"), col("gid") as userId) 
//说明: def col(colName: String): Column, 用列名构成 column 类型,并且可以用 as 指定别名。

val selectDataFrame2: DataFrame = originalDataset.select(columnNames:_*)

//如果是 RDD
val rddToRdd: RDD[Row] = hdfsRdd.map(
        dataStr => {
                val dataList: Array[String] = dataStr.split(sep, -1)
                Row.fromSeq(dataList)
        }
)


val fieldIds: List[StructField] = List(
        StructField("tdbank_imp_date", StringType, true),
        StructField("ftime", StringType, true),
        StructField("gid", StringType, true),
        StructField("adid", StringType, true)
)
val schema: StructType = StructType(fieldIds)
val rddToDataFrame: DataFrame = sparkSession.createDataFrame(rdd, schema)
2、map filter
代码语言:javascript
复制
val filterData = tdwDataFrame.filter(
        element => {
                var retFlag = true
                if(element.isNullAt(element.fieldIndex("gid")) || element.getAs("gid").toString()){
                        retFlag = false
                }
                retFlag
        }
)

//

这里 有两个地方需要说明 isNullAt 首先要判断要选取的列的值是否为空, 如果是空,直接读取数据会抛异常。getAs 本来是要指定具体的类型的,如 getAs[String],但因为 tdwDataFrame 的 schema 已知,包括各个字段的类型,如 gid 是 long, 这样如果按 getAs[String] 有可能会发生类型转换异常。

转换加工某些字段,即将原来的 DataFrame map 操作转换成另外一个 DataFrame。最开始的想法是用 scala 的 一些列表类型封装数据,当每个列的类型相同的时候,用数组 如 Array[String],但一般情况下是不同的,就用元组("a", 1, …),但这个方法有个局限,我们以 RDD 的操作为例,但在 DataFrame 中也是一样的

代码语言:javascript
复制
val mRdd2 = filterRdd.map( 
        x => (
                x(1), x(2), ... , x(23)            
        ) 

)

//语法错误:too many elements for tuple:23, allowed:22  
//编译报错:object Tuple23 is not a member of package scala。  
//当生成的 RDD 是一个超过 22 个字段的记录时,如果用 元组 tuple 就会报错, tuple 是 case class

不使用 数组和元组,而使用 Row

代码语言:javascript
复制
implicit val rowEncoder = Encoders.kryo[Row]

val mapDataset:Dataset[Row] = filterDataset.map(
        element => {
                val tempList:ListBuffer[Any] = new ListBuffer[Any]
                tempList  = element.getAs("gid")
                tempList  = element.getAs("aid")
                ...
                Row.fromSeq(tempList)
        }
)
val schema2 = StructType(...)
val mapDataFrame = SparkSession.createDataFrame(mapDataset.rdd, schema2)
3、聚合分组统计
代码语言:javascript
复制
//指定列,分组统计
val aggDagaset = mapDataFrame.groupBy(...).agg(...)

//api 说明,上面的操作有两步,
def groupBy(cols: Column*): RelationalGroupedDataset
def agg(expr: Column, exprs: Column*): DataFrame

//注意 import
import org.apache.spark.sql.functions._

val aggDagaset = mapDataFrame.groupBy(col("gid")).agg(count("gid") as cnt)
最后返回的是分组字段,和计算字段
即:gid, cnt
//分组字段,需要特别提一下的是,可以不指定,即分组字段为空
//计算字段,可以用 sql 写法,跟 sql 很类似
count("***") as taskField
sum("***") as taskField
countDistinct("***") as taskField
round(sum("***")/countDistinct("***"), 4) as taskField



//由于 agg 这个函数,必须要传两个参数,所以自己写了一个函数来封装原始的
def aggDataset(groupDataset: RelationalGroupedDataset, calculateColumns: List[Column]): Dataset[Row] = {
        calculateColumns.size match {
            case 1 => groupDataset.agg(calculateColumns(0))
            case _ => groupDataset.agg(calculateColumns(0), calculateColumns.tail: _*)
        }
    }

// 所以在标准化编程的时候,可以把维度字段,计算的字段封装成数组,然后计算。


//cube
val aggDagaset = mapDataFrame.cube(...).agg(...)
4、union
代码语言:javascript
复制
val unionDataFrame = aggDagaset1.union(aggDagaset2)
//处理空值,将空值替换为 0.0
unionData.na.fill(0.0)
5、NaN 数据中存在数据丢失

NaN,如果数据中存在 NaN(不是 null ),那么一些统计函数算出来的数据就会变成 NaN,如 avg。所以要对数据进行过滤或者转换。

代码语言:javascript
复制
import java.lang.Double.isNaN

if (isNaN(x.getAs("field"))){
        0
}
或者直接过滤掉
6、Sql 语句里一些不支持的函数或写法

不支持的函数: url_decode

不支持的写法 not rlike 支持 rlike,所以在写正则的时候可以取反 如 not rlike '^\d $' 要求不能数字开头,数字结尾,全是数字 就可以写成 rlike '\d[*^0-9] \d[*^0-9]*' 里面至少有一个不是数字的字符

2.3 TDW 一些基础功能

代码语言:javascript
复制
val tdwUtil = new TDWUtil(Constant.tdwUser, Constant.tdwPasswd, "db")
//表是否存在
tdwUtil.tableExist(table)
//分区是否存在
tdwUtil.partitionExist(table, x)

def createTableDesc(tblName: String, cols: Seq[Array[String]]) = {
        new TableDesc()
            .setTblName(tblName)
            .setCols(cols)
            .setComment(tblName)
            .setPartType("LIST")
            .setPartField("imp_date")
            .setCompress(true)
    }


//创建表,会自动创建默认分区,不用再单独创建  
tdwUtil.createTable(tblDesc)

//创建分区
tdwUtil.createListPartition(tblName, partName, datetime)

//清空分区,其实可以在写数据的时候,指定是否覆盖写
tdwUtil.truncatePartition(tblName, partName)

val outputDataset: Dataset[Row]
val sqlProvider = new TDWSQLProvider(sparkSession, Constant.tdwUser, Constant.tdwPasswd, dataBase)
sqlProvider.saveToTable(outputDataset, tblName, partName)


//判断 HDFS 路径是否有数据
val hadoopFileSystem = {
        val conf = sparkSession.sparkContext.hadoopConfiguration
        conf.set("fs.defaultFS", "hdfs://ss-xx-x-v2")
        conf.set("fs.default.name", "hdfs://ss-xx-x-v2")
        FileSystem.get(conf)
}

val fileStatusArr = hadoopFileSystem.globStatus(new Path("hdfs://host/xx/xx/*.gz"))
if(fileStatusArr.size == 0){
        "没有数据"
}

//判断表的分区类型 小时还是天分区
def getTdwTablePartitionType(dataBase: String, table: String) = {
        if(checkTdwTableExist(dataBase, table)){
            val partitions = tdwUtil.getTableInfo(table).partitions.filter(
                x => {
                    x.name != "default"
                }
            )
            if(partitions.length>0){
                val pn = partitions(0).name
                if(pn.length()>10){
                    "h"
                }else{
                    "d"
                }
            }else{
                null
            }
        }else{
            null
        }
    }

2.4 本地测试 JUnit Test

由于 Spark 程序,需要由客户端提交给集群执行,但在程序调试阶段,想快速验证代码逻辑,通过每次提交集群执行程序太费力了,可以在本地测试一下。但其实没必要用 JUnit,但有工具何乐而不用。

代码语言:javascript
复制
import org.junit.Before
import org.junit.Test
class TestFunction {

        var testData: String = null
        //准备 进行测试需要的数据、环境
        @Before
        def setUp() {
                //scala 里这个字符串 表示方法跟 python 一样
                testData = """
2017-02-17 13:27:03,115.224.31.*,2017-02-17 13:27:03,1990,xx.xx.com,/a/20170207/003084.htm,,xx.qq.com
2017-02-17 13:27:03,218.88.230.*,2017-02-17 13:27:03,1990,xx1.xx.com,/public/1/piclib_disp.shtml,cqpid=24861,data.xx.qq.com
2017-02-17 13:27:03,111.196.71.*,2017-02-17 13:27:03,1990,xx2.xx.com,/nba/,ptag=baidu.ald.sc,www.xx.com,/link
2017-02-17 13:27:03,116.117.150.*,2017-02-17 13:27:03,1990,xx3.xx.com,/storage/emulated/0/Android/data/com/files/.webapp/dirs/15/index.html
2017-02-17 13:27:03,10.50.148.*,2017-02-17 13:27:03,1990,xx1.xx.com,/x/cover/j6cgzhtkuonf6te/c0022g5ch18.html,,x.com
2017-02-17 13:26:57,222.212.239.*,2017-02-17 13:26:57,1990,xx2.xx.com,/car_sal/1139/,comefrom=newsApp
                """
        }

        //执行测试逻辑
        @Test
        def tester {
                val dataTable = other(data)

                然后就可以用上面模拟的数据对象,调试代码逻辑了
        }

        //可以封装一些其他的函数,被 testter 调用, 不是必须的
        def other(data: Sting): Array[Row] {
                val fieldNames = new Array(ftime, ip, ftime1, year, host, page, host2, ...)
                val schema = new StructType(fieldNames.map { x => StructField(x.name, StringType, true) })    
                val table = data.trim.split("n").map(
                        x => {
                                val column: Array[Any] = x.trim().split(",").map { x => x.asInstanceOf[Any] }
                                //val kv = fieldNames.map { x => x.name }.zip(column)
                                //println(kv.toList)
                                val r = new GenericRowWithSchema(column, schema)
                                //println(r)
                                r.asInstanceOf[Row]            
                        }
                )
                table
        }

2.5 Spark environment 参数

DataFrame shuffle size 设置值

sparkSession.conf.set("spark.sql.shuffle.partitions", "200") DataFrame groupBy cube 统计的时候,需要 shuffle,目前 tdw 的 shuffle 默认 partiton 的个数是 200, 如果数据量过大,或者 cube 的时候数据膨胀,就要适时调大这个参数。因为一个 partition 对应一个任务,增加 partition 个数,会增加并行的任务数,提高运行效率。

set("spark.default.parallelism", "10")

这个参数好有同样的效果,不过好像应用的场景是 RDD 的 reduce 操作。

2.6 写 mysql

代码语言:javascript
复制
val url = "jdbc:mysql://host:port/database?useUnicode=yes&characterEncoding=UTF-8"
val connectionProperties = new Properties()
connectionProperties.put("user", user)
connectionProperties.put("password", password)
connectionProperties.put("characterEncoding", "UTF-8")
connectionProperties.put("useUnicode", "yes")

unionDataFrame.write.mode(SaveMode.Append).jdbc(url, tblName, connectionProperties)

Scala JDBC 连接 Mysql,操作 mysql

代码语言:javascript
复制
Class.forName("com.mysql.jdbc.Driver")
val conn = DriverManager.getConnection(url)

//查询指定表是否存在
val statement = conn.createStatement()
var sql = s"""
select TABLE_NAME from INFORMATION_SCHEMA.TABLES where TABLE_NAME='$table'
"""
println(sql)
var rs = statement.executeQuery(sql)
if(rs.next){ //有相应的表

}

val sql = s"""
    delete from $table where ds = $datetime      
"""
val rs = statement.executeUpdate(sql)
println(sql   "n 删除的数据记录数: "   rs.toString())

发送 http 请求

代码语言:javascript
复制
import org.json4s
import org.json4s.jackson.JsonMethods._

val url = "http://*****"
val ret = fromURL(url, "utf-8").mkString
//将结果 json 解析成 map 
val retMap = parse(ret).values.asInstanceOf[Map[String, Any]]

参考

【1】Spark SQL, DataFrames and Datasets Guide 【2】RDD、DataFrame 和 DataSet 的区别 【3】TDW API 【4】Spark Programming Guide—-介绍 RDD 【5】RDD 介绍 【6】Spark Scala API

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1、RDD Dataset 和 DataFrame 速览
  • 2、使用介绍
    • 2.1 加载数据
      • 2.2 转换操作
        • 1、选择指定列
        • 2、map filter
        • 3、聚合分组统计
        • 4、union
        • 5、NaN 数据中存在数据丢失
        • 6、Sql 语句里一些不支持的函数或写法
      • 2.3 TDW 一些基础功能
        • 2.4 本地测试 JUnit Test
          • 2.5 Spark environment 参数
            • 2.6 写 mysql
              • 发送 http 请求
              • 参考
              相关产品与服务
              数据库
              云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档