我正试图找到一种类似于node-rdkafka
在成功时提交单个消息所做的解决方案。在node-rdkafka
中,我能够在消息处理成功后调用consumer.commit(message);
。在KafkaJS中,什么是等价的?
到目前为止,我一直试图在consumer.commitOffsets(...)
处理程序中调用eachMessage
,但似乎没有提交。
我有这样的代码:
import { Kafka, logLevel } from 'kafkajs';
const kafka = new Kafka({
clientId: 'qa-topic',
brokers: [process.env.KAFKA_BOOTSTRAP_SERVER],
ssl: true,
logLevel: logLevel.INFO,
sasl: {
mechanism: 'plain',
username: process.env.KAFKA_CONSUMER_SASL_USERNAME,
password: process.env.KAFKA_CONSUMER_SASL_PASSWORD
}
});
const consumer = kafka.consumer({
groupId: process.env.KAFKA_CONSUMER_GROUP_ID
});
const run = async () => {
// Consuming
await consumer.connect()
await consumer.subscribe({ topic: 'my-topic', fromBeginning: true });
await consumer.run({
autoCommit: false,
eachMessage: async ({ topic, partition, message }) => {
try {
await processMyMessage(message);
// HOW DO I COMMIT THIS MESSAGE?
// The below doesn't seem to commit
// await consumer.commitOffsets([{ topic: 'my-topic', partition, offset:message.offset }]);
} catch (e) {
// log error, but do not commit message
}
},
})
}
发布于 2022-04-14 23:58:38
我想出了怎么做。不能使用eachMessage
处理程序,而是使用eachBatch
,它允许更灵活地控制消息的提交方式。
const run = async () => {
await consumer.connect();
await consumer.subscribe({ topic: 'my-topic', fromBeginning: true });
await consumer.run({
eachBatchAutoResolve: false,
eachBatch: async ({ batch, resolveOffset, isRunning, isStale }) => {
const promises = [];
logger.log(`Starting to process ${batch.messages?.length || 0} messages`);
for (const message of batch.messages) {
if (!isRunning() || isStale()) break;
promises.push(handleMessage(batch.topic, batch.partition, message, resolveOffset));
}
await Promise.all(promises);
},
});
};
然后在handleMessage
内部只提交那些成功的消息
const handleMessage = async (topic, partition, message, resolveOffset) => {
try {
....
//Commit message if successful
resolveOffset(message.offset);
} catch(e) {
...
// Do not commit
}
发布于 2022-04-14 07:18:22
正如文档所述:您只能在consumer.run
之后调用consumer.commitOffsets
。您正在尝试从run方法中调用它,这就是它不适合您的原因。
请记住,在每条消息之后提交会增加网络流量。如果这是您愿意支付的价格,您可以将自动提交配置为通过将autoCommitThreshold
设置为1来处理。
https://stackoverflow.com/questions/71865355
复制相似问题