Kafka Ktable也可以流式传输重复更新。
我想要处理Ktable (使用Kstream.reduce()创建) changelog流,即Ktable中键值的任何更改。但它似乎即使当相同的键值对被多次发送到Ktable时,它每次都被发送到下游。我只需要在键值发生变化的情况下发送更新。
`
groupByKey(Grouped.with(new Serdes.LongSerde(),new Serdes.LongSerde()))
.reduce(new Reducer<Long>() {
@Override
public Long apply(Long t1, Long t2) {
return t2;
}
}).toStream().foreach((key, value) -> //for each update in ID, send update to the stream
{
sendUpdate(key);
});`
发布于 2020-04-22 00:23:26
这是KTable#toStream()的默认行为,它将changelog主题转换为KStream,因此每次上游reduce操作符收到消息时,reduce的下游操作符都会更新。
您可以使用Processor API对您想要的行为进行归档,在本例中,我们使用KStream.transfomerValues()。
首先注册一个KeyValueStore来存储您的最新值:
//you don't need to add number_store, if your KTable already materialized to number_store
streamsBuilder
.addStateStore(Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("number_store"), Serdes.Long(), Serdes.Long()));
numberKStream
.transformValues(ExtractIfValueChangedTransformer::new, "number_store")
.filter((key, value) -> value != null)
.foreach((key, value) -> sendUpdate(key));然后我们创建一个ExtractIfValueChangedTransformer,如果值已经改变,只返回新消息的值,如果没有,则返回null:
public class ExtractIfValueChangedTransformer implements ValueTransformerWithKey<Long, Long, Long> {
KeyValueStore<Long, Long> kvStore;
@Override
public void init(ProcessorContext context) {
kvStore = (KeyValueStore<Long, Long>) context.getStateStore("number_store");
}
@Override
public Long transform(Long key, Long newValue) {
Long oldValue = kvStore.get(key);
kvStore.put(key, newValue);
if (oldValue == null) return newValue;
return oldValue.equals(newValue) ? null : newValue;
}
@Override
public void close() {}
}发布于 2021-06-11 23:02:40
Kafka Streams提供了两种语义:更新时发出和窗口关闭时发出。
KIP-557是关于添加基于字节数组数据比较的更改时发出语义。它已经在Kafka Streams 2.6和removed due to "potential data loss"中实现。
不过,通过使用Kafka Streams DSL,我已经开发了一种emit-on-change语义实现。
其思想是将具有更新时发出语义的KStream转换为具有更改时发出语义的KStream。您可以在提供的源Kstream上使用此实现来创建KTable,也可以在应用.toStream()之后在KTable上使用此实现。
这个实现隐式地创建了一个状态存储,其中的值包含KStream数据和一个标志,该标志指示是否应该发出更新。此标志在聚合操作中设置,并基于Object#equals进行比较。但您可以将实现更改为使用Comparator。
下面是更改KStream语义的withEmitOnChange方法。您可能需要为EmitOnChangeState数据结构指定一个serde (如下所示)。
public static <K, V> KStream<K, V> withEmitOnChange(KStream<K, V> streams) {
return streams
.groupByKey()
.aggregate(
() -> (EmitOnChangeState<V>) null,
(k, data, state) -> {
if (state == null) {
return new EmitOnChangeState<>(data, true);
} else {
return state.merge(data);
}
}
)
.toStream()
.filter((k, state) -> state.shouldEmit)
.mapValues(state -> (V) state.data);
}下面是存储在状态存储中的数据结构,用于检查是否应该发出更新。
public static class EmitOnChangeState<T> {
public final T data;
public final boolean shouldEmit;
public EmitOnChangeState(T data, boolean shouldEmit) {
this.data = data;
this.shouldEmit = shouldEmit;
}
public EmitOnChangeState<T> merge(T newData) {
return new EmitOnChangeState<>(newData, Objects.equals(data, newData));
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
EmitOnChangeState<?> that = (EmitOnChangeState<?>) o;
return shouldEmit == that.shouldEmit && Objects.equals(data, that.data);
}
@Override
public int hashCode() {
return Objects.hash(data, shouldEmit);
}
}用法:
KStream<ProductKey, Product> products = builder.stream("product-topic");
withEmitOnChange(products)
.to("out-product-topic"); // output topic with emit-on-change semantichttps://stackoverflow.com/questions/61344240
复制相似问题