我尝试提交简单的flink作业以接受来自kafka的消息,但提交作业后,在不到一分钟的时间内,作业失败,并出现以下kafka异常。我已经在我的本地机器上运行了kafka 2.12,并且我已经配置了这个作业所使用的主题。
public static void main(String[] args) throws Exception {
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> kafkaData = env
.addSource(new FlinkKafkaConsumer<String>("test-topic",
new SimpleStringSchema(), properties));
kafkaData.print();
env.execute("Aggregation Job");
}
下面是一个例外:
Job has been submitted with JobID 5cc30fe72f685406126e2f5a26f10341
------------------------------------------------------------
The program finished with the following exception:
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 5cc30fe72f685406126e2f5a26f10341)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
...
Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
I saw another question in stackoverflow,但这并不能解决问题。我没有在kafka代理上配置任何SSL。如有任何建议,我们将不胜感激。
发布于 2021-03-16 06:32:31
今天我也遇到了同样的问题。在我的例子中,问题是我无法将flink应用程序放在VPC中(我的MSK集群位于VPC中)。编辑flink应用程序并将其移动到相应的VPC后,问题就消失了。
我知道这个问题已经提了几个月了,但我想我应该把我的发现发布出来,以防其他人像我一样在谷歌搜索中遇到这个问题。
https://stackoverflow.com/questions/62109520
复制相似问题