前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Kafka基础(一):基本概念及生产者、消费者示例

Kafka基础(一):基本概念及生产者、消费者示例

作者头像
create17
发布2019-06-19 18:42:36
7370
发布2019-06-19 18:42:36
举报

一、概述

1. 简介

Kafka 起初是由 LinkedIn 公司采用 Scala 语言开发的一个多分区、多副本且基于 Zookeeper 协调的分布式消息系统,现已被捐献给 Apache 基金会。目前 Kafka 已经定位为一个分布式流式处理平台,它以高吞吐、可持久化、可水平扩展、支持流数据处理等多种特性被广泛使用。目前越来越多的开源式分布处理系统如:Storm、Spark、Flink 等都支持与 Kafka 集成。

Kafka 之所以受到越来越多的青睐,与它所 “扮演” 的三大角色是分不开的:

  • 消息系统:Kafka 和传统的消息系统(也称作消息中间件)都具备系统解耦、冗余存储、流量削峰、缓冲、异步通信、扩展性、可恢复性等功能。与此同时,Kafka 还提供了大多数消息系统难以实现的消息顺序性保障回溯消费的功能。
  • 存储系统:Kafka 把消息持久化到磁盘,相比于其他基于内存存储的系统而言,有效地降低了数据丢失地风险。也正是得益于 Kafka 的消息持久化功能和多副本机制。我们可以把 Kafka 作为长期的数据存储系统来使用,只需要把对应的数据保留策略设置为 “永久” 或启用主题的日志压缩功能即可。
  • 流式处理平台:Kafka 不仅为每个流行的流式处理框架提供了可靠的数据来源,还提供了一个完整的流式处理类库,比如窗口、连接、变换和聚合等各类操作。
2. 使用场景
  • 日志收集:一个公司可以用 Kafka 可以收集各种服务的 log,通过 kafka 以统一接口服务的方式开放给各种consumer,例如 Hadoop、Hbase、Solr 等。
  • 消息系统:解耦生产者和消费者、缓存消息等。
  • 用户活动跟踪:Kafka 经常被用来记录web用户或者 app 用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到 kafka 的 topic 中,然后订阅者通过订阅这些 topic 来做实时的监控分析,或者装载到 hadoop、数据仓库中做离线分析和挖掘。
  • 运营指标:Kafka 也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。
  • 流式处理:比如 Spark Streaming 和 Storm 。
  • 事件源:是一种应用程序设计风格,其中状态的改变作为事件序列被记录下来。 Kafka对非常大的存储日志数据提供支持,使其成为以此风格构建的应用程序的一种优秀后端。
  • 峰值处理:使关键应用能够顶住访问峰值,不会因超出负荷崩溃。

二、基本概念

一个典型的 Kafka 体系架构包括若干 Producer 、若干 broker、若干 Consumer,以及一个 Zookeeper 集群,如下图所示:

该图片来自于《深入理解Kafka:核心设计与实践原理》一书

1. broker

服务代理节点。Kafka 集群由多个 Kafka 实例组成,每个实例 (server) 称为 broker,在集群中每个 broker 都有一个唯一的 brokerid ,不能重复。

2. Producer

生产者,也就是发送消息的一方。生产者负责创建消息,然后将其发送到 Kafka 中。

3. Consumer && Consumer Group(CG)

消费者,也就是接收消息的一方。消费者连接 Kafka 并接收消息,进而进行相应的业务逻辑处理。

consumer group 是 Kafka 提供的可扩展且具有容错性的消费者机制。既然是一个组,那么组内必然可以有多个消费者,它们共享一个公共的 id,即 group id。组内的所有消费者协调在一起来消费订阅主题的所有分区。当然,每个分区只能由同一个消费组内的一个消费者来消费。个人认为,理解 consumer group 记住下面这三个特性就好了:

  • consumer group 下可以有一个或多个 consumer instance,consumer instance可以是一个进程,也可以是一个线程。
  • group.id 是一个字符串,唯一标识一个 consumer group
  • consumer group 订阅的 topic 下的每个分区只能分配给某个 group 下的一个 consumer 消费。当然该分区还可以被分配给其他 consumer group。
4. Zookeeper

Zookeeper 负责 Kafka 集群元数据的管理、控制器的选举等操作。

