Spark SQL 数据统计 Scala 开发小结

导语:关于 API 使用踩过的一些坑。

1、RDD Dataset 和 DataFrame 速览

RDD 和 DataFrame 都是一个可以看成有很多行,每一行有若干列的数据集(姑且先按照记录和字段的概念来理解)

在 scala 中可以这样表示一个 RDD: RDD[Array[String]] 每条记录是字符串构成的数组 RDD[(String, Int, ….)] 每条记录是多个不同类型的数据构成的元组

RDD 是分布式的 Java 对象的集合,RDD 中每个字段的数据都是强类型的

当在程序中处理数据的时候,遍历每条记录,每个值,往往通过索引读取

val filterRdd = rdd.filter(
    x => {
        var result = true
        if((x(30)=="" && x(13)!="5" && x(13)!="")){
                result = false
        }
        result
    }
)

这种方式在 MapReduce 程序中也常常见到。

DataFrame 则是一个每列有命名的数据集,类似于关系数据库中的表,读取某一列数据的时候可以通过列名读取。所以相对于 RDD,DataFrame 提供了更详细的数据的结构信息 schema。

在 Spark 2.1 中, DataFrame 的概念已经弱化了,将它视为 DataSet 的一种实现

DataFrame is simply a type alias of Dataset[Row] @DataFrame=org.apache.spark.sql.Dataset[org.apache.spark.sql.Row"">http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.package@DataFrame=org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]

除了 Row 这种类型之外,还可以是一些其他自定义的类。

Dataset API 属于用于处理结构化数据的 Spark SQL 模块(这个模块还有 SQL API),通过比 RDD 多的数据的结构信息(Schema),Spark SQL 在计算的时候可以进行额外的优化。 Spark SQL's optimized execution engine[1]。通过列名,在处理数据的时候就可以通过列名操作。

RDD、DataFrame 和 DataSet 的区别中介绍了 DatasetAPI 的优势,MLlib 里也加大了对 DataSetAPI 的支持,并且提到 The RDD-based API is expected to be removed in Spark 3.0。所以未来推荐使用 DataSetAPI。

2、使用介绍

2.1 加载数据

目前 tdw 提供了读取 tdw 表生成 RDD 或 DataFrame 的 API。

1.6API
val sparkConf = new SparkConf()
val sparkContext = new SparkContext(sparkConf)
val rdd = new TDWProvider(sparkContext, tdwUser, tdwPasswd, dbName).table(tblName, partitions)


2.0API
val sparkSession = SparkSession.builder().getOrCreate()
val tdwDataFrame = new TDWSQLProvider(sparkSession, tdwUser, tdwPasswd, dbName).table(tblName, partitions)
这个返回的是 带有 Schema 的 数据,DataFrame 即 Dataset[Row]

val tdwRDD: RDD[Array[String]] = new TDWProvider(sparkSession.sparkContext, tdwUser, tdwPasswd, dbName).table(tblName, partitions)

或者直接读取 HDFS

val hdfsURL="hdfs://**/**/**/**/ds=20170101/*gz"
val hdfsRdd = sparkSession.sparkContext.textFile(hdfsURL)

这个生成的是 一个 RDD[Sting], 每一行是一个字符串,需要用户自己去分割读取

2.2 转换操作

1、选择指定列

//查看表的 Schema
tdwDataFrame.printSchema()
                root
                 |-- tdbank_imp_date: string (nullable = true)
                 |-- ftime: string (nullable = true)
                 |-- gid: long (nullable = true)
                 |-- aid: string (nullable = true)
                 |-- adid: long (nullable = true)

//选取指定列 方法 1
tdwDataFrame.createTempView("table1")
// 函数说明 createTempView Creates a temporary view using the given name. The lifetime of this temporary view is tied to the SparkSession that was used to create this Dataset. 在整个 SparkSession 期间创建一次就好,如果同一个创建了两次车,会报错
val selectDataFrame1 = sparkSession.sql("select ftime, gid from table1")


//选取指定列 方法 2
val columnNames: List[Column] = List(col("ftime"), col("gid") as userId) 
//说明: def col(colName: String): Column, 用列名构成 column 类型,并且可以用 as 指定别名。

val selectDataFrame2: DataFrame = originalDataset.select(columnNames:_*)

//如果是 RDD
val rddToRdd: RDD[Row] = hdfsRdd.map(
        dataStr => {
                val dataList: Array[String] = dataStr.split(sep, -1)
                Row.fromSeq(dataList)
        }
)


