我是卡夫卡的新手。我对卡夫卡的信息格式有点困惑。我测试了一个KafkaJS消费者。
const run = async () => {
await kafkaClient.consumer.subscribe({ topic: 'mytopic', fromBeginning: true })
await kafkaClient.consumer.run({
eachBatchAutoResolve: false,
eachBatch: async ({ batch, resolveOffset, heartbeat, isRunning, isStale }) => {
for (let message of batch.messages) {
if (!isRunning() || isStale()) break
processMessage(message)
resolveOffset(message.offset)
}
},
})
}
我使用console.log(message)
查看消息格式,如下所示
{
magicByte: 2,
attributes: 0,
timestamp: '540669',
offset: '601953',
key: <Buffer 39 63 37 23>,
value: <Buffer 7b 65 32 65 37 38 ... 555 more bytes>,
headers: {
'myheader': <Buffer 61 6f>,
},
‧‧‧‧‧‧
}
我还在本地主机上尝试了一个用spring引导构建的消费者。由于没有制片人,我用邮递员向卡夫卡发送消息。spring使用者收到的消息如下所示
{
body: 'this is body',
clientIp: 'this is IP
}
'this is body'
是我从邮递员那里发送的内容。clientIp
的值是我的ip。
我注意到这是message.value.toString()
在KafkaJS中返回的内容。他们为什么不一样?如果消费者连接到相同的卡夫卡主题,他们会得到不同的信息吗?
如果我想构建一个java使用者来接收和使用与KafkaJS使用者相同的消息格式,我应该尝试什么?
发布于 2022-06-01 12:09:15
console.log(message)
正在向您展示从Kafka读取的全部反序列化消息,其中包括键、值和其他元数据。该对象中的数据结构是如何与语言相关的(例如,Java或Go将拥有自己的类或结构,这些类或结构将包含大部分相同的数据,但不一定以相同的方式构造)。
即使每种语言都有不同的对象来表示反序列化的Kafka消息,元数据(标头、时间戳.)、消息键和消息值总是存在的,它们的结构可能不同,但其值应该总是通过某种方法或属性可用。在序列化时,它始终具有与Kafka协议定义的相同的字节表示形式。
https://stackoverflow.com/questions/72459520
复制相似问题