假设我有两种不同类型的数据流,一种提供天气数据,另一种提供车辆数据,我想使用Flink对数据进行复杂的事件处理。
Flink 1.3.x中的哪种方法是正确的方法?我看到了不同的方法,如Union、Connect、Window Join。基本上,我只想尝试这样一个简单的CEP:
IF weather is wet AND vehicle speed > 60
WITHIN the last 10 seconds
THEN raise alert
谢谢!
我们使用flink-cep作为一个独立的库来查找事件列表中的模式。
鉴于下列事件清单:
val patientKey = "patient"
val hrKey = "hr"
// Event
val p1e1 = Event("hr", mapOf(patientKey to 1, hr to 1))
val p1e2 = Event("hr", mapOf(patientKey to 1, hr to 2))
val p2e1 = Event("hr", mapOf(patientKey to 2, hr to
我有两个来自两个Kafka主题的流表,我想加入这些流并对所连接的数据执行聚合功能。需要使用滑动窗口连接流。在连接和窗口数据时,我将得到一个错误Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.。
下面是代码片段
select cep.payload['id'] , ep.payload['id'] ,
如何使用flink cep库检测胡言乱语模式?
例如:假设设备有一些问题,那么它会不断地发布像开、关这样的值。如果问题存在30分钟,如何使用CEP检测模式。下面是我提到的一些样本数据。
OFF 16/08/18 11:38
ON 16/08/18 11:38
OFF 16/08/18 11:38
ON 16/08/18 11:37
OFF 16/08/18 11:37
ON 16/08/18 11:36
OFF 16/08/18 11:36
OFF 16/08/18 11:36
ON 16/08/18 11:36
在过去几天学习Flink CEP库时,我的印象是它没有为Flink的标准功能添加任何新的基本功能。似乎Flink CEP的唯一目的是使事件处理更容易,具有清晰的语义和直观的代码结构。例如,Flink只显示事件匹配跳过的。虽然这些语义对于很多情况来说都是足够的,但它可能不能解决具体的问题,这使我们回到了普通的Flink。
测试用例是以下模式:
Emmit a alert(represented by 'a') for each non-overlapping pair of numbers in a stream
以模式为代表:
Pattern.begin[EventType](