val fieldIds: List[StructField] = List(
        StructField("tdbank_imp_date", StringType, true),
        StructField("ftime", StringType, true),
        StructField("gid", StringType, true),
        StructField("adid", StringType, true)
)
val schema: StructType = StructType(fieldIds)
val rddToDataFrame: DataFrame = sparkSession.createDataFrame(rdd, schema)

2、map filter

val filterData = tdwDataFrame.filter(
        element => {
                var retFlag = true
                if(element.isNullAt(element.fieldIndex("gid")) || element.getAs("gid").toString()){
                        retFlag = false
                }
                retFlag
        }
)

//

这里 有两个地方需要说明 isNullAt 首先要判断要选取的列的值是否为空, 如果是空,直接读取数据会抛异常。getAs 本来是要指定具体的类型的,如 getAs[String],但因为 tdwDataFrame 的 schema 已知,包括各个字段的类型,如 gid 是 long, 这样如果按 getAs[String] 有可能会发生类型转换异常。

转换加工某些字段,即将原来的 DataFrame map 操作转换成另外一个 DataFrame。最开始的想法是用 scala 的 一些列表类型封装数据,当每个列的类型相同的时候,用数组 如 Array[String],但一般情况下是不同的,就用元组("a", 1, …),但这个方法有个局限,我们以 RDD 的操作为例,但在 DataFrame 中也是一样的

val mRdd2 = filterRdd.map( 
        x => (
                x(1), x(2), ... , x(23)            
        ) 

)

//语法错误:too many elements for tuple:23, allowed:22  
//编译报错:object Tuple23 is not a member of package scala。  
//当生成的 RDD 是一个超过 22 个字段的记录时,如果用 元组 tuple 就会报错, tuple 是 case class

不使用 数组和元组,而使用 Row

implicit val rowEncoder = Encoders.kryo[Row]

val mapDataset:Dataset[Row] = filterDataset.map(
        element => {
                val tempList:ListBuffer[Any] = new ListBuffer[Any]
                tempList  = element.getAs("gid")
                tempList  = element.getAs("aid")
                ...
                Row.fromSeq(tempList)
        }
)
val schema2 = StructType(...)
val mapDataFrame = SparkSession.createDataFrame(mapDataset.rdd, schema2)

3、聚合分组统计

//指定列,分组统计
val aggDagaset = mapDataFrame.groupBy(...).agg(...)

//api 说明,上面的操作有两步,
def groupBy(cols: Column*): RelationalGroupedDataset
def agg(expr: Column, exprs: Column*): DataFrame

//注意 import
import org.apache.spark.sql.functions._

val aggDagaset = mapDataFrame.groupBy(col("gid")).agg(count("gid") as cnt)
最后返回的是分组字段,和计算字段
即:gid, cnt
//分组字段,需要特别提一下的是,可以不指定,即分组字段为空
//计算字段,可以用 sql 写法,跟 sql 很类似
count("***") as taskField
sum("***") as taskField
countDistinct("***") as taskField
round(sum("***")/countDistinct("***"), 4) as taskField



//由于 agg 这个函数,必须要传两个参数,所以自己写了一个函数来封装原始的
def aggDataset(groupDataset: RelationalGroupedDataset, calculateColumns: List[Column]): Dataset[Row] = {
        calculateColumns.size match {
            case 1 => groupDataset.agg(calculateColumns(0))
            case _ => groupDataset.agg(calculateColumns(0), calculateColumns.tail: _*)
        }
    }

// 所以在标准化编程的时候,可以把维度字段,计算的字段封装成数组,然后计算。


//cube
val aggDagaset = mapDataFrame.cube(...).agg(...)

4、union

val unionDataFrame = aggDagaset1.union(aggDagaset2)
//处理空值,将空值替换为 0.0
unionData.na.fill(0.0)

5、NaN 数据中存在数据丢失

NaN,如果数据中存在 NaN(不是 null ),那么一些统计函数算出来的数据就会变成 NaN,如 avg。所以要对数据进行过滤或者转换。

import java.lang.Double.isNaN

if (isNaN(x.getAs("field"))){
        0
}
或者直接过滤掉

6、Sql 语句里一些不支持的函数或写法

不支持的函数: url_decode

不支持的写法 not rlike 支持 rlike,所以在写正则的时候可以取反 如 not rlike '^\d $' 要求不能数字开头,数字结尾,全是数字 就可以写成 rlike '\d[*^0-9] \d[*^0-9]*' 里面至少有一个不是数字的字符

2.3 TDW 一些基础功能

