有奖:语音产品征文挑战赛火热进行中> HOT

背景

TDMQ CKafka 是一个分布式流处理平台,用于构建实时数据管道和流式应用程序。它提供了高吞吐量、低延迟、可伸缩性和容错性等特性。
Kafka Clients:Kafka 自带的客户端,通过 Java 实现,是 Kafka 生产和消费标准协议的客户端。
本文着重介绍上述 Java 客户端的关键参数,最佳实践以及常见问题。

生产者实践

版本选择

Kafka 客户端和集群之间的兼容性非常重要,通常情况下,较新版本的客户端可以兼容较旧版本的集群,但反之则不一定成立。一般情况下,CKafka实例 Broker 在部署后是明确的,因此可以直接根据 Broker 的版本选择相匹配的客户端的版本。
Java 生态中,广泛使用 Spring Kafka,其中 Spring Kafka 版本和 Kafka Broker 版本的对应关系,可以参见 Spring 官方网址的版本对应关系
生产者参数与调优

生产者参数

在使用 Kafka Client 客户端写入 Kafka 时候,需要配置如下关键参数,相关的参数和默认值如下:
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class KafkaProducerExample {

private static final String BOOTSTRAP_SERVERS = "localhost:9092";
private static final String TOPIC = "test-topic";

public static void main(String[] args) throws ExecutionException, InterruptedException {
// 创建Kafka生产者配置
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); //Kafka集群的地址列表,格式为host1:port1,host2:port2。生产者会使用这个列表来找到集群并建立连接。
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

// 设置生产者关键参数以及默认值
props.put(ProducerConfig.ACKS_CONFIG, "1");//acks,默认值为1,消息确认的级别。0表示不等待确认;1表示等待Leader副本写入;all或者-1表示等待所有副本写入。
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);//batch.size,批量发送的大小,单位为字节。生产者会将多个消息打包成一个批次发送,以提高性能。默认大小16384字节。
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);//buffer.memory,生产者用于缓存待发送消息的内存大小,单位为字节。默认33554432,也就是32MB
props.put(ProducerConfig.CLIENT_ID_CONFIG, "");//client.id,客户端ID。这个ID可以用于在服务端日志中识别消息来源。
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "none");//compression.type消息压缩类型。默认none不压缩,可选值为none、gzip、snappy、lz4、zstd。
props.put(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, 540000);//connections.max.idle.ms连接的最大空闲时间,单位为毫秒。超过这个时间的空闲连接会被关闭。默认540s
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120000);//delivery.timeout.ms消息的最大投递时间,单位为毫秒。超过这个时间的未确认消息会被认为发送失败。默认120s
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, false);//enable.idempotence,是否启用幂等性。如果启用,生产者会确保每个消息只被发送一次,即使在网络错误或重试的情况下。
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, "");//interceptor.classes拦截器类列表。生产者会在发送消息前后调用这些拦截器。
props.put(ProducerConfig.LINGER_MS_CONFIG, 0);//linger.ms延迟发送的时间,单位为毫秒。生产者会等待一段时间以便将更多消息打包成一个批次发送。
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 60000);//max.block.ms,生产者在获取元数据或缓存空间时的最大阻塞时间,单位为毫秒。
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);//max.in.flight.requests.per.connection,每个连接上的最大未确认请求数。
props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1048576);//max.request.size,请求的最大大小,单位为字节
props.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, 300000);//metadata.max.age.ms元数据的最大寿命,单位为毫秒。超过这个时间的元数据会被刷新。
props.put(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, "");//metric.reporters度量报告器类列表。生产者会使用这些报告器来报告度量信息。
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "org.apache.kafka.clients.producer.RoundRobinPartitioner");//partitioner.class分区器类。生产者会使用这个分区器来决定每个消息发送到哪个分区。
props.put(ProducerConfig.RECEIVE_BUFFER_CONFIG, 32768);//receive.buffer.bytes接收缓冲区的大小,单位为字节。
props.put(ProducerConfig.SEND_BUFFER_CONFIG, 131072);//send.buffer.bytes发送缓冲区的大小,单位为字节。
props.put(ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, 1000);//reconnect.backoff.max.ms重连最大间隔时间,单位为毫秒。
props.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, 50);//reconnect.backoff.ms重连间隔时间,单位毫秒
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);//request.timeout.ms请求的超时时间,单位为毫秒。
props.put(ProducerConfig.RETRIES_CONFIG, 2147483647);//retries发送失败时的重试次数。
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 100);//retry.backoff.ms重试的间隔时间,单位为毫秒。
props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 60000);//transaction.timeout.ms事务的超时时间,单位为毫秒
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, null);//transactional.id事务ID。如果设置了这个参数,生产者会启用事务功能。
props.put(ProducerConfig.CLIENT_DNS_LOOKUP_CONFIG, "default");//client.dns.lookupDNS查找策略。可选值为default、use_all_dns_ips、resolve_canonical_bootstrap_servers_only。

