首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
社区首页 >问答首页 >从writeStream读取到json文件,但在HDFS json文件中只找到一条消息。

从writeStream读取到json文件,但在HDFS json文件中只找到一条消息。
EN

Stack Overflow用户
提问于 2022-08-27 11:57:49
回答 1查看 17关注 0票数 0

只需设置一个hadoop/kafka/星火,一个节点演示环境。在pyspark中,我尝试读取(.readStream) Kafka消息,并在hadoop中将它写入(.writeStream)到json文件。奇怪的是,在hadoop "output/test“目录下,我可以找到一个创建的json文件,但只有一个消息。卡夫卡的所有新消息都不会更新json文件。但是我想把卡夫卡的所有信息都存储在一个json文件中。

我尝试过控制台(writeStream.format(“控制台”)或kafak(writeStream.format("kafka"))的接收器类型,它正常工作。有什么建议或意见吗?下面是示例代码。

代码语言:javascript
代码运行次数:0
运行
复制
schema = StructType([StructField("stock_name",StringType(),True),
                     StructField("stock_value", DoubleType(), True),
                     StructField("timestamp", LongType(), True)])

line = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "127.0.1.1:9092") \
  .option("subscribe", "fakestock") \
  .option("startingOffsets","earliest")\
  .load()\
  .selectExpr("CAST(value AS STRING)")
   
df=line.select(functions.from_json(functions.col("value")\
  .cast("string"),schema).alias("parse_value"))\
  .select("parse_value.stock_name","parse_value.stock_value","parse_value.timestamp")
query=df.writeStream\
  .format("json")\
  .option("checkpointLocation", "output/checkpoint")\
  .option("path","output/test")\
  .start()
EN

回答 1

Stack Overflow用户

发布于 2022-08-28 12:30:51

不可能将所有记录存储在一个文件中。Spark会定期以Kafka用户的身份对批量数据进行投票,然后将这些批数据写入唯一的文件。

在不知道主题中有多少记录的情况下,很难确定输出路径中应该有多少条记录,但是您的代码看起来还行。然而,与JSON相比,Parquet更多地推荐输出格式。

还值得一提的是,Kafka有一个HDFS插件,它只需要编写一个配置文件,不需要火花解析代码。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/73510798

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档