我有一个应用程序,其中我使用了Kstream-Kstream连接和Ktream-Ktable连接。我已经将输入源主题分区计数从4更新为16,应用程序停止并返回以下错误。
Could not create internal topics: Existing internal topic application-test-processor-KSTREAM-JOINTHIS-0000000009-store-changelog has invalid partitions. Expected: 16 Actual: 4. Use 'kafka.tools.StreamsResetter
我们正在使用MirrorMaker备份主题。我们注意到,与源集群相比,由Kafka应用程序创建的重新分区主题在目标集群中的大小似乎在不断增加。根据org.apache.kafka.streams.kstream.KStream#repartition()的文档,这是有意义的
Similar to auto-repartitioning, the topic will be created with infinite retention time and data will be automatically purged by Kafka Streams.
换句话说,由于在目标集群中没有运行Ka
当源主题分区计数= 1时工作正常。如果我将分区增加到任何大于1的值,我会看到下面的错误。既适用于低级,也适用于DSL API。有什么建议吗?可能会遗漏什么?
org.apache.kafka.streams.errors.StreamsException: stream-thread [StreamThread-1] Failed to rebalance
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:410)
at org.apach
定义了一个自定义存储,在自定义Transformer中使用(参考如下)。
public class KafkaStream {
public static void main(String[] args) {
StateStoreSupplier houseStore = Stores.create("HOUSE").withKeys(Serdes.String()).withValues(houseSerde).persistent().build();
KStreamBuilder kstreamBuilder = new KSt
在@StreamListener的过程方法中,我将学校KStream映射到person KStream,并通过.through()方法来填充一个主题"person“,然后在@StreamListener的另一个process1方法中生成一个KStream。
MianApplication.java
@SpringBootApplication
public class KafkaStreamsTableJoin {
public static void main(String[] args) {
SpringApplication.run(KafkaStrea
"log.retention.bytes“是我们用来保留主题消息日志的参数,我给出的值为1073741824。
我参考了卡夫卡文档,它说"log.retention.bytes“中给出的大小是每个分区,所以这意味着假设我使用的所有主题都有20个分区,那么根据文档,卡夫卡将保留的字节的总大小是20*1073741824。
但我需要的是清晰
Will Kafka retain 20*1073741824 bytes for all the topics?
(or)
Will Kafka retain 20*1073741824 bytes
我有一个复杂的Kafka应用程序,其中两个流在同一个流中完全有状态:
它使用Execution主题作为源,增强消息并将其重新发布回相同的Execution主题。it加入另一个主题WorkerTaskResult,将结果添加到Execution并发布回Execution主题.
主要目标是提供一个工作流系统。
去尾逻辑是:
the an 是一个 Execution列表,查看所有TaskRun的所有当前状态,找到下一个要执行(如果有)的列表,执行更改其TaskRunsList并添加下一个TaskRunsList并发布回卡夫卡,另外,它向另一个队列发送要执行的任务,(WorkerTask)the Wo
当我们使用自定义的groupByKey()值时,我们正在体验流处理中的“丢失数据包”。我们有一个单处理器节点,其中有一个源主题,从中读取数据包,对该组进行分组和聚合,并根据需要访问statestore的计算进行输出。
让我更详细地解释这个问题,以及到目前为止,我们是如何理解这个问题的:
概述我们正在设置一个卡夫卡流应用程序,在其中我们必须执行窗口操作。我们根据一个特定的键对设备进行分组。下面是我们为GroupBy使用的示例列:
+---------+---------+------+
| Field Name | Field Value |
+---------+---------+-----