我有一个数据集df_1,如下所示:
my_id scope feat_1 feat_2 value_1 value_2 value_3 date
23784 some_code Three A 30 60 60 2022-01-01
23794 some_code Seven B 60 40 20 2022-01-01
23774 some_cod1 Three A 90 40 60 2022-01-02
22784 some_cod1 Three C 30 10 60 2022-01-01
23564 some_cod2 Four A 20 40 20 2022-01-05
20784 some_cod3 Five A 10 70 40 2022-02-08我需要对它执行一个简单的计算,但是由于它经常更新,所以我想确保所有的数据都在那里。为此,我有以下指南df_2。version总是在增加,并且告诉我什么时候发生了最新的更新,而我只关心某个范围和日期的最大和日期。
my_id scope feat_1 feat_2 date version
23784 some_code Three A 2022-01-01 600
23794 some_code Seven B 2022-01-01 600
23774 some_cod1 Three A 2022-01-02 600
22784 some_cod1 Three C 2022-01-01 650
23564 some_cod2 Four A 2022-01-05 650
20784 some_cod3 Five A 2022-02-08 700
20744 some_cod2 Five A 2022-01-05 700
20745 some_cod2 Four C 2022-01-05 700我如何通过** scope 和 date 来查看 df_1组,并得到最大的 version**,,然后查看该版本的**df_1中是否存在所有** my_id**s?
我所做的是执行一个左反连接,以查看哪些my_id在df_2中存在,而在整个数据集中的df_1中不存在。但是,我只关心scope和date中最高的date。我该怎么做?我不能只做df_2.groupBy("scope", "date").max("version")然后做左反连接,对吗?
发布于 2022-08-12 13:44:54
您可以使用pyspark.sql.Window来完成以下操作:
from pyspark.sql import SparkSession, functions as F, Window
windowSpec = Window.partitionBy("scope", "date")
# Only retain rows with the max version after grouping by scope and date.
df2_my_id = (
df2.withColumn("max_version", F.max("version").over(windowSpec))
.filter(F.col("max_version")==F.col("version"))
.select("my_id")
)
df2_my_id.show()
+-----+
|my_id|
+-----+
|22784|
|23774|
|20744|
|20745|
|20784|
|23784|
|23794|
+-----+然后使用pyspark.sql.DataFrame.subtract查找df2中的所有in,而不是df1中的in。
df2_my_id.subtract(df1.select("my_id")).show()
+-----+
|my_id|
+-----+
|20744|
|20745|
+-----+如果此计数大于0,则意味着df2中有一些in在df1中不存在。
df2_my_id.subtract(df1.select("my_id")).count() > 0 # Returns True in this casehttps://stackoverflow.com/questions/73329661
复制相似问题