Kafka 是一个高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据,如网页浏览、搜索和其他用户的行动。Spring Boot 是一个开源的轻量级框架,专为简化 Spring 应用的创建和开发而设计。结合 Kafka 和 Spring Boot 可以实现高效的消息驱动应用程序。
Kafka:
Spring Boot:
问题: Kafka消费者无法读取消息。
原因: 可能是消费者组偏移量未提交,或者消费者配置不正确。
解决方法: 确保消费者的偏移量已提交,检查消费者的配置,如bootstrap-servers
, group-id
, key-deserializer
, value-deserializer
等。
示例代码:
@SpringBootApplication
public class KafkaSpringBootApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaSpringBootApplication.class, args);
}
}
@RestController
public class KafkaController {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@PostMapping("/send")
public String sendMessage(@RequestParam("message") String message) {
kafkaTemplate.send("test-topic", message);
return "Message sent";
}
}
@KafkaListener(topics = "test-topic", groupId = "group-id")
public void listen(String message) {
System.out.println("Received message: " + message);
}
在application.properties
中配置Kafka:
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=group-id
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
确保Kafka服务正在运行,并且test-topic
已经创建。这样就可以实现基本的消息发送和接收功能。
没有搜到相关的沙龙
领取专属 10元无门槛券
手把手带您无忧上云