首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >kafka消费者.net 'Protocol message end-group标记与预期的标签不匹配。

kafka消费者.net 'Protocol message end-group标记与预期的标签不匹配。
EN

Stack Overflow用户
提问于 2022-02-26 12:00:20
回答 1查看 619关注 0票数 1

我正试着阅读卡夫卡的数据,如你所见:

代码语言:javascript
运行
复制
var config = new ConsumerConfig
{
    BootstrapServers = ""*******,
    GroupId = Guid.NewGuid().ToString(),
    AutoOffsetReset = AutoOffsetReset.Earliest
};
MessageParser<AdminIpoChange> parser = new(() => new AdminIpoChange());
using (var consumer = new ConsumerBuilder<Ignore, byte[]>(config).Build())
{

    consumer.Subscribe("AdminIpoChange");

    while (true)
    {
        AdminIpoChange item = new AdminIpoChange();
            var cr = consumer.Consume();
    
            item = parser.ParseFrom(new ReadOnlySpan<byte>(cr.Message.Value).ToArray());
    }

    consumer.Close();
}

我使用google protobuf发送和接收数据,.This代码在解析器行中返回此错误:

代码语言:javascript
运行
复制
 KafkaConsumer.ConsumeAsync: Protocol message end-group tag did not match expected tag.
Google.Protobuf.InvalidProtocolBufferException: Protocol message end-group tag did not match expected tag.
   at Google.Protobuf.ParsingPrimitivesMessages.CheckLastTagWas(ParserInternalState& state, UInt32 expectedTag)
   at Google.Protobuf.ParsingPrimitivesMessages.ReadGroup(ParseContext& ctx, Int32 fieldNumber, UnknownFieldSet set)
   at Google.Protobuf.UnknownFieldSet.MergeFieldFrom(ParseContext& ctx)
   at Google.Protobuf.UnknownFieldSet.MergeFieldFrom(UnknownFieldSet unknownFields, ParseContext& ctx)
   at AdminIpoChange.pb::Google.Protobuf.IBufferMessage.InternalMergeFrom(ParseContext& input) in D:\MofidProject\domain\obj\Debug\net6.0\Protos\Rlc\AdminIpoChange.cs:line 213
   at Google.Protobuf.ParsingPrimitivesMessages.ReadRawMessage(ParseContext& ctx, IMessage message)
   at Google.Protobuf.CodedInputStream.ReadRawMessage(IMessage message)
   at AdminIpoChange.MergeFrom(CodedInputStream input) in D:\MofidProject\domain\obj\Debug\net6.0\Protos\Rlc\AdminIpoChange.cs:line 188
   at Google.Protobuf.MessageExtensions.MergeFrom(IMessage message, Byte[] data, Boolean discardUnknownFields, ExtensionRegistry registry)
   at Google.Protobuf.MessageParser`1.ParseFrom(Byte[] data)
   at infrastructure.Queue.Kafka.KafkaConsumer.ConsumeCarefully[T](Func`2 consumeFunc, String topic, String group) in D:\MofidProject\infrastructure\Queue\Kafka\KafkaConsumer.cs:line 168

D:\MofidProject\mts.consumer.plus\bin\Debug\net6.0\mts.consumer.plus.exe (process 15516) exited with code -1001.
To automatically close the console when debugging stops, enable Tools->Options->Debugging->Automatically close the console when debugging stops.'

更新:

我的样本数据来自卡夫卡:

代码语言:javascript
运行
复制
 - {"SymbolName":"\u0641\u062F\u0631","SymbolIsin":"IRo3pzAZ0002","Date":"1400/12/15","Time":"08:00-12:00","MinPrice":17726,"MaxPrice":21666,"Share":1000,"Show":false,"Operation":0,"Id":"100d8e0b54154e9d902054bff193e875","CreateDateTime":"2022-02-26T09:47:20.0134757+03:30"}

我的rlc模型:

代码语言:javascript
运行
复制
syntax = "proto3";

