首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
社区首页 >问答首页 >如何在windows下通过c++代码在librdkafka中使用KAFKA的producer API

如何在windows下通过c++代码在librdkafka中使用KAFKA的producer API
EN

Stack Overflow用户
提问于 2017-06-10 18:58:47
回答 0查看 10.4K关注 0票数 3

我正在试着写一个客户端作为一个生产者。我按照示例创建了一个新的win32控制台项目。我发现,除非我在程序的末尾添加getline()函数,否则API对我不起作用。

如果我删除getline(),则produce(..)方法仍然返回成功的结果。但是,我在kafka-console-consumer的命令窗口中看不到任何响应

我有点困惑。是那么回事吗?不使用getline()如何发送消息?有人知道吗?

我找到了为什么它不能工作。删除生产者对象似乎太快,导致生产者无法向代理发送消息。

当我在producer方法和delete producer对象之间添加sleep 1000时,producer可以正确发送消息。

因此,问题是如何立即发送消息。在销毁生产者对象之前,如何确保这些消息已经完全发送?

如何解决这个问题,实际上我不喜欢在我的源代码中添加一些sleep()。

win10+vs2015+kafka_2.10-0.9.0.1+zookeeper-3.4.6+librdkafka请查看以下代码

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    // kafka_test_win32_nomfc.cpp 
//

#include "stdafx.h"
#include <iostream>
#include "librdkafka/rdkafkacpp.h"


int static producer_1()
{
    std::string brokers = "127.0.0.1";
    std::string errstr;
    std::string topic_str = "linli";
    std::string mode;
    std::string debug;
    int32_t partition = RdKafka::Topic::PARTITION_UA;
    int64_t start_offset = RdKafka::Topic::OFFSET_BEGINNING;
    bool do_conf_dump = false;
    int opt;
    // MyHashPartitionerCb hash_partitioner;
    int use_ccb = 0;

    /*
    * Create configuration objects
    */
    RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
    RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);

    conf->set("metadata.broker.list", brokers, errstr);

    RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr);
    if (!producer) {
        std::cerr << "Failed to create producer: " << errstr << std::endl;
        exit(1);
    }

    std::cout << "% Created producer " << producer->name() << std::endl;

    /*
    * Create topic handle.
    */
    RdKafka::Topic *topic = NULL;
    if (!topic_str.empty()) {
        topic = RdKafka::Topic::create(producer, topic_str, tconf, errstr);
        if (!topic) {
            std::cerr << "Failed to create topic: " << errstr << std::endl;
            exit(1);
        }
    }

    RdKafka::ErrorCode resp = producer->produce(topic, partition,
        RdKafka::Producer::RK_MSG_COPY /* Copy payload */,
        const_cast<char *>("hello worlf"), 11,
        NULL, NULL);

    delete topic;
    delete producer;
    return 0;
}


int static producer_2()
{
    std::string brokers = "127.0.0.1";
    std::string errstr;
    std::string topic_str = "linli";
    std::string mode;
    std::string debug;
    int32_t partition = RdKafka::Topic::PARTITION_UA;
    int64_t start_offset = RdKafka::Topic::OFFSET_BEGINNING;
    bool do_conf_dump = false;
    int opt;
    // MyHashPartitionerCb hash_partitioner;
    int use_ccb = 0;

    RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
    RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);

    conf->set("metadata.broker.list", brokers, errstr);

    RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr);
    if (!producer) {
        std::cerr << "Failed to create producer: " << errstr << std::endl;
        exit(1);
    }

    std::cout << "% Created producer " << producer->name() << std::endl;

    RdKafka::ErrorCode resp = producer->produce(topic_str, partition,
        RdKafka::Producer::RK_MSG_COPY /* Copy payload */,
        (void *)"hi", 2,
        NULL, 0, 0, NULL);



    std::string errs(RdKafka::err2str(resp));
    std::cout << errs << std::endl;
    //producer->poll(0);


    delete producer;

    return 0;
}


int main()
{

    producer_2();

    return 0;
}
EN

回答

页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/44476992

