首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >如何有效地激发具有复杂字段的自定义模式的读取源数据?

如何有效地激发具有复杂字段的自定义模式的读取源数据?
EN

Stack Overflow用户
提问于 2020-08-29 02:22:59
回答 1查看 330关注 0票数 0

让我们说,我有以下用逗号,分隔的原始源数据,但是有一些X数量的字段具有非常自定义的格式。为了简单起见,我将这个示例最小化为3个字段/列。在这种情况下,自定义字段是具有特殊格式的address (键/值被大括号包围)。可能还有完全不同格式的其他字段。

代码语言:javascript
运行
复制
Bob,35,[street:75917;city:new york city;state:ny;zip:10000]
...
Roger,75,[street:81659;city:los angeles;state:ca;zip:99999]

个案班:

代码语言:javascript
运行
复制
case class Person(name: String, age: Int, address: Address)
case class Address(street: String, city: String, state: String, zip: Int)

将源数据(包括解析地址字段)处理为Dataset[Person]的最有效方法是什么?

目前,人们想到了两种选择:

选项1 -执行逐行手动转换:

代码语言:javascript
运行
复制
val df = df.read.csv(source)
val dataset = df.map(row => 
    Person(row.getString("_c0"), row.getInt("_c1"), getAddress(row.getString("_c3")))
).as[Person]

选项2 -使用自定义格式列的UDF (用户定义函数),并使用withColumnwithColumnRenamed

代码语言:javascript
运行
复制
val udfAddress : UserDefinedFunction = udf((address: String) => toAddressObject(address))
var df = df.read.csv(source)
df = df.withColumnRenamed("_c0", "name").withColumn("name", col("name").cast(StringType))
       .withColumnRenamed("_c1", "age").withColumn("age", col("age").cast(IntegerType))
       .withColumnRenamed("_c2",  "address").withColumn("address", udfAddress(col("address")))
val dataset = df.as[Person]

一般来说,在选项1选项2之间,什么更有效,为什么?此外,如果有另一个选项在处理/解析自定义格式化字段时更有效,那么我也可以使用其他选项。是否有更好的选择,包括手动使用StructType组合StructFields?谢谢!

EN

回答 1

Stack Overflow用户

发布于 2020-08-29 05:08:56

另一种选择可能是-

请注意,我没有进行任何性能测试

加载测试数据

代码语言:javascript
运行
复制
 val data =
      """
        |Bob,35,[street:75917;city:new york city;state:ny;zip:10000]
        |Roger,75,[street:81659;city:los angeles;state:ca;zip:99999]
      """.stripMargin

    val stringDS = data.split(System.lineSeparator())
      .map(_.split("\\,").map(_.replaceAll("""^[ \t]+|[ \t]+$""", "")).mkString("|"))
      .toSeq.toDS()
    val df = spark.read
      .option("sep", "|")
      .option("inferSchema", "true")
//      .option("header", "true")
      //      .option("nullValue", "null")
      .csv(stringDS)
    df.show(false)
    df.printSchema()
    /**
      * +-----+---+----------------------------------------------------+
      * |_c0  |_c1|_c2                                                 |
      * +-----+---+----------------------------------------------------+
      * |Bob  |35 |[street:75917;city:new york city;state:ny;zip:10000]|
      * |Roger|75 |[street:81659;city:los angeles;state:ca;zip:99999]  |
      * +-----+---+----------------------------------------------------+
      *
      * root
      * |-- _c0: string (nullable = true)
      * |-- _c1: integer (nullable = true)
      * |-- _c2: string (nullable = true)
      */

将行数据转换为人

代码语言:javascript
运行
复制
val person = ScalaReflection.schemaFor[Person].dataType.asInstanceOf[StructType]
    val toAddr = udf((map: Map[String, String]) => Address(map("street"), map("city"), map("state"), map("zip").toInt))
    val p = df.withColumn("_c2", translate($"_c2", "[]",""))
      .withColumn("_c2", expr("str_to_map(_c2, ';', ':')"))
      .withColumn("_c2", toAddr($"_c2"))
      .toDF(person.map(_.name): _*)
      .as[Person]


    p.show(false)
    p.printSchema()

    /**
      * +-----+---+---------------------------------+
      * |name |age|address                          |
      * +-----+---+---------------------------------+
      * |Bob  |35 |[75917, new york city, ny, 10000]|
      * |Roger|75 |[81659, los angeles, ca, 99999]  |
      * +-----+---+---------------------------------+
      *
      * root
      * |-- name: string (nullable = true)
      * |-- age: integer (nullable = true)
      * |-- address: struct (nullable = true)
      * |    |-- street: string (nullable = true)
      * |    |-- city: string (nullable = true)
      * |    |-- state: string (nullable = true)
      * |    |-- zip: integer (nullable = false)
      */
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/63642833

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档