1、Flink 的 TypeInformation
在 Flink 中,数据类型的描述信息都是定义在 TypeInformation 中,比较常用的 TypeInformatica 有 BasicTypeInfo、TupleTypeInfo、CaseClassTypeInfo、PojoTypeInfo 等
Flink 为何要别出心裁,自己搞一套类型系统呢?
原因有二
一是做类型检查,Flink支持比较灵活的基于field的join或group,需要先检查这个field是否可以作为key,或这个field是否可以做join或group,
二是更加利于存储,Flink 对不同类型做了不同的存储性能优化
2、原生数据类型(BasicTypeInfo)
包括所有的Java基本类型和装箱类型以及void,String,Date,BigDecimal,和BigInteger。
如下代码所示,通过从给定的元素集中创建 DataStream 数据集
// 创建 Int 类型数据集
val intStream:DataStream[Int] = env.fromElements(1,2,3,4,5,6)
// 创建 String 类型的数据集
val stringStream:DataStream[String] = env.fromElements("hello","world")
3、基本类型数组(BasicArrayTypeInfo)
包括基本类型的数组和 String 对象的数组,如下
// 通过数组创建集合
val dataStream:DataStream[Int] = env.fromCollection(Array(1,2,3,4))
// 通过List集合创建数据集
val dataStream:DataStream[Int] = env.fromCollection(List(1,2,3))
4、Java Tuples 类型(TupleTypeInfo)
Flink 提供了 Tuples 类供用户使用,目前最大支持25个字段
如下:
val tuple2Stream = env.fromElements(Tuple2("a",1),Tuple2("b",3))
5、Case Class 类型(CaseClassTypeInfo)
支持任意CaseClass,字段上限是22
// 创建 case classcase class WordCount(word:String,count:Int)
// 使用 case class 创建输入数据集val input = env.fromElements(WordCount("a",1),WordCount("b",2))
// 使用字段名分区val keyedStream = input.keyBy("word")// 使用字段位置分区val keyedStream2 = input.keyBy(0)
6、Pojos 类型(PojoTypeInfo)
Pojos 类可以完成复杂数据类型的定义,Flink 通过实现 PojoTypeInfo 来描述任意的 POJOs,包括 Java 和 Scala 类。
但是 Pojos 得遵循以下要求:
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 这里是用scala定义的类,注意要有默认构造器,否则会报错class Person (var name:String,var age:Int) { def this() { this(null,-1) }}
val personStream = env.fromElements(new Person("Peter",14),new Person("Linda",25))
personStream.keyBy("name").print()
env.execute("job")
7、小结
好,到此为止,Flink 基本知识已经讲完了,简单小结一下:
首先我们感性的认识了一下实时架构长什么样,和经典单体架构以及微服务架构有什么区别;
然后我们介绍了什么是有状态的实时架构,引出了 Flink 框架;
然后介绍了 Flink 是哪些组件构成的,编程接口是什么;
最后介绍了基本的程序结构,以及 Flink 数据类型。