有奖捉虫:办公协同&微信生态&物联网文档专题 HOT
本文主要介绍消息队列 CKafka 版生产和消费消息的最佳实践,帮助您降低消费消息出错的可能性。

生产消息

Topic 使用推荐

配置要求:推荐节点的整倍数副本,减少数据倾斜问题,同步复制最小同步副本数为2,且同步副本数不能等于 Topic 副本数,否则宕机1个副本会导致无法生产消息。
创建方式:支持选择是否开启 CKafka 自动创建 Topic 的开关。选择开启后,表示生产或消费一个未创建的 Topic 时,会自动创建一个包含3个分区和2个副本的 Topic。

分区数估计

为实现尽可能的数据均衡,分区数建议为节点数的整倍数,同时在知道预估流量的基础上,按照1MB/s一个分区设置分区数,如100MB的 Topic 吞吐,建议设置分区数为100。

失败重试

分布式环境下,由于网络等原因,消息偶尔会出现发送失败的情况,其原因可能是消息已经发送成功但是 ACK 机制失败或者是消息确实没有发送成功。
您可以根据业务需求,设置以下重试参数:
参数
说明
retries
重试次数,默认值为3,就对于数据丢失零容忍的应用而言,请考虑设置为:Integer.MAX_VALUE(有效且最大)。
retry.backoff.ms
重试间隔,建议设置为1000。
这样将能够应对 Broker 的 Leader 分区出现无法立刻响应 Producer 请求的情况。

异步发送

发送接口是异步的,如果您想接收发送的结果,可用使用 send 方法中的Callback接口进行获取发送结果。

一个 Producer 对应一个应用

Producer 是线程安全的,且可以往任何 Topic 发送消息。通常情况下,建议一个应用对应一个 Producer。

Acks

Kafka 的 ACK 机制,指 Producer 的消息发送确认机制,在 Kafka 的 0.10.x 版本上,其设置值是 Acks,而在 0.8.x 版本上,则为 request.required.acks,Acks 的设置将直接影响到 Kafka 集群的吞吐量和消息可靠性。
Acks 的参数说明如下:
参数
说明
acks=0
无需服务端的 Response。性能较高、丢数据风险较大。
acks=1
服务端主节点写成功即返回 Response。性能中等、丢数据风险中等、主节点宕机可能导致数据丢失。
acks=all
服务端主节点写成功且 ISR 中的节点同步成功才返回 Response。性能较差、数据较为安全、主节点和备节点都宕机才会导致数据丢失。
一般建议选择 acks=1,重要的服务可以设置 acks=all。

Batch

一般情况下,消息队列 CKafka 的 Topic 会有多个分区,Producer 客户端在向服务端发送消息时,需要先确认往哪个 Topic 的哪个分区发送。在给同一个分区发送多条消息时,Producer 客户端会将相关消息打包成一个 Batch,批量发送到服务端。Producer 客户端在处理 Batch 时,是有额外开销的。一般情况下,小 Batch 会导致 Producer 客户端产生大量请求,造成请求队列在客户端和服务端的排队,并造成相关机器的 CPU 升高,从而整体推高了消息发送和消费延迟。一个合适的 Batch 大小,可以减少发送消息时客户端向服务端发起的请求次数,在整体上提高消息发送的吞吐和延迟。
Batch 参数说明如下:
参数
说明
batch.size
发往每个分区(Partition)的消息缓存量(消息内容的字节数之和,不是条数)。达到设置的数值时,就会触发一次网络请求,然后 Producer 客户端把消息批量发往服务器。
linger.ms
每条消息在缓存中的最长时间。若超过这个时间,Producer 客户端就会忽略 batch.size 的限制,立即把消息发往服务器。
buffer.memory
所有缓存消息的总体大小超过这个数值后,就会触发把消息发往服务器,此时会忽略 batch.size 和 linger.ms 的限制。buffer.memory 的默认数值是 32MB,对于单个 Producer 而言,可以保证足够的性能。
说明:
如果您在同一个 JVM 中启动多个 Producer,那么每个 Producer 都有可能占用 32MB 缓存空间,此时便有可能触发 OOM(Out of Memory),此时您需要考虑 buffer.memory 的大小,避免触发 OOM。
您可以根据具体业务需求进行参数设置值的调整。Producer 客户端什么时候把消息批量发送至服务器是由 batch.size 和 linger.ms 共同决定的。您可以根据具体业务需求进行调整。为了提升发送的性能,保障服务的稳定性, 建议您设置 batch.size=16384 和 linger.ms=1000。

