Kafka 中默认的消费位移的提交方式为自动提交,这个由消费者客户端参数 enable.auto.commit 配置,默认值为 true 。这个默认的自动提交不是每消费一条消息就提交一次,而是定期提交,这个定期的周期时间由客户端 auto.commit.interval.ms 配置,默认值为 5 秒,此参数生效的前提是 enable.auto.commit 参数为 true;
在默认的配置下,消费者每隔 5 秒会将拉取到的每个分区中最大的消息位移进行提交。自动位移提交的动作是在 poll() 方法的逻辑里完成的,在每次真正向服务端发起拉取请求之前会检查是否可以进行位移提交,如果可以,那么就会提交上一次轮询的位移;每过5秒就会提交偏移量,但是在4秒发生了分区在均衡,偏移量还没来得及提交,他们这四秒的消息就会被重复消费;
当设置 enable.auto.commit 为 true,Kafka 会保证在开始调用 poll 方法时,提交上次 poll 返回的所有消息。从顺序上来说,poll 方法的逻辑是先提交上一批消息的位移,再处理下一批消息,因此它能保证不出现消费丢失的情况;
自动提交消费位移的方式并没有为开发者留有余地来处理重复消费和消息丢失的问题,无法做到精确的位移管理;kafka提供了手动位移提交的方式,这样就可以使得开发人员对消费位移的管理控制更加灵活,开启手动提交功能的前提是消费者客户端参数enable.auto.commit配置为false;
手动提交又分为同步提交和异步提交,对应于KafkaConsumer中的commitSync()和commitAsync()两种类型的方法;
auto.commit. offset = false:使用commitsync()提交poll()返回最新偏移量;
注意:
public class CommitSync {
public static void main(String[] args) {
/**消息消费者*/
Properties properties = new Properties();
properties.put("bootstrap.servers","127.0.0.1:9092");
properties.put("key.deserializer", StringDeserializer.class);
properties.put("value.deserializer", StringDeserializer.class);
/*取消自动提交*/
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>
(properties);
try {
consumer.subscribe(Collections.singletonList("simple"));
while(true){
ConsumerRecords<String, String> records = consumer.poll(500);
for(ConsumerRecord<String, String> record:records){
System.out.println(String.format(
"主题:%s,分区:%d,偏移量:%d,key:%s,value:%s",
record.topic(),record.partition(),record.offset(),
record.key(),record.value()));
//自定义业务逻辑
}
//开始事务
//读业务写数据库-
//偏移量写入数据库
//提交
consumer.commitSync();
}
} finally {
consumer.close();
}
}
}
注意:
public class CommitAsync {
public static void main(String[] args) {
/**消息消费者*/
Properties properties = new Properties();
properties.put("bootstrap.servers","127.0.0.1:9092");
properties.put("key.deserializer", StringDeserializer.class);
properties.put("value.deserializer", StringDeserializer.class);
/**取消自动提交*/
properties.put("enable.auto.commit",false);
KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(
properties);
try {
consumer.subscribe(Collections.singletonList("simple"));
while(true){
ConsumerRecords<String, String> records = consumer.poll(500);
for(ConsumerRecord<String, String> record:records){
System.out.println(String.format("主题:%s,分区:%d,偏移量:%d,key:
%s,value:%s",
record.topic(),record.partition(),record.offset(),
record.key(),record.value()));
//自定义业务逻辑开发
}
//消费完毕,同步提交
consumer.commitAsync();
}
} finally {
consumer.close();
}
}
}
中间处理消息的时候,即使偶尔出现一次偏移量提交失败,后面消费的时候,偏移量也能够提交成功,所以不会有大影响;但是到了最后消费者要关闭了的时候,偏移量一定要提交成功;因此在消费者关闭前一般会组合使用 commitAsync()和commitsync() ,同步一定会提交成功,异步可能会失败;
public class SyncAndAsync {
public static void main(String[] args) {
/**消息消费者*/
Properties properties = new Properties();
properties.put("bootstrap.servers","127.0.0.1:9092");
properties.put("key.deserializer", StringDeserializer.class);
properties.put("value.deserializer", StringDeserializer.class);
/*取消自动提交*/
properties.put("enable.auto.commit",false);
KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
try {
consumer.subscribe(Collections.singletonList("simple"));
while(true){
ConsumerRecords<String, String> records = consumer.poll(500);
for(ConsumerRecord<String, String> record:records){
System.out.println(String.format(
"主题:%s,分区:%d,偏移量:%d,key:%s,value:%s",
record.topic(),record.partition(),record.offset(),
record.key(),record.value()));
//自定义工作
}
//避免等待,异步提交
consumer.commitAsync();
}
} catch (CommitFailedException e) {
System.out.println("提交失败");
e.printStackTrace();
} finally {
try {
//最后一次提交,确保成功,同步提交
consumer.commitSync();
} finally {
consumer.close();
}
}
}
}