只需设置一个hadoop/kafka/星火,一个节点演示环境。在pyspark中,我尝试读取(.readStream) Kafka消息,并在hadoop中将它写入(.writeStream)到json文件。奇怪的是,在hadoop "output/test“目录下,我可以找到一个创建的json文件,但只有一个消息。卡夫卡的所有新消息都不会更新json文件。但是我想把卡夫卡的所有信息都存储在一个json文件中。
我尝试过控制台(writeStream.format(“控制台”)或kafak(writeStream.format("kafka"))的接收器类型,它正常工作。有什么建议或意见吗?下面是示例代码。
schema = StructType([StructField("stock_name",StringType(),True),
StructField("stock_value", DoubleType(), True),
StructField("timestamp", LongType(), True)])
line = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "127.0.1.1:9092") \
.option("subscribe", "fakestock") \
.option("startingOffsets","earliest")\
.load()\
.selectExpr("CAST(value AS STRING)")
df=line.select(functions.from_json(functions.col("value")\
.cast("string"),schema).alias("parse_value"))\
.select("parse_value.stock_name","parse_value.stock_value","parse_value.timestamp")
query=df.writeStream\
.format("json")\
.option("checkpointLocation", "output/checkpoint")\
.option("path","output/test")\
.start()
发布于 2022-08-28 04:30:51
不可能将所有记录存储在一个文件中。Spark会定期以Kafka用户的身份对批量数据进行投票,然后将这些批数据写入唯一的文件。
在不知道主题中有多少记录的情况下,很难确定输出路径中应该有多少条记录,但是您的代码看起来还行。然而,与JSON相比,Parquet更多地推荐输出格式。
还值得一提的是,Kafka有一个HDFS插件,它只需要编写一个配置文件,不需要火花解析代码。
https://stackoverflow.com/questions/73510798
复制