Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
社区首页 >专栏 >白话kafka(二)

白话kafka(二)

作者头像
流川疯
发布于 2022-05-10 11:34:33
发布于 2022-05-10 11:34:33
44600
代码可运行
举报
运行总次数:0
代码可运行

前几天在白话kafka(一)中简单介绍了下kafka的大致构成,对几个关键词进行了解释说明,当然在阐述的过程中也存在很多的漏洞,还请大家多多包涵!最近公司在搞封闭,一直没继续写,刚好新的专题,需要搭建一套kafka集群,下面结合搭建过程,说说kafka搭建中可能遇到的一些问题。

环境:虚拟机三台  安装包:zookeeper,kafka,jdk(1.7及以上)

这里先搭建一个zookeeper的管理集群,设置三个节点,跟之后kafka集群的规模相同,此处要注意,zookeeper的节点要设置成单数,这是经验所得,也是为了保证zookeeper集群某台机器宕机之后的一个leader选举的一个保证。网上有一些更详细的资料大家感兴趣的话可以深入了解有关内容。下面就开始搭建本文中涉及的集群,模拟三节点的kafka集群。之前是搭建过单zk节点带3个kafkabroker的情况,用虚拟机搭建zk集群还有几个坑! 

!!!!!!敲黑板!!!!!! 

虚拟机搭建好了后,先把防火墙都干掉!

sudo systemctl stop firewalld.servicesudo systemctl disable firewalld.service

可能还会有些异常得具体问题具体分析了!  搭建过程可以参照:http://www.cnblogs.com/luotianshuai/p/5206662.html

但是kafka毕竟是个流处理平台,其特性是高吞吐量,但是加上业务逻辑后,总显得这一特点表现不出来,但是对于我们要求高精度的数据处理来说,习惯了之前的单条处理模式,批处理的过程也是只是表现在消息的读写上,而真正的批处理没有做到,因此针对kafka进行开发的时候,Spark之类的分布式框架仍然是很有必要的。 

环境搭建之后,大家可以使用客户端自带的工具对环境进行测试。  之前写过一个关于kafka基本操作的帖子,此处不再赘述。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
BlogUrl:http://blog.csdn.net/roczheng1990/article/details/54599095

今天主要介绍一下开发中常用的几个kafka的库:  ● C++开发者可以使用librdkafka:https://github.com/edenhill/librdkafka ● Java开发者也有Java版的API:import kafka.javaapi.*[maven项目中添加依赖,会自动下载jar包的]  ● Go开发者可以使用:https://github.com/Shopify/sarama 目前我接触的就是这三个了!  以librdkafka为例简单说明一下该接口的使用吧!  librdkafka的安装可以参照:http://blog.csdn.NET/roczheng1990/article/details/69390341 安装完成后会生成一个example文件夹:

可以通过rdkafka_example来体验一下kafka的生产、消费消息!参数中需要指定集群,所以要保证测试的kafka环境正常!  下面写个librdkafka的demo供大家参考吧!  使用这个库的时候,要引入rdkafkacpp.h、rdkafkacpp_int.h两个文件,依赖librdkafka.a、librdkafka++.a两个静态库。  本来想用codeblocks编一下来着,好像window下不太好实现。还是在Linux上写写吧!  头文件:rdkfktest.h

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
#include <vector>

#include <iostream>

#include <cstdlib>

#include <cstdio>

#include <string>

extern "C"

{    
#include "rdkafkacpp.h"#include "rdkafkacpp_int.h"

}using namespace std;

实现代码:rdkfktest.cpp

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
#include "rdkfktest.h"

int main()
{    
string errstr;
string topicstr = "test"; 
string partition = "0";
string message = "testinfo--testinfo";
string brokers_str = "10.XX.XX.XX:9092"

RdKafka::Conf global_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
RdKafka::Conf topic_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
global_conf->set("metadata.broker.list", brokers_str, errstr);
global_conf->set("api.version.request", "true", errstr);
RdKafka::Producer *producer = RdKafka::Producer::create(m_global_conf, errstr);
RdKafka::Topic *topic = RdKafka::Topic::create(producer, topic_str, topic_conf, errstr);
RdKafka::Metadata *metadata;

RdKafka::ErrorCode err = producer->metadata(false, topic, &metadata, 5000);
RdKafka::ErrorCode resp = producer->produce(topic, partition,   RdKafka::Producer::RK_MSG_COPY, const_cast<char *>(message.c_str()), message.size(), NULL, NULL);    
return 0;
}

