这是我第一次使用卡夫卡。我有一个spring引导应用程序,我正在使用kafka主题的消息,并将消息存储在DB中。我有一个处理DB fail的要求,如果DB关闭,则不应该提交该消息,并在一段时间后暂停消费消息,然后监听器可以再次开始使用消息。做这件事的更好的方法是什么。
i am using spring-kafka:2.2.8.RELEASE which is internally using kafka 2.0.1
Spring Kafka - How to Retry with @KafkaListener spring-kafka SeekToCurrentErrorHandler如果设置为spring.kafka.listener.ack-mode=time,是否会重试?或者在指定的确认模式下重试工作,例如手动MANUAL_IMMEDIATE。 @Value("${${sync.kafka.header.source.id}}")
private String headerSourceId;
@Value("${sync.kafka.from.id}")
priv
我有一个问题,反序列化来自卡夫卡主题的信息。这些消息已经使用spring云流和Apache序列化。我正在使用Spring阅读它们,并试图对它们进行反序列化。如果我同时使用spring来生成和使用消息,那么我可以很好地反序列化这些消息。问题是当我用Spring消费它们,然后尝试反序列化。
我使用的是模式注册表(用于开发的spring架构注册表,以及生产中的合流模式),但是反序列化问题似乎发生在调用Schema的事件之前。
很难将所有相关代码都发布在这个问题上,所以我已经将其发布在git中心的回购程序中:。
我在这个主题上发送的对象只是一个简单的pojo:
@Data
public class R
目的是消费卡夫卡的记录,并重定向到Azure服务巴士。可以配置kafka消费者,并在调试时确认其在每次轮询kafka主题时消耗数千条消息。
另一方面,Azure服务总线将始终发布一条消息。因此,当集成运行时,kafka日志将显示接收到数千条消息,然后Azure服务总线日志将迭代每个消息,每次发送一个消息到队列。这需要几分钟的迭代,大大减缓了进程的速度。
组件写到,批量发送的设置是默认的,但是对于如何实现这个设置还不太清楚。
public class SampleKafkaConsumer extends RouteBuilder {
@Override
public void conf