本篇我们将介绍kafka的一些概念及简单的使用案例。
一. 简介
在kafka0.10之前,kafka仅仅是一个消息系统,负责消息的发送与接收,解决应用解耦、异步消息、高流量问题。但在0.10之后kafka提供了流处理和连接器的功能,它也从分布消息系统逐渐转变为流式处理平台。
一个流处理平台应该包含以下单个主要的特性:
(1). 发布订阅消息事件流的记录,类似于消息队列或者企业的消息系统。
(2). 存储事件流数据的节点具有故障容错的特点。
(3). 能够实时处理流式事件。
相关概念:
Kafka运行在可跨越多个数据中心的由一个或多个节点组成的服务器的集群上
Kafka存储记录的目录称为topic,topic是个逻辑概念
每个记录包含了key,value,timestamp
生产者:应用程序发布事件流到Kafka的一个或多个主题
消费者:应用程序订阅Kafka的一个或多个主题,并处理事件流
连接器:将Kafka主题和已有数据源进行连接,数据可以互相导人和导出
流处理:从Kafka主题消费输入流,经过处理后,输出到主题
broker: 一个broker就是一个节点
偏移量:每个分区是一个队列,队列是有序的,队列的下标就是偏移量
分区:Kafka的每个主题都有自己的分区日志,分区日志中存储了topic相关的信息,如下图所示,一个topic有三个分区,每个分区都是该topic的消息,正式因为有了分区才使得kafka具有高吞吐量,高并发。
副本分区:每个分区都有对应的备份,这个备份就称为副本,副本是基于分区而言的,不是针对topic或者主机而言
主副本分区:负责读写消息的分区
ISR:副本同步列表,在下图中,副本分区构成了ISR列表
为了更好的理解,我们画了下图,表示了kafka主要概念之间的关系。
二. 命令行实践
注:我们以下操作都是基于Ambari2.7.0&HDP3.0的。
1. 创建主题
我们创建一个名为beardata的主题,replication-factor备份因子为3,分区数为2,zookeeper参数指定zookeeper地址和端口
/usr/hdp/3.0.0.0-1634/kafka/bin/kafka-topics.sh --create --zookeeper bigdata000:2181 --replication-factor 3 --partitions 2 --topic beardata
2. 查看主题
/usr/hdp/3.0.0.0-1634/kafka/bin/kafka-topics.sh --list --zookeeper bigdata000:2181
3. 查看主题详情
/usr/hdp/3.0.0.0-1634/kafka/bin/kafka-topics.sh --describe --topic beardata --zookeeper bigdata000:2181
topic为beardata,分区数为2,分别是partition0和partition1,副本数为3,Leader表示分区主副本,Replicas表示分区副本节点,Isr表示正在同步的副本节点,它是Replicas的子集
4. 发送消息到主题
broker-list表示kafka的broker,即安装kafka的主机和端口
/usr/hdp/3.0.0.0-1634/kafka/bin/kafka-console-producer.sh --broker-list bigdata000:6667,bigdata002:6667 --topic beardata
5. 消费消息
/usr/hdp/3.0.0.0-1634/kafka/bin/kafka-console-consumer.sh --topic beardata --bootstrap-server bigdata000:2181
当我们使用上述命令时,并没有收到消息,我们想要接收之前已经发送的消息,需要加参数 --from-beginning
/usr/hdp/3.0.0.0-1634/kafka/bin/kafka-console-consumer.sh --topic beardata --bootstrap-server bigdata000:6667 --from-beginning
三. 代码实践
1. 生产者
一个简单的客户端生产消息的例子
2. 消费者
一个简单的消费端消费消息的案例,采用高层API,由服务端控制偏移量
3. 流处理
一个简单的word-count例子,单词个数统计
4. 连接器
如果熟悉flume(后续的系列文章会专门介绍flume),那么就可以把kafka connect理解成是flume,只不过是flume的channel有多种,包括kafka。kafka connect的channel就是kafka
cd /usr/hdp/3.0.0.0-1634/kafka/config
vim connect-file-source.properties,修改topic,我们在这里修改topic为beardata
vim connect-file-sink.properties,修改topic,我们在这里修改topic为beardata
vim connect-standalone.properties,修改bootstrap-server,默认为localhost:9092,在这里我们修改为bigdata000:6667
启动Source Connector和Sink Connector
./bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
打开控制台消费指令
/usr/hdp/3.0.0.0-1634/kafka/bin/kafka-console-consumer.sh --bootstrap-server bigdata000:6667 --topic beardata
向connect-file-source.properties中配置的文件中写入数据
查看控制台消息消费
查看控制台输出情况
查看connect-file-sink.properties中配置文件的内容
以上就是对kafka的概念介绍以及简单的案例演示,下一篇我们将介绍kafka Producer。
领取专属 10元无门槛券
私享最新 技术干货