我正在尝试使用Kafka to Storm来模拟数据流。我使用KafkaSpout从一个主题中读取一条消息,该消息是由一个生产者发送的,该生产者读取这些Tweet并将它们发送到一个主题。我的问题是,在拓扑使用了该主题中的所有tweet send之后,它会继续读取该主题中的消息两次。如何阻止KafkaSpout读取两次?(复制因子设置为1)
发布于 2019-05-25 05:51:50
对我来说,配置看起来不错。
也许问题出在双重攻击上。确保您在execute
中只对每个元组进行一次压缩。
正如评论中提到的,请考虑升级到更新的Kafka版本,以及切换到storm-kafka-client
。
此外,还可以让您的工作更轻松一些:考虑扩展BaseBasicBolt
而不是BaseRichBolt
。如果运行execute
没有抛出错误,BaseBasicBolt
会自动为您确认元组。如果你想使一个元组失败,你可以抛出FailedException
。只有当你想做更复杂的you时才应该使用BaseRichBolt
,例如在you之前在内存中聚合来自多个execute
调用的元组。
https://stackoverflow.com/questions/56294748
复制相似问题