我希望测试中的生产者等待测试类中的消费者通过调用Acknowledgement.acknowledge()来确认。我的使用者使用以下属性初始化:
spring:
kafka:
consumer:
group-id: test-group
bootstrap-servers: ${spring.embedded.kafka.brokers}
enable-auto-commit: false
auto-offset-reset: earliest
max-poll-records: 1我的kafkaListenerContainerFactory初始化如下:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, MessageType> kafkaListenerContainerFactory(
ConsumerFactory<String, MessageType> consumerFactory)
{
ConcurrentKafkaListenerContainerFactory<String, MessageType> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.setErrorHandler(new LoggingErrorHandler());
factory.setConcurrency(numListeners);
return factory;
}我还将我的生产者配置中的acks设置为all。尽管我在等待未来的回归,但制片人似乎并没有阻止我。
producer.send(new ProducerRecord<Integer, String>(TOPIC, key, value)).get();如何在不添加某种sleep的情况下让测试中的生产者阻塞直到使用者确认
发布于 2022-11-21 13:57:52
您不能;生产者和消费者是独立的;acks只是承认代理已经收到并保护了记录;它与消费者无关。
你需要你自己的逻辑,让消费者告诉生产者,它已经收到了记录。
https://stackoverflow.com/questions/74497483
复制相似问题