我想用topic1处理消费者的数据,然后把信息发送回卡夫卡到topic2
Kafka --> Consumer (processing messages) from topic1, then call a Producer to send processed message to topic2 --> Kafka
我的尝试:
consumer.on('message', (message) => {
let processedMsg = processMessage(message);
payloads = [
{ topic: 'topic2', messages: processedMsg }
];
producer.on('ready', function () {
producer.send(payloads, function (err, data) {
console.log(data);
});
});
producer.on('error', function (err) {})
});但是,制片人不能向卡夫卡发送经过处理的信息。我所犯的错误
MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 11 ready listeners added. Use emitter.setMaxListeners() to increase limit我使用节点模块Kafka-node
发布于 2019-07-18 04:45:36
您需要切换生产者就绪侦听器和使用者消息侦听器的顺序。
否则,您将为每条已消耗的消息设置就绪侦听器
例如
producer.on('ready', function () {
consumer.on('message', (message) => {
let processedMsg = processMessage(message);
payloads = [
{ topic: 'topic2', messages: processedMsg }
];
producer.send(payloads, function (err, data) {
console.log(data);
});
});不过,我建议查看这个库,如果主要是处理和转发到新主题https://github.com/nodefluent/kafka-streams/
https://stackoverflow.com/questions/57087236
复制相似问题