Loading [MathJax]/jax/output/CommonHTML/config.js
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
社区首页 >问答首页 >使用嵌套列连接两个spark Dataframe并更新其中一个列

使用嵌套列连接两个spark Dataframe并更新其中一个列
EN

Stack Overflow用户
提问于 2020-01-13 05:11:18
回答 1查看 251关注 0票数 0

我正在处理一些需求,其中我从CSV文件中获得了一个小表格,如下所示:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
root
 |-- ACCT_NO: string (nullable = true)
 |-- SUBID: integer (nullable = true)
 |-- MCODE: string (nullable = true)
 |-- NewClosedDate: timestamp (nullable = true

我们还有一个非常大的Avro形式的外部配置单元表,它存储在HDFS中,如下所示:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
root
-- accountlinks: array (nullable = true)
 |    |    |-- account: struct (nullable = true)
 |    |    |    |-- acctno: string (nullable = true)
 |    |    |    |-- subid: string (nullable = true)
 |    |    |    |-- mcode: string (nullable = true)
 |    |    |    |-- openeddate: string (nullable = true)
 |    |    |    |-- closeddate: string (nullable = true)

现在,需要根据csv文件中的三列查找外部配置单元表:ACCT_NO - SUBID - MCODE。如果匹配,则使用CSV文件中的NewClosedDate更新accountlinks.account.closeddate

我已经编写了以下代码来分解所需的列并将其与小表连接,但我不太确定如何使用NewClosedDate更新closeddate字段(对于所有帐户持有人,该字段当前为null ),因为closeddate是一个嵌套列,我不能轻松地使用withColumn来填充它。除此之外,模式和列名不能更改,因为这些文件链接到一些外部配置单元表。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
 val df = spark.sql("select * from db.table where archive='201711'")

val ExtractedColumn = df 
.coalesce(150)
.withColumn("ACCT_NO", explode($"accountlinks.account.acctno"))
.withColumn("SUBID", explode($"accountlinks.account.acctsubid"))
.withColumn("MCODE", explode($"C.mcode"))

val ReferenceData = spark.read.format("csv")
.option("header","true")
.option("inferSchema","true")
.load("file.csv")

val FinalData = ExtractedColumn.join(ReferenceData, Seq("ACCT_NO","SUBID","MCODE") , "left")
EN

回答 1

Stack Overflow用户

发布于 2020-01-13 10:12:53

您需要做的就是分解accountlinks数组,然后像这样连接两个数据帧:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
val explodedDF = df.withColumn("account", explode($"accountlinks"))
val joinCondition = $"ACCT_NO" === $"account.acctno" && $"SUBID" === $"account.subid" && $"MCODE" === $"account.mcode"
val joinDF  = explodedDF.join(ReferenceData, joinCondition, "left")

现在,您可以像下面这样更新account结构列,并收集列表以获取数组结构:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
val FinalData = joinDF.withColumn("account", 
                                  struct($"account.acctno", $"account.subid", $"account.mcode", 
                                         $"account.openeddate", $"NewClosedDate".alias("closeddate")
                                        )
                                 )
                        .groupBy().agg(collect_list($"account").alias("accountlinks"))

其思想是创建一个新结构,其中包含account中除closedate之外的所有字段,这些字段是从NewCloseDate列获得的。

如果该结构包含许多字段,则可以使用for-comprehension来获取除关闭日期之外的所有字段,以防止键入所有字段。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/59710866

复制
相关文章
bootstrap笔记(六)——列偏移与嵌套列
嵌套列 为了使用内置的栅格系统将内容再次嵌套,可以通过添加一个新的 .row 元素和一系列 .col-sm-* 元素到已经存在的 .col-sm-* 元素内。被嵌套的行(row)所包含的列(column)的个数不能超过12(其实,没有要求你必须占满12列)。
兮动人
2021/06/11
9910
bootstrap笔记(六)——列偏移与嵌套列
dataframe行变换为列
使用 import org.apache.spark.sql.functions 里面的函数,具体的方式可以看 functions :
机器学习和大数据挖掘
2019/07/01
1.1K0
从DataFrame中删除列
在操作数据的时候,DataFrame对象中删除一个或多个列是常见的操作,并且实现方法较多,然而这中间有很多细节值得关注。
老齐
2021/03/29
7K0
[1016]DataFrame一列拆成多列以及一行拆成多行
在处理数据过程中,会需要将一条数据拆分为多条,比如:a|b|c拆分为a、b、c,并结合其他数据显示为三条数据。
周小董
2021/07/14
7.5K0
[1016]DataFrame一列拆成多列以及一行拆成多行
python用符号拼接DataFrame两列
碰到Null值时,会报错,因为none不可与str运算 解决如下,加入if判断即可
诡途
2022/01/07
1.7K0
pandas dataframe 新增单列和多列
dataframe assign方法,返回一个新对象(副本),不影响旧dataframe对象
lovelife110
2021/01/14
4.3K0
Pandas DataFrame显示行和列的数据不全
pd.set_option('display.max_columns', None)
用户7886150
2020/12/26
6.7K0
如何使用python连接MySQL表的列值?
MySQL 是一个开源关系数据库管理系统,广泛用于存储、管理和组织数据。使用 MySQL 表时,通常需要将多个列值组合成一个字符串以进行报告和分析。Python是一种高级编程语言,提供了多个库,可以连接到MySQL数据库和执行SQL查询。
很酷的站长
2023/08/11
2600
如何使用python连接MySQL表的列值?
【说站】Python DataFrame如何根据列值选择行
以上就是Python DataFrame根据列值选择行的方法,希望对大家有所帮助。
很酷的站长
2022/11/24
5.3K0
【说站】Python DataFrame如何根据列值选择行
GridView添加新列并绑定控件
4、创建控件事件(不能是click事件,关联字段触发的事件要创建Command事件)
用针戳左手中指指头
2021/01/29
1.2K0
GridView添加新列并绑定控件
pandas按行按列遍历Dataframe的几种方式
iterrows(): 按行遍历,将DataFrame的每一行迭代为(index, Series)对,可以通过row[name]对元素进行访问。 itertuples(): 按行遍历,将DataFrame的每一行迭代为元祖,可以通过row[name]对元素进行访问,比iterrows()效率高。 iteritems():按列遍历,将DataFrame的每一列迭代为(列名, Series)对,可以通过row[index]对元素进行访问。 示例数据
kirin
2021/04/30
7.1K0
用SQL语句实现:当A列大于B列时选择A列否则选择B列,当B列大于C列时选择B列否则选择C列。
数据库中有A B C三列,用SQL语句实现:当A列大于B列时选择A列否则选择B列,当B列大于C列时选择B列否则选择C列。
全栈程序员站长
2022/07/09
1.7K0
使用spark对hive表中的多列数据判重
本文处理的场景如下,hive表中的数据,对其中的多列进行判重deduplicate。 1、先解决依赖,spark相关的所有包,pom.xml spark-hive是我们进行hive表spark处理的关键。 <dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <ver
用户1225216
2018/03/05
5.2K0
Pandas 修改单列,多列,Dataframe 数据类型方法汇总
文章目录 1.修改单列的数据类型 2.修改指定多列的数据类型 3.创建dataframe时,修改数据类型 4.读取时,修改数据类型 5.自动 1.修改单列的数据类型 import pandas as pd import numpy as np df = pd.read_csv('test.csv') df['column_name'] = df['column_name'].astype(np.str) print(df.dtypes) 2.修改指定多列的数据类型 import pandas as
白墨石
2021/01/13
6.7K0
PBI-基础入门:添加列与新建列(计算列)
大海:在Power BI里增加列有2种方法,一种是咱们在学Power Query里的“添加列”方法,还有一种是在PowerPivot里的新建“计算列”方法。具体操作方法如下:
大海Power
2021/08/30
7.6K0
spark踩坑——dataframe写入hbase连接异常
最近测试环境基于shc[https://github.com/hortonworks-spark/shc]的hbase-connector总是异常连接不到zookeeper,看下报错日志: 18/06/20 10:45:02 INFO RecoverableZooKeeper: Process identifier=hconnection-0x5175ab05 connecting to ZooKeeper ensemble=localhost:2181 18/06/20 10:45:02 INFO Rec
用户1154259
2018/06/21
2.3K0
怎样能自动按列01 列02 最大为列99,来设置列标题?
前几天在Python最强王者交流群有个粉丝咨询了这个问题:获取到数据表的列数比较简单,一般不超过99列,怎样能自动按列01 列02 最大为列99,来设置列标题?一劳永逸,以后这类场景都这样套用。
前端皮皮
2022/12/19
1.1K0
怎样能自动按列01 列02  最大为列99,来设置列标题?
Spark DataFrame
DataFrame是一种不可变的分布式数据集,这种数据集被组织成指定的列,类似于关系数据库中的表。SchemaRDD作为Apache Spark 1.0版本中的实验性工作,它在Apache Spark 1.3版本中被命名为DataFrame。对于熟悉Python pandas DataFrame或者R DataFrame的读者,Spark DataFrame是一个近似的概念,即允许用户轻松地使用结构化数据(如数据表)。
week
2018/12/07
9180
PQ基础-数据转换4:删列、移列、添加索引列
本文通过一个例子,综合体现常用的删列、移列、添加索引列操作方法。数据样式及要求如下:
大海Power
2021/08/31
1.7K0
Spark中SQL列和并为一行
但是在 spark 中没有 GROUP_CONCAT 命令,查找后发现命令 concat_ws :
机器学习和大数据挖掘
2019/07/02
1.7K0

相似问题

如何在Spark dataframe中使用嵌套列进行连接

111

如何遍历spark dataframe列并逐个访问其中的值?

236

如何比较两个Dataframe并更新其中一个Dataframe中的特定列?

20

更新spark中的dataframe列

512

使用空值连接两列spark dataframe

216
添加站长 进交流群

领取专属 10元无门槛券

AI混元助手 在线答疑

扫码加入开发者社群
关注 腾讯云开发者公众号

洞察 腾讯核心技术

剖析业界实践案例

扫码关注腾讯云开发者公众号
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
查看详情【社区公告】 技术创作特训营有奖征文