我是flink和Kafka的新手。我正在尝试使用Confluent Schema注册表对avro数据进行反序列化。我已经在ec2机器上安装了flink和Kafka。此外,在运行代码之前已经创建了"test“主题。 代码路径:https://gist.github.com/mandar2174/5dc13350b296abf127b92d0697c320f2 作为实现的一部分,代码执行以下操作: 1) Create a flink DataStream object using a list of user element. (User class is avro generated
我正在使用PyFlink创建一个流处理器。当我将Kafka连接到Flink时,一切工作正常。但是当我向kafka发送json数据时,PyFlink接收到它,但反序列化程序将其转换为空。PyFlink代码是 from pyflink.common.serialization import Encoder
from pyflink.datastream.connectors import StreamingFileSink
from pyflink.common.serialization import JsonRowDeserializationSchema
from pyflink.comm
我的客户类已经使用maven-avro plugin.When创建了,我尝试运行这个程序,我得到的是Exception in thread "main" java.lang.IllegalStateException: Expecting type to be a PojoTypeInfo错误。
[main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class com.example.Customer does not contain a setter for field first_name
[main
不是一个错误,但我确实看到了这一行,根据消息可能会影响性能: 2019-01-02 14:44:44,879 INFO org.apache.flink.api.java.typeutils.TypeExtractor
- class org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition
does not contain a setter for field topic
2019-01-02 14:44:44,879 INFO org.apache.flink.api
自定义类
Person
class Person
{
private Integer id;
private String name;
//getters and setters
}
卡夫卡Flink连接器
TypeInformation<Person> info = TypeInformation.of(Person.class);
TypeInformationSerializationSchema schema = new TypeInformationSerializationSchema(info, new ExecutionConfig());
DataSt
我试图连接卡夫卡与Flink和运行通过sql-client.sh。但是,无论我如何处理.yaml和库,我都会得到错误:
Exception in thread "main" org.apache.flink.table.client.SqlClientException: Unexpected exception. This is a bug. Please consider filing an issue.
at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:201)
我在flink应用程序(Flink 1.10.1)中定义了以下case类
case class FilterDefinition(filterDefId: String, filter: TileFilter)
case class TileFilter(tiles: Seq[Long], zoomLevel: Int)
在运行时,我注意到日志上写着
FilterDefinition cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as Gene
使用flink版本1.13.1
我写了一个自定义的度量报告,但在我的flink中似乎不起作用。启动flink时,JobManager显示警告日志,如下所示:
2021-08-25 14:54:06,243 WARN org.apache.flink.runtime.metrics.ReporterSetup [] - The reporter factory (org.apache.flink.metrics.kafka.KafkaReporterFactory) could not be found for reporter kafka. Available f
我真的很难让Flink与运行中的Kafka实例进行正确的通信,使用来自communicate的Avro模式(对于,键和值)。
经过一段时间的思考和重组我的计划,我能够推动我的实现到目前为止:
生产方法
public static FlinkKafkaProducer<Tuple2<GenericRecord,GenericRecord>> kafkaAvroGenericProducer() {
final Properties properties = new Properties();
properties.put(Pro
我有一个flink应用程序,它从kafka中读取并将其吸收到kafka。
当我从Intellij运行应用程序时,没有问题,但是当我将ClassCastException提交给flink集群时,它会给出shadowJar。我能在找出我在这里做错了什么方面得到一些帮助吗?
异常跟踪:
Caused by: java.lang.ClassCastException: cannot assign instance of org.apache.kafka.clients.consumer.OffsetResetStrategy to field org.apache.flink.connector.ka
我在集群上使用Flink。在提交任务时,我得到了以下例外:
Caused by: java.util.concurrent.CompletionException: java.lang.IllegalStateException: Trying to work with offloaded serialized shuffle descriptors.
at java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326)
at java.util.concurrent.
我已经编写了一个消费者,它读取卡夫卡主题,并使用StreamSink以拼图格式写入数据。但是我得到了以下错误
java.lang.Exception: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowS
对于Flink工作,我有一个简单的概念证明。它基本上接收来自Kafka主题的消息(以JSON格式),将它们反序列化为域模型,根据一些预定义的规则对这些消息进行验证,应用一些转换,最后将结果消息发布到Kafka接收器中。
我确实有几个函数/操作符,它们使用其他“服务”类的一些行为。这些“服务”类也可以导入其他一些依赖项。
据我所知,Flink将尝试(反)序列化这些函数/操作符,以使整个作业真正分发。我不清楚Flink是否会通过在这些字段/成员上使用transient自动避免这种情况,或者仅仅将它们声明为static是否足以避免这种情况。
这就是我所拥有的一个例子:
public final cl