带有火花流的Kafka抛出了一个错误:
from pyspark.streaming.kafka import KafkaUtils ImportError: No module named kafka
我已经建立了一个卡夫卡经纪人和一个工作火花环境与一个主人和一个工人。
import os
os.environ['PYSPARK_PYTHON'] = '/usr/bin/python2.7'
import findspark
findspark.init('/usr/spark/spark-3.0.0-preview2-bin-hadoop2.7
我用的是火花2.4。
我正在将星火流应用程序迁移到结构化流。
我正在为每个批处理进行生成度量,并且我希望控制每个微批的统计数据。我对每个processingDelay、schedulingDelay和totalDelay指标以及在结构化流中找到它们的位置很感兴趣。
我尝试了以下方法,但它不生成任何统计数据。
val recentBatchInfos = new StatsReportListener(60).batchInfos
val numberOfRecords = recentBatchInfos.map(_.numRecords).sum
有人能告诉我们如何使用,拥有对统计数据的控制
我有一个要求,把从火花放电脚本创建的日志推到kafka。我正在做POC,所以在windows机器上使用Kafka二进制文件。我的版本是- kafka - 2.4.0,火花- 3.0和python-3.8.1。我用的是吡喃编辑器。
import sys
import logging
from datetime import datetime
try:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka im
我正在开发一个火花流作业(使用结构化流,而不是使用DStreams)。我从kafka收到一条消息,其中将包含许多带有逗号分隔值的字段,其中第一列将是一个文件名。现在,基于该文件名,我将不得不从HDFS读取文件,并创建一个数据文件并在该文件上进一步操作。这似乎很简单,但是seems不允许我在调用start之前运行任何操作。火花文档也引用了同样的话。
此外,还有一些Dataset方法无法在流数据集上工作。它们是将立即运行查询和返回结果的操作,这在流数据集中没有意义。
下面是我尝试过的。
object StructuredStreamingExample {
case class fil
根据
queryName("myTableName")用于在输出接收器为format("memory")时定义内存中的表名。
aggDF
.writeStream
.queryName("aggregates") // this query name will be the table name
.outputMode("complete")
.format("memory")
.start()
spark.sql("select * from aggregates").sho