前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >写给大忙人的Flink的Data Types

写给大忙人的Flink的Data Types

作者头像
shengjk1
发布2020-03-18 11:32:38
9080
发布2020-03-18 11:32:38
举报
文章被收录于专栏:码字搬砖

一.Flink 中 Data Type 组成

  • 基本数据类型:java 的 8 中基本数据类型加上它们各自的包装类型,在加上 void , String, Date,BigDecimal, BigInteger.
  • 基本数据类型的数据和 Object 类型的数组
  • 复合类型 1.Flink Java Tuples
  • scala case classes
  • Row
  • POJOs: 如果要被 Flink 识别的也允许按 name 引用的话,需要复符合一定的规则(否则的话,会被当做泛型处理) 1). 这个类是 pulic 的并且没有非静态的内部类。 2). 得有一个没有参数的 pulic 构造器 3).所有非静态的非 transient 的属性(包括所有的父类)都必须是 pulic 或者符合 java beans 命名规范的 getter setter 方法。
  • 辅助类型 (集合类、Option、Either 等)
  • 泛型:不会被 Flink 自带的序列化器序列化,而被是 Kryo

二、Flink 是如何处理 Data Type 的

首先Flink会根据自身的序列化器进行序列化,如果不行,则默认回退到 Kryo 序列化器进行序列化。

可能碰到的问题,如下:

  • Registering subtypes 如果方法签名是父类,而返回或者使用的是子类,也就是所谓的协变返回类型关于协变返回类型。让 Flink 知道所有的子类可以在一定的程度上提高性能。
代码语言:javascript
复制
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.registerType(KuduTableDesc.class);
  • Registering custom serializers 虽然 Flink 自己序列化不了的会给 Kryo,但是 Kryo 也不能很好的处理掉所有的类型,这个时候就要自定义序列化器了。
代码语言:javascript
复制
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

env.getConfig().registerTypeWithKryoSerializer(MyCustomType.class, MyCustomSerializer.class);
  • Adding Type Hints Flink 可能无法推断出泛型的类型,仅仅在 Java Api 中时必要的。
代码语言:javascript
复制
DataSet<SomeType> result = dataSet
    .map(new MyGenericNonInferrableFunction<Long, SomeType>())
        .returns(SomeType.class);
        
DataSet<SomeType> result = dataSet
    .map(new MyGenericNonInferrableFunction<Long, SomeType>())
        .returns(new TypeHint<SomeType.class});
  • Manually creating a TypeInformation Flink 可能无法推断出泛型的类型时
代码语言:javascript
复制
TypeInformation<String> info = TypeInformation.of(String.class);

TypeInformation<Tuple2<String, Double>> info = TypeInformation.of(new TypeHint<Tuple2<String, Double>>(){});

三、常见的 returns 的使用

代码语言:javascript
复制
.returns(Types.TUPLE(Types.INT,Types.INT))
.returns(Types.STRING)
.returns(TypeInformation.of(String.class))
.returns(new TypeHint<Tuple2<String, String>>(){})
.returns(TypeInformation.of(new TypeHint<Tuple2<ConsumerRecord, String>>() {}))
.returns(SomeType.class)
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2020/03/16 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
文件存储
文件存储(Cloud File Storage,CFS)为您提供安全可靠、可扩展的共享文件存储服务。文件存储可与腾讯云服务器、容器服务、批量计算等服务搭配使用,为多个计算节点提供容量和性能可弹性扩展的高性能共享存储。腾讯云文件存储的管理界面简单、易使用,可实现对现有应用的无缝集成;按实际用量付费,为您节约成本,简化 IT 运维工作。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档