我的任务如下:
我正在监控来自第三方测量设备的时间同步事件。这次同步有点不稳定,所以我想检测同步何时停止并发出警报。
为此,我将向Kafka主题生成同步事件。我有三个不同的事件在进行:
由于其他设备未响应,
所以,我想做的是:
我目前正在设置一个Kafka-Streams应用程序,我需要存储状态,以防该应用程序崩溃(不应该崩溃,但我想确保),所以我设置了以下内容:
val builder = new StreamsBuilder
val storeBuilder = Stores.
keyValueStoreBuilder(Stores.persistentKeyValueStore("timesync-alarms"),
Serdes.String(),
logEntrySerde)
builder.addStateStore(storeBuilder)
val eventStream = builder.stream(sourceTopic, Consumed.`with`(Serdes.String(), logEntrySerde))
现在,我被卡住了。我基本上认为我需要做的是在eventStream上有一个flatMap
函数,当事件到达时:
如果要使用当前接收到的警报和processed
,则查询存储
那么,我该如何在这里实现步骤1和3呢?或者我在概念上是错的,必须以不同的方式来做?
https://stackoverflow.com/questions/50760363
复制相似问题