这是我第一次使用卡夫卡。我有一个spring引导应用程序,我正在使用kafka主题的消息,并将消息存储在DB中。我有一个处理DB fail的要求,如果DB关闭,则不应该提交该消息,并在一段时间后暂停消费消息,然后监听器可以再次开始使用消息。做这件事的更好的方法是什么。
i am using spring-kafka:2.2.8.RELEASE which is internally using kafka 2.0.1
我使用spring kafka 2.x来读取来自kafka的消息。 我的问题是,如果我的方法在我的kafkalistener方法中花了一段时间,kafka发送相同的消息两次,这对me.What来说是问题,如果我从kafka读取消息的时间,kafka不会发送此消息的第二时间值,如max.poll.interval.ms not quarantee to read message at once.What是实现一次性读取策略的正确方法在春天boot.In我的消息我没有键,因此我无法放置控件。 @KafkaListener(topics = "mytopics",groupId =
我有一个使用Spring实现的Kafka消费者:
public class MyKafkaConsumer<T> implements AcknowledgingMessageListener<String, T> {
private final ConcurrentMessageListenerContainer<String, T> container;
public MyKafkaConsumer(ConsumerFactory<String, T> consumerFactoryry) {
Container
在我们的项目中,我们需要通过许多线程定期从第三方获取数据,然后将这些数据推送给Kafka。如果目前无法使用Kafka服务器,则应该终止流,在下一次预定执行过程中应该丢失并重新获取获取的数据。此外,还需要使用事务管理,因为我们需要将这批消息发送给Kafka到不同的主题。如果没有发送一条消息,则所有其他消息都应该回滚。
如果Kafka Server不可用,我们就遇到了终止执行的问题。当事务管理被禁用时,一切正常工作,我们
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms
我正在尝试在Docker和Spring上配置Kafka。
spring应用程序负责生成消息。Docker应该使用它们。我已经准备好了容器,我正在尝试连接。
不幸的是,我得到一个错误:
[Producer clientId=1-1] 1 partitions have leader brokers without a matching listener, including [myTopic-0]
我的docker-component
version: '3'
services:
zookeeper:
container_name: zookeeper
ima