我有一个关于在IDE中运行Flink流作业或作为fat jar运行而不将其部署到Flink服务器的问题。
问题是当我的作业中有多个任务槽时,我不能在IDE中运行它。
public class StreamingJob {
public static void main(String[] args) throws Exception {
// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecuti
我使用flink监控hdfs中的新文件(文件为gzip格式),并对其进行处理。 env.readFile(filePath) 它可以在文件有效时工作, 但如果gzip文件无效,flink作业将被终止。 有异常日志: java.io.IOException: Error opening the Input Split hdfs://mdw:8020/user/data/15_077_4.gz [0,-1]: Not in GZIP format
at org.apache.flink.api.common.io.FileInputFormat.open(FileInputFormat.
我正在使用flink siddhi并在处理大型对象时走出内存错误。在siddhi生成的输出流中,我有超过200个字段的对象,之后我有一些操作符来处理这个对象。flink版本1.7.2
java.lang.OutOfMemoryError: Java heap space
at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:307)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(De
我正在flink集群上运行一个流束作业,在那里我得到了以下异常。 Caused by: org.apache.beam.sdk.util.UserCodeException: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:34)
我使用执行WordCount。它的文件大小为10 of。但是,对于一个100 the的文件,shell抛出一个NullPointerException:
java.lang.NullPointerException
at org.apache.flink.api.common.accumulators.SerializedListAccumulator.deserializeList(SerializedListAccumulator.java:93)
at org.apache.flink.api.scala.DataSet.collect(DataSet.sc
我们试图迁移到Flink 1.11,从1.10中的保存点恢复作业。作业代码没有更改,只将依赖项的Flink版本更新为1.11 (在SBT中,我们使用Scala)并重新构建jar。所有运算符都有uids,如果在1.10集群上运行,作业将正确地从该保存点恢复,我们将得到以下异常,并且不知道:
java.lang.Exception: Exception while creating StreamOperatorStateContext.
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.st