Key 和 Value

消息队列 CKafka 的消息有 Key(消息标识)和 Value(消息内容)两个字段。
为了便于追踪,请为消息设置一个唯一的 Key。您可以通过 Key 追踪某消息,打印发送日志和消费日志,了解该消息的生产和消费情况。
如果消息发送量较大,建议不要设置 Key,并使用黏性分区策略。

黏性分区

只有发送到相同分区的消息,才会被放到同一个 Batch 中,因此决定一个 Batch 如何形成的一个因素是消息队列 Kafka Producer 端设置的分区策略。消息队列 Kafka Producer 允许通过设置 Partitioner 的实现类来选择适合自己业务的分区。在消息指定 Key 的情况下,消息队列 Kafka Producer 的默认策略是对消息的Key进行哈希,然后根据哈希结果选择分区,保证相同 Key 的消息会发送到同一个分区。
在消息没有指定 Key 的情况下,消息队列 Kafka 2.4版本之前的默认策略是循环使用主题的所有分区,将消息以轮询的方式发送到每一个分区上。但是,这种默认策略 Batch 的效果会比较差,在实际使用中,可能会产生大量的小 Batch,从而使得实际的延迟增加。鉴于该默认策略对无 Key 消息的分区效率低问题,消息队列 Kafka 在2.4版本引入了黏性分区策略(Sticky Partitioning Strategy)。
黏性分区策略主要解决无 Key 消息分散到不同分区,造成小 Batch问题。其主要策略是如果一个分区的 Batch 完成后,就随机选择另一个分区,然后后续的消息尽可能地使用该分区。这种策略在短时间内看,会将消息发送到同一个分区,如果拉长整个运行时间,消息还是可以均匀地发布到各个分区上的。这样可以避免消息出现分区倾斜,同时还可以降低延迟,提升服务整体性能。
如果您使用的消息队列 Kafka Producer 客户端是2.4及以上版本,默认的分区策略就采用黏性分区策略。如果您使用的 Producer 客户端版本小于2.4,可以根据黏性分区策略原理,自行实现分区策略,然后通过参数 partitioner.class 设置指定的分区策略。
关于黏性分区策略实现,您可以参见如下 Java 版代码实现。该代码的实现逻辑主要是根据一定的时间间隔,切换一次分区。
public class MyStickyPartitioner implements Partitioner {

// 记录上一次切换分区时间。
private long lastPartitionChangeTimeMillis = 0L;
// 记录当前分区。
private int currentPartition = -1;
// 分区切换时间间隔,可以根据实际业务选择切换分区的时间间隔。
private long partitionChangeTimeGap = 100L;

public void configure(Map<String, ?> configs) {}

/**
* Compute the partition for the given record.
*
* @param topic The topic name
* @param key The key to partition on (or null if no key)
* @param keyBytes serialized key to partition on (or null if no key)
* @param value The value to partition on or null
* @param valueBytes serialized value to partition on or null
* @param cluster The current cluster metadata
*/
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {

// 获取所有分区信息。
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();

if (keyBytes == null) {
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
int availablePartitionSize = availablePartitions.size();

// 判断当前可用分区。
if (availablePartitionSize > 0) {
handlePartitionChange(availablePartitionSize);
return availablePartitions.get(currentPartition).partition();
} else {
handlePartitionChange(numPartitions);
return currentPartition;
}
} else {
// 对于有key的消息,根据key的哈希值选择分区。
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}

private void handlePartitionChange(int partitionNum) {
long currentTimeMillis = System.currentTimeMillis();

// 如果超过分区切换时间间隔,则切换下一个分区,否则还是选择之前的分区。
if (currentTimeMillis - lastPartitionChangeTimeMillis >= partitionChangeTimeGap
|| currentPartition < 0 || currentPartition >= partitionNum) {
lastPartitionChangeTimeMillis = currentTimeMillis;
currentPartition = Utils.toPositive(ThreadLocalRandom.current().nextInt()) % partitionNum;
}
}

public void close() {}

}

分区顺序

单个分区(Partition)内,消息是按照发送顺序储存的,是基本有序的。每个主题下面都有若干分区,如果消息被分配到不同的分区中,不同 Partition 之间不能保证顺序。
如果需要进行消息具有消费顺序性,可以在生产端指定这一类消息的 key,这类消息都用相同的 key 进行消息发送,CKafka 就会根据 key 哈希取模选取其中一个分区进行存储,由于一个分区只能由一个消费者进行监听消费,此时消息就具有消息消费的顺序性了。
数据倾斜
Kafka Broker数据倾斜问题通常是由于分区分布不均匀或者生产者发送数据的key分布不均匀导致的,会引发几类问题:
1. 整体流量没有限流,但是节点局部限流;
2. 某些节点负载过快,导致整体kafka使用率不高,影响整体吞吐。
针对该类问题可以通过以下方式进行优化:
3. 使用合理分区数,分区数保障为节点数的整倍数。
4. 合理的分区策略,例如:RoundRobin(轮询)、Range(范围)和Sticky(粘性)或者自定义的分区策略,均衡发送消息。
5. 查是否使用Key进行发送,如果使用了Key进行发送,尽量设计策略让Key更加分区均衡。

消费消息

消费消息基本流程

1. Poll 数据。
2. 执行消费逻辑。
3. 再次 poll 数据。

负载均衡

每个 Consumer Group 可以包含多个 Consumer,并将参数 group.id 设置成相同的值,属于同一个 Consumer Group 的 Consumer 会负责消费订阅的 Topic。
例如:Consumer Group A 订阅了 Topic A,并开启三个消费实例 C1、C2、C3,则发送到 Topic A 的每条消息最终只会传给 C1、C2、C3 的某一个。CKafka 默认会均匀地把消息传给各个消费实例,以做到消费负载均衡。
CKafka 负载均衡的内部原理是:把订阅的 Topic 的分区,平均分配给各个 Consumer。因此,Consumer 的个数不要大于分区的数量,否则会有消费实例分配不到任何分区而处于空跑状态,尽量保证消费者数量能被分区总数整除。除了第一次启动上线之外,后续消费实例发生重启、增加、减少,分区数发生增加等变更时,都会触发一次重均衡。

频繁出现 Rebalance

如果频繁出现 Rebalance,可能有以下几种可能:
1. 消费者消费处理耗时很长
2. 消费某一个异常消息导致消费者阻塞或者失败
3. 心跳超时会引发 Rebalance
4. v0.10.2之前版本的客户端:Consumer 没有独立线程维持心跳,而是把心跳维持与 poll 接口耦合在一起。其结果就是,如果用户消费出现卡顿,就会导致Consumer 心跳超时,引发 Rebalance。v0.10.2及之后版本的客户端:如果消费时间过慢,超过一定时间(max.poll.interval.ms设置的值,默认5分钟)未进行 poll 拉取消息,则会导致客户端主动离开队列,而引发 Rebalance。
可以通过优化消费处理提高消费速度和参数调整等方法解决:
1. 消费端需要和 Broker 版本保持一致。
2. 参考以下说明调整参数值:
session.timeout.ms:v0.10.2之前的版本可适当提高该参数值,需要大于消费一批数据的时间,但不要超过30s,建议设置为25s;而v0.10.2及其之后的版本,保持默认值10s即可。
max.poll.records:降低该参数值,建议远远小于<单个线程每秒消费的条数> <消费线程的个数> <max.poll.interval.ms>的积。
max.poll.interval.ms:该值要大于<max.poll.records> / (<单个线程每秒消费的条数> * <消费线程的个数>)的值。
3. 尽量提高客户端的消费速度,消费逻辑另起线程进行处理,针对耗时进行监控。
4. 减少 Group 订阅 Topic 的数量,一个 Group 订阅的 Topic 最好不要超过5个,建议一个 Group 只订阅一个 Topic。

订阅关系

同一个 Consumer Group 内,建议各个消费实例订阅的 Topic 保持一致,避免给排查问题带来干扰。
Consumer Group 订阅多个 Topic
一个 Consumer Group 可以订阅多个Topic,多个 Topic 的消息被 Cosumer Group 中的 Consumer 均匀消费。例如 Consumer Group A 订阅了 Topic A、Topic B、Topic C,则这三个 Topic 中的消息,被 Consumer Group 中的 Consumer 均匀消费。
Consumer Group 订阅多个 Topic 的示例代码如下:
String topicStr = kafkaProperties.getProperty("topic");
String[] topics = topicStr.split(",");
for (String topic: topics) {
subscribedTopics.add(topic.trim());
}
consumer.subscribe(subscribedTopics);
Topic 被多个 Consumer Group 订阅
一个 Topic 可以被多个 Consumer Group 订阅,且各个 Consumer Group 独立消费Topic下的所有消息。例如 Consumer Group A 订阅了 Topic A,Consumer Group B也订阅了Topic A,则发送到Topic A的每条消息,不仅会传一份给Consumer Group A的消费实例,也会传一份给Consumer Group B的消费实例,且这两个过程相互独立,相互没有任何影响。

一个 Consumer Group 对应一个应用

建议一个 Consumer Group 对应一个应用,即不同的应用对应不同的代码。如果您需要将不同的代码写在同一个应用中,请准备多份不同的 kafka.properties。例如:kafka1.properties、kafka2.properties。

消费位点 Offset

每个 Topic 会有多个分区,每个分区会统计当前消息的总条数,这个称为最大位点 MaxOffset。
消息队列 CKafka 的 Consumer 会按顺序依次消费分区内的每条消息,记录已经消费了的消息条数,称为消费位点 ConsumerOffset。
剩余的未消费的条数(也称为消息堆积量)=MaxOffset-ConsumerOffset。
offset 提交
消息队列 CKafka 的 Consumer 有两个相关参数:
enable.auto.commit:默认值为true。
auto.commit.interval.ms: 默认值为5000,即5s。
这两个参数组合的结果为:每次 poll 数据前会先检查上次提交位点的时间,如果距离当前时间已经超过参数 auto.commit.interval.ms 规定的时长,则客户端会启动位点提交动作。
因此,如果将 enable.auto.commit 设置为 true,则需要在每次 poll 数据时,确保前一次 poll 出来的数据已经消费完毕,否则可能导致位点跳跃。
如果想自己控制位点提交,请把 enable.auto.commit 设为 false,并调用 commit(offsets) 函数自行控制位点提交。
注意:
尽量避免提交位点请求过于频繁,否则容易导致 Broker CPU 很高,影响正常的服务。例如自动提交位点设置 auto.commit.interval.ms 为100ms,手动提交位点,在高吞吐场景下,每消费一条消息提交一个位点。

重置 offset

以下两种情况,会发生消费位点重置:
当服务端不存在曾经提交过的位点时(例如客户端第一次上线)。
当从非法位点拉取消息时(例如某个分区最大位点是10,但客户端却从11开始拉取消息)。
Java 客户端可以通过 auto.offset.reset 来配置重置策略,主要有三种策略:
latest:从最大位点开始消费。
earliest:从最小位点开始消费。
none:不做任何操作,即不重置。
说明:
建议设置成 latest,而不要设置成 earliest,避免因位点非法时从头开始消费,从而造成大量重复。
如果是您自己管理位点,可以设置成 none。

拉取消息

消费过程是由客户端主动去服务端拉取消息的,在拉取大消息时需要控制拉取速度,注意以下参数设置:
max.poll.records:如果单条消息超过 1MB,建议设置为1。
max.partition.fetch.bytes:设置比单条消息的大小略大一点。
fetch.max.bytes:设置比单条消息的大小略大一点。
通过公网消费消息时,通常会因为公网带宽的限制导致连接被断开,此时需要注意控制拉取速度,注意以下参数设置:
fetch.max.bytes:建议设置成公网带宽的一半(注意该参数的单位是 bytes,公网带宽的单位是 bits)
max.partition.fetch.bytes:建议设置成 fetch.max.bytes 的三分之一或者四分之一。

拉取大消息

消费过程是由客户端主动去服务端拉取消息的,在拉取大消息时,需要注意控制拉取速度,注意修改配置:
max.poll.records:每次Poll获取的最大消息数量。如果单条消息超过1 MB,建议设置为1。
fetch.max.bytes:设置比单条消息的大小略大一点。
max.partition.fetch.bytes:设置比单条消息的大小略大一点。
拉取大消息的核心是逐条拉取的。

消息重复和消费幂等

消息队列 CKafka 消费的语义是 at least once, 也就是至少投递一次,保证消息不丢失,但是无法保证消息不重复。在出现网络问题、客户端重启时均有可能造成少量重复消息,此时应用消费端如果对消息重复比较敏感(例如订单交易类),则应该做消息幂等。
以数据库类应用为例,常用做法为:
发送消息时,传入 key 作为唯一流水号 ID。
消费消息时,判断 key 是否已经消费过,如果已经消费过了,则忽略,如果没消费过,则消费一次。
当然,如果应用本身对少量消息重复不敏感,则不需要做此类幂等检查。

消费失败

消息队列 CKafka 是按分区逐条消息顺序向前推进消费的,如果消费端拿到某条消息后执行消费逻辑失败,例如应用服务器出现了脏数据,导致某条消息处理失败,等待人工干预,那么有以下两种处理方式:
失败后一直尝试再次执行消费逻辑。这种方式有可能造成消费线程阻塞在当前消息,无法向前推进,造成消息堆积。
由于消息队列 CKafka 没有处理失败消息的设计,实践中通常会打印失败的消息或者存储到某个服务(例如创建一个 Topic 专门用来放失败的消息),然后定时检查失败消息的情况,分析失败原因,根据情况处理。

消费延迟

消费过程是由客户端主动去服务端拉取消息。一般情况下,如果客户端能够及时消费,则不会产生较大延迟。若产生了较大延迟,请先关注是否有堆积,并注意提高消费速度。

消费堆积

通常造成消息堆积的原因是:
消费速度跟不上生产速度,此时应该提高消费速度。
消费端产生了阻塞。
消费端拿到消息后,执行消费逻辑,通常会执行一些远程调用,如果这个时候同步等待结果,则有可能造成一直等待,消费进程无法向前推进。
消费端应该尽量避免堵塞消费线程,如果存在等待调用结果的情况,建议设置等待的超时时间,超时后作为消费失败进行处理。

提高消费速度

增加 Consumer 实例个数提高并行处理能力,如果消费者和分区数已经1:1,可以考虑增加分区数(注意:对于 flink 自动维护分区的场景不会自动感知新增分区后可能需要修改相关代码后重启)。 可以在进程内直接增加(需要保证每个实例对应一个线程),也可以部署多个消费实例进程。
说明:
实例个数超过分区数量后就不再能提高速度,将会有消费实例不工作。
增加消费线程。
1. 定义一个线程池。
2. Poll 数据。
3. 把数据提交到线程池进行并发处理。
4. 等并发结果返回成功后,再次 poll 数据执行。

套接字缓冲区(socket buffers)

在 Kafka 的 0.10.x 版本中,参数 receive.buffer.bytes 的默认值为 64KB。而在 Kafka 的 0.8.x 版本中,参数 socket.receive.buffer.bytes 的默认值为 100KB。
这两个默认值对于高吞吐量的环境而言都太小了,特别是如果 Broker 和 Consumer 之间的网络带宽延迟积(bandwidth-delay product)大于局域网(local areanetwork,LAN)时。
对于延迟为1ms或更多的高带宽的网络(例如 10Gbps 或更高),建议将套接字缓冲区设置为8或16MB。
如果您的内存不足,也至少考虑设置为 1MB。您也可以设置为 -1,它会让底层操作系统根据网络的实际情况,去调整缓冲区的大小。
但是,对于需要启动“热”分区的 Consumers 来说,自动调整可能不会那么快。

消息广播

CKafka 目前没有消息广播的语义,可以通过创建不同的 Group 来模拟实现。

消息过滤

CKafka 自身没有消息过滤的语义。实践中可以采取以下两个办法:
如果过滤的种类不多,可以采取多个 Topic 的方式达到过滤的目的。
如果过滤的种类多,则最好在客户端业务层面自行过滤。
实践中请根据业务具体情况进行选择,也可以综合运用上面两种办法。

消费某些分区不消费

消费者在消费过程中,可能遇到消费者在线,但是某些分区的位点一致不前进,可能原因如下:
1. 遇到一条异常消息,可能是超大消息,格式异常,导致消费者拉取消息时候,转换成业务位点。
2. 使用公网带宽,带宽较小,拉取大消息时候直接把带宽打满,导致在超时时间内拉取不到消息。
3. 消费者假死,导致不去拉取。
解决方式:
关掉消费者,在 CKafka 控制台设置位点,跳过某些异常消息,或者优化消费代码,然后重启消费者消费。