// 创建生产者
KafkaProducer<String, String> producer = new KafkaProducer<>(props);

// 发送消息
for (int i = 0; i < 100; i++) {
String key = "key-" + i;
String value = "value-" + i;

// 创建消息记录
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, key, value);

// 发送消息
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
System.out.println("消息发送成功:key=" + key + ", value=" + value);
} else {
System.err.println("消息发送失败:" + exception.getMessage());
}
}
});
}

// 关闭生产者
producer.close();
}
}

参数说明调优

关于 acks 参数优化

acks 参数用于控制生产者发送消息时的确认机制。该参数的默认值为1,表示消息发送给 Leader Broker 后,Leader 确认消息写入后即返回。acks参数还有以下可选值:
0: 不等待任何确认,直接返回。
1: 等待 Leader 副本确认写入后返回。
-1或者 all: 等待 Leader 副本以及相关的 Follower 副本确认写入后返回。
由上可知,在跨可用区场景,以及副本数较多的 Topic,acks 参数的取值会影响消息的可靠性和吞吐量。因此:
在一些在线业务消息的场景下,吞吐量要求不大,可以将 acks 参数设置为-1,确保消息被所有副本接收和确认后才返回,从而提高消息的可靠性,但是会牺牲写入吞吐和性能,时延会增加。
在日志采集等大数据或者离线计算的场景下,要求高吞吐(即每秒写入 Kafka 的数据量)的情况下,可以将 acks 设置为1,提高吞吐量。

关于 Batch 相关参数优化

默认情况下,传输同等数据量的情况下,多次请求和一次请求的网络传输,一次请求传输能有效减少相关计算和网络资源,提高整体写入的吞吐量。
因此,可以通过这个参数设置优化客户端发送消息的吞吐能力。在高吞吐场景下,可以配合计算和设置:
batch.size:默认16K。
linger.ms:默认为0,可以适当增加耗时,如设置100ms,尽可能聚合更多消息批量发送消息。
buffer.memory:默认32MB,对于大流量 Producer,在堆内存充足情况可以设置更大,如设置256MB。

关于事务参数优化


props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, false);//enable.idempotence,是否启用幂等性。如果启用,生产者会确保每个消息只被发送一次,即使在网络错误或重试的情况下。 //是否需要幂等,在事务场景下需要设置为true
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);//max.in.flight.requests.per.connection,每个连接上的最大未确认请求数。在事务场景不要超过5
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, null);//transactional.id事务ID。如果设置了这个参数,生产者会启用事务功能。
props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 60000);//transaction.timeout.ms事务的超时时间,单位为毫秒,可以适当延长事务时间。
需要强调,事务因为要保障消息的 exactly once 语义,因此会额外付出更多的计算资源。
对于事务场景,可是适当增加事务超时时间,容忍高吞吐场景下,写入延时带来的抖动。

关于压缩参数优化

Kafka Java Client 支持如下压缩参数:
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "none");//compression.type消息压缩类型。默认none不压缩,可选值为none、gzip、snappy、lz4、zstd。

目前支持以下几种压缩配置:
none:不使用压缩算法。
gzip:使用 GZIP 压缩算法。
snappy:使用 Snappy 压缩算法。
lz4:使用 LZ4 压缩算法。
zstd:使用 ZSTD 压缩算法。
要在 Kafka Java 客户端中使用压缩消息,需要在创建生产者时设置 compression.type 参数。例如,要使用 LZ4 压缩算法,可以将compression.type 设置为lz4。
Kafka 压缩消息是一种用计算换带宽的优化方式,虽然 Kafka 压缩消息的压缩和解压缩,发生在客户端,但是由于Broker 针对压缩消息存在校验行为会付出额外的计算成本,尤其是 gzip 压缩,服务端对该压缩消息校验的计算成本会非常大,在某种程度上可能会出现得不偿失的情况,因为计算的增加导致 Broker CPU 利用率很高,降低了其他请求的处理能力,导致整体性能更低。这种情况建议可以使用如下方式规避 Broker 的校验:
1. 在 Producer 端对消息数据独立压缩,生成压缩包数据:messageCompression,同时在消息的 key 存储压缩方式:
{"Compression","lz4"}
2. 在 Producer 端将 messageCompression 当成正常消息发送。
3. 在 Consumer 段读取消息 key,获取使用的压缩方式,独立进行解压缩。

