Kafka的consumer采用pull(拉)模式从broker中读取数据。
模式 | 优点 | 缺点 |
---|---|---|
Push(推)模式 | - 快速传递消息- 消息发送速率由broker决定 | - 难以适应不同消费者的消费速率- 可能导致拒绝服务和网络拥塞 |
Pull(拉)模式 | - 可以根据消费者的消费能力以适当速率消费消息 | - 潜在的循环问题,如果Kafka没有数据,消费者可能会一直返回空数据- 需要设置轮询的timeout以避免无限等待时长过长 |
Kafka消费者的总体工作流程包括以下步骤:
poll()
方法从Kafka broker中拉取消息。它会定期轮询(拉)Kafka集群以获取新消息。
这个工作流程涵盖了Kafka消费者从配置到数据处理再到资源管理的主要步骤。消费者通常是多线程或多进程的,以处理大量的消息,并能够根据需要调整消费速率。此外,Kafka的消费者库提供了很多功能,如自动负载均衡、自动偏移管理等,以简化消费者的开发和维护。
Kafka消费者组(Consumer Group)是一种机制,用于协调和管理多个消费者并共同消费一个或多个Kafka主题的消息。消费者组的工作原理如下:
通过这种方式,Kafka消费者组能够实现高可用性、负载均衡和容错,允许多个消费者并行处理消息,并根据需求动态调整分区分配。这使得消费者组成为了处理大规模流式数据的理想工具。
Kafka消费者组的初始化流程包括一系列步骤,用于创建和配置消费者组的成员。以下是Kafka消费者组的初始化流程:
subscribe()
方法订阅一个或多个Kafka主题。这告诉Kafka你希望从哪些主题中接收消息。
poll()
方法开始轮询消息。这将启动消费者实例并开始拉取消息。消费者组中的每个成员都会独立执行这个步骤。
这个初始化流程涵盖了Kafka消费者组的基本步骤,从配置消费者组成员到消息的处理和消费。请注意,Kafka消费者组的初始化需要注意各个配置选项以及消费者组的协调过程,以确保正常运行和负载均衡。
需求:创建一个独立消费者,消费artisan主题中的数据
注意:在消费者API代码中必须配置消费者组id。
package com.artisan.pc;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;
/**
* @author 小工匠
* @version 1.0
* @mark: show me the code , change the world
*/
public class CustomConsumer {
public static void main(String[] args) {
// 1.创建消费者的配置对象
Properties properties = new Properties();
// 2.给消费者配置对象添加参数
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.126.171:9092");
// 配置序列化 必须
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 配置消费者组 必须
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "artisan-group");
// 3. 创建消费者对象
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
// 4. 订阅主题
ArrayList<String> topics = new ArrayList<>();
topics.add("artisan");
consumer.subscribe(topics);
// 5. 拉取数据打印
while (true) {
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
// 6. 遍历并输出消费到的数据
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord);
}
}
}
}
① 在IDEA中执行消费者程序 ② 服务器上中创建kafka生产者,并输入数据
③ 在IDEA中观察接收到的数据
ConsumerRecord(topic = artisan, partition = 2, leaderEpoch = 0, offset = 34, CreateTime = 1698630425187, serialized key size = -1, serialized value size = 13, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = first message)
ConsumerRecord(topic = artisan, partition = 2, leaderEpoch = 0, offset = 35, CreateTime = 1698630429909, serialized key size = -1, serialized value size = 15, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = seconde message)
参数名称 | 描述 |
---|---|
bootstrap.servers | 向Kafka集群建立初始连接用到的host/port列表。 |
key.deserializer | 指定接收消息的key的反序列化类型。需要写全类名。 |
value.deserializer | 指定接收消息的value的反序列化类型。需要写全类名。 |
group.id | 标记消费者所属的消费者组。 |
enable.auto.commit | 默认值为true,消费者会自动周期性地向服务器提交偏移量。 |
auto.commit.interval.ms | 若enable.auto.commit=true,表示消费者提交偏移量的频率,默认为5秒。 |
auto.offset.reset | 当Kafka中没有初始偏移量或当前偏移量在服务器中不存在时的处理方式。可选值包括"earliest"、“latest”、“none”、 |
offsets.topic.num.partitions | __consumer_offsets的分区数,默认是50个分区。 |
heartbeat.interval.ms | Kafka消费者和coordinator之间的心跳时间,默认为3秒。必须小于session.timeout.ms,也不应该高于session.timeout.ms的1/3。 |
session.timeout.ms | Kafka消费者和coordinator之间连接超时时间,默认为45秒。超过该值,消费者被移除,消费者组执行再平衡。 |
max.poll.interval.ms | 消费者处理消息的最大时长,默认为5分钟。超过该值,消费者被移除,消费者组执行再平衡。 |
fetch.min.bytes | 消费者获取服务器端一批消息最小的字节数,默认为1个字节。 |
fetch.max.wait.ms | 默认为500毫秒。如果没有从服务器端获取到一批数据的最小字节数,等待时间到,仍然会返回数据。 |
fetch.max.bytes | 默认为52428800(50兆字节)。消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值,仍然可以拉取回这批数据,这不是一个绝对最大值,一批次的大小受message.max.bytes(broker配置)或max.message.bytes(topic配置)影响。 |
max.poll.records | 一次poll拉取数据返回消息的最大条数,默认为500条。 |