首页
学习
活动
专区
圈层
工具
发布

Flink进阶教程:数据类型和序列化机制简介

val stock = StockPrice("0001", 0L, 121) println(stock.symbol) Java POJO Java的话,需要定义POJO类,定义POJO类有一些注意事项...所有子字段也必须是Flink支持的数据类型。 下面三个例子中,只有第一个是POJO,其他两个都不是POJO,非POJO类将使用Kryo序列化工具。...此外,使用Avro生成的类可以被Flink识别为POJO。 Tuple Tuple可被翻译为元组,比如我们可以将之前的股票价格抽象为一个三元组。...tuple"); } Scala的Tuple中所有元素都不可变,Java的Tuple中的元素是可以被更改和赋值的,因此在Java中使用Tuple可以充分利用这一特性,这样可以减少垃圾回收的压力。...registerType方法的源码如下所示,其中TypeExtractor对数据类型进行推断,如果传入的类型是POJO,则可以被Flink识别和注册,否则将使用Kryo。

2.5K10

Flink实战(三) - 编程范式及核心概念

最初通过在Flink程序中添加源来创建集合,并通过使用诸如map,filter等API方法对它们进行转换来从这些集合中派生新集合。...: 按字段名称选择POJO字段 例如,“user”指的是POJO类型的“user”字段 通过1偏移字段名称或0偏移字段索引选择元组字段 例如,“_ 1”和“5”分别表示Scala Tuple...Flink必须支持字段的类型。 目前,Flink使用Avro序列化任意对象(例如Date)。 Flink分析POJO类型的结构,即它了解POJO的字段。 因此,POJO类型比一般类型更容易使用。...它们不是通过通用序列化框架,而是通过使用读取和写入方法实现org.apache.flinktypes.Value接口来为这些操作提供自定义代码。当通用序列化效率非常低时,使用值类型是合理的。...ResultTypeQueryable接口可以通过输入格式和函数实现,以明确告知API其返回类型。调用函数的输入类型通常可以通过先前操作的结果类型来推断。 参考 Apache Flink

