我正在解析来自流的XML,并将POJO分派到ProcessContext.output。它是跟随ClosedChannelException抛出的。知道是怎么回事吗?
com.google.cloud.dataflow.sdk.util.UserCodeException: java.nio.channels.ClosedChannelException
at com.google.cloud.dataflow.sdk.util.DoFnRunner.invokeProcessElement(DoFnRunner.java:193)
at com.google.cloud.dataflow.sdk.util.DoFnRunner.processElement(DoFnRunner.java:171)
at com.google.cloud.dataflow.sdk.runners.worker.ParDoFnBase.processElement(ParDoFnBase.java:193)
at com.google.cloud.dataflow.sdk.util.common.worker.ParDoOperation.process(ParDoOperation.java:52)
at com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver.process(OutputReceiver.java:52)
at com.google.cloud.dataflow.sdk.runners.worker.ParDoFnBase$1.output(ParDoFnBase.java:157)
at com.google.cloud.dataflow.sdk.util.DoFnRunner$DoFnContext.outputWindowedValue(DoFnRunner.java:329)
at com.google.cloud.dataflow.sdk.util.DoFnRunner$DoFnProcessContext.output(DoFnRunner.java:483)
at com.myproj.dataflow.MyDocDispatcher.onMyDoc(MyDocDispatcher.java:24)发布于 2015-11-26 12:22:29
一个可能的原因是,进行XML处理以生成POJO的DoFn实际上懒惰地生成POJO。当您将该POJO传递给ProcessContext#output()时,稍后可能会根据优化器将其直接传递给管道中的其他DoFn。
在这种情况下,如果与POJO交互的下游DoFn对接收到的POJO有一些副作用,它就会违反immutability requirements,因为与从ProcessContext#element()接收到的POJO交互会修改它。
如果这就是问题所在,最简单的解决方法是先克隆POJO,然后再将其传递给output()。
https://stackoverflow.com/questions/33836226
复制相似问题