我有个问题。我无法找到通过过滤kafka文档的键来创建流的方法。
我希望过滤和操作kafka键的json,以检索以下示例的payload,该示例对应于我的couchbase id:
ksql>打印cb_bench_products-get_purge‘限值1;
Key format: JSON or HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 2022/03/04 10:49:43.643 Z, key: {
我在我的项目中定义了以下集成流程
///
public IntegrationFlow acarsEventFlow() {
return IntegrationFlows
//.from(Jms.messageDrivenChannelAdapter(this.acarsMqListener)) //Get Message from MQ
.from(org.springframework.integration.jms.dsl.Jms.messageDrivenChannelAdapter(
o
当使用Kafka流DSL时,是否有一种方法可以使用相同的主题作为两个不同处理例程的源?
StreamsBuilder streamsBuilder = new StreamsBuilder();
// use the topic as a stream
streamsBuilder.stream("topic")...
// use the same topic as a source for KTable
streamsBuilder.table("topic")...
return streamsBuilder.build();
上面的简单实现在运行时
Spring boot Apache Camel-Java DSL应用程序从Kafka主题读取消息。 @Component
public class KafkaTopicService extends RouteBilder {
public void configure(){
from("kafka:myTopic?brokers=localhost:9092")
.log("Message received from Kafka: ${body}")}
} 如果我阻止卡夫卡我会得到org.apache.kafk
当源主题分区计数= 1时工作正常。如果我将分区增加到任何大于1的值,我会看到下面的错误。既适用于低级,也适用于DSL API。有什么建议吗?可能会遗漏什么?
org.apache.kafka.streams.errors.StreamsException: stream-thread [StreamThread-1] Failed to rebalance
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:410)
at org.apach