val tdwUtil = new TDWUtil(Constant.tdwUser, Constant.tdwPasswd, "db")
//表是否存在
tdwUtil.tableExist(table)
//分区是否存在
tdwUtil.partitionExist(table, x)

def createTableDesc(tblName: String, cols: Seq[Array[String]]) = {
        new TableDesc()
            .setTblName(tblName)
            .setCols(cols)
            .setComment(tblName)
            .setPartType("LIST")
            .setPartField("imp_date")
            .setCompress(true)
    }


//创建表,会自动创建默认分区,不用再单独创建  
tdwUtil.createTable(tblDesc)

//创建分区
tdwUtil.createListPartition(tblName, partName, datetime)

//清空分区,其实可以在写数据的时候,指定是否覆盖写
tdwUtil.truncatePartition(tblName, partName)

val outputDataset: Dataset[Row]
val sqlProvider = new TDWSQLProvider(sparkSession, Constant.tdwUser, Constant.tdwPasswd, dataBase)
sqlProvider.saveToTable(outputDataset, tblName, partName)


//判断 HDFS 路径是否有数据
val hadoopFileSystem = {
        val conf = sparkSession.sparkContext.hadoopConfiguration
        conf.set("fs.defaultFS", "hdfs://ss-xx-x-v2")
        conf.set("fs.default.name", "hdfs://ss-xx-x-v2")
        FileSystem.get(conf)
}

val fileStatusArr = hadoopFileSystem.globStatus(new Path("hdfs://host/xx/xx/*.gz"))
if(fileStatusArr.size == 0){
        "没有数据"
}

//判断表的分区类型 小时还是天分区
def getTdwTablePartitionType(dataBase: String, table: String) = {
        if(checkTdwTableExist(dataBase, table)){
            val partitions = tdwUtil.getTableInfo(table).partitions.filter(
                x => {
                    x.name != "default"
                }
            )
            if(partitions.length>0){
                val pn = partitions(0).name
                if(pn.length()>10){
                    "h"
                }else{
                    "d"
                }
            }else{
                null
            }
        }else{
            null
        }
    }

2.4 本地测试 JUnit Test

由于 Spark 程序,需要由客户端提交给集群执行,但在程序调试阶段,想快速验证代码逻辑,通过每次提交集群执行程序太费力了,可以在本地测试一下。但其实没必要用 JUnit,但有工具何乐而不用。

import org.junit.Before
import org.junit.Test
class TestFunction {

        var testData: String = null
        //准备 进行测试需要的数据、环境
        @Before
        def setUp() {
                //scala 里这个字符串 表示方法跟 python 一样
                testData = """
2017-02-17 13:27:03,115.224.31.*,2017-02-17 13:27:03,1990,xx.xx.com,/a/20170207/003084.htm,,xx.qq.com
2017-02-17 13:27:03,218.88.230.*,2017-02-17 13:27:03,1990,xx1.xx.com,/public/1/piclib_disp.shtml,cqpid=24861,data.xx.qq.com
2017-02-17 13:27:03,111.196.71.*,2017-02-17 13:27:03,1990,xx2.xx.com,/nba/,ptag=baidu.ald.sc,www.xx.com,/link
2017-02-17 13:27:03,116.117.150.*,2017-02-17 13:27:03,1990,xx3.xx.com,/storage/emulated/0/Android/data/com/files/.webapp/dirs/15/index.html
2017-02-17 13:27:03,10.50.148.*,2017-02-17 13:27:03,1990,xx1.xx.com,/x/cover/j6cgzhtkuonf6te/c0022g5ch18.html,,x.com
2017-02-17 13:26:57,222.212.239.*,2017-02-17 13:26:57,1990,xx2.xx.com,/car_sal/1139/,comefrom=newsApp
                """
        }

        //执行测试逻辑
        @Test
        def tester {
                val dataTable = other(data)

                然后就可以用上面模拟的数据对象,调试代码逻辑了
        }

