我想用Kafka (多种编程语言)为一个主题创建一个负载平衡。所以我做了下面的事。
我在C#中使用了confluent.kafka,在NodeJs中使用了卡夫卡伊。
我打开制片人让它继续运行。
Inconsistent group protocol
错误我们不能为同一个使用者组使用两种编程语言吗?
生成器在C# - windows窗体中
using System;
using System.Collections.Generic;
using System.Windows.Forms;
using Confluent.Kafka;
namespace KafkaProducer
{
public partial class frmProducer : Form
{
const string TOPIC = "testTopic";
private IProducer<Null, string> pBuilder;
public frmProducer()
{
InitializeComponent();
}
private async void timer1_Tick(object sender, EventArgs e)
{
try
{
// instead of sending some value, we send current DateTime as value
var dr = await pBuilder.ProduceAsync(TOPIC, new Message<Null, string> { Value = DateTime.Now.ToLongTimeString() });
// once done, add the value into list box
listBox1.Items.Add($"{dr.Value} - Sent to Partition: {dr.Partition.Value}");
listBox1.TopIndex = listBox1.Items.Count - 1;
}
catch (ProduceException<Null, string> err)
{
MessageBox.Show($"Failed to deliver msg: {err.Error.Reason}");
}
}
private void frmProducer_Load(object sender, EventArgs e)
{
ProducerConfig config = new ProducerConfig { BootstrapServers = "localhost:9092" };
pBuilder = new ProducerBuilder<Null, string>(config).Build();
timer1.Enabled = true;
}
private void frmProducer_FormClosing(object sender, FormClosingEventArgs e)
{
timer1.Enabled = false;
pBuilder.Dispose();
}
}
}
使用者在C# - windows表单
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Windows.Forms;
using Confluent.Kafka;
namespace KafkaConsumer
{
public partial class frmConsumer : Form
{
CancellationTokenSource cts = new CancellationTokenSource();
public frmConsumer()
{
InitializeComponent();
}
private void StartListen()
{
var conf = new ConsumerConfig
{
GroupId = "test-consumer-group",
BootstrapServers = "localhost:9092",
AutoOffsetReset = AutoOffsetReset.Earliest
};
using (var c = new ConsumerBuilder<Ignore, string>(conf).Build())
{
c.Subscribe("testTopic");
//TopicPartitionTimestamp tpts = new TopicPartitionTimestamp("testTopic", new Partition(), Timestamp. )
//c.OffsetsForTimes()
try
{
while (true)
{
try
{
var cr = c.Consume(cts.Token);
// Adding the consumed values into the UI
listBox1.Invoke(new Action(() =>
{
listBox1.Items.Add($"{cr.Value} - from Partition: {cr.Partition.Value}" );
listBox1.TopIndex = listBox1.Items.Count - 1;
}));
}
catch (ConsumeException err)
{
MessageBox.Show($"Error occured: {err.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
// Ensure the consumer leaves the group cleanly and final offsets are committed.
c.Close();
}
}
}
private void Form1_FormClosing(object sender, FormClosingEventArgs e)
{
cts.Cancel();
}
private async void frmConsumer_Load(object sender, EventArgs e)
{
await Task.Run(() => StartListen());
}
}
}
NodeJs中的消费者
const { Kafka } = require("kafkajs");
const kafka = new Kafka({
clientId: 'my-app',
brokers: ["localhost:9092"]
});
const consumer = kafka.consumer({ groupId: "test-consumer-group" });
const run = async () => {
// Consuming
await consumer.connect();
await consumer.subscribe({ topic: "testTopic", fromBeginning: false });
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log(message.value.toString() + " - from Partition " + partition);
}
});
};
run().catch(console.error);
如果我同时运行C#和NodeJs消费者,那么就会得到Inconsistent group protocol
错误。
如何在Kafka?中使用来自不同编程语言的多个使用者
发布于 2019-12-15 02:30:39
短答案
这可能不像你想的那样与不同的语言有多大关系。这是由于两个消费者客户端(和他们的库)协议上的不同而发生的。
尝试在两个使用者客户端中设置以下属性:
partition.assignment.strategy = round-robin
注意:我刚刚提供了通用属性,所以您需要查看客户端的特定语言版本。您甚至可以将其设置为range
,但保持其一致性。
的解释是这样的,
在卡夫卡的维基上阅读协议,找出Inconsistent group protocol
的根本原因--这是在以下情况下返回的:
现在,ConsumerGroupProtocolMetadata
中可能有不同的方面,但是在您正在使用的客户端的库中,其中一个方面似乎是不同的,那就是partition.assignment.strategy
。
dotnet客户端是围绕拉夫卡的包装器,将上述属性的值默认为range
。这是参考文献。
何地as
卡夫卡伊与文档默认为round-robin
,因此导致了不一致性。
希望这能有所帮助。
https://stackoverflow.com/questions/59322217
复制相似问题