我有个用途,
我需要从数据源中读取记录,并将其写入多个接收器,包括kafka,以及一些聚合。
下面是我的伪代码,
Dataset<Row> dataset = spark.readStream()......
dataset.writeStream().foreachBatch(
// do some processing, including aggregations
// write it to multiple sinks
batch.write().format('kafka').save();
).start().awaitTermination();
我收到了错误信息
java.lang.IllegalArgumentException: Schema must be specified when creating a streaming source DataFrame. If some files already exist in the directory, then depending on the file format you may be able to create a static DataFrame on that directory with 'spark.read.load(directory)' a