首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >Kafka Ktable也流式传输重复更新

Kafka Ktable也流式传输重复更新
EN

Stack Overflow用户
提问于 2020-04-21 21:03:45
回答 2查看 789关注 0票数 2

Kafka Ktable也可以流式传输重复更新。

我想要处理Ktable (使用Kstream.reduce()创建) changelog流,即Ktable中键值的任何更改。但它似乎即使当相同的键值对被多次发送到Ktable时,它每次都被发送到下游。我只需要在键值发生变化的情况下发送更新。

`

代码语言:javascript
复制
groupByKey(Grouped.with(new Serdes.LongSerde(),new Serdes.LongSerde())) 
                .reduce(new Reducer<Long>() {   
                    @Override
                    public Long apply(Long t1, Long t2) {
                        return t2;
                    }
                }).toStream().foreach((key, value) -> //for each update in ID, send update to the stream
        {

            sendUpdate(key); 
        });

`

EN

Stack Overflow用户

回答已采纳

发布于 2020-04-22 00:23:26

这是KTable#toStream()的默认行为,它将changelog主题转换为KStream,因此每次上游reduce操作符收到消息时,reduce的下游操作符都会更新。

您可以使用Processor API对您想要的行为进行归档,在本例中,我们使用KStream.transfomerValues()。

首先注册一个KeyValueStore来存储您的最新值:

代码语言:javascript
复制
//you don't need to add number_store, if your KTable already materialized to number_store
streamsBuilder
        .addStateStore(Stores.keyValueStoreBuilder(
                Stores.persistentKeyValueStore("number_store"), Serdes.Long(), Serdes.Long()));

numberKStream
        .transformValues(ExtractIfValueChangedTransformer::new, "number_store")
        .filter((key, value) -> value != null)
        .foreach((key, value) -> sendUpdate(key));

然后我们创建一个ExtractIfValueChangedTransformer,如果值已经改变,只返回新消息的值,如果没有,则返回null:

代码语言:javascript
复制
public class ExtractIfValueChangedTransformer implements ValueTransformerWithKey<Long, Long, Long> {

    KeyValueStore<Long, Long> kvStore;

    @Override
    public void init(ProcessorContext context) {
        kvStore = (KeyValueStore<Long, Long>) context.getStateStore("number_store");
    }

    @Override
    public Long transform(Long key, Long newValue) {
        Long oldValue = kvStore.get(key);
        kvStore.put(key, newValue);
        if (oldValue == null) return newValue;
        return oldValue.equals(newValue) ? null : newValue;
    }

    @Override
    public void close() {}
}
票数 2
EN
查看全部 2 条回答
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/61344240

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档