我正在处理一些需求,其中我从CSV文件中获得了一个小表格,如下所示:
root
|-- ACCT_NO: string (nullable = true)
|-- SUBID: integer (nullable = true)
|-- MCODE: string (nullable = true)
|-- NewClosedDate: timestamp (nullable = true
我们还有一个非常大的Avro形式的外部配置单元表,它存储在HDFS中,如下所示:
root
-- accountlinks: array (nullable = true)
| | |-- account: struct (nullable = true)
| | | |-- acctno: string (nullable = true)
| | | |-- subid: string (nullable = true)
| | | |-- mcode: string (nullable = true)
| | | |-- openeddate: string (nullable = true)
| | | |-- closeddate: string (nullable = true)
现在,需要根据csv文件中的三列查找外部配置单元表:ACCT_NO - SUBID - MCODE
。如果匹配,则使用CSV文件中的NewClosedDate
更新accountlinks.account.closeddate
。
我已经编写了以下代码来分解所需的列并将其与小表连接,但我不太确定如何使用NewClosedDate更新closeddate字段(对于所有帐户持有人,该字段当前为null ),因为closeddate是一个嵌套列,我不能轻松地使用withColumn来填充它。除此之外,模式和列名不能更改,因为这些文件链接到一些外部配置单元表。
val df = spark.sql("select * from db.table where archive='201711'")
val ExtractedColumn = df
.coalesce(150)
.withColumn("ACCT_NO", explode($"accountlinks.account.acctno"))
.withColumn("SUBID", explode($"accountlinks.account.acctsubid"))
.withColumn("MCODE", explode($"C.mcode"))
val ReferenceData = spark.read.format("csv")
.option("header","true")
.option("inferSchema","true")
.load("file.csv")
val FinalData = ExtractedColumn.join(ReferenceData, Seq("ACCT_NO","SUBID","MCODE") , "left")
发布于 2020-01-13 10:12:53
您需要做的就是分解accountlinks
数组,然后像这样连接两个数据帧:
val explodedDF = df.withColumn("account", explode($"accountlinks"))
val joinCondition = $"ACCT_NO" === $"account.acctno" && $"SUBID" === $"account.subid" && $"MCODE" === $"account.mcode"
val joinDF = explodedDF.join(ReferenceData, joinCondition, "left")
现在,您可以像下面这样更新account
结构列,并收集列表以获取数组结构:
val FinalData = joinDF.withColumn("account",
struct($"account.acctno", $"account.subid", $"account.mcode",
$"account.openeddate", $"NewClosedDate".alias("closeddate")
)
)
.groupBy().agg(collect_list($"account").alias("accountlinks"))
其思想是创建一个新结构,其中包含account
中除closedate
之外的所有字段,这些字段是从NewCloseDate
列获得的。
如果该结构包含许多字段,则可以使用for-comprehension来获取除关闭日期之外的所有字段,以防止键入所有字段。
https://stackoverflow.com/questions/59710866
复制相似问题