导语:关于 API 使用踩过的一些坑。
RDD 和 DataFrame 都是一个可以看成有很多行,每一行有若干列的数据集(姑且先按照记录和字段的概念来理解)
在 scala 中可以这样表示一个 RDD: RDD[Array[String]] 每条记录是字符串构成的数组 RDD[(String, Int, ….)] 每条记录是多个不同类型的数据构成的元组
RDD 是分布式的 Java 对象的集合,RDD 中每个字段的数据都是强类型的
当在程序中处理数据的时候,遍历每条记录,每个值,往往通过索引读取
val filterRdd = rdd.filter(
x => {
var result = true
if((x(30)=="" && x(13)!="5" && x(13)!="")){
result = false
}
result
}
)
这种方式在 MapReduce 程序中也常常见到。
DataFrame 则是一个每列有命名的数据集,类似于关系数据库中的表,读取某一列数据的时候可以通过列名读取。所以相对于 RDD,DataFrame 提供了更详细的数据的结构信息 schema。
在 Spark 2.1 中, DataFrame 的概念已经弱化了,将它视为 DataSet 的一种实现
DataFrame is simply a type alias of Dataset[Row] @DataFrame=org.apache.spark.sql.Dataset[org.apache.spark.sql.Row"">http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.package@DataFrame=org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]
除了 Row 这种类型之外,还可以是一些其他自定义的类。
Dataset API 属于用于处理结构化数据的 Spark SQL 模块(这个模块还有 SQL API),通过比 RDD 多的数据的结构信息(Schema),Spark SQL 在计算的时候可以进行额外的优化。 Spark SQL's optimized execution engine[1]。通过列名,在处理数据的时候就可以通过列名操作。
RDD、DataFrame 和 DataSet 的区别中介绍了 DatasetAPI 的优势,MLlib 里也加大了对 DataSetAPI 的支持,并且提到 The RDD-based API is expected to be removed in Spark 3.0。所以未来推荐使用 DataSetAPI。
目前 tdw 提供了读取 tdw 表生成 RDD 或 DataFrame 的 API。
1.6API
val sparkConf = new SparkConf()
val sparkContext = new SparkContext(sparkConf)
val rdd = new TDWProvider(sparkContext, tdwUser, tdwPasswd, dbName).table(tblName, partitions)
2.0API
val sparkSession = SparkSession.builder().getOrCreate()
val tdwDataFrame = new TDWSQLProvider(sparkSession, tdwUser, tdwPasswd, dbName).table(tblName, partitions)
这个返回的是 带有 Schema 的 数据,DataFrame 即 Dataset[Row]
val tdwRDD: RDD[Array[String]] = new TDWProvider(sparkSession.sparkContext, tdwUser, tdwPasswd, dbName).table(tblName, partitions)
或者直接读取 HDFS
val hdfsURL="hdfs://**/**/**/**/ds=20170101/*gz"
val hdfsRdd = sparkSession.sparkContext.textFile(hdfsURL)
这个生成的是 一个 RDD[Sting], 每一行是一个字符串,需要用户自己去分割读取
//查看表的 Schema
tdwDataFrame.printSchema()
root
|-- tdbank_imp_date: string (nullable = true)
|-- ftime: string (nullable = true)
|-- gid: long (nullable = true)
|-- aid: string (nullable = true)
|-- adid: long (nullable = true)
//选取指定列 方法 1
tdwDataFrame.createTempView("table1")
// 函数说明 createTempView Creates a temporary view using the given name. The lifetime of this temporary view is tied to the SparkSession that was used to create this Dataset. 在整个 SparkSession 期间创建一次就好,如果同一个创建了两次车,会报错
val selectDataFrame1 = sparkSession.sql("select ftime, gid from table1")
//选取指定列 方法 2
val columnNames: List[Column] = List(col("ftime"), col("gid") as userId)
//说明: def col(colName: String): Column, 用列名构成 column 类型,并且可以用 as 指定别名。
val selectDataFrame2: DataFrame = originalDataset.select(columnNames:_*)
//如果是 RDD
val rddToRdd: RDD[Row] = hdfsRdd.map(
dataStr => {
val dataList: Array[String] = dataStr.split(sep, -1)
Row.fromSeq(dataList)
}
)
val fieldIds: List[StructField] = List(
StructField("tdbank_imp_date", StringType, true),
StructField("ftime", StringType, true),
StructField("gid", StringType, true),
StructField("adid", StringType, true)
)
val schema: StructType = StructType(fieldIds)
val rddToDataFrame: DataFrame = sparkSession.createDataFrame(rdd, schema)
val filterData = tdwDataFrame.filter(
element => {
var retFlag = true
if(element.isNullAt(element.fieldIndex("gid")) || element.getAs("gid").toString()){
retFlag = false
}
retFlag
}
)
//
这里 有两个地方需要说明 isNullAt 首先要判断要选取的列的值是否为空, 如果是空,直接读取数据会抛异常。getAs 本来是要指定具体的类型的,如 getAs[String],但因为 tdwDataFrame 的 schema 已知,包括各个字段的类型,如 gid 是 long, 这样如果按 getAs[String] 有可能会发生类型转换异常。
转换加工某些字段,即将原来的 DataFrame map 操作转换成另外一个 DataFrame。最开始的想法是用 scala 的 一些列表类型封装数据,当每个列的类型相同的时候,用数组 如 Array[String],但一般情况下是不同的,就用元组("a", 1, …),但这个方法有个局限,我们以 RDD 的操作为例,但在 DataFrame 中也是一样的
val mRdd2 = filterRdd.map(
x => (
x(1), x(2), ... , x(23)
)
)
//语法错误:too many elements for tuple:23, allowed:22
//编译报错:object Tuple23 is not a member of package scala。
//当生成的 RDD 是一个超过 22 个字段的记录时,如果用 元组 tuple 就会报错, tuple 是 case class
不使用 数组和元组,而使用 Row
implicit val rowEncoder = Encoders.kryo[Row]
val mapDataset:Dataset[Row] = filterDataset.map(
element => {
val tempList:ListBuffer[Any] = new ListBuffer[Any]
tempList = element.getAs("gid")
tempList = element.getAs("aid")
...
Row.fromSeq(tempList)
}
)
val schema2 = StructType(...)
val mapDataFrame = SparkSession.createDataFrame(mapDataset.rdd, schema2)
//指定列,分组统计
val aggDagaset = mapDataFrame.groupBy(...).agg(...)
//api 说明,上面的操作有两步,
def groupBy(cols: Column*): RelationalGroupedDataset
def agg(expr: Column, exprs: Column*): DataFrame
//注意 import
import org.apache.spark.sql.functions._
val aggDagaset = mapDataFrame.groupBy(col("gid")).agg(count("gid") as cnt)
最后返回的是分组字段,和计算字段
即:gid, cnt
//分组字段,需要特别提一下的是,可以不指定,即分组字段为空
//计算字段,可以用 sql 写法,跟 sql 很类似
count("***") as taskField
sum("***") as taskField
countDistinct("***") as taskField
round(sum("***")/countDistinct("***"), 4) as taskField
//由于 agg 这个函数,必须要传两个参数,所以自己写了一个函数来封装原始的
def aggDataset(groupDataset: RelationalGroupedDataset, calculateColumns: List[Column]): Dataset[Row] = {
calculateColumns.size match {
case 1 => groupDataset.agg(calculateColumns(0))
case _ => groupDataset.agg(calculateColumns(0), calculateColumns.tail: _*)
}
}
// 所以在标准化编程的时候,可以把维度字段,计算的字段封装成数组,然后计算。
//cube
val aggDagaset = mapDataFrame.cube(...).agg(...)
val unionDataFrame = aggDagaset1.union(aggDagaset2)
//处理空值,将空值替换为 0.0
unionData.na.fill(0.0)
NaN,如果数据中存在 NaN(不是 null ),那么一些统计函数算出来的数据就会变成 NaN,如 avg。所以要对数据进行过滤或者转换。
import java.lang.Double.isNaN
if (isNaN(x.getAs("field"))){
0
}
或者直接过滤掉
不支持的函数: url_decode
不支持的写法
not rlike
支持 rlike,所以在写正则的时候可以取反
如 not rlike '^\d $'
要求不能数字开头,数字结尾,全是数字
就可以写成 rlike '\d[*^0-9] \d[*^0-9]*'
里面至少有一个不是数字的字符
val tdwUtil = new TDWUtil(Constant.tdwUser, Constant.tdwPasswd, "db")
//表是否存在
tdwUtil.tableExist(table)
//分区是否存在
tdwUtil.partitionExist(table, x)
def createTableDesc(tblName: String, cols: Seq[Array[String]]) = {
new TableDesc()
.setTblName(tblName)
.setCols(cols)
.setComment(tblName)
.setPartType("LIST")
.setPartField("imp_date")
.setCompress(true)
}
//创建表,会自动创建默认分区,不用再单独创建
tdwUtil.createTable(tblDesc)
//创建分区
tdwUtil.createListPartition(tblName, partName, datetime)
//清空分区,其实可以在写数据的时候,指定是否覆盖写
tdwUtil.truncatePartition(tblName, partName)
val outputDataset: Dataset[Row]
val sqlProvider = new TDWSQLProvider(sparkSession, Constant.tdwUser, Constant.tdwPasswd, dataBase)
sqlProvider.saveToTable(outputDataset, tblName, partName)
//判断 HDFS 路径是否有数据
val hadoopFileSystem = {
val conf = sparkSession.sparkContext.hadoopConfiguration
conf.set("fs.defaultFS", "hdfs://ss-xx-x-v2")
conf.set("fs.default.name", "hdfs://ss-xx-x-v2")
FileSystem.get(conf)
}
val fileStatusArr = hadoopFileSystem.globStatus(new Path("hdfs://host/xx/xx/*.gz"))
if(fileStatusArr.size == 0){
"没有数据"
}
//判断表的分区类型 小时还是天分区
def getTdwTablePartitionType(dataBase: String, table: String) = {
if(checkTdwTableExist(dataBase, table)){
val partitions = tdwUtil.getTableInfo(table).partitions.filter(
x => {
x.name != "default"
}
)
if(partitions.length>0){
val pn = partitions(0).name
if(pn.length()>10){
"h"
}else{
"d"
}
}else{
null
}
}else{
null
}
}
由于 Spark 程序,需要由客户端提交给集群执行,但在程序调试阶段,想快速验证代码逻辑,通过每次提交集群执行程序太费力了,可以在本地测试一下。但其实没必要用 JUnit,但有工具何乐而不用。
import org.junit.Before
import org.junit.Test
class TestFunction {
var testData: String = null
//准备 进行测试需要的数据、环境
@Before
def setUp() {
//scala 里这个字符串 表示方法跟 python 一样
testData = """
2017-02-17 13:27:03,115.224.31.*,2017-02-17 13:27:03,1990,xx.xx.com,/a/20170207/003084.htm,,xx.qq.com
2017-02-17 13:27:03,218.88.230.*,2017-02-17 13:27:03,1990,xx1.xx.com,/public/1/piclib_disp.shtml,cqpid=24861,data.xx.qq.com
2017-02-17 13:27:03,111.196.71.*,2017-02-17 13:27:03,1990,xx2.xx.com,/nba/,ptag=baidu.ald.sc,www.xx.com,/link
2017-02-17 13:27:03,116.117.150.*,2017-02-17 13:27:03,1990,xx3.xx.com,/storage/emulated/0/Android/data/com/files/.webapp/dirs/15/index.html
2017-02-17 13:27:03,10.50.148.*,2017-02-17 13:27:03,1990,xx1.xx.com,/x/cover/j6cgzhtkuonf6te/c0022g5ch18.html,,x.com
2017-02-17 13:26:57,222.212.239.*,2017-02-17 13:26:57,1990,xx2.xx.com,/car_sal/1139/,comefrom=newsApp
"""
}
//执行测试逻辑
@Test
def tester {
val dataTable = other(data)
然后就可以用上面模拟的数据对象,调试代码逻辑了
}
//可以封装一些其他的函数,被 testter 调用, 不是必须的
def other(data: Sting): Array[Row] {
val fieldNames = new Array(ftime, ip, ftime1, year, host, page, host2, ...)
val schema = new StructType(fieldNames.map { x => StructField(x.name, StringType, true) })
val table = data.trim.split("n").map(
x => {
val column: Array[Any] = x.trim().split(",").map { x => x.asInstanceOf[Any] }
//val kv = fieldNames.map { x => x.name }.zip(column)
//println(kv.toList)
val r = new GenericRowWithSchema(column, schema)
//println(r)
r.asInstanceOf[Row]
}
)
table
}
DataFrame shuffle size 设置值
sparkSession.conf.set("spark.sql.shuffle.partitions", "200") DataFrame groupBy cube 统计的时候,需要 shuffle,目前 tdw 的 shuffle 默认 partiton 的个数是 200, 如果数据量过大,或者 cube 的时候数据膨胀,就要适时调大这个参数。因为一个 partition 对应一个任务,增加 partition 个数,会增加并行的任务数,提高运行效率。
set("spark.default.parallelism", "10")
这个参数好有同样的效果,不过好像应用的场景是 RDD 的 reduce 操作。
val url = "jdbc:mysql://host:port/database?useUnicode=yes&characterEncoding=UTF-8"
val connectionProperties = new Properties()
connectionProperties.put("user", user)
connectionProperties.put("password", password)
connectionProperties.put("characterEncoding", "UTF-8")
connectionProperties.put("useUnicode", "yes")
unionDataFrame.write.mode(SaveMode.Append).jdbc(url, tblName, connectionProperties)
Scala JDBC 连接 Mysql,操作 mysql
Class.forName("com.mysql.jdbc.Driver")
val conn = DriverManager.getConnection(url)
//查询指定表是否存在
val statement = conn.createStatement()
var sql = s"""
select TABLE_NAME from INFORMATION_SCHEMA.TABLES where TABLE_NAME='$table'
"""
println(sql)
var rs = statement.executeQuery(sql)
if(rs.next){ //有相应的表
}
val sql = s"""
delete from $table where ds = $datetime
"""
val rs = statement.executeUpdate(sql)
println(sql "n 删除的数据记录数: " rs.toString())
import org.json4s
import org.json4s.jackson.JsonMethods._
val url = "http://*****"
val ret = fromURL(url, "utf-8").mkString
//将结果 json 解析成 map
val retMap = parse(ret).values.asInstanceOf[Map[String, Any]]
【1】Spark SQL, DataFrames and Datasets Guide 【2】RDD、DataFrame 和 DataSet 的区别 【3】TDW API 【4】Spark Programming Guide—-介绍 RDD 【5】RDD 介绍 【6】Spark Scala API
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。