使用Scala2.12运行Flink 1.9.0并尝试使用将数据发布到Kafka,在本地调试时一切正常。一旦我将作业提交到集群,就会在运行时得到以下java.lang.LinkageError,它无法运行作业:
java.lang.LinkageError: loader constraint violation: loader (instance of org/apache/flink/util/ChildFirstClassLoader) previously initiated loading for a different type with name "org/apache/
我使用Flink v.1.13.2来管理一个工作经理,三个任务经理。
由于某些原因(我无法找出原因),任务管理器连接正在丢失。下面是我找到的日志:
2022-02-17 21:19:55,891 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Sink: Print to Std. Out (13/32) (f0ff88713cc3ff5ce39e7073468abed4) switched from RUNNING to FAILED on 1.2.3.5:39309-f61daa @ serve
当我试图用flatMap操作符写出集合时,我得到了非法的状态异常(仅在高负载下):缓冲池被破坏了,我在这里做错了什么?当flink抛出缓冲池错误时?
java.lang.RuntimeException: Buffer pool is destroyed.
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
at org.apache.flink.streaming.runtime.io.RecordWriter
我使用flink监控hdfs中的新文件(文件为gzip格式),并对其进行处理。 env.readFile(filePath) 它可以在文件有效时工作, 但如果gzip文件无效,flink作业将被终止。 有异常日志: java.io.IOException: Error opening the Input Split hdfs://mdw:8020/user/data/15_077_4.gz [0,-1]: Not in GZIP format
at org.apache.flink.api.common.io.FileInputFormat.open(FileInputFormat.
我正在尝试读取FLINK中的本地文件。我的错误越来越小。
java.lang.NoClassDefFoundError: org/apache/hadoop/yarn/exceptions/YarnException
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:264)
at org.apache.flink.client.cli.CliFrontend.loadCustomCommandLine(CliFrontend.jav
我使用的是flink table api,使用kafka作为输入源,使用json作为表模式。当提交我的程序时,我得到了这个错误:‘程序结束了,但出现了以下异常:
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error.
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)
at org.apache.flink.client
我正在开发一个应用程序,它将一些文件上传到s3桶,稍后,从s3桶中读取文件,并将其推送到数据库。
我使用Flink 1.4.2和fs.s3a API从s3桶读取和写入文件。
上传文件到s3桶没有任何问题,但是当我的应用程序的第二阶段--从s3读取这些上传的文件--启动时,我的应用程序会抛出错误
Caused by: java.io.InterruptedIOException: Reopen at position 0 on s3a://myfilepath/a/b/d/4: org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.SdkClient
我试图在编写pyflink作业时读取一个已建立的csv文件。我使用文件系统连接器来获取数据,但是在ddl上执行execute_sql()之后,然后对表执行查询,我得到了一个错误,这说明它无法获取下一个结果。我无法解决此错误。我已经检查了csv文件,它是完全正确的,并与熊猫一起工作,但在这里,我不知道为什么它不能获取下一行。如需参考,请查找所附代码。
from pyflink.common.serialization import SimpleStringEncoder
from pyflink.common.typeinfo import Types
from pyflink.datastre
我已经从postgresql DB.then加载了一个规则表作为Flink表,读取kafka msg,并根据这些规则对msg进行分类。代码如下所示 val senv = StreamExecutionEnvironment.getExecutionEnvironment
senv.enableCheckpointing(5000)
val stenv=StreamTableEnvironment.create(senv)
val streamsource=senv.createInput(inputFormat)
stenv.registerDataS
我试图连接卡夫卡与Flink和运行通过sql-client.sh。但是,无论我如何处理.yaml和库,我都会得到错误:
Exception in thread "main" org.apache.flink.table.client.SqlClientException: Unexpected exception. This is a bug. Please consider filing an issue.
at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:201)
我有一个在Spring应用程序中运行的Flink应用程序。应用程序无法在Flink集群中作为jar运行。我能够将应用程序作为一个jar运行在我的本地windows计算机(迷你集群)上。
异常堆栈跟踪如下所示:
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: null
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:54
我可以将流部署到Apache的独立安装(使用一个JobManager和多个TaskManagers),没有问题:
bin/flink run -m example-app-1.stag.local:6123 -d -p 4 my-flow-fat-jar.jar <flow parameters>
但是,当我运行相同的命令并部署到独立的HA集群时,这个命令会引发错误:
------------------------------------------------------------
The program finished with the following except
我一直试图使用文档中的参数sink.buffer-flush.max-rows和sink.buffer-flush.interval缓冲来自upsert连接器的输出。
每当我尝试运行带有缓冲的INSERT查询时,我都会收到以下错误(缩写为简洁):
Caused by: java.lang.NullPointerException
at org.apache.flink.streaming.connectors.kafka.table.ReducingUpsertWriter.flush(ReducingUpsertWriter.java:145)
at org
我刚刚使用maven原型(即模板)创建了新的Flink项目(查看文章)。
mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java
1.14.4是目前的版本。该项目可以编译。但是当运行BatchJob或StreamingJob时
public class BatchJob {
public static void main