我有一个流式处理程序,基本上如下所示
Stream(Int, Boolean, Int).Keyby(0, 1).Window().process()
关键是我想要定义一个组合键,然后处理它。但是,如果我使用keyby(0, 1)
和process(... Key: (Int, Boolean), ...)
,进程中的键类型总是提示错误。我试图定义keyby(_._1, _._2)
,但不正确。
因此,无论如何,要使用scala定义一个组合键,这样我就可以在下面的流程函数中推断出像(Int, Boolean)
这样的键类型。
提前感谢!
发布于 2018-10-24 08:30:40
问题是input.keyBy(0, 1).timeWindow(Time.days(1))
创建了一个KeyedStream[(Int, Boolean, Int), Tuple]
,其中Tuple
是Flink的tuple类。这也是process
函数的关键参数的类型。为了访问Tuple
的字段,您需要调用tuple.[T]getField(idx)
,其中T
是字段的类型。
如果希望使用Scala作为ProcessWindowFunction
的键,则需要定义一个KeySelector
。下面的代码片段完成了这一任务:
input
.keyBy(a => (a._1, a._2))
.timeWindow(Time.days(1))
.process(new ProcessWindowFunction[(Int, Boolean, Int), Int, (Int, Boolean), TimeWindow] {
override def process(key: (Int, Boolean), context: Context, elements: Iterable[(Int, Boolean, Int)], out: Collector[Int]): Unit = {
out.collect(key._1)
}
})
https://stackoverflow.com/questions/52961264
复制相似问题