我正在尝试用以前的非空值(如果存在)填充我的Spark dataframe中缺少的值。我在Python/Pandas中做过这种事情,但是我的数据对于Pandas (在一个小集群上)来说太大了,我是Spark noob。这是Spark可以做到的吗?它可以对多个列执行此操作吗?如果是这样的话,是怎么做的?如果没有,在谁的Hadoop工具套件中有任何替代方法的建议吗?
谢谢!
发布于 2018-05-19 13:51:04
我已经找到了一种不需要额外编码就可以使用Window here的解决方案。所以Jeff是对的,有一个解决方案。完整的代码现在,我将简要地解释它的作用,更多的细节只需查看博客。
from pyspark.sql import Window
from pyspark.sql.functions import last
import sys
# define the window
window = Window.orderBy('time')\
.rowsBetween(-sys.maxsize, 0)
# define the forward-filled column
filled_column_temperature = last(df6['temperature'], ignorenulls=True).over(window)
# do the fill
spark_df_filled = df6.withColumn('temperature_filled', filled_column_temperature)
所以我们的想法是定义一个窗口在数据中滑动(更多关于滑动窗口here),它总是包含实际的行和所有以前的行:
window = Window.orderBy('time')\
.rowsBetween(-sys.maxsize, 0)
请注意,我们按时间排序,因此数据的顺序是正确的。还要注意,使用"-sys.maxsize“可以确保窗口始终包含所有以前的数据,并且在自上而下遍历数据时不断增长,但可能有更有效的解决方案。
使用" last“函数,我们总是处理该窗口中的最后一行。通过传递"ignorenulls=True“,我们定义如果当前行为null,则函数将在窗口中返回最近(最后一个)非NULL值。否则,将使用实际行的值。
好了。
https://stackoverflow.com/questions/38131982
复制相似问题