在 Kafka 集群中会有一个或者多个 broker ,其中有一个 broker 会被选举为控制器(Kafka Controller),它负责管理整个集群中所有分区和副本的状态。当某个分区的 leader 副本出现故障时,由控制器负责为该分区选举新的 leader 副本。当检测到某个分区的 ISR 集合发生变化时,由控制器负责通知所有 broker 更新其元数据信息。当使用 kafka-topics.sh 脚本为某个 topics 增加分区数量时,同样还是由控制器负责分区的重新分配。

Kafka 中控制器选举的工作依赖于 Zookeeper ,成功竞选为控制器的 broker 会在 Zookeeper 中创建 /controller 临时节点,执行 get 命令可查看该临时节点的内容:

代码语言:javascript
复制
[zk: localhost:2181(CONNECTED) 1] get /controller
{"version":1,"brokerid":1001,"timestamp":"1560653018773"}
...(省略)
[zk: localhost:2181(CONNECTED) 2]

其中 version 在目前版本中固定为1,brokerid 表示称为控制器的 broker 的 id 编号,timestamp 表示竞选称为控制器时的时间戳。

5. Topic

又称为主题。主题是一个逻辑上的概念,Kafka 中的消息都以主题为单位进行归类,生产者负责将消息发送到特定的主题(发送到 Kafka 集群中的每一条消息都要指定一个主题),消费者负责订阅主题并进行消费。

6. Partition

又称为分区。主题可以细分为多个分区,一个分区只属于一个主题,很多时候也会把分区称为主题分区(Topic-Partition)。同一主题下的不同分区包含的消息是不同的,分区在存储层面可看作一个可追加的日志(Log)文件。消息在被追加到分区日志文件的时候都会分配一个特定的偏移量(offset),offset 是消息在分区中的唯一标识,Kafka 通过它来保证消息在分区内的顺序性。不过 offset 并不跨越分区,也就是说,Kafka 保证的是分区内有序而不是主题内有序。

在主题创建成功之后,也可以修改分区的数量,通过增加分区的数量来实现水平扩展。Kafka 的分区可以分布在不同的 broker 上,所以一个主题可以横跨多个 broker。

分区的内部还被分为若干个Segment,所谓的 Segment 其实就是在分区对应的文件夹下产生的文件。一个 segment 又由一个 .log 和一个 .index 文件组成。

7. Replica

Kafka 为分区引入了多副本(Replica)机制,可通过增加副本数量来提升容灾能力。同一分区的副本保存的是相同的消息(不过在同一时刻,副本之间并非完全一样)。副本之间是 “一主多从” 的关系,其中 leader 副本负责处理读写请求,follower 副本只负责与 leader 副本的消息同步。副本处于不同的broker中,当 leader 副本出现故障时,从 follower 副本中重新选举新的 leader 副本对外提供服务。

如下图所示,Kafka 集群中有 4 个 broker,某个主题中有 3 个分区,且副本因子(副本个数)也为 3,如此,每个分区都有 1 个 leader 副本和 2 个 follower 副本。生产者与消费者只与 leader 副本进行交互,而 follower 副本只负责消息的同步,所以很多时候 follower 副本中的消息相对于 leader 副本而言有一定的滞后。

该图片来自于《深入理解Kafka:核心设计与实践原理》一书

8. AR、ISR、OSR

AR(Assigned Replicas):是 Kafka 所有副本的集合。

ISR(In-Sync Replicas):所有与 leader 副本保持一定程度同步的副本(包括 leader 副本在内)集合。消息会先发送到 leader 副本,然后 follower 副本才能从 leader 副本中拉取消息进行同步。在同步期间内,follower 副本相对于 leader 副本而言有一定程度的滞后性,这个滞后的范围可以通过参数来配置。在这个参数范围内的副本为 ISR。

OSR(Out-of-Sync Replicas):超出这个参数范围的,也就是与 leader 副本同步滞后过多的的 follower 副本组成 OSR。

由此可见,AR = ISR + OSR 。

在正常情况下,所有的 follower 副本都应该与 leader 副本保持一定程度的同步,即 AR=ISR,OSR 集合为空。

leader 副本负责维护和跟踪 ISR 集合中所有 follower 副本的滞后状态,当 follower 副本落后太多或失效时,leader 副本会把它从 ISR 集合中剔除。如果 OSR 集合中有 follower 副本 “追上” 了 leader 副本,那么 leader 副本会把它从 OSR 集合转移至 ISR 集合。

