我有一个带有@KafkaListener方法的spring组件: @Slf4j
@Component
public class ResponseHandler {
private final ResponseMessageService responseMessageService;
public ResponseHandler(ResponseMessageService responseMessageService) {
this.responseMessageService= responseMessageService;
}
@K
春季卡夫卡版本- 2.8.5
```@KafkaListener
consumeMessages(@Payload List<String> messages,Ack ack)
{
//process records in separate Thread and acknowledge
// Thread is not available nack it after 2 seconds
}```
Nack记录应在KafkaListener中的2秒后重新处理。但是,跳过的记录不是由KafkaListener处理的。重新启动Spring应用程序
我试图根据事件的类型为kafka事件创建多个实现类。
public class KafkaListener {
@Autowired
Service service;
@KafkaListener(topics = ("mytopic"), containerFactory = "kafkaListenerContainerFactory")
public void consumeSource(Object event) {
service.process(event);
}
}
public inte
我有这样的服务(全在科特林):
@Service
class MyService {
fun getSomeString(): String = "test"
}
这个集成测试类:
@RunWith(SpringRunner::class)
@SpringBootTest
@EmbeddedKafka // used on some kafka tests
class BasicTests {
和方法:
@Test
fun `test method count`() {
// here I have a kafka producer sending a messa
我需要在运行时创建Kafka侦听器,一切看起来都正常,除了消息转换器属性似乎被忽略(或者这是一个设计的特性,或者我做错了什么)。
当使用@KafkaListener时,它是正确的,但是当手动创建侦听器时,我的消息没有转换为所需的对象,因此我得到了一个错误:
Caused by: java.lang.ClassCastException: class java.lang.String cannot be cast to class com.my.company.model.MyPojo (java.lang.String is in module java.base of loader '
只有当标志设置为true时,我才试图启动我的KafkaListener。
@Component
public class KafkaTopicConsumer {
//Somehow wrap the listener to only start when a property value is set to true
@KafkaListener(topics = "#{@consumerTopic}", groupId = "#{@groupName}")
public void consumeMessage(ConsumerRecord<Stri
Spring Kafka是否有可能在保证顺序的情况下从不同的消费者中读取两个不同的主题?例如,主题A存储与确定如何处理存储在主题B中的数据相关的信息。主题A被读取到内存中并被引用,但在从主题B读取数据之前需要完全填充主题A。下面是我当前设置的示例... @Service
public class TopicA {
@KafkaListener(topics = "topicA")
public void consume(ConsumerRecord<String, byte[]> record) {
// ... some code here to
我们使用Spring和Spring (所有最新版本)一起使用。我们将对Kafka消息的处理转换为@KafkaHandler注释的方法,并期望@Valid/@Validated与@Payload一起确保有效负载验证,但这没有发生。此功能适用于@KafkaListener,如果它也适用于@KafkaHandler
@KafkaListener(...)
@Component
public class NotificationListener {
@KafkaHandler
public void handleV1(@Payload @Valid NotificationV1 no
我正在尝试查看当使用者使用来自Kafka主题的消息时,是否调用了来自Service类的方法,但我得到的错误是与Mock没有零交互。当测试运行时,它使用该消息,并且我可以在终端上看到服务方法被实际调用(我尝试打印),但它没有通过测试。
我的Consumer类:
@Component
public class Consumer {
@Autowired
private Service service;
@KafkaListener(topics = "topic")
public void consume(String message) {