首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >消费者应否处理讯息,然后将讯息发回卡夫卡

消费者应否处理讯息,然后将讯息发回卡夫卡
EN

Stack Overflow用户
提问于 2019-07-18 04:41:31
回答 1查看 352关注 0票数 0

我想用topic1处理消费者的数据,然后把信息发送回卡夫卡到topic2

Kafka --> Consumer (processing messages) from topic1, then call a Producer to send processed message to topic2 --> Kafka

我的尝试:

代码语言:javascript
复制
 consumer.on('message', (message) => {
     let processedMsg = processMessage(message);

     payloads = [
        { topic: 'topic2', messages: processedMsg }
     ];

     producer.on('ready', function () {
         producer.send(payloads, function (err, data) {
             console.log(data);
         });
     });

     producer.on('error', function (err) {})
});

但是,制片人不能向卡夫卡发送经过处理的信息。我所犯的错误

代码语言:javascript
复制
MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 11 ready listeners added. Use emitter.setMaxListeners() to increase limit

我使用节点模块Kafka-node

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2019-07-18 04:45:36

您需要切换生产者就绪侦听器和使用者消息侦听器的顺序。

否则,您将为每条已消耗的消息设置就绪侦听器

例如

代码语言:javascript
复制
 producer.on('ready', function () {
    consumer.on('message', (message) => {
      let processedMsg = processMessage(message);

      payloads = [
       { topic: 'topic2', messages: processedMsg }
      ];

       producer.send(payloads, function (err, data) {
         console.log(data);
       });
 });

不过,我建议查看这个库,如果主要是处理和转发到新主题https://github.com/nodefluent/kafka-streams/

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/57087236

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档