我正在尝试提交一个使用Scala 2.11创建的Flink作业,它通过在命令行中运行来使用本地Flink集群中的Twitter流API:
flink run -c org.myClass C:\path\to\jarFile.jar
2019-06-09 23:40:47,758 WARN org.apache.flink.runtime.webmonitor.handlers.JarRunHandler - Configuring the job submission via query parameters is deprecated. Please migrate to sub
我正在尝试向我的flink集群提交一个作业,但我一直遇到以下错误:
2021-05-03 17:14:32
java.lang.NoSuchMethodError: org/apache/flink/api/common/state/OperatorStateStore.getSerializableListState(Ljava/lang/String;)Lorg/apache/flink/api/common/state/ListState; (loaded from file:/opt/flink/lib/flink-dist_2.11-1.11.3.jar by jdk.interna
我已经编写了一个消费者,它读取卡夫卡主题,并使用StreamSink以拼图格式写入数据。但是我得到了以下错误
java.lang.Exception: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowS
java.lang.NoSuchMethodError:org.apache.flink.api.common.ExecutionConfig.setRestartStrategy(Lorg/apache/flink/api/common/restartstrategy/RestartStrategies$RestartStrategyConfiguration;)
at com.WriteIntoKafka.main(WriteIntoKafka.java:53)
at sun.reflect.NativeMethodAccessorImpl.invoke0(N
在我的flink应用程序中,我正在阅读来自运动的事件。事件是以protobuf格式进行的。如果我在flink应用程序中使用'com.google.protobuf:protobuf-java:3.7.1',我没有问题。但是,如果我将其更改为'com.google.protobuf:protobuf-java:3.10.0',则会得到上面的堆栈跟踪异常。
java.lang.IncompatibleClassChangeError: class com.google.protobuf.Descriptors$OneofDescriptor has interfac
我是Flink流媒体的初学者。
使用RowCsvInputFormat读取文件时,Kryo序列化程序创建的代码不能正常工作。
代码在下面。
val readLocalCsvFile = new RowCsvInputFormat(
new Path("flink-test/000000_1"),
Array(Types.STRING, Types.STRING, Types.STRING),
"\n",
","
)
val read = env.readFile(
我正在开发一个沉入Kafka的Flink应用程序。我创建了一个默认池大小为5的Kafka生产者。我使用以下配置启用了检查点:
env.enableCheckpointing(1800000);//checkpointing for every 30 minutes.
// set mode to exactly-once (this is the default)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// make sure 500 m
我使用以下脚本提交了我在yarn上的应用程序。 MAIN_CLASS=org.example.app.HelloFlink
flink run -m yarn-cluster -yn 4 -ys 1 -ynm FlinkHiveIntegrationTest -c $MAIN_CLASS /learn.flink.ioc-1.0-SNAPSHOT.jar 它抱怨说Could not get job jar and dependencies from JAR file: JAR file does not exist: -yn。 当我删除脚本中的-yn 4时,它就可以工作了。我在之前的flin
我使用下面提到的POM文件来编写flink代码
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Ve
我尝试根据添加具有事件时间属性的表源。我的代码如下:
class SISSourceTable
extends StreamTableSource[Row]
with DefinedRowtimeAttributes
with FlinkCal
with FlinkTypeTags {
private[this] val profileProp = ConfigurationManager.loadBusinessProperty
val topic: String = ...
val schemas = Seq(
(TsCol, SQLTi
我使用的是带有32 32执行程序的flink mysql连接器,有32个插槽的16 16vCPU。如果我运行一个具有并行性32 (作业并行性224)的作业,它使用10个MySQL表执行时态查找联接,那么在2-3次成功运行之后,它就会开始失败。
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandle
我是flink的新手,我部署了我的flink应用程序,它基本上执行简单的模式匹配。它部署在库伯奈特斯集群,拥有1 JM和6 TM。我每10分钟发送一次4.4k和200k大小的消息,并执行负载测试。我添加了重新启动策略和检查指向,如下所示,我没有显式地使用代码中的任何状态,因为不需要它。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// start a checkpoint every 1000 ms
env.enableCheckpointing(inter
我试图写一个卡夫卡消费者,它消耗来自一个主题的数据。但是,每当我尝试运行它时,我都会得到以下错误。
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:897)
at or
我正在运行一个基于EventTime的测试窗口的简单示例。我能够用处理时间生成输出,但是当我使用EventTime时,不会出现输出。请帮助我理解我做错了什么。
我正在创建一个大小为10秒的SlidingWindow,它每5秒滑动一次,在窗口的末尾,系统将发出在此期间收到的消息数量。
input :
a,1513695853 (generated at 13th second, received at 13th second)
a,1513695853 (generated at 13th second, received at 13th second)
a,1513695856 (gene
可以为syslog.conf中的每个文件配置日志旋转大小和文件数。
user.info /var/log/users.log (logrotate up to 1mb, up to 5 files)
user.debug;user.!info /var/log/debug.log (logrotate up to 10mb, up to 10 files)
我用的是busybox系统。
我试图在编写pyflink作业时读取一个已建立的csv文件。我使用文件系统连接器来获取数据,但是在ddl上执行execute_sql()之后,然后对表执行查询,我得到了一个错误,这说明它无法获取下一个结果。我无法解决此错误。我已经检查了csv文件,它是完全正确的,并与熊猫一起工作,但在这里,我不知道为什么它不能获取下一行。如需参考,请查找所附代码。
from pyflink.common.serialization import SimpleStringEncoder
from pyflink.common.typeinfo import Types
from pyflink.datastre
将Flink 1.10升级到Flink 1.11后,log4j配置不再工作。
我以前的配置是使用一个适配器库,该适配器需要log4j 1.x,并且不再与Flink 1.11兼容
根据新的配置,flink- look . this应该如下所示
log4j-console.properties: |+
# This affects logging for both user code and Flink
rootLogger.level = INFO
rootLogger.appenderRef.console.ref = ConsoleAppender
rootL