消费者开发规则大致与此类似,也是先定义两个conf文件:  头文件:rdkfktest.h

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
#include <vector>

#include <iostream>

#include <cstdlib>

#include <cstdio>

#include <string>

extern "C"

{    
#include "rdkafkacpp.h"#include "rdkafkacpp_int.h"

}
using namespace std;
bool message_consume(RdKafka::Message * message, vector<string> v_msg);

代码实现:rdkfktest.cpp

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
#include "rdkfktest.h"

int main()
{    
string errstr; 
vector<string> vec_msg; 
string topicstr = "test"; 
string partition = "0";
string brokers_str = "10.XX.XX.XX:9092"

int64_t begin_offset = "latest";
 RdKafka::Conf global_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
 RdKafka::Conf topic_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
 global_conf->set("metadata.broker.list", brokers_str, errstr);
 global_conf->set("api.version.request", "true", errstr);
 RdKafka::Consumer *consumer = RdKafka::Consumer::create(m_global_conf, errstr);
 RdKafka::Topic *topic = RdKafka::Topic::create(consumer, topic_str, topic_conf, errstr);
 RdKafka::Metadata *metadata;

RdKafka::ErrorCode err = consumer->metadata(false, topic, &metadata, 5000);
 consumer->start(topic, partition, begin_offset);
RdKafka::Message *message = consumer->consume(topic, partition, 1000);
message_consume(msg, vec_msg); 
return 0;
}
bool message_consume(RdKafka::Message * message, vector<string> v_msg)
{
v_msg.push_back(static_cast<const char *>(message->payload()));
}
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2017-07-27,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
php 操作kafka的实践
安装kafka的php扩展 先安装rdkfka库文件 git clone https://github.com/edenhill/librdkafka.git cd librdkafka/ ./configure make sudo make install git clone https://github.com/arnaud-lb/php-rdkafka.git cd php-rdkafka phpize ./configure make all -j 5
友儿
2022/07/27
8930
kafka 静态消费组成员
kafka的消费者组机制一直很受诟病,原因是他的设计看起来是比较美好的,但是在实际使用过程中,由于各种业务本身的消费逻辑漫长或者用户的使用姿势问题,导致自身的消费者组经常陷入无限的重平衡中,而由于消费者组的STW机制也会导致同组内的其他消费者出现消费停止的情况。这种现象在越大的工业集群中越容易出现,所以为了改进这种现象,kafka从2.3版本开始提供了静态消费者组的机制。(云上ckafka可以购买专业版2.4 也可以支持本特性)
markzhang12
2021/02/08
1.5K0
安装 php-rdkafka 扩展并使用 Kafka 记录日志
最近项目的用户日志达到了上亿条,之前图方便,直接存储到MySQL,然后大数据的技术让我把这些日志都存储到Kafka
seth-shi
2023/12/18
7390
[喵咪KafKa(3)]PHP拓展See-KafKa
[喵咪KafKa(3)]PHP拓展See-KafKa 前言 (Simple 简单 easy 容易 expand 的拓展) KafKa是由Apache基金会维护的一个分布式订阅分发系统,KafKa它最初
喵了个咪233
2018/03/02
1.2K0
[喵咪KafKa(3)]PHP拓展See-KafKa
librdkafka 安装与使用
librdkafka 中自带 examples,cpp 目录下是 C++ 版本的包括两个 cpp 文件:consumer.cpp 和 producer.cpp,即生产者和消费者。修改其中 brokers 变量和 topic_str 变量的值。
aneutron
2022/08/19
2.4K0
confluent-kafka-go源码分析
confluent-kafka-go是已知的kafka 客户端中最快的,为什么呢?因为它非常轻量,通过cgo 对librdkafka做了一个封装,所以本质上运行的是一个c客户端。
golangLeetcode
2022/08/02
1.1K0
【Rust日报】2023-09-13 RustRover – JetBrains 推出的独立 Rust IDE
“什么时候会有 Rust IDE?” 这是用户经常提出的问题(八年了,你知道这八年我怎么过的吗?),现在,JetBrains 宣布这一天已经到来:它就是 JetBrains 独立 Rust IDE – RustRover。
MikeLoveRust
2023/09/26
4300
【Rust日报】2023-09-13 RustRover – JetBrains 推出的独立 Rust IDE
Kafka C++客户端库librdkafka笔记
librdkafka提供的异步的生产接口,异步的消费接口和同步的消息接口,没有同步的生产接口。
一见
2018/08/02
5.4K0
Kafka C++客户端库librdkafka笔记
Python Kafka客户端confluent-kafka学习总结
Confluent在GitHub上开发和维护的confluent-kafka-python,Apache Kafka®的一个python客户端,提供了一个与所有brokers>=v0.8的kafka 、Confluent Cloud和Confluent Platform兼容的高阶级生产者、消费者和AdminClient。
授客
2023/11/07
1.5K0
Kafka - 3.x Kafka命令行操作
这些参数是用于操作和管理Apache Kafka主题的命令行工具参数,通常用于kafka-topics.sh工具。以下是每个参数的描述:
小小工匠
2023/10/27
7810
Kafka - 3.x Kafka命令行操作
kafka 入门
kafka是一个分布式消息队列。具有高性能、持久化、多副本备份、横向扩展能力。生产者往队列里写消息,消费者从队列里取消息进行业务逻辑。一般在架构设计中起到解耦、削峰、异步处理的作用。 Kafka安装配置
句小芒
2022/12/29
4380
kafka 入门
Librdkafka对Kafka Message的封装和相关操作
struct rd_kafka_message_t struct rd_kafka_msg_t struct rd_kafka_msgq_t kafka message的协议格式可参考 官网 ---- struct rd_kafka_message_s 所在文件: src/rdkafka.h 生产的数据在application层调用接口后最终会将数据封装成这个结构, 从broker消费下来的数据回调给application层时也会封装成这个结构; 定义: typedef struct rd_kafka_m
扫帚的影子
2018/09/05
2.2K0
老雷编程技术分享之PHPer的kafka快速入门
Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。
老雷PHP全栈开发
2021/07/08
7670
golang源码分析:sarama kafka client(part I:生产者)
https://github.com/Shopify/sarama 是一个纯go实现的kafka客户端,是gopher学习kafka一个很好的资料。说实话sarama的代码组织很烂,密密麻麻一堆源码文件都在一个目录,让人无从下手,下面列出了一部分:
golangLeetcode
2022/08/02
5640
Apache Kafka 集群搭建与使用
继续之前的 Apache Kafka 部署与启动 ,单机的kafka的topic的创建,发送消息和接收消息,单播和多播消息,以及本次的集群搭建和使用。
zoro
2019/04/11
1K0
Librdkafka对kafka topic的封装
上一节我们讲了librdkakfa对topic-partition的封装, 任何一个partition都必须要属于一下topic; 我们这节就来分析一上librdkafka对topic的封装 ---- rd_kafka_itopic_s 所在文件: src/rdkafka_topic.h 这里还有一个类型rd_kafka_topic_t,定义:typedef struct rd_kafka_topic_s rd_kafka_topic_t;,这是个空定义没有现实, 其实就是rd_kafka_itopic_s
扫帚的影子
2018/09/05
1.4K0
[727]python操作kafka
pypi:https://pypi.org/project/kafka-python/ kafka-python:https://github.com/dpkp/kafka-python
周小董
2020/01/13
2.8K0
storm kafka 编程指南
一、原理及关键步骤介绍 storm中的storm-kafka组件提供了storm与kafka交互的所需的所有功能,请参考其官方文档:https://github.com/apache/storm/tree/master/external/storm-kafka#brokerhosts (一)使用storm-kafka的关键步骤 1、创建ZkHosts 当storm从kafka中读取某个topic的消息时,需要知道这个topic有多少个分区,以及这些分区放在哪个kafka节点(broker)上,ZkHosts
用户1177713
2018/02/24
2.1K0
kafka 上手指南:集群版
在消息系统中,涉及的概念都比较类似,初学消息系统,概念有时候理解不到位,需要读者反复的根据自己的学习进度回过头把基本概念捋清楚。
谢伟
2019/11/12
1.4K0
System|分布式|Kafka
Kafka是最前沿的开源MQ之一,阿里的RocketMQ也借鉴了不少Kafka的思想。2011年领英发了篇文章描述Kafka的设计,我这先学习初版。
朝闻君
2021/11/22
2110
System|分布式|Kafka
相关推荐
php 操作kafka的实践
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
查看详情【社区公告】 技术创作特训营有奖征文
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验