Flink DataStream编程指南

Flink程序是执行分布式集合转换(例如,filtering, mapping, updating state, joining, grouping, defining windows, aggregating)的常规程序。集合创建于原始的数据源(例如,通过从文件读取,kafka主题或从本地内存集合中进行创建)。通过sinks返回结果,例如将数据写入(分布式)文件或标准输出(例如,命令行终端)。Flink程序以各种上下文运行,独立或嵌入其他程序中。执行可能发生在本地JVM或许多机器的集群上。取决于数据源的类型,即有界或无界源,您将编写批处理程序或流式程序,其中DataSet API用于批处理,DataStream API用于流式传输。

一,DataSet and DataStream

Flink有一个特殊的类DataSet和DataStream来表示程序中的数据。您可以将它们视为不可变的数据集合,可以包含重复的数据。在DataSet的情况下,数据是有限的,而对于DataStream,元素的数量可以是无限制的。

这些集合在一些关键方面与常规Java集合不同。 首先,它们是不可变的,这意味着一旦创建它们就不能添加或删除元素。 你也不能简单地检查里面的元素。

最初通过在Flink程序中添加一个源来创建一个集合,并且通过使用API方法(如map,filter等)来转换它们,从这些集合中导出新集合。

二,解刨Flink程序

Flink每个程序由相同的基本部分组成:

1),获取执行环境,

2),加载/创建初始数据,

3),指定对此数据的转换,

4),指定计算结果的位置,

5),触发程序执行

现在我们将对每个步骤进行概述,详情后面会出文章介绍。

StreamExecutionEnvironment 是Flink项目的基础。可以使用下面的StreamExecutionEnvironment静态方法来创建:

getExecutionEnvironment()

createLocalEnvironment()

createRemoteEnvironment(host: String, port: Int, jarFiles: String*)

通常情况下,您只需要使用getExecutionEnvironment(),因为这将根据上下文执行正确的操作:如果您正在IDE中执行程序或作为常规Java程序,将创建一个将执行您的程序的本地环境。如果您从程序中创建了一个JAR文件,并通过命令行调用它,Flink集群管理器将执行您的main方法,而getExecutionEnvironment()将返回一个在集群上执行程序的执行环境。为了指定数据源,执行环境有几种使用各种方法从文件中读取的方法:您可以逐行阅读它们,如CSV文件,或使用完全自定义的数据输入格式。使用下面的方式

val env = StreamExecutionEnvironment.getExecutionEnvironment()

val text: DataStream[String] = env.readTextFile("file:///path/to/file")

这将创建一个DataStream,然后您可以应用转换来创建新的派生DataStream。您可以通过使用转换函数调用DataSet上的方法来应用转换。例如,map变换如下所示:

val input: DataSet[String] = ...
val mapped = input.map { x => x.toInt }

这将通过将原始集合中的每个String转换为整数来创建一个新的DataStream。

一旦你有一个包含最终结果的DataStream,你可以通过创建一个sink来将它写入外部系统。这些只是创建接收器的一些示例方法:

writeAsText(path: String)
print()

一旦您指定了完整的程序,您需要通过调用StreamExecutionEnvironment上的execute()来触发程序执行。根据ExecutionEnvironment的类型,执行将在本地机器上触发或提交程序以在集群上执行。

execute()方法返回一个JobExecutionResult,它包含执行时间和累加器结果。

三,Lazy Evaluation

所有Flink程序都懒执行:当执行程序的main方法时,数据加载和转换不会直接发生。相反,每个操作都被创建并添加到程序的计划中。当执行由执行环境上的execute()调用显式触发时,这些操作实际上被执行。程序是在本地还是在集群上执行取决于执行环境的类型。

懒执行可以让你构建Flink执行的复杂的程序,并视其为整体计划单元。

四,Specifying Keys

一些转换(join,coGroup,keyBy,groupBy)要求在一组元素上定义一个键。其他转换(Reduce,GroupReduce,Aggregate,Windows)允许使用key 对数据进行分组。

DataSet被分组为:

DataSet<...> input = // [...]
 DataSet<...> reduced = input
  .groupBy(/*define key here*/)
  .reduceGroup(/*do something*/);

同时可以给DataStream指定一个键,方法如下:

DataStream<...> input = // [...]
 DataStream<...> windowed = input
  .keyBy(/*define key here*/)
  .window(/*window specification*/);

Flink的数据模型不是基于键值对。因此,您不需要将数据集类型物理打包到键和值中。Keys是“虚拟”:它们被定义为实际数据的函数,以指导分组运算符。

五,Define keys for Tuples

最简单的情况是在元组的一个或多个字段上分组元组:

val input: DataStream[(Int, String, Long)] = // [...]
val keyed = input.keyBy(0)

元组在第一个字段(Integer类型)中分组。

