我有一个Streaming设置,它消耗了一个Kafka主题,我需要使用一些使用的App,但是当我试图转换它时,我得到了以下内容
: org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
kafka
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$anal
我收到了错误信息
java.lang.IllegalArgumentException: Schema must be specified when creating a streaming source DataFrame. If some files already exist in the directory, then depending on the file format you may be able to create a static DataFrame on that directory with 'spark.read.load(directory)' a
当我通过spark-submit启动一个流任务时,我收到关于Kafka属性无效的警告消息:
VerifiableProperties: Property auto.offset.reset is overridden to largest
VerifiableProperties: Property enable.auto.commit is not valid.
VerifiableProperties: Property sasl.kerberos.service.name is not valid
VerifiableProperties: Property key.deserializ
我想对我从一个卡夫卡集群中获得的消息流执行tweet情绪分析,该集群反过来从Twitter v2中获取这些消息。
当我尝试应用预先训练过的情感分析管道时,我会收到一条错误消息,上面写着:Exception: target must be either a spark DataFrame, a list of strings or a string,我想知道是否有办法解决这个问题。
我已经检查了文档,在流数据上找不到任何东西。
这是我使用的代码:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functi
Spark SQL ( DSL而不是API)是否支持结构化流中的窗口功能?Flink中类似的内容如下所示: SELECT user, SUM(amount) FROM Orders GROUP BY TUMBLE(proctime,
INTERVAL '1' DAY), user 我在官方结构化流媒体网站上找到的唯一SQL示例是下面的,其中没有窗口功能的示例: df.createOrReplaceTempView("updates")
spark.sql("select count(*) from updates") // returns
我在运行无法连接到Kinesis数据源的胶水流式作业时遇到以下错误:
错误:
WARNING:root:StreamingQueryException caught. Retry number 10 ERROR:root:Exceeded maximuim number of retries in streaming interval,
exception thrown Parse yarn logs get error message:
StreamingQueryException: 'Error while Describe Streams\n=== Streaming Q
我正在尝试将xml文件(小于100 kb)发送到Azure事件中心,然后在发送它们之后,读取Databricks中的事件。
现在,我已经使用Python以字节发送XML的内容(这个步骤是工作)。但是我想要完成的下一步是从事件的“主体”中读取该XML内容,并使用PYSPARK创建一个Spark。
要做到这一点,我有两个疑问:
XML1-我在选项中指定事件的“主体”的内容是XML的选项吗? 2-是否有其他方法可以直接将该内容转储到Spark ? 3-在将spark.readStream作为事件发送时缺少一些配置吗?
我试着像下面的例子一样:
Python事件生成器
# this is the p
当我从Kafka主题创建一个流并打印它的内容时
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2 pyspark-shell'
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka impo
--我尝试从Socket获取数据以附加到dataframe --我接收了数据并将其执行到Seq()中,但是当我使用forEach将它们附加到dataframe时,有一个问题--这是我的代码:
object CustomReceiver {
def main(args: Array[String]): Unit = {
StreamingExamples.setStreamingLogLevels()
// Create the context with a 1 second batch size
val spark: SparkSession = SparkS