分区策略决定 producer 将消息怎么分发到 partition 中, 分区策略不合适可能导致数据倾斜, 有些时候我们需要实现顺序消息, 也需要将同一业务的消息都发送到同一个 partition 上。生产端将消息发送给 broker 之前主要经过拦截、序列化、分区(Partitioner)几个步骤。分区器主要读取 partition 配置(生产端配置partitioner.class
, 默认值是 DefaultPartitioner)
DefaultPartitioner 实现逻辑:
round-robin(轮询) 算法
。如果指定了自定义分区类partitoner.class
, 则按照自定义分区类的实现来获得 partition
producer 发送流程如下:
producer 发送消息的时候默认是异步的, 通常有以下三种模式
Fire-and-forget: 异步发送的一种, 不关心是否失败:
producer.send(record);
同步发送: send 方法返回的是一个 future 对象, 可以调用 Future 的 get 等待返回:
producer.send(record).get();
异步发送: 通过回调(callback)的方式处理, 可以感知到消息是否发送成功, 并做相应的业务处理。如果对消息的顺序不敏感, 但是需要保证消息的成功可以使用这种方式
producer.send(myRecord,
new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e) {
if(e != null) {
// do something
} else {
System.out.println("The offset of the record we just sent is: " + metadata.offset());
}
}
});
在前面说过kafka默认是异步发送, 在 producer 端实际上存在 2 个线程:
RecordAccumulator
(RecordAccumulator可以理解是个集合,集合的元素是个队列,每个队列对应要发送至服务上的分区) 中,send方法即刻返回,也就是说此时并不能确定消息是否真正的发送到 brokerRecordAccumulator
,满足一定条件后,就进行真正的网络IO发送,使用的是异步非阻塞的NIO。主线程的send方法提供了一个用于回调的参数,当sender线程发送完后,回调函数将被调用,可以用来处理成功,失败或异常的逻辑producer 的关键参数:
buffer.memory
: 设置生产者可用于缓冲等待发送给brokers消息的总内存字节数,默认为33554432(32MB)。如果消息发送到缓存区的速度比发送到broker的速度快,那么生产者会被阻塞(根据max.block.ms
配置的时间,默认为60000ms=1分钟,在0.9.0.0版本之前使用block.on.buffer.full
配置),之后会抛出异常。compression.type
: 设置生产者的压缩算法, 默认是 none.retries
:发送失败后的最大重试次数max.in.flight.requests.per.connection
: producer 单个 IO 线程收到 broker ack 之前可以发送多少消息, 默认值是 5。是为了解决设置了retries参数大于0后,可能会带来新的问题。假如我们需要相同Key的Message进入特定的Partition,并且是要严格按照Producer生产Message的顺序排序。那么此时如果第一条Message发送失败,第二条Message发送成功了,第一条通过重试发送成功了,那Message的顺序就发生了变化。如果想在设置了retries还要严格控制Message顺序,可以把max.in.flight.requests.per.connection
设置为1。让Broker处永远只有一条Message在排队,就可以严格控制顺序了。但是这样做会严重影响性能(接收Message的吞吐量)。batch.size
: 一个 batch 的大小, 默认 16KBlinger.ms
: 两次发送的最大间隔时间。 batch.size 满了或达到 linger.ms 就会把消息发送出去max.block.ms
: 当发送缓冲区已满或者元数据不可用时,生产者调用send()和partitionsFor()方法会被阻塞,默认阻塞时间为60000ms=1分钟。由于使用用户自定义的序列化器和分区器造成的阻塞将不会计入此时间。max.request.size
: 设置生产者在单个请求中能够发送的最大字节数,默认为1048576(1MB)。注意,broker也有接收消息的大小限制,使用的配置是message.max.bytes
(默认也是1MB)request.require.acks
: 持久化参数配置, broker 对发送的请求的确认模式。 有三种取值:min.insync.replicas
, 标志至少有多少 replica 持久化才返回 ackconsumer 顺序消费 partition 上的 message, offset 在老版本(0.10以前)由zk来保存, 由于zk的性能不好, 在之后的版本是专门放在一个__consumer_offsets
的 topic 中管理 。
写进消息的key由<groupid、topic、partition>
组成,value 是偏移量 offset
consumer 提交 offset 分为自动提交和手动提交, 通过enable.auto.commit
来控制, 默认是 true, 自动提交间隔通过 auto.commit.interval.ms
配置, 默认 5s
手动提交位移提供了两种模式commitSync
和commitAsync
commitSync
同步提交再 broker 返回提交结果之前都处于阻塞状态, 会因为提交 offset 而影响整个应用的 TPScommitAsync
异步提交采取了 callback 的方式来处理提交后的逻辑,如记录日志和处理异常, 但是并不会自动重试, 如果不处理好 callback 逻辑, 可能会丢失消息kafka 还提供了更精确的提交 api, 可以实现处理完一批消息后, 提交最后一个 offset 值。commitSync(Map<TopicPartition, OffsetAndMetadata>)
和commitAsync(Map<TopicPartition, OffsetAndMetadata>)
。 使用示例:
// 实现每处理 100 条消息提交一次 offset
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record: records) {
process(record); // 处理消息
offsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1);
if(count % 100 == 0)
consumer.commitAsync(offsets, null); // 回调处理逻辑是null
count++;
}
kafka 通过 compact 策略来清理过期消息(对于同一个key的两条消息M1和M2,如果M1的发送时间早于M2,那么M1就是过期消息)。类似 JVM 的 compact, 扫描日志中的所有消息, 剔除过期的消息, 将剩下的消息整理在一起
kafka 支持重新设置位移进度来控制重新消费、跳过消费。目前支持 7 种策略(支持通过 api 和 kafka-consumer-groups.sh
两种方式设置):
Earliest: 将 offset 调整到主题当前最早处(不一定是 0 这个位置, 由于 broker 的消息保留策略有些消息可能被自动删除)
Latest:把 offset 重设成最新末端(也就是跳过历史消息, 从最新的位置开始消费)
Current: 将 offset 调整成消费者当前提交的最新处, 比如修改了消费者程序代码,并重启了消费者,结果发现代码有问题,你需要回滚之前的代码变更,同时也要把位移重设到消费者重启时的位置,那么,Current 策略就可以帮你实现这个功能
Specified-Offset: 设置到指定 offset 位置
Shift-By-N : 设置成当前 offset 的相对位置(可以相对于当前位移前多少条或后多少条, 比如设置消费位置为当前offset的前100条, N就是 -100)
DateTime:指定一个时间,然后将 offset 重置到该时间之后的最早位移处。如设置到昨天0点后开始消费
Duration:指定到当前时间的间隔处, duration 与 java 的 duration 格式相同
# 重新设置 consumer group offset 示例
# 如把 test-group 设置成最早(从头开始消费)
kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-earliest –execute
Latest
kafka 消费进度可以通过自带运维脚本kafka-consumer-groups
来获取, 使用格式如下:
# broker 地址为 ip+端口
# group 信息是需要查看的 consumer group 的名称(消费者中设置的 group.id)
kafka-consumer-groups.sh --bootstrap-server <broker 地址> --describe --group <group信息>
# 输出列主要关注几个点:
- PARTITION: 分区
- LOG-END-OFFSET: 分区当前最新生产的消息的 offset
- CURRENT-OFFSET: 分区当前 group 消费的最新 offset
- LAG: 生产offset和消费offset的差值, 即消息堆积数
rebalance 就是一个 consumer group 在 consumer group coordinate
的协调下, 完成 consumer 和 partition 的分配
rebalance 的触发条件主要是两种:
rebalance 对系统的影响(需要尽量避免 rebalance 的发生):
尽可能的避免 rebalance。 从 rebalance 的触发条件来分析, 主要是 partition 发生变化, 和 consumer group 成员发生变化两种, 其中为了增加消费能力而增加 partition、增加 consumer 数量, 这两点是无法避免的并且属于计划之中的。我们只能从防止 consumer 崩溃被意外踢出 consumer group 这个方向来考虑。首先需要明白 kafka 如何判断 consumer 失效而踢出 group。 group 中的 consumer 会定期向 coordinator 发送心跳, 如果 coordinator 在一定时间内没有收到心跳请求就会认为该 consumer 已经”挂了”。
kafka 通过session.timeout.ms
来配置心跳超时, 默认是 10s, 也就是说10s内没有收到 consumer 的心跳就会踢出。 另外通过heartbeat.interval.ms
来配置心跳发送的频率, 发送的频率越高对带宽资源以及broker的压力就越大, 但是发送的频率过低就会降低容错, 可能一次心跳网络丢失就被踢出 group 从而引发 rebalance。推荐配置:
# 要保证Consumer实例在被判定为“dead”之前,能够发送至少3轮的心跳请求,即session.timeout.ms >= 3 * heartbeat.interval.ms
session.timeout.ms = 6s
heartbeat.interval.ms = 2s
另外 kafka 还可能因为消费时间过长(IO时间长、频繁 fgc导致消费慢等)而发生 rebalance, max.poll.interval.ms
控制了 Consumer 两次调用 poll方法 的最大时间间隔。它的默认值是5分钟,表示Consumer 如果在5分钟之内无法消费完 poll 返回的消息,那么 Consumer 会主动发起 leave group 的请求,Coordinator 也会开启新一轮 Rebalance。
另外还需要注意的一点是, Kafka 在 0.10.1.0
之前的版本, 心跳请求(Heartbeat Request
) 是在消费者主线程(也就是 KafkaConsumer.poll 的线程)中完成的, 所以一旦消费时间过长就可能错误的让 coordinator 认为 consumer 挂了。 在0.10.1.0
之后就引入了一个单独的线程来完成 Heartbeat Request
。其次 coordinator 会将 rebalance 通知封装到 Heartbeat Request
中响应给 consumer, 因此 consumer 端配置的参数 heartbeat.interval.ms
间隔越小, 就能越快的感知到 rebalance 通知
__consumer_offsets
topic 上的 Partition 位置(即由 __consumer_offsets 哪个 partition 来保存当前 group 的数据), 计算规则: partitionId=Math.abs(groupId.hashCode() % offsetsTopicPartitionCount)
Controller(控制器)是管理 broker 集群的重要单元, 依赖 zookeeper, 任何 broker 都可以充当 Controller 的角色, 但运行时只有一个 Broker 成为 Controller。
当 Broker 启动的时候都会尝试去 ZK 中创建 /controller
节点, 第一个成功创建的节点就是 Controller。
Controller 承担以下功能:
kafka-reassign-partitions
脚本)/brokers/ids
目录下临时节点的变化情况Kafka 容灾主要依靠 replication 机制, 一般来说在 kafka 集群中 broker 的数量是要大于等于 replicas 的数量的。
Kafka 这样设计 Replication 机制的优点:
Read-your-writes
: producer 写入消息成功后, consumer 马上就可以读到刚才的消息。如果允许 follower 提供读服务(读写分离机制), 那么可能会产生一段时间的数据不一致。Monotonic Reads
): 对于一个消费者用户来说, 多次消费消息不会看到某条消息一会儿存在一会儿不存在。 如果 follower 提供读服务, 有可能发生 consumer 首先从 follower1 拉取消息, 然后从 follower2 拉取消息, 可能会看到第一次消费的消息在第二次消费时不见了(同步延迟)。in-sync replicas(已同步的副本)
, 存在 ISR 里面的 follower 是会变化的, 如果一个follower宕机,或者落后leader太多,leader将把它从ISR中踢出。当它活过来/再次跟上时会再拉进来。 对于 ISR 中的副本通常需要满足:与 leader 的消息相差的时间最大不能超过 replica.lag.time.max.ms(默认 10s)
。绝大多数分布式系统采用了多数投票法则选择新的 leader, Kafka 并不采取这种方式。Kafka 是从 ISR 列表中选择一个 follower 成为 leader。
如果ISR节点为空, 就代表没有副本处于 ISR 列表中(leader 也挂了), 这种情况下有两种策略:
Unclean Leader Election
, 通过 broker 参数 unclean.leader.election.enable
来开启。 虽然这种方式可以让分区的可用性提高, 但是会丢失一些数据, 是否开启这个配置也是等于在 C(Consistency) 和 A(Availability) 之间做一个抉择。在kafka中,数据一致性的含义是若某条消息对Consumer可见,那么即使Leader宕机了,在新Leader上数据依然可以被读到。 Kafka 通过高水位(HighWaterMark)机制来定义消息的可见性。
先简单描述几个高水位相关的概念:
HW 和 LEO 的更新:
follower HW = min(currentLEO, leaderHW)
。在更新 LEO 之后,follower 向 log 写完数据时会尝试更新它自己的 HW 值, 具体做法就是比较当前 LEO 值与 FETCH 响应中 leader 的 HW 值,取两者的小者作为新的HW值。这里就会明白一点,follower HW永远不可能超过leader,因为leader的HW是整个分区的HW,所以leader的HW一定是最高的leader HW = max{leader HW, min(LEO1,....,LEOn)}
leader HW = max{leader HW, min(LEO1,....,LEOn)}
现在完整的看一下写请求过程中 HW/LEO 的更新流程:
从上面的流程可以看出: 即 follower 的 HW 更新需要 follower 的2轮 fetch leader 返回才能更新,而 Leader 的 HW 已更新。
HW 机制定义了消息的可见性, 但是在某些情况下会出现数据丢失/数据不一致的问题。
如在设置min.insync.replica=1
的情况, 以下流程就会导致已经 ack 过的数据丢失:
在上述流程下, 假如老 leader 还未恢复, 新 leader 又收到生产者的消息。当老 leader 恢复时变成 follower 节点,发生自己的HW和LEO相等,就不用日志截断了。这样就发生了同一个offset位置的数据不一致情况。
上面描述的数据丢失/不一致的情况核心问题在于依据HW截断做日志截断的依据,而且HW的同步是异步的,任何异常崩溃都可能导致HW是一个过期的值。
kafka0.11.x版本引入了leader epoch的概念来规避此问题。leader epoch由一对二元组组成(epoch,startOffset)。Kafka Broker 会在内存中为每个分区都缓存 Leader Epoch 数据,同时它还会定期地将这些信息持久化到一个 checkpoint 文件中。当 Leader 副本写入消息到磁盘时,Broker 会尝试更新这部分缓存。如果该 Leader 是首次写入消息,那么 Broker 会向缓存中增加一个 Leader Epoch 条目,否则就不做更新。
follower 重启后并不会直接进行日志截断,先向现任 leader 发起 OffsetsForLeaderEpochRequest 请求携带 follower 副本当前的 epoch。有如下几种情况:
上面说的场景对应就是如下流程: