首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >更改模式注册表格式后,Kafka消费者消息无法接收

更改模式注册表格式后,Kafka消费者消息无法接收
EN

Stack Overflow用户
提问于 2019-07-05 05:01:04
回答 2查看 1.3K关注 0票数 0

我是卡夫卡的新手,能以某种方式经营卡夫卡、阿夫罗、消费者和制片人。制片人正在制作这条信息,我正在成功地在消费者中获得它。下面是我的生产者代码片段:

代码语言:javascript
复制
static async void AvroProducer()
{
    string bootstrapServers = "localhost:9092";
    string schemaRegistryUrl = "Production163:8081"; 
    string topicName = "player";
    string groupName = "avro-generic-example-group";


     var s = (RecordSchema)RecordSchema.Parse(
        @"{
            ""namespace"": ""Confluent.Kafka.Examples.AvroSpecific"",
            ""type"": ""record"",
            ""name"": ""User"",
            ""fields"": [
                {""name"": ""name"", ""type"": ""string""},
                {""name"": ""favorite_number"",  ""type"": [""int"", ""null""]},
                {""name"": ""favorite_color"", ""type"": [""string"", ""null""]}
            ]
          }"
    );

    using (var schemaRegistry = new CachedSchemaRegistryClient(new SchemaRegistryConfig { SchemaRegistryUrl = schemaRegistryUrl }))
    using (var producer =
        new ProducerBuilder<string, GenericRecord>(new ProducerConfig { BootstrapServers = bootstrapServers })
            .SetKeySerializer(new AsyncAvroSerializer<string>(schemaRegistry))
            .SetValueSerializer(new AsyncAvroSerializer<GenericRecord>(schemaRegistry))
            .Build())
    {
        Console.WriteLine($"{producer.Name} producing on {topicName}. Enter user names, q to exit.");

        int i = 0;
        string text;
        while ((text = Console.ReadLine()) != "q")
        {
            var record = new GenericRecord(s);
            record.Add("name", text);
            record.Add("favorite_number", i++);
            record.Add("favorite_color", "blue");

            producer.ProduceAsync(topicName, new Message<string, GenericRecord> { Key = text, Value = record })
                .ContinueWith(task => task.IsFaulted
                    ? $"error producing message: {task.Exception.Message}"
                    : $"produced to: {task.Result.TopicPartitionOffset}");
        }
    }
    Console.ReadLine();

}

正如您在上面的代码中所看到的,我正在使用一个记录方案,但是我正在尝试这个scehema:

代码语言:javascript
复制
//this is the new schema try
        var s = (RecordSchema)RecordSchema.Parse(
            @"{
                ""type"": ""record"",
                ""name"": ""TestingMsg"",
                ""doc"": ""Sample"",
                ""fields"": [
                  {
                   ""name"": ""key"",
                   ""type"": ""string""
                  },
                  {
                   ""name"": ""Time"",
                   ""type"": ""long""
                  },
                  {
                   ""name"": ""sourceSeconds"",
                   ""type"": ""long""
                  },
                  {
                   ""name"": ""serverT"",
                   ""type"": ""long""
                  },

                  {
                   ""name"": ""statusCode"",
                   ""type"": ""int""
                  }
                ]
                }"
            );

新的一个,我正在尝试使用,但它没有工作,因为我没有收到信息的消费者。这是消费者:

代码语言:javascript
复制
void KafkaReader(CancellationToken cancellationToken)
    {
        Debug.Log("kafka reader started. . .");
        // Set up your Kafka connection here.

        while (_keepThreadRunning)
        {
            using (CachedSchemaRegistryClient schemaRegistry = new CachedSchemaRegistryClient(new SchemaRegistryConfig { SchemaRegistryUrl = schemaRegistryUrl }))
            using (IConsumer<string, GenericRecord> consumer = new ConsumerBuilder<string, GenericRecord>(new ConsumerConfig { BootstrapServers = bootstrapServers, GroupId = groupName })
            //using (IConsumer<string, GenericRecord> consumer = new ConsumerBuilder<string, GenericRecord>(new ConsumerConfig { BootstrapServers = bootstrapServers})
                    .SetKeyDeserializer(new AsyncAvroDeserializer<string>(schemaRegistry).AsSyncOverAsync())
                    .SetValueDeserializer(new AsyncAvroDeserializer<GenericRecord>(schemaRegistry).AsSyncOverAsync())
                    .SetErrorHandler((_, e) => Debug.Log($"Error: {e.Reason}"))
                    .Build())
            {
                Debug.Log("subscribe" );
                consumer.Subscribe(topicName);


                while (true)
                {
                    ConsumeResult<string, GenericRecord> consumeResult = consumer.Consume(cancellationToken);//TimeSpan.FromMilliseconds(50000)//new TimeSpan(0,0,1)

                    _stringsReceived.Enqueue(consumeResult.Value.ToString());


                    if (consumeResult != null)
                    {
                        Debug.Log($"Key: {consumeResult.Message.Key}\nValue: {consumeResult.Value}");


                    }
                    else
                    {
                        Debug.Log("consumer Result is null");
                    }

                    //yield return new WaitForSeconds(1);
                }
            }


        }

        GetComponent<KafkaServerConfigUI>().KafkaDisconnected();

        // Disconnect and clean up your connection here.


    }

记住,我只是使用一个批处理文件运行默认的apache注册表。

代码语言:javascript
复制
D:\ApachKafka\confluent\confluent-5.2.1\bin\windows\schema-registry-start.bat D:\ApachKafka\confluent\confluent-5.2.1\etc\schema-registry\schema-registry.properties

我做错什么了?是否需要将架构注册到任何地方?

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2019-07-08 05:07:26

要进行任何更改或使用新模式,必须注册架构。我错过了这个东西,所以我没有收到消费者的信息。下面是帮助您注册模式的short python script

使用http://,,您必须提供模式注册表的URL (从script开始,而不仅仅是主机名和端口)、模式应该注册的主题以及模式的路径。

下面是我注册我的模式的方式

谢谢参考文献:https://aseigneurin.github.io/2018/08/02/kafka-tutorial-4-avro-and-schema-registry.html

票数 1
EN

Stack Overflow用户

发布于 2019-07-12 08:43:03

我知道你有答案。下面是我的建议,避免为每次模式更新运行python脚本。

您可以使用架构-注册表-ui。

在核壳中,模式-注册表-ui提供-探索和搜索模式- Avro演进兼容性检查-新模式注册- Avro +表模式视图-显示CURL命令。

如何得到它

代码语言:javascript
复制
git clone https://github.com/Landoop/schema-registry-ui.git
cd schema-registry-ui
npm install -g bower
npm install
http-server .

演示

代码语言:javascript
复制
http://schema-registry-ui.landoop.com/

或者码头图像是可用的。如果您可以选择许可证,尝试合流控制中心,它提供更多的选择。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/56896791

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档