我想我有一个很不标准的用法。我想使用filter函数将我的源流分成几个流:
val dataStream:DataStream[MyEvent] = ...
val s1 = dataStream.filter(...).map(...)
val s2 = dataStream.filter(...).map(...)我还有一个时间戳提取器(传入的事件将在XML中附加一个时间戳):
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
...
dataStream.assignTimestampsAndWatermarks(new MyTimestampExtractor)
...
class MyTimestampExtractor extends AssignerWithPunctuatedWatermarks[Elem]
{
override def checkAndGetNextWatermark(lastElement:Elem, extractedTimestamp:Long):Watermark = new Watermark(extractedTimestamp)
override def extractTimestamp(element:Elem, previousElementTimestamp:Long):Long = XmlOperations.getDateTime(element, "@timestamp").getMillis
}我之所以选择这种方法,是因为而不是简单地做单个流(val s = dataStream.filter(...).map(...).filter(...).map(...)),因为我想构建一个分裂/组合任意流的网络(例如s1+s2->c1,s1+s3->c2,c2+s4->c3,.)
现在,当通过上面的示例发送事件时,事件E1可能会同时出现在s1和s2中。根据我的理解,这意味着完全相同的事件E1作为第一个实例放入s1 (E1a),也作为第二个实例放置到s2 (E1b)中。
所以我现在要做的是把E1a和E1b重新组合成一个组合的E1,它类似于E1,它既是s1的转换,也是s2的转换。
我试过:
val c1 = s1.join(s2)
.where(_.key).equalTo(_.key)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.apply((e1a, e2b) => { printf("Got e1a and e1b"); e1a })然而,事件似乎永远达不到应用功能,我也找不出原因。
我的例子有什么不对?我对像这样的流网络的方法/想法会起作用吗?
https://stackoverflow.com/questions/53117182
复制相似问题