创建生产者实例

如果应用程序需要更高的吞吐量,可以使用异步发送,以提高消息的发送速度。同时,可以使用批量发送消息的方式,以减少网络开销和 IO 消耗。如果应用程序需要更高的可靠性,可以使用同步发送,以确保消息发送成功。同时,可以使用 ACK 确认机制和事务机制,以确保消息的可靠性和一致性。具体的参数调优参考生产者参数与调优。

同步发送

在 Kafka Java Client 客户端中,同步发送的示例代码如下:
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class KafkaProducerSyncExample {

private static final String BOOTSTRAP_SERVERS = "localhost:9092";
private static final String TOPIC = "test-topic";

public static void main(String[] args) {
// 创建Kafka生产者配置
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

// 设置生产者参数
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 3);

// 创建生产者
KafkaProducer<String, String> producer = new KafkaProducer<>(props);

// 同步发送消息
for (int i = 0; i < 10; i++) {
String key = "sync-key-" + i;
String value = "sync-value-" + i;

// 创建消息记录
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, key, value);

try {
// 发送消息并等待结果
RecordMetadata metadata = producer.send(record).get();
System.out.println("同步发送成功:key=" + key + ", value=" + value);
} catch (InterruptedException | ExecutionException e) {
System.err.println("同步发送失败:" + e.getMessage());
}
}

// 关闭生产者
producer.close();
}
}

异步发送

异步发送:异步发送消息时不会阻塞当前线程,生产者的吞吐量较高,但是需要通过回调函数来处理消息结果,示例如下:

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class KafkaProducerAsyncExample {

private static final String BOOTSTRAP_SERVERS = "localhost:9092";
private static final String TOPIC = "test-topic";

public static void main(String[] args) {
// 创建Kafka生产者配置
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

// 设置生产者参数
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 3);

// 创建生产者
KafkaProducer<String, String> producer = new KafkaProducer<>(props);

// 异步发送消息
for (int i = 0; i < 10; i++) {
String key = "async-key-" + i;
String value = "async-value-" + i;

// 创建消息记录
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, key, value);

// 发送消息并设置回调函数
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
System.out.println("异步发送成功:key=" + key + ", value=" + value);
} else {
System.err.println("异步发送失败:" + exception.getMessage());
}
}
});
}

// 关闭生产者
producer.close();
}
}



消费者实践

消费者参数


import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class KafkaConsumerDemo {

public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // "bootstrap.servers",Kafka集群的地址,没有默认值
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // "key.deserializer",消息键的反序列化方式,没有默认值
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // "value.deserializer",消息值的反序列化方式,没有默认值
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); // "group.id",消费者组ID,没有默认值
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // "auto.offset.reset",位点不存在时的处理方式,"latest"表示从最新的消息开始消费,默认值为"latest"
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); // "enable.auto.commit",是否自动提交位点,默认值为"true"
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000"); // "auto.commit.interval.ms",自动提交位点的间隔时间,单位为毫秒,默认值为"5000"
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000"); // "session.timeout.ms",消费者组成员的会话超时时间,单位为毫秒,默认值为"10000"
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500"); // "max.poll.records",单次poll的最大消息数,默认值为"500"
properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "300000"); // "max.poll.interval.ms",两次poll操作间的最大允许间隔时间,单位为毫秒,默认值为"300000"
properties.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "1"); // "fetch.min.bytes",服务器返回的最小数据,如果设置大于1,服务器会等待直到累计的数据量大于这个值,默认值为"1"
properties.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, "52428800"); // "fetch.max.bytes",服务器返回的最大数据量,单位为字节,默认值为"52428800"
properties.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "500"); // "fetch.max.wait.ms",服务器等待满足fetch.min.bytes条件的最大时间,单位为毫秒,默认值为"500"
properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "3000"); // "heartbeat.interval.ms",心跳间隔时间,单位为毫秒,默认值为"3000"
properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "my-client-id"); // "client.id",客户端ID,没有默认值
properties.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "30000"); // "request.timeout.ms",客户端请求超时时间,单位为毫秒,默认值为"30000"

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList("test-topic"));

try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
} finally {
consumer.close();
}
}
}

参数调优

