读完本文,你将了解到如下知识点:
图1
消费者1
的消费速度,
那么就会导致数据堆积,
产生一些大家都知道的蛋疼事情了,
那么我们只能加强 消费者
的消费能力,
所以也就有了我们下面来说的 消费者组
消费者组
,其实就是一组 消费者
的集合,
当我们看到下面这张图是不是就特别舒服了,
我们采用了一个 消费组
来消费这个 topic
,
众人拾柴火焰高,其消费能力那是按倍数递增的,
所以这里我们一般来说都是采用 消费者组
来消费数据,
而不会是 单消费者
来消费数据的。
这里值得我们注意的是:
topic
可以被 多个 消费者组
消费,
但是每个 消费者组
消费的数据是 互不干扰 的,
也就是说,每个 消费组
消费的都是 完整的数据 。消费者
消费,
而 不能拆给多个消费者 消费,
也就是说如果你某个 消费者组内的消费者数 比 该 Topic 的分区数还多,
那么多余的消费者是不起作用的图2
图3
我们就很好的可以回答这个问题了,
我们可以看到 消费者4
是完全没有消费任何的数据的,
所以如果你想要加强 消费者组
的能力,
除了添加消费者,分区的数量也是需要跟着增加的,
只有这样他们的并行度才能上的去,消费能力才会强。图3
图2
,一般来说我们建议 消费者
数量 和 分区
数量是一致的,
当我们的消费能力不够时,
就必须通过调整分区的数量来提高并行度,
但是,我们应该尽量来避免这种情况发生,
比如:
现在我们需要在图2
的基础上增加一个 分区4
,
那么这个 分区4
该由谁来消费呢?
这个时候kafka会进行 分区再均衡
,
来为这个分区分配消费者,分区再均衡 期间该 Topic 是不可用的,
并且作为一个 被消费者
,
分区数的改动将影响到每一个消费者组
,
所以在创建 topic
的时候,我们就应该考虑好分区数,
来尽量避免这种情况发生咱们以 java api
为例,下面是一个简单的 kafka consumer
public static void main(String[] args) {
//consumer 的配置属性
Properties props = new Properties();
///brokers 地址
props.put("bootstrap.servers", "localhost:9092");
//指定该 consumer 将加入的消费组
props.put("group.id", "test");
// 开启自动提交 offset,关于offset提交,我们后续再来详细说说
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
//指定序列化类
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//创建 consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//订阅消费主题,这里一个消费者可以同时消费 foo 和 bar 两个主题的数据
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
使用起来还是很简单的,不过如果想要用好 consumer, 可能你还需要了解以下这些东西:
ok,那么我们接下来一个个来看吧。。。
String topic = "foo";
TopicPartition partition0 = new TopicPartition(topic, 0);
TopicPartition partition1 = new TopicPartition(topic, 1);
consumer.assign(Arrays.asList(partition0, partition1));
看起来只是把普通的订阅方式修改成了订阅 topic
指定的分区,
其余的还是照常使用,不过这里也需要注意一下的是:
group id
是唯一的就可以了。topic
的分区变动不敏感,
也就是说当 topic
新增了分区,
分区的数据将会发生改变,
但该消费组对此确是不感知的,依然照常运行,
所以很多时候需要你手动consumer.partitionsFor()
去查看topic
的分区情况subscription
混合使用partition.assignment.strategy
进行分区策略配置
这里的话 kafka 是自带两种分区策略的,
为了方便理解,
我们以如下场景为例来进行解释:
已知:
TopicA 有 3 个 partition(分区):A-1,A-2,A-3;
TopicB 有 3 个 partition(分区):B-1,B-2,B-3;
ConsumerA 和 ConsumerB 作为一个消费组 ConsumerGroup 同时消费 TopicA 和 TopicB从上面我们也是可以看出这两种策略的异同, RoundRobin 相比较 Range 会使得分区分配的更加的均衡, 但如果是消费单个 topic ,那么其均衡是差不多的, 而 Range 会比 RoundRobin 更具优势一点, 至于这个优势,我目前能想到的是分配的方式可能简单一点, 但是折算是优势嘛....黑人问号????。
assign
的参数以及返回注释就 ok了public class RangeAssignor extends AbstractPartitionAssignor{ //省略部分代码。。。。 /** * 根据订阅者 和 分区数量来进行分区 * @param partitionsPerTopic: topic->分区数量 * @param subscriptions: memberId 消费者id -> subscription 消费者信息 * @return: memberId ->list<topic名称 和 分区序号(id)> */ @Override public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic, Map<String, Subscription> subscriptions) { //topic -> list<消费者> Map<String, List<String>> consumersPerTopic = consumersPerTopic(subscriptions); //初始化 返回结果 Map<String, List<TopicPartition>> assignment = new HashMap<>(); for (String memberId : subscriptions.keySet()) assignment.put(memberId, new ArrayList<TopicPartition>()); for (Map.Entry<String, List<String>> topicEntry : consumersPerTopic.entrySet()) { //topic String topic = topicEntry.getKey(); // 消费该topic的 consumer-id List<String> consumersForTopic = topicEntry.getValue(); //topic 的分区数量 Integer numPartitionsForTopic = partitionsPerTopic.get(topic); if (numPartitionsForTopic == null) continue; Collections.sort(consumersForTopic); //平均每个消费者分配的 分区数量 int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size(); //平均之后剩下的 分区数 int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size(); //这里就是将连续分区切开然后分配给每个消费者 List<TopicPartition> partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic); for (int i = 0, n = consumersForTopic.size(); i < n; i++) { int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition); int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1); assignment.get(consumersForTopic.get(i)).addAll(partitions.subList(start, start + length)); } } return assignment; } }
首先,我们都应该知道,最全最全的文档应该是来自官网(虽然有时候可能官网找不到): http://kafka.apachecn.org/documentation.html#newconsumerconfigs 嗯,以下内容来自 kafka权威指南 , 请原谅我的小懒惰。。。后续有时间会把工作中的遇到的补充上
fetch.min.bytes
该属性指定了消费者从服务器获取记录的最小字节数。
broker 在收到消费者的数据请求时,
如果可用的数据量小于fetch.min.bytes 指定的大小,
那么它会等到有足够的可用数据时才把它返回给消费者。
这样可以降低消费者和 broker 的工作负载,
因为它们在主题不是很活跃的时候(或者一天里的低谷时段),
就不需要来来回回地处理消息。
如果没有很多可用数据,但消费者的 CPU 使用率却很高,
那么就需要把该属性的值设得比默认值大。
如果消费者的数量比较多,
把该属性的值设置得大一点可以降低broker 的工作负载。fetch.max.wait.ms
我们通过 fetch.min.bytes 告诉 Kafka,
等到有足够的数据时才把它返回给消费者。
而feth.max.wait.ms
则用于指定 broker 的等待时间,默认是 500ms。
如果没有足够的数据流入 Kafka,
消费者获取最小数据量的要求就得不到满足,
最终导致 500ms 的延迟。
如果要降低潜在的延迟(为了满足 SLA),
可以把该参数值设置得小一些。
如果 fetch.max.wait.ms
被设为 100ms,
并且fetch.min.bytes 被设为 1MB,
那么 Kafka 在收到消费者的请求后,
要么返回 1MB 数据,
要么在100ms 后返回所有可用的数据,
就看哪个条件先得到满足。max.partition.fetch.bytes
该属性指定了服务器从每个分区里返回给消费者的最大字节数。它的默认值是 1MB,也就是说,KafkaConsumer.poll() 方法从每个分区里返回的记录最多不超过 max.partition.fetch.bytes指定的字节。如果一个主题有 20 个分区和 5 个消费者,那么每个消费者需要至少 4MB 的可用内存来接收记录。在为消费者分配内存时,可以给它们多分配一些,因为如果群组里有消费者发生崩溃,剩下的消费者需要处理更多的分区。
max.partition.fetch.bytes 的值必须比 broker 能够接收的最大消息的字节数(通过 max.message.size 属性配置)大,否则消费者可能无法读取这些消息,导致消费者一直挂起重试。在设置该属性时,另一个需要考虑的因素是消费者处理数据的时间。消费者需要频繁调用poll() 方法来避免会话过期和发生分区再均衡,如果单次调用 poll() 返回的数据太多,消费者需要更多的时间来处理,可能无法及时进行下一个轮询来避免会话过期。
如果出现这种情况,可以把max.partition.fetch.bytes 值改小,或者延长会话过期时间。session.timeout.ms
该属性指定了消费者在被认为死亡之前可以与服务器断开连接的时间,
默认是 3s。
如果消费者没有在session.timeout.ms
指定的时间内发送心跳给群组协调器,
就被认为已经死亡,
协调器就会触发再均衡,
把它的分区分配给群组里的其他消费者。
该属性与 heartbeat.interval.ms
紧密相关。
heartbeat.interval.ms
指定了 poll() 方法向协调器发送心跳的频率,
session.timeout.ms
则指定了消费者可以多久不发送心跳。
所以,一般需要同时修改这两个属性,
heartbeat.interval.ms
必须比 session.timeout.ms
小,
一般是session.timeout.ms
的三分之一。
如果 session.timeout.ms
是 3s,那么 heartbeat.interval.ms
应该是 1s。
把session.timeout.ms 值设得比默认值小,
可以更快地检测和恢复崩溃的节点,
不过长时间的轮询或垃圾收集可能导致非预期的再均衡。
把该属性的值设置得大一些,
可以减少意外的再均衡,
不过检测节点崩溃需要更长的时间。auto.offset.reset
该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下
(因消费者长时间失效,包含偏移量的记录已经过时并被删除)该作何处理。
它的默认值是 latest,
意思是说,
在偏移量无效的情况下,
消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)。
另一个值是earliest,
意思是说,
在偏移量无效的情况下,
消费者将从起始位置读取分区的记录。enable.auto.commit
我们稍后将介绍几种不同的提交偏移量的方式。
该属性指定了消费者是否自动提交偏移量,默认值是true。
为了尽量避免出现重复数据和数据丢失,可以把它设为 false,
由自己控制何时提交偏移量。
如果把它设为true,还可以通过配置 auto.commit.interval.ms
属性来控制提交的频率。partition.assignment.strategy
(这部分好像重复了 ~~~)
我们知道,分区会被分配给群组里的消费者。
PartitionAssignor 根据给定的消费者和主题,
决定哪些分区应该被分配给哪个消费者。
Kafka 有两个默认的分配策略。
client.id
该属性可以是任意字符串,
broker 用它来标识从客户端发送过来的消息,
通常被用在日志、度量指标和配额里。max.poll.records
该属性用于控制单次调用 call() 方法能够返回的记录数量,
可以帮你控制在轮询里需要处理的数据量。receive.buffer.bytes 和 send.buffer.bytes
socket 在读写数据时用到的 TCP 缓冲区也可以设置大小。
如果它们被设为 -1,就使用操作系统的默认值。
如果生产者或消费者与 broker 处于不同的数据中心内,
可以适当增大这些值,
因为跨数据中心的网络一般都有比较高的延迟和比较低的带宽这篇文章有点太长了,所以准备另起一篇来专门讲 offset 的控制。 预计在周末更新吧,如果你有兴趣,可以点击关注一下,以便及时收到提醒噢!!! 弱弱的,也是求一波关注,哈哈哈!!!