让我们说,我有以下用逗号,分隔的原始源数据,但是有一些X数量的字段具有非常自定义的格式。为了简单起见,我将这个示例最小化为3个字段/列。在这种情况下,自定义字段是具有特殊格式的address (键/值被大括号包围)。可能还有完全不同格式的其他字段。
Bob,35,[street:75917;city:new york city;state:ny;zip:10000]
...
Roger,75,[street:81659;city:los angeles;state:ca;zip:99999]个案班:
case class Person(name: String, age: Int, address: Address)
case class Address(street: String, city: String, state: String, zip: Int)将源数据(包括解析地址字段)处理为Dataset[Person]的最有效方法是什么?
目前,人们想到了两种选择:
选项1 -执行逐行手动转换:
val df = df.read.csv(source)
val dataset = df.map(row =>
Person(row.getString("_c0"), row.getInt("_c1"), getAddress(row.getString("_c3")))
).as[Person]选项2 -使用自定义格式列的UDF (用户定义函数),并使用withColumn和withColumnRenamed
val udfAddress : UserDefinedFunction = udf((address: String) => toAddressObject(address))
var df = df.read.csv(source)
df = df.withColumnRenamed("_c0", "name").withColumn("name", col("name").cast(StringType))
.withColumnRenamed("_c1", "age").withColumn("age", col("age").cast(IntegerType))
.withColumnRenamed("_c2", "address").withColumn("address", udfAddress(col("address")))
val dataset = df.as[Person]一般来说,在选项1和选项2之间,什么更有效,为什么?此外,如果有另一个选项在处理/解析自定义格式化字段时更有效,那么我也可以使用其他选项。是否有更好的选择,包括手动使用StructType组合StructFields?谢谢!
发布于 2020-08-29 05:08:56
另一种选择可能是-
请注意,我没有进行任何性能测试
加载测试数据
val data =
"""
|Bob,35,[street:75917;city:new york city;state:ny;zip:10000]
|Roger,75,[street:81659;city:los angeles;state:ca;zip:99999]
""".stripMargin
val stringDS = data.split(System.lineSeparator())
.map(_.split("\\,").map(_.replaceAll("""^[ \t]+|[ \t]+$""", "")).mkString("|"))
.toSeq.toDS()
val df = spark.read
.option("sep", "|")
.option("inferSchema", "true")
// .option("header", "true")
// .option("nullValue", "null")
.csv(stringDS)
df.show(false)
df.printSchema()
/**
* +-----+---+----------------------------------------------------+
* |_c0 |_c1|_c2 |
* +-----+---+----------------------------------------------------+
* |Bob |35 |[street:75917;city:new york city;state:ny;zip:10000]|
* |Roger|75 |[street:81659;city:los angeles;state:ca;zip:99999] |
* +-----+---+----------------------------------------------------+
*
* root
* |-- _c0: string (nullable = true)
* |-- _c1: integer (nullable = true)
* |-- _c2: string (nullable = true)
*/将行数据转换为人
val person = ScalaReflection.schemaFor[Person].dataType.asInstanceOf[StructType]
val toAddr = udf((map: Map[String, String]) => Address(map("street"), map("city"), map("state"), map("zip").toInt))
val p = df.withColumn("_c2", translate($"_c2", "[]",""))
.withColumn("_c2", expr("str_to_map(_c2, ';', ':')"))
.withColumn("_c2", toAddr($"_c2"))
.toDF(person.map(_.name): _*)
.as[Person]
p.show(false)
p.printSchema()
/**
* +-----+---+---------------------------------+
* |name |age|address |
* +-----+---+---------------------------------+
* |Bob |35 |[75917, new york city, ny, 10000]|
* |Roger|75 |[81659, los angeles, ca, 99999] |
* +-----+---+---------------------------------+
*
* root
* |-- name: string (nullable = true)
* |-- age: integer (nullable = true)
* |-- address: struct (nullable = true)
* | |-- street: string (nullable = true)
* | |-- city: string (nullable = true)
* | |-- state: string (nullable = true)
* | |-- zip: integer (nullable = false)
*/https://stackoverflow.com/questions/63642833
复制相似问题