首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >Flink1.5.4异常:流损坏,找到标记: 105

Flink1.5.4异常:流损坏,找到标记: 105
EN

Stack Overflow用户
提问于 2019-03-07 20:50:02
回答 2查看 418关注 0票数 0

我的程序想在没有Flink窗口的情况下加入两个流。

我连接了两个流并定义了一个类A扩展了RichCoFlatMapFunction来处理它们。然后Guava缓存有一个删除侦听器来收集加入和过期的数据到下一个Flink函数。

代码语言:javascript
运行
复制
private synchronized void collect(ReqFeatures features) {
    feaCollector.collect(features);
}

每次开始时,它都运行得很好,但几个小时后,它总是因为这个异常而死。

代码语言:javascript
运行
复制
java.io.IOException: Corrupt stream, found tag: 105
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:220)
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:49)
at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:172)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
at java.lang.Thread.run(Thread.java:748)

有时还会有另一个错误日志:

代码语言:javascript
运行
复制
java.lang.IllegalStateException: When there are multiple buffers, an unfinished bufferConsumer can not be at the head of the buffers queue.
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
at org.apache.flink.runtime.io.network.partition.PipelinedSubpartition.pollBuffer(PipelinedSubpartition.java:158)
at org.apache.flink.runtime.io.network.partition.PipelinedSubpartitionView.getNextBuffer(PipelinedSubpartitionView.java:51)
at org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.getNextBuffer(LocalInputChannel.java:186)
at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:551)
at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:508)
at org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:94)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:209)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
at java.lang.Thread.run(Thread.java:748)

如果我改为使用Flink Window函数,则不会发生此异常。为什么会发生这种异常,我如何解决它?

EN

回答 2

Stack Overflow用户

发布于 2020-03-06 00:32:04

我可以确认在Flink 1.9.1中也会发生这种情况(尽管对我们来说,它发生在我们运行flink stop <job-id>的时候)

票数 0
EN

Stack Overflow用户

发布于 2021-09-28 08:32:06

我修复了在收集输出时获得检查点锁定的相同问题。users的flatMap函数已经持有检查点锁定,所以如果你在flatMap函数中收集输出,也可以解决这个问题。

在flink的代码中:

代码语言:javascript
运行
复制
synchronized (checkpointingLock) {
                numRecordsIn.inc();
                streamOperator.setKeyContextElement1(record);
                streamOperator.processElement(record);
            }
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/55044215

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档