1.1简介
Apache Kafka 是分布式发布-订阅消息系统(消息中间件)。它最初由 LinkedIn 公司开发,之后成为 Apache 项目的一部分。Kafka 是一种快速、可扩展的、设计内在就是分布式的,分区的和可复制的提交日志服务。
Apache Kafka 与传统消息系统相比,有以下不同:
1.2术语
Broker | Kafka 集群包含一个或多个服务器,这种服务器被称为 broker |
---|---|
Topic | 每条发布到 Kafka 集群的消息都有一个类别,这个类别被称为 Topic。(物 理上不同 Topic 的消息分开存储,逻辑上一个 Topic 的消息虽然保存于一个或多个 broker 上但用户只需指定消息的 Topic 即可生产或消费数据而不必关心数据存于何处) |
Partition | Partition 是物理上的概念,每个 Topic 包含一个或多个 Partition. |
Producer | 负责发布消息到 Kafka broker |
---|---|
Consumer | 消息消费者,向 Kafka broker 读取消息的客户端 |
Consumer Group | 每个 Consumer 属于一个特定的 Consumer Group(可为每个 Consumer 指定 group name,若不指定 group name 则属于默认的 group) |
replica | partition 的副本,保障 partition 的高可用 |
leader | replica 中的一个角色, producer 和 consumer 只跟 leader 交互 |
follower | replica 中的一个角色,从 leader 中复制数据 |
controller | Kafka 集群中的其中一个服务器,用来进行 leader election 以及各种 failover |
(*)kafka的安装 解压:tar -zxvf kafka_2.11-0.10.2.1.tgz -C /opt/modules/ 核心配置文件: config/server.propertis
broker.id=0
listeners=PLAINTEXT://bigdata01:9092
log.dirs=/opt/modules/kafka_2.11-0.10.2.1/data
num.partitions=3
zookeeper.connect=bigdata01:2181,bigdata02:2181,bigdata03:2181
分发:
scp -r kafka_2.11-0.10.2.1/ bigdata02:/opt/modules/
scp -r kafka_2.11-0.10.2.1/ bigdata03:/opt/modules/
修改bigdata02:
broker.id=1
listeners=PLAINTEXT://bigdata02:9092
修改bigdata03:
broker.id=2
listeners=PLAINTEXT://bigdata03:9092
每台机器上都要启动(需要先启动zookeeper):
/root/app/kafka_2.11-0.10.2.1/bin/kafka-server-start.sh -daemon /root/app/kafka_2.11-0.10.2.1/config/server.properties
/root/app/kafka_2.11-0.10.2.1/bin/kafka-topics.sh --create --zookeeper 192.168.1.3:2181,192.168.1.4:2181,192.168.1.5:2181 --replication-factor 3 --partitions 3 --topic test
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
bin/kafka-console-producer.sh --broker-list
192.168.1.3:9092,192.168.1.4:9092,192.168.1.5:9092 --topic test
bin/kafka-console-consumer.sh --bootstrap-server hadoop03:9092
--from-beginning --topic hellotopic
4.1Kafka的拓扑结构
如上图所示,一个典型的 Kafka 集群中包含若干 Producer,若干 broker(Kafka 支持水平扩展, 一般 broker 数量越多,集群吞吐率越高),若干 Consumer Group,以及一个 Zookeeper 集群。Kafka 通过 Zookeeper 管理集群配置,选举 leader。Producer 使用 push 模式将消息发布到 broker,Consumer 使用 pull 模式从 broker 订阅并消费消息。
Producer采用push模式将消息发布到broker,每条消息都被append到partition,属于顺序写磁盘
producer 发送消息到broker时,会根据分区算法将其存储到哪一个partition
指定了partition,则直接使用
未指定partition,但指定key,通过对key的 value进行hash 选出一个partition
partition和key 都为指定,使用轮询选出一个partition
4.2.1写数据流程
4.3.1消息存储方式
物理上把 topic 分成一个或多个 partition(对应 server.properties 中的 num.partitions=3 配置),每个 partition 物理上对应一个文件夹(该文件夹存储该 partition 的所有消息和索引文件)
4.3.2消息存储策略
无论消息是否被消费,kafka 都会保留所有消息。有两种策略可以删除旧数据:
log.retention.hours=168 #基于时间
log.retention.bytes=1073741824 #基于大小
Partition 中的每条 Message 由 offset 来表示它在这个 partition 中的偏移量,这个 offset 不是该 Message 在 partition 数据文件中的实际存储位置,而是逻辑上一个值,它唯一确定了partition 中的一条 Message。因此,可以认为 offset 是 partition 中 Message 的 id。partition中的每条 Message 包含了以下三个属性: offset ;MessageSize;data
那 Kafka 是如何解决查找效率的的问题呢?
分段
Kafka 解决查询效率的手段之一是将数据文件分段,比如有 100 条 Message,它们的 offset 是从 0 到 99。假设将数据文件分成 5 段,第一段为 0-19,第二段为 20-39,以此类推,每段放在一个单独的数据文件里面,数据文件以该段中最小的 offset 命名。这样在查找指定 offset 的 Message 的时候,用二分查找就可以定位到该 Message 在哪个段中。
索引
数据文件分段使得可以在一个较小的数据文件中查找对应 offset 的 Message 了,但是这依然需要顺序扫描才能找到对应 offset 的 Message。为了进一步提高查找的效率,Kafka 为每个分段后的数据文件建立了索引文件,文件名与数据文件的名字是一样的,只是文件扩展名为.index。
索引文件中包含若干个索引条目,每个条目表示数据文件中一条 Message 的索引。索引包含两个部分,分别为相对 offset 和 position。