val input: DataSet[(Int, String, Long)] = // [...]
val grouped = input.groupBy(0,1)

在这里,我们将元组分组在由第一个和第二个字段组成的复合key上。

关于嵌套元组的注释:如果您有一个包含嵌套元组的DataStream,例如:

DataStream<Tuple3<Tuple2<Integer, Float>,String,Long>> ds;

指定keyBy(0)将使系统使用完整的Tuple2作为键(以整数和浮点为键)。 如果要正确的使用,则必须使用下面说明的field expression 。

五,Define keys using Field Expressions

您可以使用String-based 的Field Expressions来引用嵌套字段并定义用于 grouping, sorting, joining, or coGrouping。Field Expressions使得非常容易选择(嵌套)复合类型(如Tuple和POJO类型)中的字段。在下面的例子中,我们有一个WC POJO,它有两个字段“word”和“count”。要通过字段分组,我们只是将其名称传递给keyBy()函数。

// some ordinary POJO (Plain old Java Object)
class WC(var word: String, var count: Int) {
 def this() { this("", 0L) }
}
val words: DataStream[WC] = // [...]
val wordCounts = words.keyBy("word").window(/*window specification*/)

// or, as a case class, which is less typing
case class WC(word: String, count: Int)
val words: DataStream[WC] = // [...]
val wordCounts = words.keyBy("word").window(/*window specification*/)

Field Expression语法:

1),按其字段名称选择POJO字段。例如,“user”是指POJO类型的“user”字段。

2),通过其1-偏移字段名称或0-offset字段索引选择元组字段。 例如,“_1”和“5”分别指Scala元组类型的第一个和第六个字段。

3),您可以在POJO和元组中选择嵌套字段。例如,“user.zip”是指存储在POJO类型的“user”字段中的POJO的“zip”字段。支持POJO和Tuples的任意嵌套和混合,例如“_2.user.zip”或“user._4.1.zip”。

4),您可以使用“_”通配符表达式选择完整类型。这也适用于不是元组或POJO类型的类型。

Field Expression 例子:

class WC(var complex: ComplexNestedClass, var count: Int) {
 def this() { this(null, 0) }
}
class ComplexNestedClass(
 var someNumber: Int,
 someFloat: Float,
 word: (Long, Long, String),
 hadoopCitizen: IntWritable) {
 def this() { this(0, 0, (0, 0, ""), new IntWritable(0)) }
}

这些是上述示例代码的有效字段表达式:

“count”:WC类中的计数字段。

“complex”:递归选择POJO类型ComplexNestedClass的字段复合体的所有字段。

“complex.word._3”:选择嵌套的Tuple3的最后一个字段。

“complex.hadoopCitizen”:选择Hadoop IntWritable类型。

六,Define keys using Key Selector Functions

定义key 的另一种方法是“key selector”功能。key selector函数将单个元素作为输入,并返回元素的key。

以下示例显示了一个key selector函数,它只返回一个对象的字段:

// some ordinary case class
case class WC(word: String, count: Int)
val words: DataStream[WC] = // [...]
val keyed = words.keyBy( _.word )

七,Specifying Transformation Functions

大多数转换需要用户定义的函数。本节列出了如何指定它们的不同方法。

1,Lambda Functions

如前面的示例中已经看到的,所有操作都接受lambda函数来描述操作:

val data: DataSet[String] = // [...]
 data.filter { _.startsWith("http://") }
val data: DataSet[Int] = // [...]
 data.reduce { (i1,i2) => i1 + i2 }
// or
data.reduce { _ + _ }

2,Rich functions

以lambda函数作为参数的所有转换可以取代作为参数的丰富函数。

比如,我们常用的是

data.map { x => x.toInt }

可以替换为:

class MyMapFunction extends RichMapFunction[String, Int] {
 def map(in: String):Int = { in.toInt }
})
data.map(new MyMapFunction())

Rich functions也可以定义为匿名类:

data.map (new RichMapFunction[String, Int] {
 def map(in: String):Int = { in.toInt }
})

Rich functions除了用户定义的功能(map,reduce等)外,还提供了四种方法:open,close,getRuntimeContext和setRuntimeContext。这些功能可用于参数化功能(参见传递函数),创建和完成本地状态,访问广播变量(请参阅广播变量)以及访问运行时信息(如累加器和计数器)以及有关。

八,Supported Data Types

Flink对DataSet或DataStream中的元素类型设置了一些限制。其原因是系统分析类型以确定有效的执行策略。

有六种类型的数据类型:

1),Java Tuples and Scala Case Classes

2),Java POJOs

3),Primitive Types

4),Regular Classes

5),Values

6),Hadoop Writables

7),Special Types

1,Tuples and Case Classes

