我的任务如下:
我正在监控来自第三方测量设备的时间同步事件。这次同步有点不稳定,所以我想检测同步何时停止并发出警报。
为此,我将向Kafka主题生成同步事件。我有三个不同的事件在进行:
由于其他设备未响应,
所以,我想做的是:
我目前正在设置一个Kafka-Streams应用程序,我需要存储状态,以防该应用程序崩溃(不应该崩溃,但我想确保),所以我设置了以下内容:
val builder = new StreamsBuilder
val storeBuilder = Stores.
keyValueStoreBuilder(Stores.persistentKeyValueStore("timesync-alarms"),
Serdes.String(),
logEntrySerde)
builder.addStateStore(storeBuilder)
val eventStream = builder.stream(sourceTopic, Consumed.`with`(Serdes.String(), logEntrySerde))
现在,我被卡住了。我基本上认为我需要做的是在eventStream上有一个flatMap
函数,当事件到达时:
如果要使用当前接收到的警报和processed
,则查询存储
那么,我该如何在这里实现步骤1和3呢?或者我在概念上是错的,必须以不同的方式来做?
发布于 2018-06-10 07:00:28
我认为你不需要直接使用状态存储。您可以创建两个流-一个具有同步请求事件,另一个具有同步响应(成功、失败)并加入它们:
requestStream.outerJoin(responseStream, (leftVal, rightVal) -> ...,
JoinWindows.of(timeout), ...);
在超时的情况下,rightVal
为空。
如果您想向单独的topic发送告警,您可以简单地过滤加入的流,并将所有失败(错误响应和超时)写入到topic中。否则,您可以使用peek()
方法并在内部触发一些操作。下面是一个简单的例子:https://github.com/djarza/football-events/blob/master/football-ui/src/main/java/org/djar/football/ui/projection/StatisticsPublisher.java
https://stackoverflow.com/questions/50760363
复制相似问题