Kafka Ktable也可以流式传输重复更新。
我想要处理Ktable (使用Kstream.reduce()创建) changelog流,即Ktable中键值的任何更改。但它似乎即使当相同的键值对被多次发送到Ktable时,它每次都被发送到下游。我只需要在键值发生变化的情况下发送更新。
`
groupByKey(Grouped.with(new Serdes.LongSerde(),new Serdes.LongSerde()))
.reduce(new Reducer<Long>() {
@Override
public Long apply(Long t1, Long t2) {
return t2;
}
}).toStream().foreach((key, value) -> //for each update in ID, send update to the stream
{
sendUpdate(key);
});`
发布于 2020-04-22 00:23:26
这是KTable#toStream()的默认行为,它将changelog主题转换为KStream,因此每次上游reduce操作符收到消息时,reduce的下游操作符都会更新。
您可以使用Processor API对您想要的行为进行归档,在本例中,我们使用KStream.transfomerValues()。
首先注册一个KeyValueStore来存储您的最新值:
//you don't need to add number_store, if your KTable already materialized to number_store
streamsBuilder
.addStateStore(Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("number_store"), Serdes.Long(), Serdes.Long()));
numberKStream
.transformValues(ExtractIfValueChangedTransformer::new, "number_store")
.filter((key, value) -> value != null)
.foreach((key, value) -> sendUpdate(key));然后我们创建一个ExtractIfValueChangedTransformer,如果值已经改变,只返回新消息的值,如果没有,则返回null:
public class ExtractIfValueChangedTransformer implements ValueTransformerWithKey<Long, Long, Long> {
KeyValueStore<Long, Long> kvStore;
@Override
public void init(ProcessorContext context) {
kvStore = (KeyValueStore<Long, Long>) context.getStateStore("number_store");
}
@Override
public Long transform(Long key, Long newValue) {
Long oldValue = kvStore.get(key);
kvStore.put(key, newValue);
if (oldValue == null) return newValue;
return oldValue.equals(newValue) ? null : newValue;
}
@Override
public void close() {}
}https://stackoverflow.com/questions/61344240
复制相似问题