我正在使用教程尝试基本的spring引导卡夫卡应用程序。
我的项目层次如下:
SpringBootKafkaApplication.java
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class SpringBootKafkaApplication{
public static void main(String[] ar
我想使用avro消息并反序列化它们,而不使用Confluent模式注册表。我在本地有这个模式。因此,我只遵循了这个面向消费者的https://medium.com/@mailshine/apache-avro-quick-example-in-kafka-7b2909396c02部分。现在,我想在application.properties文件中配置这个反序列化程序( Spring boot的方式)。我尝试添加spring.kafka.consumer.value-deserializer=com.example.AvroDeserializer,但结果出现错误:“找不到com.exampl
我正在尝试创建一个spring boot应用程序(Java),它必须能够通过给出偏移量和分区来删除kafka主题中的消息。我一直在研究可以做到这一点的java或spring boot包类,但我只发现了这样的东西:Delete Messages from a Topic in Apache Kafka有一个java kafka客户端,它有一个方法可以删除偏移前的所有消息,但我只有一个方法可以删除一个。这有可能吗? 提前感谢
我见过这样的例子,我们有一个java configuration类,我们定义多个KafkaListenerContainer并将所需的containerType传递给@kafkaListener。但我正在探索是否有任何方法可以通过appication.yml/properties使用Spring Boot auto Kafka配置来实现同样的目的。
我正在尝试使用框架构建一个简单的Kafka流应用程序。我可以连接到流来推送原始数据进行处理。但是,当我试图处理按键计算事件的流时,我在运行应用程序时会得到Serde class not found: org.apache.kafka.common.serialization.Serde$StringSerde异常。我签入了包含库的项目,我可以找到Serde类,它没有丢失。我不知道为什么在运行时它没有被加载!
下面是我的源文件。
com.pgp.learn.kafka.analytics.AnalyticsApplication
package com.pgp.learn.kafka.analy
我正在使用Spring和Kafka处理一个小批处理,该批处理从Kafka主题读取json数据,将其转换为sends对象,更改值并将其发送回Kafka主题。一切都很好,但我唯一的问题是,我的消费者总是在阅读这个话题的乞求。我需要它从最后一条未消耗的信息中读出来。我已经添加了这些属性:
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to earliest
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to false
ConsumerConfig.GROUP_ID_CONFIG to a random value
但这似乎不起作