复制
相关文章
白话kafka(二)
前几天在白话kafka(一)中简单介绍了下kafka的大致构成,对几个关键词进行了解释说明,当然在阐述的过程中也存在很多的漏洞,还请大家多多包涵!最近公司在搞封闭,一直没继续写,刚好新的专题,需要搭建一套kafka集群,下面结合搭建过程,说说kafka搭建中可能遇到的一些问题。
流川疯
2022/05/10
4460
Kafka快速入门系列(9) | Kafka的Producer API操作
  Kafka的Producer发送消息采用的是异步发送的方式。在消息发送的过程中,涉及到了两个线程——main线程和Sender线程,以及一个线程共享变量——RecordAccumulator。main线程将消息发送给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到Kafka broker。
不温卜火
2020/10/28
7030
Kafka快速入门系列(9) | Kafka的Producer API操作
Kafka核心API——Producer生产者
在上文中介绍了AdminClient API的使用,现在我们已经知道如何在应用中通过API去管理Kafka了。但在大多应用开发中,我们最常面临的场景就是发送消息到Kafka,或者从Kafka中消费消息,也就是典型的生产/消费模式。而本文将要演示的就是如何使用Producer API将消息发送至Kafka中,使应用成为一个生产者。
端碗吹水
2020/09/23
7050
Kafka核心API——Producer生产者
0500-使用Python2访问Kerberos环境下的Kafka
Kafka支持多种客户端语言(C/C++、Go、Java、JMS、.NET、Python)。Fayson在前面多篇文章介绍了Java访问Kerberos和非Kerberos环境下的Kafka,参考《如何使用Java连接Kerberos的Kafka》。本篇文章Fayson主要介绍使用Python2访问Kerberos环境下的Kafka。在学习本篇文章内容前你还需要知道《如何通过Cloudera Manager为Kafka启用Kerberos及使用》。
Fayson
2019/11/27
1K0
Kafka kafka在windows下的安装与配置
1.测试环境.................................................................................................................
授客
2020/07/16
1.9K0
Kafka Producer
指定一组host:port键值对,用于连接kafka broker节点,producer可以通过该参数发现Kafka集群中的所有broker,因此可以指定部分节点。
shysh95
2020/03/19
6890
技术分享 | kafka的使用场景以及生态系统
kafka的使用场景 今天介绍一些关于Apache kafka 流行的使用场景。这些领域的概述 消息 kafka更好的替换传统的消息系统,消息系统被用于各种场景(解耦数据生产者,缓存未处理的消息,等),与大多数消息系统比较,kafka有更好的吞吐量,内置分区,副本和故障转移,这有利于处理大规模的消息。根据我们的经验,消息往往用于较低的吞吐量,但需要低的端到端延迟,并需要提供强大的耐用性的保证。 在这一领域的kafka比得上传统的消息系统,如的ActiveMQ或RabbitMQ的。 网站活动追踪 kafka
加米谷大数据
2018/04/02
3.7K0
php rdkafka_php rdkafka
在使用 PHP 处理 Kafka 消息的时候需要使用一个 PHP 的扩展 php-rdkafka 下面将介绍一下如何在 Linux / Mac OS 下安装 php-rdkafka
全栈程序员站长
2022/11/04
2.7K0
Kafka C++客户端库librdkafka笔记
librdkafka提供的异步的生产接口,异步的消费接口和同步的消息接口,没有同步的生产接口。
一见
2018/08/02
5.4K0
Kafka C++客户端库librdkafka笔记
confluent-kafka-go源码分析
confluent-kafka-go是已知的kafka 客户端中最快的,为什么呢?因为它非常轻量,通过cgo 对librdkafka做了一个封装,所以本质上运行的是一个c客户端。
golangLeetcode
2022/08/02
1.1K0
Kafka Producer Consumer
org.apache.kafka.clients.producer.KafkaProducer
java架构师
2019/01/28
5300
Kafka Producer Consumer
图解Kafka Producer中的消息缓存模型
在阅读本文之前, 希望你可以思考一下下面几个问题, 带着问题去阅读文章会获得更好的效果。
石臻臻的杂货铺[同名公众号]
2022/03/24
6410
图解Kafka Producer中的消息缓存模型
Kafka producer 解析
Kafka 作为一个消息系统,其中很大的一个用途就是作为业务上的解耦,而它实现的模式就是经典的生产者消费者模式。毫无疑问,就出现了producer、consumer。然后消息总得有地方存放啊,然后就有了具体的broker,那在broker上是如何进行组织和存放的,就出现了partition。对应的为保证消息不丢失,也就出现了消息备份组这样一个概念(ISR,in-sync replica)再加上消息的topic也就形成了,kafka的 topic-partition-message 的三级负载结构。到这里Kafka中比较核心的几个概念就都有了,下面开始详细介绍。
邹志全
2019/07/31
6960
Librdkafka对kafka topic的封装
上一节我们讲了librdkakfa对topic-partition的封装, 任何一个partition都必须要属于一下topic; 我们这节就来分析一上librdkafka对topic的封装 ---- rd_kafka_itopic_s 所在文件: src/rdkafka_topic.h 这里还有一个类型rd_kafka_topic_t,定义:typedef struct rd_kafka_topic_s rd_kafka_topic_t;,这是个空定义没有现实, 其实就是rd_kafka_itopic_s
扫帚的影子
2018/09/05
1.4K0
如何在Windows下使用NCL
NCL是气象和海洋绘图中常用的软件,其专门为处理气象和海洋数据设计,因此在处理两类数据有着较高的效率。同时,随着NCL多个版本的优化,NCL提供了大量优秀的函数,来帮助使用者快速熟悉并使用气象和海洋中常用的统计方法。虽然现在NCL在向python转移,但是短时间内NCL依旧不会过时。
郭好奇同学
2020/10/15
4.2K0
在windows下通过telnet连接virtualbox下的linux
之前,在virtualbox安装了fedora 13,今天突发奇想,想通过客户机连接里头的虚拟机,或者,通过虚拟机连接客户机。
williamwong
2018/07/24
3.3K0
在windows下通过telnet连接virtualbox下的linux
KafkaBridge - Kafka Client SDK 开源啦~~~
KafkaBridge 封装了对Kafka集群的读写操作,接口极少,简单易用,稳定可靠,支持c++/c、php、python、golang等多种语言,并特别针对php-fpm场景中作了长连接复用的优化,已在360公司内部广泛使用。
扫帚的影子
2018/10/11
9400
Pytorch的C++端(libtorch)在Windows中的使用
填一个之前的坑啊,本篇的姊妹篇——利用Pytorch的C++前端(libtorch)读取预训练权重并进行预测 这篇文章中已经说明了如何在Ubuntu系统中使用libtorch做预测,当初也有朋友问我如何在Windows之下尝试使用libtorch,当时因为时间关系没有去看,后来就给忘了…现在有时间了当然要尝试一下~
老潘
2023/10/19
1.1K0
Pytorch的C++端(libtorch)在Windows中的使用
Kafka源码系列之通过源码分析Producer性能瓶颈
Kafka源码系列之通过源码分析Producer性能瓶颈 本文,kafka源码是以0.8.2.2,原因是浪尖一直没对kafka系统进行升级。主要是java的kafka生产者源码,Broker接收到producer请求之后处理的相关源码。估计源码内容是比较多的,只给出大致逻辑,主类和函数名称。本文的目的是让大家,彻底了解发送消息到kafka的过程及如何对producer进行调优。 一,kafka的producer基本介绍及主要类 1,基本介绍 Kafka的Producer,主要负责将消息发送给kafka集群。
Spark学习技巧
2018/01/30
1.2K0
Kafka源码系列之通过源码分析Producer性能瓶颈
Spring在无RedirectAttributes的情况下(如Interceptor中)使用Flash scope
判断逻辑很简单,但是重定向的时候需要前台有消息提示,如果是在Controller中,可以在方法上注入RedirectAttributes参数,但是Interceptor中默认没有这个参数,那么我们如何实现RedirectAttributes的flashMessage功能呢?
飞奔去旅行
2019/06/13
5.4K0

相似问题

Kafka producer在windows中不工作

24

在WSL上运行Kafka,在windows上制作producer

171

在pentaho中设置kafka producer

13

SSL confluent-kafka-dotnet librdkafka kafka SSL

12

在oracle DB中使用Kafka Producer

15
添加站长 进交流群

领取专属 10元无门槛券

AI混元助手 在线答疑

扫码加入开发者社群
关注 腾讯云开发者公众号

洞察 腾讯核心技术

剖析业界实践案例

扫码关注腾讯云开发者公众号
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
查看详情【社区公告】 技术创作特训营有奖征文