message AdminIpoChange
{
 string Id =1;
 string SymbolName =2;
 string SymbolIsin =3;
 string Date =4;
 string Time=5;
 double MinPrice =6;
 double MaxPrice =7;
 int32 Share =8;
 bool Show =9;
 int32 Operation =10;
 string  CreateDateTime=11;
enum AdminIpoOperation
{
    Add = 0;
    Edit = 1;
    Delete = 2;
}

}

我的数据以字节为单位:

代码语言:javascript
运行
复制
7B 22 53 79 6D 62 6F 6C 4E 61 6D 65 22 3A 22 5C 75 30 36 34 31 5C 75 30 36 32 46 5C 75 30 
36 33 31 22 2C 22 53 79 6D 62 6F 6C 49 73 69 6E 22 3A 22 49 52 6F 33 70 7A 41 5A 30 30 30 
32 22 2C 22 44 61 74 65 22 3A 22 31 34 30 30 2F 31 32 2F 31 35 22 2C 22 54 69 6D 65 22 3A 
22 30 38 3A 30 30 2D 31 32 3A 30 30 22 2C 22 4D 69 6E 50 72 69 63 65 22 3A 31 37 37 32 36 
2C 22 4D 61 78 50 72 69 63 65 22 3A 32 31 36 36 36 2C 22 53 68 61 72 65 22 3A 31 30 30 30 
2C 22 53 68 6F 77 22 3A 66 61 6C 73 65 2C 22 4F 70 65 72 61 74 69 6F 6E 22 3A 30 2C 22 49 
64 22 3A 22 31 30 30 64 38 65 30 62 35 34 31 35 34 65 39 64 39 30 32 30 35 34 62 66 66 31 
39 33 65 38 37 35 22 2C 22 43 72 65 61 74 65 44 61 74 65 54 69 6D 65 22 3A 22 32 30 32 32 
2D 30 32 2D 32 36 54 30 39 3A 34 37 3A 32 30 2E 30 31 33 34 37 35 37 2B 30 33 3A 33 30 22 
7D 
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-02-28 09:57:39

数据绝对不是protobuf二进制文件;字节0启动一个字段号为15的组;在这个组中是:

  • 字段4、string
  • 字段13、fixed32
  • 字段6、varint
  • 字段12、fixed32
  • 字段6、varintH 210f 211

在此之后(在字节151处),将遇到一个带有字段号6的端组令牌。

关于这一点有很多惊人的地方:

  1. 您的模式不使用组(实际上,仅在文档中很难找到组的存在),所以.所有这些看起来都不需要right
  2. end-group标记来匹配最后一个开始组字段号,它在单个级别内的doesn't
  3. fields通常是(虽然作为“应该”,而不是“必须”)按照数字顺序编写的
  4. (没有字段12或13 declared
  5. your字段6)是错误的类型--我们期望fixed64在这里,但是有varint

所以:毫无疑问:这些数据是.不是你期望的那样。它肯定不是有效的原型二进制文件。在不知道数据是如何存储的情况下,我们所能做的只是猜测,但凭直觉:让我们尝试将其解码为UTF8,看看它是什么样子:

代码语言:javascript
运行
复制
{"SymbolName":"\u0641\u062F\u0631","SymbolIsin":"IRo3pzAZ0002","Date":"1400/12/15","Time":"08:00-12:00","MinPrice":17726,"MaxPrice":21666,"Share":1000,"Show":false,"Operation":0,"Id":"100d8e0b54154e9d902054bff193e875","CreateDateTime":"2022-02-26T09:47:20.0134757+03:30"}

或(格式化)

代码语言:javascript
运行
复制
{ 
 "SymbolName":"\u0641\u062F\u0631",
  "SymbolIsin":"IRo3pzAZ0002",
  "Date":"1400/12/15",
  "Time":"08:00-12:00", 
  "MinPrice":17726,
  "MaxPrice":21666,
  "Share":1000,
  "Show":false,
  "Operation":0,
  "Id":"100d8e0b54154e9d902054bff193e875",
  "CreateDateTime":"2022-02-26T09:47:20.0134757+03:30"
}

糟了!您已经将数据编写为JSON,并试图将其解码为二进制protobuf。将其解码为JSON,您应该会没事的。如果这是用protobuf编写的:用protobuf对其进行解码。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/71276445

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档