首页
学习
活动
专区
工具
TVP
发布
社区首页 >问答首页 >如何处理kafka主题中的旧数据?

如何处理kafka主题中的旧数据?
EN

Stack Overflow用户
提问于 2018-11-26 22:40:55
回答 1查看 211关注 0票数 2

我开始使用spark structured。

我通过waterMark从kafka topic (startOffset: latest)获取readStream,按事件时间和窗口时长分组,并写入kafka topic。

我的问题是,在spark结构化流媒体作业之前,我如何处理写入kafka主题的数据?

一开始我试着用‘`startOffset: with’来运行。但是kafka topic中的数据量太大,因此没有启动spark streaming流程,因为纱线超时。(即使我增加了超时值)

如果我简单地创建一个批处理作业并按特定的数据范围进行过滤,则为

  1. 。结果没有反映在火花流的当前状态中,结果的一致性和准确性似乎存在问题。

  1. I尝试重置检查点目录,但不起作用。

如何处理旧数据和大数据?帮帮我。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2018-11-27 12:31:30

您可以尝试使用Kafka + Structured Streaming的参数maxOffsetsPerTrigger来接收来自Kafka的旧数据。设置此参数的值为您希望一次从Kafka接收的记录数。

使用:

代码语言:javascript
复制
sparkSession.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("subscribe", "test-name")
      .option("startingOffsets", "earliest")
      .option("maxOffsetsPerTrigger", 1)
      .option("group.id", "2")
      .option("auto.offset.reset", "earliest")
      .load()
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/53483483

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档