首页
学习
活动
专区
工具
TVP
发布
社区首页 >问答首页 >如何在Kafka流转换(map / flatMap)中考虑键/值存储中的值?

如何在Kafka流转换(map / flatMap)中考虑键/值存储中的值?
EN

Stack Overflow用户
提问于 2018-06-08 20:10:00
回答 1查看 641关注 0票数 0

我的任务如下:

我正在监控来自第三方测量设备的时间同步事件。这次同步有点不稳定,所以我想检测同步何时停止并发出警报。

为此,我将向Kafka主题生成同步事件。我有三个不同的事件在进行:

由于其他设备未响应,

  1. Synchronization request
  2. Synchronization successful
  3. Synchronization失败

所以,我想做的是:

  • 收到请求,一段时间后什么也没有收到,我想在收到请求时发出超时告警
  • ,并且在超时时间内,成功事件到达,如果在超时时间之后没有请求到达,我想发出超时,当失败事件到达时,我想发出other device no respond告警

我目前正在设置一个Kafka-Streams应用程序,我需要存储状态,以防该应用程序崩溃(不应该崩溃,但我想确保),所以我设置了以下内容:

代码语言:javascript
复制
val builder = new StreamsBuilder
val storeBuilder = Stores.
  keyValueStoreBuilder(Stores.persistentKeyValueStore("timesync-alarms"),
                       Serdes.String(),
                       logEntrySerde)
builder.addStateStore(storeBuilder)
val eventStream = builder.stream(sourceTopic, Consumed.`with`(Serdes.String(), logEntrySerde))

现在,我被卡住了。我基本上认为我需要做的是在eventStream上有一个flatMap函数,当事件到达时:

如果要使用当前接收到的警报和processed

  • Decides raised

  • Updates event

  • Produces
  1. 查询存储中的最后一个事件,如果有

,则查询存储

那么,我该如何在这里实现步骤1和3呢?或者我在概念上是错的,必须以不同的方式来做?

EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/50760363

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档