我有一个案例,Kafka生产者一天发送两次数据。这些生产者从数据库/文件中读取所有数据并发送给Kafka。所以这些信息是每天发送的,是重复的。我需要对消息进行重复数据删除,并使用Spark流在一些持久存储中写入。在这种情况下,删除重复消息的最佳方法是什么?
发送的重复消息是带有时间戳字段的json字符串,仅更新。
注意:我不能改变Kafka Producer只发送新的数据/消息,它已经安装在客户机上并由其他人编写。
发布于 2018-02-12 15:51:02
对于重复数据删除,您需要在某个位置存储有关已处理内容的信息(例如,邮件的唯一ids )。
要存储消息,您可以使用:
发布于 2018-02-12 20:58:56
您可以将主题配置更改为compact
模式。压缩后,Kafka日志中会覆盖/更新具有相同key的记录。在这里,您只能从Kafka获得密钥的最新值。
您可以阅读有关压缩here的更多信息。
发布于 2018-05-03 16:19:33
您可以尝试使用mapWithState
。查看我的answer。
https://stackoverflow.com/questions/48739605
复制相似问题