我对为C++使用rdkafkacpp.h Kafka库相当陌生。
我参考了一些可用的在线资源,介绍了如何使用标准的Kafka环境安装.bat文件在Windows上设置卡夫卡用户:https://kafka.apache.org/quickstart (我只是玩了一下,并且能够测试从生产者终端发送的消息是否出现在消费终端上)
我阅读了卡夫卡的基本理论,并浏览了以下文件:https://docs.confluent.io/4.0.0/clients/librdkafka。
在基本阅读之后,我尝试使用内置的库函数编写示例代码,如下所示。但是,我在代码行中收到一个运行时异常,我试图在代码行中设置'conf‘的引导服务器属性。异常是访问冲突异常。在mykafka.exe: 0xC0000005:访问冲突读取位置0x0000008E5A900000中,0x00007FB878EC3F9 (msvcr120.dll)引发的异常。
我怀疑使用库函数的Kafka消费者‘实现’的顺序已经错过了一些步骤,或者必须重新排序。
我试图使实现保持简单,只是本地计算机上的一个生产者(localhost:9092),以及这个单一的使用者(mykafka.exe)。此外,一个终端上已经开始了一个主题“快速启动事件”。
任何帮助都是非常感谢的!
P.S: VisualStudio2019用于此代码开发。
#include <iostream>
#include "..\include\librdkafka\rdkafkacpp.h"
#include <process.h> // to use exit()
using namespace std;
using namespace RdKafka;
int configAsKafkaConsumer()
{
string host = "localhost:9092";
string errstr;
/* Set properties */
cout << "Inside configAsKafkaConsumer()" << endl;
// Create configuration objects
RdKafka::Conf* conf = NULL;
conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
if (conf != NULL)
{
cout << "conf != NULL" << endl;
}
// THIS IS WHERE I'M GETTING THE RUNTIME EXCEPTION!!
if (conf->set("bootstrap.servers", "localhost:9092", errstr) != RdKafka::Conf::CONF_OK)
{
cerr << "Failed to set config of broker: " << errstr << endl;
exit(1);
}
if (conf->set("client.id", host, errstr) != RdKafka::Conf::CONF_OK)
{
cout << "client.id:" << endl;
exit(1);
}
if (conf->set("group.id", "foo", errstr) != RdKafka::Conf::CONF_OK)
{
cout << "group.id:" << endl;
exit(1);
}
// Create a consumer handle
Consumer* ConsumerHandle = RdKafka::Consumer::create(conf, errstr);
string errstr;
string prodTopic = "quickstart-events";
cout << "Creating topic handle" << endl;
RdKafka::Conf* tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
// Create topic handle.
RdKafka::Topic* topic = RdKafka::Topic::create(ConsumerHandle, prodTopic,
tconf, errstr);
if (!topic)
{
std::cerr << "Failed to create topic: " << errstr << std::endl;
exit(1);
}
cout << "Starting the consumer handle" << endl;
ConsumerHandle->start(topic, 1, 0);
cout << "Consuming the message" << endl;
Message* msg = ConsumerHandle->consume(topic, 1, 10000);
cout << "Message is: " << msg->payload() << endl;
}
发布于 2022-03-25 17:17:21
而不是
RdKafka::Conf* conf = NULL;
conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
试试这个:
RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
希望这能解决这个问题。
https://stackoverflow.com/questions/64908622
复制相似问题