1. 在使用 Kafka 消费者时,我们可以通过调整一些参数来优化性能。以下是一些常见的参数调优方案:
fetch.min.bytes:如果不确定消息最低大小,这个参数建议设置为1,如果明确消息最小值,可以设置该值为最小消息大小。
max.poll.records:这个参数可以根据应用的处理能力进行调整。如果您的应用可以处理更多的记录,可以将这个参数设置为更大的值,以减少 poll操作的次数。
auto.commit.interval.ms:这个参数可以根据您的应用的需求进行调整,一般自动提交位点场景,建议保持默认值5000ms。注意,过于频繁的位点提交会影响性能,额外占用 Broker 的计算资源。
client.id:可以为每个消费者设置一个唯一的 ID,以便在监控和日志中区分不同的消费者。
以上是一些常见的参数调优方案,但具体的最佳设置可能会根据您的应用的特性和需求有所不同。在调优参数时,请记住始终进行性能测试,以确保您的设置可以达到预期的效果。
2. 对于 rebalance 时间频繁和消费线程阻塞问题,参考以下说明参数优化:
session.timeout.ms:v0.10.2之前的版本可适当提高该参数值,需要大于消费一批数据的时间,但不要超过30s,建议设置为25s;而v0.10.2及其之后的版本,保持默认值10s即可。
max.poll.records:降低该参数值,建议远远小于<单个线程每秒消费的条数> * <消费线程的个数> *<max.poll.interval.ms>的积。
max.poll.interval.ms:该值要大于<max.poll.records> / (<单个线程每秒消费的条数> * <消费线程的个数>)的值。

创建消费者实例

Kafka Java Client 提供订阅的模型创建消费者,其中在提交位点方面,提供手动提交位点和自动提交位点两种方式。

自动提交位点

自动提交位点:消费者在拉取消息后会自动提交位点,无需手动操作。这种方式的优点是简单易用,但是可能会导致消息重复消费或丢失。注意,自动提交位点时间间隔 auto.commit.interval.ms 不要设置太短,否则容易导致 Broker CPU 偏高,影响其他请求处理。

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class KafkaConsumerAutoCommitExample {

private static final String BOOTSTRAP_SERVERS = "localhost:9092";
private static final String TOPIC = "test-topic";
private static final String GROUP_ID = "test-group";

public static void main(String[] args) {
// 创建Kafka消费者配置
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

// 开启自动提交位点
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000); // 自动提交间隔,单位:5000毫秒

// 创建消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

// 订阅主题
consumer.subscribe(Collections.singletonList(TOPIC));

// 消费消息
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("消费消息:topic=%s, partition=%d, offset=%d, key=%s, value=%s%n",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
}
} finally {
// 关闭消费者
consumer.close();
}
}
}



手动提交位点

手动提交位点:消费者在处理完消息后需要手动提交位点。这种方式的优点是可以精确控制位点的提交,避免消息重复消费或丢失。但是需要注意,手动提交位点如果太频繁会导致 Broker CPU 很高,影响性能,随着消息量增加,CPU 消费会很高,影响正常 Broker 的其他功能,因此建议间隔一定消息提交位点。

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class KafkaConsumerManualCommitExample {

public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList("test-topic"));

int count = 0;
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
count++;
if (count % 10 == 0) {
consumer.commitSync();
System.out.println("Committed offsets.");
}
}
}
} finally {
consumer.close();
}
}
}

Assign 消费

Kafka Java Client 的 assign 消费模式允许消费者直接指定订阅的分区,而不是通过订阅主题来自动分配分区。这种模式适用于需要手动控制消费分区的场景,例如:为了实现特定的负载均衡策略,或者在某些情况下跳过某些分区。一般流程为使用 assign 方法来手动指定消费者消费的分区,通过seek 方法来设置开始消费的位点,然后执行消费逻辑,使用示例如下:

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class KafkaConsumerAssignAndSeekApp {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("enable.auto.commit", "false");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

String topic = "my-topic";
TopicPartition partition0 = new TopicPartition(topic, 0);
TopicPartition partition1 = new TopicPartition(topic, 1);
consumer.assign(Arrays.asList(partition0, partition1));

// 设置消费的起始位点
long startPosition0 = 10L;
long startPosition1 = 20L;
consumer.seek(partition0, startPosition0);
consumer.seek(partition1, startPosition1);

try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
consumer.commitSync(); // 手动提交位点
}
} finally {
consumer.close();
}
}
}

Kafka Java Client 生产消费常见问题

Kafka Java Producer 无法成功发送消息
首先排查 Kafka 集群的 IP 和端口能够正常连接,若不能请先解决通信问题。
其次检查是否正确配置接入点,版本是否和 Broker 版本匹配,可以参考最佳实践的发送 demo。