首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >在apache中分割和连接流

在apache中分割和连接流
EN

Stack Overflow用户
提问于 2018-11-02 10:52:12
回答 1查看 1.4K关注 0票数 0

我想我有一个很不标准的用法。我想使用filter函数将我的源流分成几个流:

代码语言:javascript
运行
复制
val dataStream:DataStream[MyEvent] = ...
val s1 = dataStream.filter(...).map(...)
val s2 = dataStream.filter(...).map(...)

我还有一个时间戳提取器(传入的事件将在XML中附加一个时间戳):

代码语言:javascript
运行
复制
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
...

dataStream.assignTimestampsAndWatermarks(new MyTimestampExtractor)
...

class MyTimestampExtractor extends AssignerWithPunctuatedWatermarks[Elem]
{
  override def checkAndGetNextWatermark(lastElement:Elem, extractedTimestamp:Long):Watermark = new Watermark(extractedTimestamp)
  override def extractTimestamp(element:Elem, previousElementTimestamp:Long):Long = XmlOperations.getDateTime(element, "@timestamp").getMillis
}

我之所以选择这种方法,是因为而不是简单地做单个流(val s = dataStream.filter(...).map(...).filter(...).map(...)),因为我想构建一个分裂/组合任意流的网络(例如s1+s2->c1,s1+s3->c2,c2+s4->c3,.)

现在,当通过上面的示例发送事件时,事件E1可能会同时出现在s1和s2中。根据我的理解,这意味着完全相同的事件E1作为第一个实例放入s1 (E1a),也作为第二个实例放置到s2 (E1b)中。

所以我现在要做的是把E1a和E1b重新组合成一个组合的E1,它类似于E1,它既是s1的转换,也是s2的转换。

我试过:

代码语言:javascript
运行
复制
val c1 = s1.join(s2)
  .where(_.key).equalTo(_.key)
  .window(TumblingEventTimeWindows.of(Time.seconds(10)))
  .apply((e1a, e2b) => { printf("Got e1a and e1b"); e1a })

然而,事件似乎永远达不到应用功能,我也找不出原因。

我的例子有什么不对?我对像这样的流网络的方法/想法会起作用吗?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2018-11-02 16:36:56

你安排好有水印了吗?当使用事件时间时,只有当水印到达时,才会触发窗口,该水印将事件时间时钟向前推进到窗口的末尾。您可以使用时间戳提取器/水印生成器来完成此操作;有关更多细节,请参见文档中的一个示例

如果其中一个流有时是空闲的,这也会导致问题,因为空闲流上缺少水印将阻碍它连接到的任何流的水印。

根据您想要做的事情,您可能会发现使用CoProcessFunction比使用时间窗口的连接更容易。看看Flink培训网站上关于有状态充实到期状态的练习,举个例子。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/53117182

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档