首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >使用rdkafkacpp.h实现C++中的基本卡夫卡消费者

使用rdkafkacpp.h实现C++中的基本卡夫卡消费者
EN

Stack Overflow用户
提问于 2020-11-19 09:16:18
回答 1查看 1.2K关注 0票数 1

我对为C++使用rdkafkacpp.h Kafka库相当陌生。

我参考了一些可用的在线资源,介绍了如何使用标准的Kafka环境安装.bat文件在Windows上设置卡夫卡用户:https://kafka.apache.org/quickstart (我只是玩了一下,并且能够测试从生产者终端发送的消息是否出现在消费终端上)

我阅读了卡夫卡的基本理论,并浏览了以下文件:https://docs.confluent.io/4.0.0/clients/librdkafka

在基本阅读之后,我尝试使用内置的库函数编写示例代码,如下所示。但是,我在代码行中收到一个运行时异常,我试图在代码行中设置'conf‘的引导服务器属性。异常是访问冲突异常。在mykafka.exe: 0xC0000005:访问冲突读取位置0x0000008E5A900000中,0x00007FB878EC3F9 (msvcr120.dll)引发的异常。

我怀疑使用库函数的Kafka消费者‘实现’的顺序已经错过了一些步骤,或者必须重新排序。

我试图使实现保持简单,只是本地计算机上的一个生产者(localhost:9092),以及这个单一的使用者(mykafka.exe)。此外,一个终端上已经开始了一个主题“快速启动事件”。

任何帮助都是非常感谢的!

P.S: VisualStudio2019用于此代码开发。

代码语言:javascript
运行
复制
#include <iostream>
#include "..\include\librdkafka\rdkafkacpp.h"
#include <process.h> // to use exit()

using namespace std;
using namespace RdKafka;

int configAsKafkaConsumer()
{
    string host = "localhost:9092";
    string errstr;

    /* Set properties */
    cout << "Inside configAsKafkaConsumer()" << endl;

    // Create configuration objects
    RdKafka::Conf* conf = NULL;
    conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);

    if (conf != NULL)
    {
        cout << "conf != NULL" << endl;
    }

    // THIS IS WHERE I'M GETTING THE RUNTIME EXCEPTION!!
    if (conf->set("bootstrap.servers", "localhost:9092", errstr) != RdKafka::Conf::CONF_OK)
    {
        cerr << "Failed to set config of broker: " << errstr << endl;
        exit(1);
    }

    if (conf->set("client.id", host, errstr) != RdKafka::Conf::CONF_OK) 
    {
        cout << "client.id:" << endl;
        exit(1);
    }

    if (conf->set("group.id", "foo", errstr) != RdKafka::Conf::CONF_OK)
    {
        cout << "group.id:" << endl;
        exit(1);
    }

    // Create a consumer handle
    Consumer* ConsumerHandle = RdKafka::Consumer::create(conf, errstr);

    string errstr;
    string prodTopic = "quickstart-events"; 

    cout << "Creating topic handle" << endl;

    RdKafka::Conf* tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);

    // Create topic handle.
    RdKafka::Topic* topic = RdKafka::Topic::create(ConsumerHandle, prodTopic,
        tconf, errstr);

    if (!topic) 
    {
        std::cerr << "Failed to create topic: " << errstr << std::endl;
        exit(1);
    }

    cout << "Starting the consumer handle" << endl;
    ConsumerHandle->start(topic, 1, 0);

    cout << "Consuming the message" << endl;
    Message* msg = ConsumerHandle->consume(topic, 1, 10000);

    cout << "Message is: " << msg->payload() << endl;

}
EN

回答 1

Stack Overflow用户

发布于 2022-03-25 17:17:21

而不是

代码语言:javascript
运行
复制
RdKafka::Conf* conf = NULL;
    conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);

试试这个:

代码语言:javascript
运行
复制
RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);

希望这能解决这个问题。

票数 -1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/64908622

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档