在阅读了官方的flink测试文档(https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/testing.html)后,我能够使用Test Harness为ProcessFunction开发测试,如下所示:
pendingPartitionBuilder = new PendingPartitionBuilder(":::some_name", "")
testHarness =
new OneInputStreamOperatorTestHarness[StaticAdequacyTilePublishedData, PendingPartition](
new ProcessOperator[StaticAdequacyTilePublishedData, PendingPartition](pendingPartitionBuilder)
)
testHarness.open()
现在,我试着对一个ProcessAllWindowFunction做同样的事情,看起来像这样:
class MapVersionValidationDistributor(batchSize: Int) extends
ProcessAllWindowFunction[MapVersionValidation, Seq[StaticAdequacyTilePublishedData],TimeWindow] {
lazy val state: ValueState[Long] = getRuntimeContext .getState(new ValueStateDescriptor[Long]("latestMapVersion", classOf[Long]))
(...)
首先,我意识到我不能对ProcessAllWindowFunction使用TestHarness,因为它没有processElement方法。在这种情况下,我应该遵循什么单元测试策略?
编辑:目前我的测试代码如下所示:
val collector = mock[Collector[Seq[StaticAdequacyTilePublishedData]]]
val mvv = new MapVersionValidationDistributor(1)
val input3 = Iterable(new MapVersionValidation("123",Seq(TileValidation(1,true,Seq(1,3,4)))))
val ctx = mock[mvv.Context]
val streamContext = mock[RuntimeContext]
mvv.setRuntimeContext(streamContext)
mvv.open(mock[Configuration])
mvv.process(ctx,input3,collector)
我得到了这个错误:
Unexpected call: <mock-3> RuntimeContext.getState[T](ValueStateDescriptor{name=latestMapVersion, defaultValue=null, serializer=null}) Expected: inAnyOrder { }
发布于 2019-11-03 01:01:53
您并不真正需要测试工具来对ProcessAllWindowFunction
的process
方法进行单元测试。process
函数有3个参数:Context
、Iterable[IN]
、Collector[OUT]
。您可以使用一些库,这取决于用于模拟Context
的语言。您还可以根据这里的前提条件轻松地实现或模拟Collector
。IterableIN只是一个包含窗口元素的Iterable
,它将在窗口被触发后传递给函数。
https://stackoverflow.com/questions/58648453
复制相似问题