我有以下火花数据:-
df1
id dia_date
1 2/12/17
1 4/25/16
2 12/8/17
2 6/12/11
df2
id obs_date obs_value
1 2/16/17 4
1 2/20/17 2
1 2/9/17 4
1 12/12/18 5
1 4/18/16 1
1 4/18/16 6
1 4/30/16 7
1 5/25/16 9
2 12/12/17 10
2 12/6/17 11
2 12/14/17 4
2 6/11/11 5
2 6/11/11 6
我希望得到的数据如下:
1)通过比较df1在df2中的数据,找出三个最近的日期。
2)如果没有3个最近的日期,则插入null。
3)最近的日期只应按id字段分组。与id '1‘的dia_date类似,我们必须查看df2中仅用于id1 '1’的obs_date字段。
产生数据的示例:-
id dia_date obs_date1 obs_val1 obs_date2 obs_val2 obs_date3 obs_val3
1 2/12/17 2/9/17 4 2/16/17 4 2/20/17 2
1 4/25/16 4/18/16 1 4/18/16 6 4/30/16 7
2 12/8/17 12/6/17 11 12/12/17 10 12/14/17 4
2 6/12/11 6/11/11 5 6/11/11 6 null null
我想用火花放电来做。尝试了一些方法,但发现它真的很难,因为我刚刚开始与火星雨。
发布于 2018-11-04 18:48:26
这里有一个Scala的答案,因为这个问题与火花放电无关。你可以皈依。
您的最后输出我无法得到,但有一个替代方案就足够了。
//假设我们也可以进一步优化这一点,但不能这样做。//假设要与之比较的不同值。如果没有,则需要进一步的逻辑。//在排名中发现的错误--或者是我?围绕这一点工作,放弃逻辑//转轴在这里没有帮助。在SQL中按分组检查特定列的名称不优雅,使用了更多的Scala方法。
import org.apache.spark.sql.functions._
import spark.implicits._
import java.time._
import org.apache.spark.sql.functions.{rank}
import org.apache.spark.sql.expressions.Window
def toEpochDay(s: String) = LocalDate.parse(s).toEpochDay
val toEpochDayUdf = udf(toEpochDay(_: String))
// Our input.
val df0 = Seq(
("1","2018-09-05"), ("1","2018-09-14"),
("2","2018-12-23"), ("5","2015-12-20"),
("6","2018-12-23")
).toDF("id", "dia_dt")
val df1 = Seq(
("1","2018-09-06", 5), ("1","2018-09-07", 6), ("6","2023-09-07", 7),
("2","2018-12-23", 4), ("2","2018-12-24", 5), ("2","2018-10-23", 5),
("1","2017-09-06", 5), ("1","2017-09-07", 6),
("5","2015-12-20", 5), ("5","2015-12-21", 6), ("5","2015-12-19", 5), ("5","2015-12-18", 7), ("5","2015-12-22", 5),
("5","2015-12-23", 6), ("5","2015-12-17", 6), ("5","2015-12-26", 60)
).toDF("id", "obs_dt", "obs_val")
val myExpression = "abs(dia_epoch - obs_epoch)"
// Hard to know how to restrict further at this point.
val df2 = df1.withColumn("obs_epoch", toEpochDayUdf($"obs_dt"))
val df3 = df2.join(df0, Seq("id"), "inner").withColumn("dia_epoch", toEpochDayUdf($"dia_dt"))
.withColumn("abs_diff", expr(myExpression))
@transient val w1 = org.apache.spark.sql.expressions.Window.partitionBy("id", "dia_epoch" ).orderBy(asc("abs_diff"))
val df4 = df3.select($"*", rank.over(w1).alias("rank")) // This is required
// Final results as collect_list. Distinct column names not so easy due to not being able to use pivot - may be a limitation on knowledge on my side.
df4.orderBy("id", "dia_dt")
.filter($"rank" <= 3)
.groupBy($"id", $"dia_dt")
.agg(collect_list(struct($"obs_dt", $"obs_val")).as("observations"))
.show(false)
返回:
+---+----------+---------------------------------------------------+
|id |dia_dt |observations |
+---+----------+---------------------------------------------------+
|1 |2018-09-05|[[2017-09-07, 6], [2018-09-06, 5], [2018-09-07, 6]]|
|1 |2018-09-14|[[2017-09-07, 6], [2018-09-06, 5], [2018-09-07, 6]]|
|2 |2018-12-23|[[2018-10-23, 5], [2018-12-23, 4], [2018-12-24, 5]]|
|5 |2015-12-20|[[2015-12-19, 5], [2015-12-20, 5], [2015-12-21, 6]]|
|6 |2018-12-23|[[2023-09-07, 7]] |
+---+----------+---------------------------------------------------+
再往前走一步,繁重的工作完成了。
https://stackoverflow.com/questions/53136737
复制相似问题