默认情况下,当 leader 副本坏掉的话,只有在 ISR 集合中的副本才有资格被选举为新的 leader,而在 OSR 集合中的副本则没有任何机会(不过这个原则也可以通过修改相应的参数配置来改变)。

9. HW、LEO

ISR 与 HW 和 LEO 也有紧密的关系。

HW(High Watermark):俗称高水位。它标识了一个特定的消息偏移量(offset),消费者只能拉取到这个 offset 之前的消息。

如下图所示,它代表一个日志文件,这个日志文件中有9条消息,第一条消息的 offset(Log Start Offset)为 0,最后一条消息的 offset 为 8,offset 为 9 的消息用虚线框来表示,代表下一条待写入的消息。日志文件的 HW 为 6,表示消费者只能拉取到 offset 从 0 到 5 之间的消息(不包括 HW),而 offset 为 6 的消息(HW)对消费者而言是不可见的。

该图片来自于《深入理解Kafka:核心设计与实践原理》一书

LEO(Log End Offset):标识当前日志文件中下一条待写入消息的 offset,如上图 offset 为 9 的位置即为当前日志文件的 LEO,LEO 的大小相当于当前日志分区中最后一条消息的 offset 值加 1 。分区 ISR 集合中的每个副本都会维护自身的 LEO 。ISR 集合中最小的 LEO 即为 分区的 HW ,对消费者而言只能消费 HW 之前的消息。

10. HW截断机制

如果 leader副本 宕机,选出了新的 leader 副本,而新的 leader 并不能保证已经完全同步了之前 leader 的所有数据,只能保证 HW 之前的数据是同步过的,此时所有的 follower 副本都要将数据截断到 HW 的位置,再和新的 leader 同步数据,来保证数据一致。

当宕机的 leader 恢复,发现新的 leader 中的数据和自己持有的数据不一致,此时宕机的 leader 会将自己的数据截断到宕机之前的 HW 位置,然后同步新 leader 的数据。宕机的 leader 活过来也像 follower 一样同步数据,来保证数据的一致性。

三、生产者、消费者示例

1. 创建主题

Kafka 提供了许多实用的脚本工具,存放在Kafka源码的bin目录下,其中与主题有关的就是 kafka-topic.sh 脚本,接下来我们使用该脚本创建一个分区数为 4,副本数为 3 的主题 test,示例如下:

代码语言:javascript
复制
cd /usr/hdp/3.0.1.0-187/kafka
bin/kafka-topics.sh --create --zookeeper node71.xdata --replication-factor 3 --partitions 4 --topic test

其中 --create 是创建主题的命令,--zookeeper 指定了 Kafka 所连接的 Zookeeper 地址,--replication-fator 指定了分区副本的个数,--partitions 指定了分区个数,--topic 指定了所要创建主题的名称。

主题创建好之后,我们可以查看具体的主题存储目录。主题存储目录由参数 log.dirs 指定。如下图所示:

Kafka的存储目录为 /kafka-logs ,test-0 ~ test-3 为主题 test 的 4 个分区。分区文件夹的名字是主题名加上分区编号,编号从 0 开始。主题的数据就存储在分区文件夹下的 .log 文件内。

2. 查看主题的分区和副本情况
代码语言:javascript
复制
cd /usr/hdp/3.0.1.0-187/kafka
bin/kafka-topics.sh --zookeeper node71.xdata --describe --topic test

解释

结果输出的第一行是对 Topic 信息的汇总:Topic 名称,分区个数以及副本个数。Configs 后面的输出代表该 Topic 每个分区副本在 broker 的分布情况。就第一条而言,代表的意思为:编号为 0 的 Partition,leader 副本在 brokerid = 1003 这个节点上;该分区所有的副本分布在 brokerid 为 1003,1001,1002 这三个节点;Isr 为 Replicas 的子集,子集内的所有副本均分布在 brokerid 为 1003,1001,1002 这三个节点上,并与所属 Partition 的 leader 副本保持一定程度的同步。

3. 生产与消费数据

Kafka 在源码路径的 bin 目录下提供了 kafka-console-producer.sh 和 kafka-console-consumer.sh 脚本工具,可通过控制台来收发消息。

