Kafka是一个分布式流处理平台,广泛用于构建实时数据管道和流应用。Spring Boot是一个用于简化Spring应用初始搭建以及开发过程的框架。在Spring Boot中使用Kafka,通常会涉及到Kafka消费者的配置和使用。
@KafkaListener(topics = "myTopic", groupId = "myGroup")
public void listen(String message) {
// 并行处理消息
CompletableFuture.runAsync(() -> processMessage(message));
}
props.put("enable.auto.commit", "false");
props.put("auto.offset.reset", "earliest");
@Transactional
public void processMessage(String message) {
// 幂等性处理逻辑
}
以下是一个简单的Spring Boot应用中使用Kafka消费者的示例:
@SpringBootApplication
public class KafkaConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaConsumerApplication.class, args);
}
@KafkaListener(topics = "testTopic", groupId = "group-id")
public void listen(ConsumerRecord<String, String> record) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
在application.properties
中配置Kafka消费者属性:
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=my-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
通过以上配置和代码,可以实现一个基本的Kafka消费者,并处理常见的异常情况。
领取专属 10元无门槛券
手把手带您无忧上云