如何使用"sourceArchiveDir“和"cleanSource=archive”将源CSV文件移动到归档目录?我正在运行下面的代码,但它不移动源文件,但是流处理工作正常,即它将源文件内容打印到控制台。 import org.apache.spark.sql.streaming.{OutputMode, Trigger}
val inputPath =
"/<here is an absolute path to my project dir>/data/input/spark_full_delta/2021-06-21"
spar
我正在尝试通过spark structured streaming从Kafka中读取数据。但是,在Spark 2.4.0.中,您不能为流设置组id (参见How to set group.id for consumer group in kafka data source in Structured Streaming?)。 然而,由于没有设置,spark只是生成组Id,而我停留在GroupAuthorizationException: 19/12/10 15:15:00 ERROR streaming.MicroBatchExecution: Query [id = 747090ff-12
我正在做一个小任务,使用一个卡夫卡主题读取access_logs文件,然后我计算状态并将状态计数发送到另一个kafka主题。但是,当我不使用输出模式或附加模式时,我仍然会收到错误:
Exception in thread "main" org.apache.spark.sql.AnalysisException: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;;
使用完全模式时:
E