我正在使用NodeJS消费来自Kafka的消息,收到消息后,我会带着它在Elasticsearch中创建索引。这是我的代码片段:
kafkaConsumer.on('message', function (message) {
elasticClient.index({
index: 'test',
type: 'sample',
body: message
}, function (error, response) {
if (error) {
// Stop consuming message here
console.log(error);
}
console.log(response);
});
});
我希望确保在继续使用下一条消息之前必须成功创建索引,因为我不希望丢失任何消息。
发布于 2016-05-31 12:51:47
试试consumer.pause()/resume()
吧。
pause()暂停使用者
resume()恢复消费者
发布于 2016-05-31 12:55:00
您可以暂停您的消费者,这样它就不会订阅和获取新消息。一旦你得到elasticClient的响应,你就可以恢复你的kafka消费者了。
kafkaConsumer.pause();
kafkaConsumer.resume();
根据您的设置使用它。
https://stackoverflow.com/questions/37545494
复制相似问题