首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >如何在Kafka中对同一组ID使用不同编程语言中的多个使用者

如何在Kafka中对同一组ID使用不同编程语言中的多个使用者
EN

Stack Overflow用户
提问于 2019-12-13 12:01:06
回答 2查看 1.7K关注 0票数 0

我想用Kafka (多种编程语言)为一个主题创建一个负载平衡。所以我做了下面的事。

  1. 创建了一个具有4个分区的主题。
  2. 在C#中创建一个生产者(每秒生成消息)
  3. 在consumer1(消费者组: testConsumerGrp)中创建一个使用者(TestConsumerGrp)
  4. 在NodeJs (消费者组:consumer2)中创建多一个使用者( testConsumerGrp)

我在C#中使用了confluent.kafka,在NodeJs中使用了卡夫卡伊

我打开制片人让它继续运行。

  1. 如果我只运行C#使用者,它就能正常工作。
  2. 如果我只运行NodeJs使用者,它可以正常工作。
  3. 如果我运行多个C#使用者(只有c#和少于4个实例),它可以正常工作。
  4. 如果我运行多个NodeJs使用者(只有NodeJs和少于4个实例),它可以正常工作。
  5. 如果我运行一个C#和一个NodeJs使用者,那么我将得到Inconsistent group protocol错误

我们不能为同一个使用者组使用两种编程语言吗?

生成器在C# - windows窗体

代码语言:javascript
运行
复制
using System;
using System.Collections.Generic;
using System.Windows.Forms;
using Confluent.Kafka;

namespace KafkaProducer
{
    public partial class frmProducer : Form
    {
        const string TOPIC = "testTopic";
        private IProducer<Null, string> pBuilder;

        public frmProducer()
        {
            InitializeComponent();
        }

        private async void timer1_Tick(object sender, EventArgs e)
        {
            try
            {
                // instead of sending some value, we send current DateTime as value
                var dr = await pBuilder.ProduceAsync(TOPIC, new Message<Null, string> { Value = DateTime.Now.ToLongTimeString() });

                // once done, add the value into list box
                listBox1.Items.Add($"{dr.Value} - Sent to Partition: {dr.Partition.Value}");
                listBox1.TopIndex = listBox1.Items.Count - 1;
            }
            catch (ProduceException<Null, string> err)
            {
                MessageBox.Show($"Failed to deliver msg: {err.Error.Reason}");
            }
        }

        private void frmProducer_Load(object sender, EventArgs e)
        {
            ProducerConfig config = new ProducerConfig { BootstrapServers = "localhost:9092" };
            pBuilder = new ProducerBuilder<Null, string>(config).Build();

            timer1.Enabled = true;
        }

        private void frmProducer_FormClosing(object sender, FormClosingEventArgs e)
        {
            timer1.Enabled = false;
            pBuilder.Dispose();
        }
    }
}

使用者在C# - windows表单

代码语言:javascript
运行
复制
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Windows.Forms;
using Confluent.Kafka;

namespace KafkaConsumer
{
    public partial class frmConsumer : Form
    {
        CancellationTokenSource cts = new CancellationTokenSource();

        public frmConsumer()
        {
            InitializeComponent();
        }

        private void StartListen()
        {
            var conf = new ConsumerConfig
            {
                GroupId = "test-consumer-group",
                BootstrapServers = "localhost:9092",
                AutoOffsetReset = AutoOffsetReset.Earliest
            };

            using (var c = new ConsumerBuilder<Ignore, string>(conf).Build())
            {
                c.Subscribe("testTopic");

                //TopicPartitionTimestamp tpts = new TopicPartitionTimestamp("testTopic", new Partition(), Timestamp.  )
                //c.OffsetsForTimes()

                try
                {
                    while (true)
                    {
                        try
                        {
                            var cr = c.Consume(cts.Token);

                            // Adding the consumed values into the UI
                            listBox1.Invoke(new Action(() =>
                            {
                                listBox1.Items.Add($"{cr.Value} - from Partition: {cr.Partition.Value}" );
                                listBox1.TopIndex = listBox1.Items.Count - 1;
                            }));
                        }
                        catch (ConsumeException err)
                        {
                            MessageBox.Show($"Error occured: {err.Error.Reason}");
                        }
                    }
                }
                catch (OperationCanceledException)
                {
                    // Ensure the consumer leaves the group cleanly and final offsets are committed.
                    c.Close();
                }
            }
        }

        private void Form1_FormClosing(object sender, FormClosingEventArgs e)
        {
            cts.Cancel();
        }

        private async void frmConsumer_Load(object sender, EventArgs e)
        {
            await Task.Run(() => StartListen());
        }
    }
}

NodeJs中的消费者

代码语言:javascript
运行
复制
const { Kafka } = require("kafkajs");

const kafka = new Kafka({
  clientId: 'my-app',
  brokers: ["localhost:9092"]
});

const consumer = kafka.consumer({ groupId: "test-consumer-group" });

const run = async () => {
  // Consuming
  await consumer.connect();
  await consumer.subscribe({ topic: "testTopic", fromBeginning: false });

  await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
      console.log(message.value.toString() + " - from Partition " + partition);
    }
  });
};

run().catch(console.error);

如果我同时运行C#和NodeJs消费者,那么就会得到Inconsistent group protocol错误。

如何在Kafka?中使用来自不同编程语言的多个使用者

EN

Stack Overflow用户

发布于 2019-12-15 02:30:39

短答案

这可能不像你想的那样与不同的语言有多大关系。这是由于两个消费者客户端(和他们的库)协议上的不同而发生的。

尝试在两个使用者客户端中设置以下属性:

partition.assignment.strategy = round-robin

注意:我刚刚提供了通用属性,所以您需要查看客户端的特定语言版本。您甚至可以将其设置为range,但保持其一致性。

的解释是这样的,

卡夫卡的维基上阅读协议,找出Inconsistent group protocol的根本原因--这是在以下情况下返回的:

  1. 有一个具有活动/正在运行的消费者的活跃消费者组。
  2. 一个新的使用者到达,使用与当前组的协议类型(或一组协议)不兼容的协议类型加入该组。

现在,ConsumerGroupProtocolMetadata中可能有不同的方面,但是在您正在使用的客户端的库中,其中一个方面似乎是不同的,那就是partition.assignment.strategy

dotnet客户端是围绕拉夫卡的包装器,将上述属性的值默认为range。这是参考文献

何地as

卡夫卡伊文档默认为round-robin,因此导致了不一致性。

希望这能有所帮助。

票数 2
EN
查看全部 2 条回答
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/59322217

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档