首页
学习
活动
专区
工具
TVP
发布
社区首页 >问答首页 >如何在使用Spark Streaming对kafka进行流媒体时进行去重?

如何在使用Spark Streaming对kafka进行流媒体时进行去重?
EN

Stack Overflow用户
提问于 2018-02-12 12:35:26
回答 6查看 5.3K关注 0票数 2

我有一个案例,Kafka生产者一天发送两次数据。这些生产者从数据库/文件中读取所有数据并发送给Kafka。所以这些信息是每天发送的,是重复的。我需要对消息进行重复数据删除,并使用Spark流在一些持久存储中写入。在这种情况下,删除重复消息的最佳方法是什么?

发送的重复消息是带有时间戳字段的json字符串,仅更新。

注意:我不能改变Kafka Producer只发送新的数据/消息,它已经安装在客户机上并由其他人编写。

EN

回答 6

Stack Overflow用户

发布于 2018-02-12 15:51:02

对于重复数据删除,您需要在某个位置存储有关已处理内容的信息(例如,邮件的唯一ids )。

要存储消息,您可以使用:

  1. spark检查点。优点:开箱即用。缺点:如果你更新app的源代码,你需要清理检查点。因此,您将丢失信息。如果对重复数据删除的要求不是很严格,则可以使用此解决方案。
  2. 任何数据库。例如,如果您在hadoop env上运行,则可以使用Hbase。对于你收到的每一封邮件(检查它是否以前没有发送过),当它真的被发送时,在数据库中标记为已发送。
票数 1
EN

Stack Overflow用户

发布于 2018-02-12 20:58:56

您可以将主题配置更改为compact模式。压缩后,Kafka日志中会覆盖/更新具有相同key的记录。在这里,您只能从Kafka获得密钥的最新值。

您可以阅读有关压缩here的更多信息。

票数 1
EN

Stack Overflow用户

发布于 2018-05-03 16:19:33

您可以尝试使用mapWithState。查看我的answer

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/48739605

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档