Scala的case classes(作为案例类的特殊情况的Scala元组)是包含固定数量的各种类型的字段的复合类型。Tule字段的访问通过偏移,如_1,访问第一个元素。Case class元素的访问使用的是字段的名称。

case class WordCount(word: String, count: Int)
val input = env.fromElements(
 WordCount("hello", 1),
 WordCount("world", 2)) // Case Class Data Set

input.keyBy("word")// key by field expression "word"

val input2 = env.fromElements(("hello", 1), ("world", 2)) // Tuple2 Data Set

input2.keyBy(0, 1) // key by field positions 0 and 1

2,POJOs

如果符合以下要求,则Java和Scala类将被Flink视为特殊的POJO数据类型:

1),class必须是public

2),必须有一个public无参构造函数

3),所有字段都是public,或者可以通过getter和setter函数访问。对于一个名为foo的字段,getter和setter方法必须命名为getFoo()和setFoo()。

4),Flink必须支持字段的类型。目前,Flink使用Avro序列化任意对象(如Date)。

Flink分析POJO类型的结构,即它了解POJO的字段。因此,POJO类型比一般类型更容易使用。此外,Flink可以比一般类型更有效地处理POJO。

以下示例显示了一个带有两个公共字段的简单POJO。

class WordWithCount(var word: String, var count: Int) {
 def this() {
 this(null, -1)
  }
}
val input = env.fromElements(
 new WordWithCount("hello", 1),
 new WordWithCount("world", 2)) // Case Class Data Set

input.keyBy("word")// key by field expression "word"

3,Primitive Types

Flink支持所有Java和Scala原始类型,如Integer, String, and Double。

4,General Class Types

Flink支持大多数Java和Scala类(API和自定义)。限制使用于包含无法序列化的字段的类,如文件指针,I / O流或其他本机资源。遵循Java Bean规则的类通常运行良好。

没有标识为POJO类型的所有类(参见上面的POJO要求)由Flink作为一般类类型处理。Flink将这些数据类型视为黑框,并且无法访问其内容(即用于高效排序)。一般类型使用序列化框架Kryo进行序列化。

5,Values

Value类型手动描述它们的序列化和反序列化。他们提供实现了org.apache.flinktypes.Value

(具有read和write方法)接口的自定义代码操作算子,而不是使用通用的框架。

当通用序列化效率非常低时,使用Value类型是合理的。一个例子是一个数据类型,它将一个稀疏的元素向量作为一个数组实现。由于数组大多为零,所以可以对非零元素使用特殊编码,而通用序列化则会简单的编写所有数组元素。

org.apache.flinktypes.CopyableValue接口以类似的方式支持手动内部copy逻辑。

Flink带有与基本数据类型相对应的预定义值类型。(ByteValue, ShortValue, IntValue, LongValue, FloatValue, DoubleValue, StringValue, CharValue, BooleanValue).这些值类型充当基本数据类型的可变变体:它们的值可以更改,允许程序员重用对象并减轻垃圾回收器的压力。

6,Hadoop Writables

您可以使用实现org.apache.hadoop.Writable接口的类型。在write()和readFields()方法中定义的序列化逻辑将用于序列化。

7,Special Types

您可以使用特殊类型,包括Scala的Either,Option和Try。Java API有Either的自定义实现。类似于Scala的Either,它代表一个两种可能的类型的值Left或Right。对于错误处理或需要输出两种不同类型的记录的操作符,可能是有用的。

8,Type Erasure & Type Inference(类型擦除和类型推断)

本节仅与Java相关。

编译后,Java编译器会抛出大部分的泛型类型信息。这被称为Java中的类型擦除。这意味着在运行时,对象的一个实例不再知道它的泛型类型。例如,DataStream <String>和DataStream <Long>的实例对于jvm来说是相同的。

Flink在准备执行程序时(当调用程序的main 方法时)需要类型信息。Flink Java API尝试以各种方式重建丢弃的类型信息,并将其明确存储在数据集和操作符中。您可以通过DataStream.getType()检索类型。该方法返回一个TypeInformation的实例,它是Flink内部表示类型的方式。在某些情况下,类型推断有其限制,需要程序员的“cooperation”。示例是从集合创建数据集的方法,例如ExecutionEnvironment.fromCollection(),您可以在其中传递描述类型的参数。而且通用功能如MapFunction<I, O> 可能需要额外的类型信息。

ResultTypeQueryable接口可以通过输入格式和函数来实现,以便明确地告诉API它们的返回类型。函数调用的输入类型通常可以通过以前操作的结果类型来推断。

九,Accumulators & Counters

Accumulators 是由添加操作和最终累积结果简单构成,在作业结束后可用。最直接的Accumulators 是counter:您可以使用Accumulator.add(V value)方法来增加它。在作业结束时,Flink将聚合(合并)所有部分结果并将结果发送给客户端。累积器在调试期间很有用,或者如果您想快速了解更多有关数据的信息。

