我对有个问题。问题是,我有一个bean,它一创建就会写到Kafka (带有@PostConstruct注释的方法),因此我自动创建了适当的MessageChannel,并在application.yml中设置了目的地和绑定器属性。事情是这样的:
@Component
@RequiredArgsConstructor
public class Sender
{
private final MessageChannel output;
@PostConstruct
public void start()
{
output.send(new Gener
我正在尝试使用框架构建一个简单的Kafka流应用程序。我可以连接到流来推送原始数据进行处理。但是,当我试图处理按键计算事件的流时,我在运行应用程序时会得到Serde class not found: org.apache.kafka.common.serialization.Serde$StringSerde异常。我签入了包含库的项目,我可以找到Serde类,它没有丢失。我不知道为什么在运行时它没有被加载!
下面是我的源文件。
com.pgp.learn.kafka.analytics.AnalyticsApplication
package com.pgp.learn.kafka.analy
我正在为应用程序编写测试。这有一个来自topicA的topicA读数。在测试中,我使用KafkaTemplate发布消息,并等待KStream日志的出现。
测试引发以下异常:
java.lang.ClassCastException: class com.sun.proxy.$Proxy143 cannot be cast to class org.springframework.messaging.MessageChannel (com.sun.proxy.$Proxy143 and org.springframework.messaging.MessageChannel are in un
我有一个使用Spring和的服务。这个服务产生一个特定的主题,同时也使用这个主题。当我第一次启动服务并且这个主题在Kafka中不存在时,会引发以下异常:
java.lang.IllegalStateException: The number of expected partitions was: 100, but 3 have been found instead
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner$2.doWithRetry(Ka
当我尝试使用Spring Cloud Stream构建Gradle Spring Boot项目时,收到以下错误: Execution failed for task ':compileTestJava'.
> Could not resolve all files for configuration ':testCompileClasspath'.
> Could not find org.springframework.cloud:spring-cloud-stream:test-binder.
Required by:
我不知道如何用kafka做一个样本测试,我试着按照弹簧指南做,但不起作用。
有人能帮我吗?
zzzzz zz z z z
@RunWith(SpringRunner.class)
@SpringBootTest
@DirtiesContext
public class EnrollSenderTest {
@Autowired
public EnrollSender producer;
@Autowired
private BinderFactory<MessageChannel> binderFactory;
@Autowired
private MessageColle
我试图使用在Kafka流上运行交互式查询。我坚持从InteractiveQueryService检索密钥值存储。在我的代码上,甚至在示例代码上,我总是会遇到相同的错误:
java.lang.IllegalStateException: Error retrieving state store: prod-id-count-store
at org.springframework.cloud.stream.binder.kafka.streams.InteractiveQueryService.lambda$getQueryableStore$0(InteractiveQuerySer
我试图创建一个简单的程序,从一个卡夫卡主题打印一个Kstream。我不断地得到一个NPE和完全没有想法。
我使用了spring云流绑定卡夫卡流依赖关系,我正在使用最新版本的spring "Finchley.M9“。
我所写的代码是:
@Component
@EnableBinding(KafkaStreamsProcessor.class)
public class EventListener{
@StreamListener("input")
public void listen(KStream<String,String> kstrea
我有一个使用SpringBoot2.0.3的Spring应用程序。在Finchley.SR1下,libraries和Spring库。具有以下特性:
spring:
kafka:
ssl:
protocol: SASL_SSL
keystore-location: classpath:kafka.p12 // created from a PEM with private key and certificate
keystore-password: passw0rd
key-store-type: PKCS12
cloud:
我打算为Kafka消费者弹性设置多个绑定。更具体地说,备份侦听器具有与主侦听器期望代理IP相同的目的地和组。备份侦听器的autoStartUp在启动时被关闭,但在发生故障转移时将被打开编程。
但是,微服务在启动时会抛出以下异常:
org.springframework.cloud.stream.binder.BinderException: Exception thrown while starting consumer:
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindCon
我的问题非常接近Print Kafka Stream Input out to console?,但有一点不同:我使用的是StreamsBuilder而不是Kstreambuilder。(不幸的是,由于404错误,包含该分辨率的git不可用。) 我的听众是: @StreamListener(LoansStreams.INPUT)
public void handleLoans(@Payload Loans loans) {
final Serde<String> stringSerde = Serdes.String();
S