Flink
程序程序看起来像转换数据集合的普通程序。每个程序都由相同的基本部分组成:
现在我们将对每一步进行一个简要的概述。请注意,Java DataSet API
的所有核心类都可以在org.apache.flink.api.java
包中找到,而Java DataStream API
的类可以在org.apache.flink.streaming.api
中找到。Scala DataSet API
的所有核心类都可以在org.apache.flink.api.scala
包中找到,而Scala DataStream API
的类可以在org.apache.flink.streaming.api.scala
中找到。
StreamExecutionEnvironment
是所有Flink
程序的基础。你可以使用StreamExecutionEnvironment
上的如下静态方法获取:
Java版本:
getExecutionEnvironment()
createLocalEnvironment()
createRemoteEnvironment(String host, int port, String... jarFiles)
Scala版本:
getExecutionEnvironment()
createLocalEnvironment()
createRemoteEnvironment(host: String, port: Int, jarFiles: String*)
通常情况下,我们只需要使用getExecutionEnvironment()
即可,因为这会根据上下文做正确的选择:如果你在IDE
内执行程序或作为常规的Java
程序,将创建一个本地环境,在你的本地机器上执行你的程序。如果使用程序创建JAR
文件并通过命令行调用它,那么Flink
集群管理器将执行你的main
方法,并且getExecutionEnvironment()
返回一个用于在集群上执行你程序的执行环境。
对于指定数据源,执行环境有多种方法可以从文件中读取数据:可以逐行读取,以CSV格式文件读取或使用完全自定义的数据输入格式。只要将文本文件作为一系列行读取,就可以使用:
Java版本:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = env.readTextFile("file:///path/to/file");
Scala版本:
val env = StreamExecutionEnvironment.getExecutionEnvironment()
val text: DataStream[String] = env.readTextFile("file:///path/to/file")
这将为你提供一个DataStream
,然后就可以应用转换函数来创建新的派生DataStream
。
通过调用DataStream
上的转换函数来应用转换操作。例如,一个map
转换函数看起来像这样:
Java版本:
DataStream<String> input = ...;
DataStream<Integer> parsed = input.map(new MapFunction<String, Integer>() {
@Override
public Integer map(String value) {
return Integer.parseInt(value);
}
});
Scala版本:
val input: DataSet[String] = ...
val mapped = input.map { x => x.toInt }
这将通过将原始集合中的每个String
转换为Integer
来创建一个新的DataStream
。
一旦获得了包含最终结果的DataStream
,就可以通过创建接收器(sink
)将其写入外部系统中。下面是创建接收器的一些示例方法:
Java版本:
writeAsText(String path)
print()
Scala版本:
writeAsText(path: String)
print()
一旦你指定的完整程序需要触发程序执行,可以通过调用StreamExecutionEnvironment
的execute()
方法来触发程序的执行。根据执行环境的类型,执行将在你的本地机器上触发,或提交程序在集群上执行。
execute()
方法返回一个JobExecutionResult
,它包含执行时间和累加器结果。