Flink目前拥有以下built-in accumulators。每个都实现了累加器接口。

1),IntCounter,LongCounter和DoubleCounter:使用计数器的示例见下文。

2),直方图:离散数量的分箱的直方图实现。在内部它只是一个从整数到整数的map。您可以使用它来计算值的分布,例如,一个单词计数程序的每行字的分布。

1,累加器使用

首先,您必须在用户定义的转换函数中创建一个累加器对象(这里是一个计数器)。

private IntCounter numLines = new IntCounter();

第二,您必须注册累加器对象,通常在 rich function的open()方法中。此时,还可以定义一个名字。

getRuntimeContext().addAccumulator("num-lines", this.numLines);

您现在可以在运算符函数中使用累加器,包括在open()和close()方法中。

this.numLines.add(1);

这个结果结果将存储在从执行环境的execute()方法返回的JobExecutionResult对象中(目前这仅在执行等待完成作业时才起作用)。

myJobExecutionResult.getAccumulatorResult("num-lines")

所有累加器每个job共享一个命名空间。因此,您可以在job的不同操作算子中使用相同的累加器。Flink将内部合并所有具有相同名称的累加器。

关于累加器和迭代的注释:目前,累积器的结果仅在总体作业结束后才可用。我们还计划在下一次迭代中使上一次迭代的结果可用。您可以使用聚合器来计算每次迭代统计数据,并且基于此类统计信息的迭代结束。

2,自定义累加器

要实现自己的累加器,你只需要编写你的累加器接口的实现。您可以选择实现Accumulator 或SimpleAccumulator。

Accumulator <V,R>是最灵活的:它为要添加的值定义一个类型V,最终结果的结果类型为R。例如。 对于直方图,V是数字,R是直方图。SimpleAccumulator适用于两种类型相同的情况,如counters。

十,总结

本文主要是Flink 编程基本介绍。希望,大家通过<Flink流式处理概念简介>和本文。对Flink有更深入的认识,也对Flink编程有进一步的认识,后面会陆续出各种使用文章及生产中的注意事项。

原文发布于微信公众号 - Spark学习技巧(bigdatatip)

原文发表时间:2017-08-31

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏C/C++基础

统计无符号整数二进制中1的个数(Hamming weight)

之所以来记录这个问题的解法,是因为在在线编程中经常遇到,比如编程之美和京东的校招笔试以及很多其他公司都累此不疲的出这个考题。看似简单的问题,背后却隐藏着很多精妙...

1532
来自专栏灯塔大数据

技术 | Python从零开始系列连载(十九)

但它的特点就是下次使用next(a)时,接着上次的断点继续运行,直到下一个yield

1123
来自专栏九彩拼盘的叨叨叨

如何给函数取个合适的名字

Quora 和 Ubuntu Forums thread 上的 4500 个程序员对上面的问题进行投票。49%的程序员认为给函数,变量等命名是最难的任务。

712
来自专栏take time, save time

你所能用到的BMP格式介绍(二)

一、可能你忽视的基础         在正式开始之前,我不得不从最基本的地方开始,因为这些地方大多数人会忽视的一干二净,如果不在开始进行说明,那么在后面一定会有...

2967
来自专栏IMWeb前端团队

简洁的javascript编码(一)--变量、函数

本文作者:IMWeb jaychen 原文出处:IMWeb社区 未经同意,禁止转载 ? 一、变量 使用语义化的变量名称 Bad cons...

2509
来自专栏顶级程序员

Python 工匠:编写条件分支代码的技巧

我一直觉得编程某种意义上是一门『手艺』,因为优雅而高效的代码,就如同完美的手工艺品一样让人赏心悦目。

942
来自专栏苦逼的码农

Hash冲突之开放地址法

比如说我的输入是任意一个自然数(0,1,2,3...),而我要求经过一个函数后我的输出的数的范围要在0-9这样一个范围之间。

4772
来自专栏阿凯的Excel

让你眼花缭乱的匹配函数反查技巧

小编已经连续写了三期关于匹配函数的用法,匹配函数的扛把子(老大)肯定是Vlookup函数莫属,但是Vlookup函数有一个问题,就是要查找的内容,必须在查找内容...

2966
来自专栏java达人

哈希表

哈希表是种数据结构,它可以提供快速的插入操作和查找操作。第一次接触哈希表时,它的优点多得让人难以置信。不论哈希表中有多少数据,插入和删除(有时包括侧除)只需要接...

1847
来自专栏java一日一条

成为优秀Swift开发者的10条建议

在这里给大家分享一些帮助大家成为更优秀的Swift开发者的建议,让你的代码,写的更少,性能更优 。

1172

扫码关注云+社区

领取腾讯云代金券