我正在实现一个程序,它将整个数据帧作为参数。我知道这可能不是Spark的支持,但我想知道有没有好的方法来解决我的问题。
我有一个Spark数据框,如下所示:
Item_sale_table
item_id date Sale Amount
aaa 3-11 20
aaa 3-12 21
aaa 3-13 28
... ... ...
bbb 3-11 17
bbb 3-12 12
... ... ...
ccc 3-11 9
... ... ...
Item_List
item_id description
aaa xxxx
bbb xxxyx
ccc zxsa
...
我想做的是,从item_list
表中获取每个项目,并从item_sale
表中收集该项目的历史数据,然后对其应用一个函数(这里是一个简单的计数函数)。
因此,item process函数如下所示
def ItemProcess (item_id: String, Dataset: DataFrame) = {
val item_count = Dataset.filter(Dataset("item_id").contains(item_id)).count()
println(item_id,item_count)
}
调用此函数的主函数是
val item_count_collection = item_list.select("item_id").foreach(x => ItemProcess(x(0).toString, item_sale_table))
然后我得到了
ERROR Executor: Exception in task 4.0 in stage 11.0 (TID 504)
java.lang.NullPointerException
at org.apache.spark.sql.DataFrame.resolve(DataFrame.scala:151)
at org.apache.spark.sql.DataFrame.col(DataFrame.scala:664)
at org.apache.spark.sql.DataFrame.apply(DataFrame.scala:652)
因此,我在foreach函数中传递了整个数据帧。我想这里的problem.But是怎么改正的呢?
========更新=======
我发现,即使像这样嵌入item process函数,我也会得到NullPointerException
val item_count_collection = item_list.select("item_id").foreach(x => Item_sale_table.filter(Item_sale_table("item_id").contains(x(0).toString)).count())
发布于 2016-12-11 04:21:14
聚合和(可选)连接:
val item_counts = item_sale_table.groupBy("item_id").count()
可选的加入:
item_list.join(item_counts, Seq("item_id"))
或者使用contains
(效率较低):
item_list.join(
item_counts,
item_counts("item_id").contains(item_list("item_id"))),
"left"
)
https://stackoverflow.com/questions/41067360
复制相似问题