一.Flink 中 Data Type 组成
二、Flink 是如何处理 Data Type 的
首先Flink会根据自身的序列化器进行序列化,如果不行,则默认回退到 Kryo 序列化器进行序列化。
可能碰到的问题,如下:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.registerType(KuduTableDesc.class);
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().registerTypeWithKryoSerializer(MyCustomType.class, MyCustomSerializer.class);
DataSet<SomeType> result = dataSet
.map(new MyGenericNonInferrableFunction<Long, SomeType>())
.returns(SomeType.class);
DataSet<SomeType> result = dataSet
.map(new MyGenericNonInferrableFunction<Long, SomeType>())
.returns(new TypeHint<SomeType.class});
TypeInformation<String> info = TypeInformation.of(String.class);
TypeInformation<Tuple2<String, Double>> info = TypeInformation.of(new TypeHint<Tuple2<String, Double>>(){});
三、常见的 returns 的使用
.returns(Types.TUPLE(Types.INT,Types.INT))
.returns(Types.STRING)
.returns(TypeInformation.of(String.class))
.returns(new TypeHint<Tuple2<String, String>>(){})
.returns(TypeInformation.of(new TypeHint<Tuple2<ConsumerRecord, String>>() {}))
.returns(SomeType.class)