首先我们打开一个 shell 终端,通过 kafka-console-consumer.sh 脚本来订阅主题 test,示例如下:

代码语言:javascript
复制
/usr/hdp/3.0.1.0-187/kafka/bin/kafka-console-consumer.sh --bootstrap-server node71.xdata:6667 --topic test

其中 --bootstrap-server 指定了连接 Kafka 集群地址,--topic 指定了消费者订阅的主题,如果不加 --group 会自动创建一个消费者组指定。目前主题 test 尚未有任何消息存入,所以此脚本还不能消费任何信息。

我们在打开一个 shell 终端,然后使用 kafka-console-producer.sh 脚本发送一条消息 “This is a message” 到主题 test,示例如下:

代码语言:javascript
复制
/usr/hdp/3.0.1.0-187/kafka/bin/kafka-console-producer.sh --broker-list node71.xdata:6667 --topic test
>This is a message

输入完 “This is a message” 之后,按下回车,返回 consumer 的 shell 终端,可以接收到刚刚键入的消息 “This is a message”。

4. 查看主题偏移量
代码语言:javascript
复制
/usr/hdp/3.0.1.0-187/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list node71.xdata:6667 --topic test --time -1

# 显示结果如下
test:2:5
test:1:4
test:3:3
test:0:4

# 结果格式为: topic名称:partition分区号:分区的offset
# --time 为 -1 显示主题各分区最新的offset,也就是HW
# --time 为 -2 显示主题各分区最早有效的offset

对于消费者来说,我们可以执行增加一些参数来消费指定的数据,比如:

  • 增加 --partition 选项:从指定的分区消费消息
  • 增加 --offset 选项:从指定的偏移位置消费消息

关于更多参数可以直接执行消费者脚本查看参数说明。看下面这个消费者示例:

代码语言:javascript
复制
/usr/hdp/3.0.1.0-187/kafka/bin/kafka-console-consumer.sh --bootstrap-server node71.xdata:6667 --topic test --partition 2 --offset 2
# 显示结果如下
1
asd
32423

指定消费 offset 从 2 到 HW 的消息,HW 为 5。可能有同学忘了HW的概念,我这里再贴出来:HW(High Watermark):俗称高水位。它标识了一个特定的消息偏移量(offset),消费者只能拉取到这个 offset 之前的消息。所以上述命令只消费了三条信息。

5. 查看消费者组
代码语言:javascript
复制
# 列举消费者组
/usr/hdp/3.0.1.0-187/kafka/bin/kafka-consumer-groups.sh --bootstrap-server node71.xdata:6667 --list

# 查看消费者组详情
/usr/hdp/3.0.1.0-187/kafka/bin/kafka-consumer-groups.sh --bootstrap-server node71.xdata:6667 --describe --group <消费者名称>
6. 删除主题

如果需要删除 Kafka 主题,则需要确保 delete.topic.enable 配置为 true,然后再执行下述命令:

代码语言:javascript
复制
/usr/hdp/3.0.1.0-187/kafka/bin/kafka-topics.sh --delete --zookeeper node71.xdata --topic test

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-06-17,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据实战演练 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、概述
    • 1. 简介
      • 2. 使用场景
      • 二、基本概念
        • 1. broker
          • 2. Producer
            • 3. Consumer && Consumer Group(CG)
              • 4. Zookeeper
                • 5. Topic
                  • 6. Partition
                    • 7. Replica
                      • 8. AR、ISR、OSR
                        • 9. HW、LEO
                          • 10. HW截断机制
                          • 三、生产者、消费者示例
                            • 1. 创建主题
                              • 2. 查看主题的分区和副本情况
                                • 3. 生产与消费数据
                                  • 4. 查看主题偏移量
                                    • 5. 查看消费者组
                                      • 6. 删除主题
                                      相关产品与服务
                                      消息队列 TDMQ
                                      消息队列 TDMQ (Tencent Distributed Message Queue)是腾讯基于 Apache Pulsar 自研的一个云原生消息中间件系列,其中包含兼容Pulsar、RabbitMQ、RocketMQ 等协议的消息队列子产品,得益于其底层计算与存储分离的架构,TDMQ 具备良好的弹性伸缩以及故障恢复能力。
                                      领券
                                      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档