我正在使用flink 1.4.2从Kafka读取数据,并使用JSONDeserializationSchema将它们解析为JSONDeserializationSchema。如果传入的记录不是有效的JSON,那么我的Flink作业就会失败。我想跳过破纪录而不想失败。
FlinkKafkaConsumer010<ObjectNode> kafkaConsumer =
new FlinkKafkaConsumer010<>(TOPIC, new JSONDeserializationSchema(), consumerProperties);
在EMR上部署flink应用程序时出现此错误
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/api/common/serialization/DeserializationSchema
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.hadoop.util.RunJar.run(RunJar.java
我用flink (java,maven version8.1)从磁盘读取csv文件(),得到以下异常:
ERROR operators.DataSinkTask: Error in user code: Channel received an event before completing the current partial record.: DataSink(Print to System.out) (4/4)
java.lang.IllegalStateException: Channel received an event before completing the current
有时会引发此错误,并向下查看任务管理器。我使用了org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode WARN org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer - Falling back to default Kryo serializer because Chill serializer couldn't be found.
java.lang.reflect.InvocationTargetExc
我使用flink从kafka读取数据并将其转换为protobuf。我面临的问题是,当我运行java应用程序时,会出现以下错误。如果我将unknownFields变量名修改为其他内容,它就能工作,但是很难对所有protobuf类进行这种更改。
我还试图在阅读kafka时直接反序列化,但我不确定getProducedType()方法应该返回什么。
public static class ProtoDeserializer implements DeserializationSchema{
@Override
public TypeInformation getProduc
我最近开始“玩”Apache Flink。我已经组装了一个小应用程序来开始测试框架等等。当我尝试序列化一个普通的POJO类时,我现在遇到了一个问题: @Getter
@ToString
@EqualsAndHashCode
@NoArgsConstructor
@AllArgsConstructor
public final class Species {
private String name;
private List<String> abilities;
} 不知何故,我可以从堆栈跟踪看出List类型不能序列化,但根据Flink的文档,情况不应该是这样的。这是堆栈
我正在尝试修改一个case类,它有大约240个变量,其中一些是我定义的其他pojos : Signal (case类)有变量PowerPojoFeature,UserFeature。当我修改任何pojo添加新变量时,它会抛出exception rg.apache.flink.util.StateMigrationException下面:操作符typeSerializer的新状态state不能是不兼容的。在org.apache.flink.runtime.state.DefaultOperatorStateBackend.getListState(DefaultOperatorStateBac
我有一个Flink应用程序,它保持与这个错误的错误。 com.org.ads.audience.traffic.MyClass@6eaa21d8 is not serializable. The object probably contains or references non serializable fields.
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:140)
org.apache.flink.api.java.ClosureCleaner.clean(Cl
我尝试将我的Flink应用程序部署到AWS中。此应用程序使用Apache进行反序列化/序列化传入消息。我的应用程序在本地机器上运行良好,但是当我将它部署到AWS时,我有异常(在CloudWatch日志中):Caused by: java.io.InvalidClassException: org.apache.avro.specific.SpecificRecordBase; local class incompatible: stream classdesc serialVersionUID = 4445917349737100331, local class serialVersionUI
我在Apache 3.0 (定制Greeter示例)中有一个嵌入式模块,它使用JSON序列化事件。当试图从入口反序列化route()消息时,我得到了一个例外,即我的自定义类型不能转换为protobuf (是的,它不是) --但是应该是这样吗?我的意思是,我试图使用3.x文档,但没有发现对要路由的类型有任何限制。
在这个问题上有什么暗示或提示吗?
谢谢
// The custom type (Bean-style and all)
public final class Message {
@JsonProperty private String name;
@JsonProperty p
当我执行我的Flink应用程序时,它给出了这个NullPointerException:
2017-08-08 13:21:57,690 INFO com.datastax.driver.core.Cluster - New Cassandra host /127.0.0.1:9042 added
2017-08-08 13:22:02,427 INFO org.apache.flink.runtime.taskmanager.Task - TriggerWindow(TumblingEventTimeWindows(30000), List
我使用dataSet API,我有两种案例类。
case class Geo(country:Int, province:Int, city:Int, county:Int)
case class AntiFraudLog(
eventType: Int,
valid: Boolean
)
case class AntiFraudSession(fraudLogs: Seq[AntiFraudLog])
然后我生成了一个键/值对,它的值是一个case类。
val dataKeyValue: DataSet[(Long, AntiFraudLog)]
并尝试使