首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >如何在成功使用KafkaJS时提交单个消息

如何在成功使用KafkaJS时提交单个消息
EN

Stack Overflow用户
提问于 2022-04-14 01:08:33
回答 2查看 1.1K关注 0票数 1

我正试图找到一种类似于node-rdkafka在成功时提交单个消息所做的解决方案。在node-rdkafka中,我能够在消息处理成功后调用consumer.commit(message);。在KafkaJS中,什么是等价的?

到目前为止,我一直试图在consumer.commitOffsets(...)处理程序中调用eachMessage,但似乎没有提交。

我有这样的代码:

代码语言:javascript
运行
复制
import { Kafka, logLevel } from 'kafkajs';

const kafka = new Kafka({
    clientId: 'qa-topic',
    brokers: [process.env.KAFKA_BOOTSTRAP_SERVER],
    ssl: true,
    logLevel: logLevel.INFO,
    sasl: {
        mechanism: 'plain', 
        username: process.env.KAFKA_CONSUMER_SASL_USERNAME,
        password: process.env.KAFKA_CONSUMER_SASL_PASSWORD
    }
});

const consumer = kafka.consumer({
    groupId: process.env.KAFKA_CONSUMER_GROUP_ID
});

const run = async () => {

    // Consuming
    await consumer.connect()
    await consumer.subscribe({ topic: 'my-topic', fromBeginning: true });

    await consumer.run({
        autoCommit: false,
        eachMessage: async ({ topic, partition, message }) => {
            try {
                await processMyMessage(message);
                
                // HOW DO I COMMIT THIS MESSAGE?
                // The below doesn't seem to commit 
                // await consumer.commitOffsets([{ topic: 'my-topic', partition, offset:message.offset }]);

            } catch (e) {
                // log error, but do not commit message
            }
        },
    })
}
EN

回答 2

Stack Overflow用户

发布于 2022-04-14 23:58:38

我想出了怎么做。不能使用eachMessage处理程序,而是使用eachBatch,它允许更灵活地控制消息的提交方式。

代码语言:javascript
运行
复制
const run = async () => {
    await consumer.connect();
    await consumer.subscribe({ topic: 'my-topic', fromBeginning: true });

    await consumer.run({
        eachBatchAutoResolve: false,
        eachBatch: async ({ batch, resolveOffset, isRunning, isStale }) => {
            const promises = [];
            logger.log(`Starting to process ${batch.messages?.length || 0} messages`);
            for (const message of batch.messages) {
                if (!isRunning() || isStale()) break;
                promises.push(handleMessage(batch.topic, batch.partition, message, resolveOffset));
            }
            await Promise.all(promises);
        },
    });
};

然后在handleMessage内部只提交那些成功的消息

代码语言:javascript
运行
复制
const handleMessage = async (topic, partition, message, resolveOffset) => {
    try {
      ....
      
      //Commit message if successful 
      resolveOffset(message.offset);
    } catch(e) {
       ...
       // Do not commit 

}
票数 1
EN

Stack Overflow用户

发布于 2022-04-14 07:18:22

正如文档所述:您只能在consumer.run之后调用consumer.commitOffsets。您正在尝试从run方法中调用它,这就是它不适合您的原因。

请记住,在每条消息之后提交会增加网络流量。如果这是您愿意支付的价格,您可以将自动提交配置为通过将autoCommitThreshold设置为1来处理。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/71865355

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档