温馨提示:整个 Kafka 专栏基于 kafka-2.2.1 版本。
根据 KafkaConsumer 类上的注释上来看 KafkaConsumer 具有如下特征:
对于消息处理时间不可预测的情况下上述两个参数可能不够用,那将如何是好呢? 通常的建议将消息拉取与消息消费分开,一个线程负责 poll 消息,处理这些消息使用另外的线程,这里就需要手动提交消费进度。为了控制消息拉起的过快,您可能会需要用到 Consumer#pause(Collection) 方法,暂时停止向该分区拉起消息。RocketMQ 的推模式就是采用了这种策略。如果大家有兴趣的话,可以从笔者所著的《RocketMQ技术内幕》一书中详细了解。
public static void testConsumer1() {
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092,localhost:9082,localhost:9072");
props.setProperty("group.id", "C_ODS_ORDERCONSUME_01");
props.setProperty("enable.auto.commit", "true");
props.setProperty("auto.commit.interval.ms", "1000");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("TOPIC_ORDER"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.println("消息消费中");
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
public static void testConsumer2() {
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");
props.setProperty("enable.auto.commit", "false");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
final int minBatchSize = 200;
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
buffer.add(record);
}
if (buffer.size() >= minBatchSize) {
// insertIntoDb(buffer);
// 省略处理逻辑
consumer.commitSync();
buffer.clear();
}
}
}
要认识 Kafka 的消费者,个人认为最好的办法就是从它的类图着手,下面给出 Consumer 接口的类图。
接下来对起重点方法进行一个初步的介绍,从下篇文章开始将对其进行详细设计。
接下来笔者根据其构造函数,对一一介绍其核心属性的含义,为接下来讲解其核心方法打下基础。