@Configuration
public class KafkaConfiguration {
@Value("${kafka.boot.server}")
private String kafkaServer;
@Bean
public KafkaTemplate<String,String> kafkaTemplate(){
return new KafkaTemplate<>(producerConfig());}
@Bean
public ProducerFactory<S
我想使用一个单独的生成器将JSON对象编写到多个主题。
下面的代码正在做我想做的事情,但是使用setDefaultTopic()方法告诉KafkaTemplate它应该向哪个主题发送消息是错误的。
如果我使用send(String topic, ? payload)方法,StringJsonMessageConverter就不能工作。
我的制片人:
public class MyProducer {
@Autowired
private KafkaTemplate<String, ?> kafka;
public void send(String top
我似乎无法在其具体的avro实现中使用消息,我得到了以下例外:
class org.apache.avro.generic.GenericData$Record cannot be cast to class my.package.MyConcreteClass
下面是代码(我使用Spring Boot)
MyProducer.java
private final KafkaTemplate<String, MyConcreteClass> kafkaTemplate;
public PositionProducer(KafkaTemplate<String, MyConc
我有一个spring引导应用程序,它让使用者从一个集群中的主题消费,然后在不同的集群中生成另一个主题。
现在我正在尝试用spring编写集成测试用例,但是有一个KafkaTemplate could not be registered. A bean with that name has already been defined in class path resource问题
消费者级
@Service
public class KafkaConsumerService {
@Autowired
private KafkaProducerService kafkaProducerServi
当我试图从另一个发送者的ListenableFutureCallback中发送消息时,我遇到了生产者超时,但如果我同步等待结果并且不使用回调,就不会发生这种情况。在几条消息之后,可以看到如下错误: ...[kafka-producer-network-thread | producer-1] ERROR o.s.k.s.LoggingProducerListener - Exception thrown when sending a message with key='null' and payload='...' to topic ...: org.apac
可以从kstream应用读取我在kafkaProducer应用中设置的Kafka消息头吗?我的KafkaProducer看起来像这样;我已经在消息中设置了头 public class Producer {
private final org.slf4j.Logger log = LoggerFactory.getLogger(Producer.class);
@Value("${topic.name}")
private String TOPIC;
private final KafkaTemplate<Integer, testEvent> kafkaTemp
我有一个配置:
@Configuration
@EnableKafka
public class ConsumerConfig {
final DlqErrorHandler dlqErrorHandler;
public ConsumerConfig(DlqErrorHandler dlqErrorHandler) {
this.dlqErrorHandler = dlqErrorHandler;
}
@Bean
public ConsumerFactory<String, String> consumerFactor
我使用spring (2.1.6.RELEASE)和spring (2.2.7.RELEASE),并使用KafkaTemplate向我的kafka集群发送消息。但有时(通常当我重新启动kafka-broker或进行再平衡时),我在发送消息时会看到这样的错误:
org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition.
由于默认的卡夫卡制片人吐露,我希望发送失败将被重新尝试,但他们不是。拖欠卡夫卡制片人吐露:
retri