2K20
  • 您找到你想要的搜索结果了吗?
    是的
    没有找到

    Flink实战(三) - 编程范式及核心概念

    最初通过在Flink程序中添加源来创建集合,并通过使用诸如map,filter等API方法对它们进行转换来从这些集合中派生新集合。...: 按字段名称选择POJO字段 例如,“user”指的是POJO类型的“user”字段 通过1偏移字段名称或0偏移字段索引选择元组字段 例如,“_ 1”和“5”分别表示Scala Tuple类型的第一个和第六个字段...Flink必须支持字段的类型。 目前,Flink使用Avro序列化任意对象(例如Date)。 Flink分析POJO类型的结构,即它了解POJO的字段。 因此,POJO类型比一般类型更容易使用。...它们不是通过通用序列化框架,而是通过使用读取和写入方法实现org.apache.flinktypes.Value接口来为这些操作提供自定义代码。当通用序列化效率非常低时,使用值类型是合理的。...ResultTypeQueryable接口可以通过输入格式和函数实现,以明确告知API其返回类型。调用函数的输入类型通常可以通过先前操作的结果类型来推断。 参考 Apache Flink

    1.7K40

    Table API&SQL的基本概念及使用介绍

    以下列表概述了不同选项的功能: Row:字段通过位置,任意数量的字段映射,支持空值,无类型安全访问。 POJO:按名称映射字段(POJO字段必须命名为表字段),任意字段数,支持空值,类型安全访问。...可以通过为所有字段提供名称(基于位置的映射)来重命名字段。如果未指定字段名称,则使用默认字段名称。...(Java and Scala) Flink支持POJO作为复合类型。...在这里记录了确定POJO的规则。将POJO DataStream或DataSet转换为Table而不指定字段名称时,将使用原始POJO字段的名称。...重命名原始POJO字段需要关键字AS,因为POJO字段没有固有的顺序。名称映射需要原始名称,不能通过位置来完成。

    6.6K70

    Flink DataStream编程指南

    Tule字段的访问通过偏移,如_1,访问第一个元素。Case class元素的访问使用的是字段的名称。...和Scala类将被Flink视为特殊的POJO数据类型: 1),class必须是public 2),必须有一个public无参构造函数 3),所有字段都是public,或者可以通过getter和setter...Flink分析POJO类型的结构,即它了解POJO的字段。因此,POJO类型比一般类型更容易使用。此外,Flink可以比一般类型更有效地处理POJO。...4,General Class Types Flink支持大多数Java和Scala类(API和自定义)。限制使用于包含无法序列化的字段的类,如文件指针,I / O流或其他本机资源。...遵循Java Bean规则的类通常运行良好。 没有标识为POJO类型的所有类(参见上面的POJO要求)由Flink作为一般类类型处理。

    4.6K70

    Flink的sink实战之三:cassandra3

    两种写入cassandra的方式 flink官方的connector支持两种方式写入cassandra: Tuple类型写入:将Tuple对象的字段对齐到指定的SQL的参数中; POJO类型写入:通过DataStax...,将POJO对象对应到注解配置的表和字段中; 接下来分别使用这两种方式; 开发(Tuple写入) 《Flink的sink实战之二:kafka》中创建了flinksinkdemo工程,在此继续使用; 在pom.xml...; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.tuple.Tuple2...; import org.apache.flink.util.Collector; import java.util.Properties; public class CassandraTuple2Sink...; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream

    1.4K10

    【flink番外篇】1、flink的23种常用算子介绍及详细示例(2)- keyby、reduce和Aggregations

    二、Flink 示例专栏Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。...(完整版)一、Flink的23种算子说明及示例本文示例中使用的maven依赖和java bean 参考本专题的第一篇中的maven和java bean。...对于POJO类型,KeyBy可以通过keyBy(fieldName)指定字段进行分区。对于Tuple类型,KeyBy可以通过keyBy(fieldPosition)指定字段进行分区。...;import org.apache.flink.api.java.functions.KeySelector;import org.apache.flink.api.java.tuple.Tuple;...import java.util.ArrayList;import java.util.Arrays;import java.util.List;import org.apache.flink.api.java.tuple.Tuple3

    82610

    大数据Flink进阶(七):Flink批和流案例总结

    批处理不同API引入ExecutionEnvironment如下: //Flink Java api 引入的包 import org.apache.flink.api.java.ExecutionEnvironment...Java Api中创建 Tuple 方式 在Flink Java api中创建Tuple2时,可以通过new Tuple2方式也可以通过Tuple2.of方式,两者本质一样。...,使用Scala API 时需要隐式转换来推断函数操作后的类型 import org.apache.flink.streaming.api.scala._ 六、关于Flink Java api 中的 returns...七、批和流对数据进行分组方法不同 批和流处理中都是通过readTextFile来读取数据文件,对数据进行转换处理后,Flink批处理过程中通过groupBy指定按照什么规则进行数据分组,groupBy中可以根据字段位置指定...key(例如:groupBy(0)),如果数据是POJO自定义类型也可以根据字段名称指定key(例如:groupBy("name")),对于复杂的数据类型也可以通过定义key的选择器KeySelector

    1.6K41

    使用Apache Flink进行批处理入门教程

    另外,如果你刚刚开始使用Apache Flink,在我看来,最好从批处理开始,因为它更简单,并且类似于使用数据库。...在本文中,我们将使用Java来编写应用程序,当然您也可以在Scala,Python或R中的一门语言来编写Flink应用程序。...并非每种Java类型都可用于数据集,但你可以使用四种不同类型的类型: 内置Java类型和POJO类 Flink tuples(元组)和Scala case类 Values,它是Java基本类型的特殊可变式装饰器...请记住,Java流操作与这些操作之间最大的区别在于Java 8可以处理内存中的数据并且可以访问本地数据,而Flink在分布式环境中处理集群中的数据。 我们来看看使用了这些操作的简单示例。...因此,我们使用f1字段和f2字段分别访问这些列。

    23K4133

    Apache Flink在小米的发展和应用

    所以从实际使用体验上讲,Flink 的调度数据模式,显然更容易减少损耗,提高计算效率,同时在使用上更符合用户“直觉”,不易出现重复创建资源的情况。...在数据序列化上,Flink 和 Spark 采用了不同的方式;Spark 对于所有数据默认采用 Java 原生序列化方式,用户也可以配置使用 Kryo;而 Flink 则是自己实现了一套高效率的序列化方法...有意思的是,Flink 官方文档里对于不要使用Java原生序列化强调了三遍,甚至网上有传言 Oracle 要抛弃 Java 原生序列化: ?...另外,用户为了保证数据能使用Flink自带的序列化器,有时候不得不自己再重写一个 POJO 类,把外部系统中数据的值再“映射”到这个 POJO 类中;而根据开发人员对 POJO 的理解不同,写出来的效果可能不一样...,比如之前有个用户很肯定地说自己是按照 POJO 的规范来定义的类,我查看后发现原来他不小心多加了个 logger,这从侧面说明还是有一定的用户使用门槛的。

    1.1K30

    Flink零基础实战教程:股票价格数据流处理

    通过本文,你可以学到: 定义相关数据结构。 Flink流处理程序的骨架。 Flink的执行环境概念。 自定义Source、设置时间戳和Watermark。...数据结构 Flink能处理任何可被序列化的数据结构: 基础数据类型,包括 String、Integer、Boolean、Array 复杂数据结构,包括 Scala case class和 Java POJO...case class StockPrice(symbol: String, timestamp: Long, price: Double) 当然,如果使用Java,也可以定义一个POJO(Plain Old...Java Object),该类中各个字段或者具有public属性,或者有一个对应的getter和setter方法,且该类有一个无参数的构造函数。...import java.util.Calendar import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor

    3K10
    领券