        //可以封装一些其他的函数,被 testter 调用, 不是必须的
        def other(data: Sting): Array[Row] {
                val fieldNames = new Array(ftime, ip, ftime1, year, host, page, host2, ...)
                val schema = new StructType(fieldNames.map { x => StructField(x.name, StringType, true) })    
                val table = data.trim.split("n").map(
                        x => {
                                val column: Array[Any] = x.trim().split(",").map { x => x.asInstanceOf[Any] }
                                //val kv = fieldNames.map { x => x.name }.zip(column)
                                //println(kv.toList)
                                val r = new GenericRowWithSchema(column, schema)
                                //println(r)
                                r.asInstanceOf[Row]            
                        }
                )
                table
        }

2.5 Spark environment 参数

DataFrame shuffle size 设置值

sparkSession.conf.set("spark.sql.shuffle.partitions", "200") DataFrame groupBy cube 统计的时候,需要 shuffle,目前 tdw 的 shuffle 默认 partiton 的个数是 200, 如果数据量过大,或者 cube 的时候数据膨胀,就要适时调大这个参数。因为一个 partition 对应一个任务,增加 partition 个数,会增加并行的任务数,提高运行效率。

set("spark.default.parallelism", "10")

这个参数好有同样的效果,不过好像应用的场景是 RDD 的 reduce 操作。

2.6 写 mysql

val url = "jdbc:mysql://host:port/database?useUnicode=yes&characterEncoding=UTF-8"
val connectionProperties = new Properties()
connectionProperties.put("user", user)
connectionProperties.put("password", password)
connectionProperties.put("characterEncoding", "UTF-8")
connectionProperties.put("useUnicode", "yes")

unionDataFrame.write.mode(SaveMode.Append).jdbc(url, tblName, connectionProperties)

Scala JDBC 连接 Mysql,操作 mysql

Class.forName("com.mysql.jdbc.Driver")
val conn = DriverManager.getConnection(url)

//查询指定表是否存在
val statement = conn.createStatement()
var sql = s"""
select TABLE_NAME from INFORMATION_SCHEMA.TABLES where TABLE_NAME='$table'
"""
println(sql)
var rs = statement.executeQuery(sql)
if(rs.next){ //有相应的表

}

val sql = s"""
    delete from $table where ds = $datetime      
"""
val rs = statement.executeUpdate(sql)
println(sql   "n 删除的数据记录数: "   rs.toString())

发送 http 请求

import org.json4s
import org.json4s.jackson.JsonMethods._

val url = "http://*****"
val ret = fromURL(url, "utf-8").mkString
//将结果 json 解析成 map 
val retMap = parse(ret).values.asInstanceOf[Map[String, Any]]

参考

【1】Spark SQL, DataFrames and Datasets Guide 【2】RDD、DataFrame 和 DataSet 的区别 【3】TDW API 【4】Spark Programming Guide—-介绍 RDD 【5】RDD 介绍 【6】Spark Scala API

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

编辑于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏程序生活

斯坦福tensorflow教程(二) tensorflow相关运算1.认识下TensorBoard2.常量op3. 数学运算数据类型

2286
来自专栏Jed的技术阶梯

spark读写HBase之使用hortonworks的开源框架shc(二):入门案例

shc测试环境的搭建参考: spark读写HBase之使用hortonworks的开源框架shc(一):源码编译以及测试工程创建

1243
来自专栏Ceph对象存储方案

使用udev增强对ceph储存设备的管理

需求描述 默认情况下磁盘可以使用by-id/by-partlabel/by-parttypeuuid/by-partuuid/by-path/by-uuid等多...

1915
来自专栏机器学习算法全栈工程师

快来操纵你的GPU| CUDA编程入门极简教程

2006年,NVIDIA公司发布了CUDA(http://docs.nvidia.com/cuda/),CUDA是建立在NVIDIA的CPUs上的一个通用并行计...

3864
来自专栏漫漫深度学习路

tensorflow学习笔记(四十五):sess.run(tf.global_variables_initializer()) 做了什么?

当我们训练自己的神经网络的时候,无一例外的就是都会加上一句 sess.run(tf.global_variables_initializer()) ,这行代码的...

2166
来自专栏大内老A

谈谈你最熟悉的System.DateTime[上篇]

最近一直在负责公司内部框架的升级工作,今天对一个小问题进行了重新思考——时间的处理。具体来说,是如何有效地进行时间的处理以提供对跨时区的支持。对于一个分布式的应...

1809
来自专栏Hongten

python开发_python日期操作

883
来自专栏闻道于事

Java常用工具类之RegexpUtils,正则表达式工具类

package com.test.core.util; import org.apache.log4j.Logger; import org.apache.o...

3577
来自专栏技术翻译

了解Spark SQL,DataFrame和数据集

对于数据集和DataFrameAPI存在很多混淆,因此在本文中,我们将带领大家了解SparkSQL、DataFrames和DataSet。

1422
来自专栏大数据和云计算技术

spark 2.0主要特性预览

Spark 2.0相比老版本变化很大,已经发布了预览版本。原始的英文版databricks的博客:https://databricks.com/blog/201...

4109

扫码关注云+社区