我正在尝试将Spark Streaming设置为读取MQTT源,但当我收到第二条消息时,它会启动一个异常。我有以下代码:import org.apache.commons.io.FileUtils// Read text from socket
val lines = spark.readStream.format("org.apache.bahir.sql.streaming
我正在尝试将一个ApacheSpark结构化流连接到一个MQTT主题(在这个例子中,IBM平台在IBM上)。我创建结构化流如下所示: .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider所以这个问题已经解决了,但是如果我开始使用以下行从流中读取:
val = df.writeStream。JSON表示的字符串对象流</em