如何在Python Spark structured streaming中使用foreach
在输出上触发操作。
query = wordCounts\
.writeStream\
.outputMode('update')\
.foreach(func)\
.start()
def func():
ops(wordCounts)
发布于 2019-01-20 01:50:30
在Spark 2.4.0中添加了对Python中的foreach接收器的支持,文档也进行了更新:http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch
确保您拥有该版本,现在可以执行以下操作:
def process_row(row):
# Process row
pass
query = streamingDF.writeStream.foreach(process_row).start()
发布于 2018-01-11 21:02:14
TL;DR在火花源中不能使用foreach
方法。
引用Spark Structured Streaming的official documentation (突出我的):
操作允许在输出数据上计算任意操作。从Spark2.1开始,这只适用于Scala和。
发布于 2018-03-28 11:29:18
现在不可能通过任何简单的技巧在pyspark
中使用foreach
,此外,在pyspark
中,update
输出模式只能用于调试。
我推荐你在scala
中使用spark,这并不难学。
https://stackoverflow.com/questions/48201647
复制相似问题