我试图删除值为null的记录,在下游的changelog中,我知道在statestore中,它们仅仅是空(墓碑)就被删除了,但是当您在KTable或Stream上进行聚合时,它们跳过null而不删除它。我需要想办法在聚合中设置一个删除标志,让Kafka知道记录可以被删除。property to true in application.properties
// if ("tru
我是Flink和Kafka的新手,有一些用Scala编写的数据聚合作业,这些作业在Apache Flink中运行,这些作业消耗Kafka中的数据,执行聚合并将结果返回给Kafka。我需要的作业,以消耗任何新的Kafka主题的数据,而作业正在运行,这是匹配的模式。我通过为我的消费者设置以下属性来实现这一点 val properties = new Properties()
properties.setProperty(“bootstra