首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >使用kafka检测值的更改

使用kafka检测值的更改
EN

Stack Overflow用户
提问于 2019-03-29 16:43:26
回答 1查看 1.6K关注 0票数 6

我有一个流应用程序,它不断地接收坐标流以及一些自定义的元数据,这些元数据也包括一个位串。此流使用producer API生成到kafka主题中。现在,另一个应用程序需要处理此stream Streams API,并存储来自位字符串的特定位,并在该位发生变化时生成警报

下面是需要处理的连续消息流

代码语言:javascript
运行
复制
{"device_id":"1","status_bit":"0"}
{"device_id":"2","status_bit":"1"}
{"device_id":"1","status_bit":"0"}
{"device_id":"3","status_bit":"1"}
{"device_id":"1","status_bit":"1"} // need to generate alert with change: 0->1
{"device_id":"3","status_bits":"1"}
{"device_id":"2","status_bit":"1"}
{"device_id":"3","status_bits":"0"} // need to generate alert with change 1->0

现在,我想把这些提示写到另一个kafka主题中,比如

代码语言:javascript
运行
复制
{"device_id":1,"init":0,"final":1,"timestamp":"somets"}
{"device_id":3,"init":1,"final":0,"timestamp":"somets"}

我可以使用如下命令将当前位保存在状态存储中

代码语言:javascript
运行
复制
streamsBuilder
        .stream("my-topic")
        .mapValues((key, value) -> value.getStatusBit())
        .groupByKey()
        .windowedBy(TimeWindows.of(Duration.ofMinutes(1)))
        .reduce((oldAggValue, newMessageValue) -> newMessageValue, Materialized.as("bit-temp-store"));

但是我不能理解如何从现有的bit中检测到这种变化。我需要在处理器拓扑中以某种方式查询状态存储吗?如果是?多么?如果不是呢?还能做些什么呢?

任何建议/想法,我可以尝试(可能完全不同于我的想法)也是感激的。我对Kafka是个新手,从事件驱动流的角度思考让我难以理解。

提前谢谢。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2019-03-29 18:41:08

我不确定这是不是最好的方法,但在类似的任务中,我使用了一个中间实体来捕获状态变化。在您的情况下,它将类似于

代码语言:javascript
运行
复制
    streamsBuilder.stream("my-topic").groupByKey()
              .aggregate(DeviceState::new, new Aggregator<String, Device, DeviceState>() {
            public DeviceState apply(String key, Device newValue, DeviceState state) {
                if(!newValue.getStatusBit().equals(state.getStatusBit())){
                     state.setChanged(true);    
                }
                state.setStatusBit(newValue.getStatusBit());
                state.setDeviceId(newValue.getDeviceId());
                state.setKey(key);
                return state;
            }
        }, TimeWindows.of(…) …).filter((s, t) -> (t.changed())).toStream();

在生成的主题中,您将具有这些更改。您还可以向DeviceState添加一些属性以首先对其进行初始化,这取决于您是否要发送事件、第一个设备记录到达的时间等。

票数 6
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/55413473

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档