前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink1.4 Flink程序剖析

Flink1.4 Flink程序剖析

作者头像
smartsi
发布2019-08-07 08:31:13
5700
发布2019-08-07 08:31:13
举报
文章被收录于专栏:SmartSi

Flink程序程序看起来像转换数据集合的普通程序。每个程序都由相同的基本部分组成:

  • 获得一个执行环境
  • 加载/创建初始数据
  • 指定在这些数据上的转换操作
  • 指定计算结果存放位置
  • 触发程序执行

现在我们将对每一步进行一个简要的概述。请注意,Java DataSet API的所有核心类都可以在org.apache.flink.api.java包中找到,而Java DataStream API的类可以在org.apache.flink.streaming.api中找到。Scala DataSet API的所有核心类都可以在org.apache.flink.api.scala包中找到,而Scala DataStream API的类可以在org.apache.flink.streaming.api.scala中找到。

StreamExecutionEnvironment是所有Flink程序的基础。你可以使用StreamExecutionEnvironment上的如下静态方法获取: Java版本:

代码语言:javascript
复制
getExecutionEnvironment()

createLocalEnvironment()

createRemoteEnvironment(String host, int port, String... jarFiles)

Scala版本:

代码语言:javascript
复制
getExecutionEnvironment()

createLocalEnvironment()

createRemoteEnvironment(host: String, port: Int, jarFiles: String*)

通常情况下,我们只需要使用getExecutionEnvironment()即可,因为这会根据上下文做正确的选择:如果你在IDE内执行程序或作为常规的Java程序,将创建一个本地环境,在你的本地机器上执行你的程序。如果使用程序创建JAR文件并通过命令行调用它,那么Flink集群管理器将执行你的main方法,并且getExecutionEnvironment()返回一个用于在集群上执行你程序的执行环境。

对于指定数据源,执行环境有多种方法可以从文件中读取数据:可以逐行读取,以CSV格式文件读取或使用完全自定义的数据输入格式。只要将文本文件作为一系列行读取,就可以使用:

Java版本:

代码语言:javascript
复制
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> text = env.readTextFile("file:///path/to/file");

Scala版本:

代码语言:javascript
复制
val env = StreamExecutionEnvironment.getExecutionEnvironment()

val text: DataStream[String] = env.readTextFile("file:///path/to/file")

这将为你提供一个DataStream,然后就可以应用转换函数来创建新的派生DataStream

通过调用DataStream上的转换函数来应用转换操作。例如,一个map转换函数看起来像这样:

Java版本:

代码语言:javascript
复制
DataStream<String> input = ...;

DataStream<Integer> parsed = input.map(new MapFunction<String, Integer>() {
    @Override
    public Integer map(String value) {
        return Integer.parseInt(value);
    }
});

Scala版本:

代码语言:javascript
复制
val input: DataSet[String] = ...

val mapped = input.map { x => x.toInt }

这将通过将原始集合中的每个String转换为Integer来创建一个新的DataStream

一旦获得了包含最终结果的DataStream,就可以通过创建接收器(sink)将其写入外部系统中。下面是创建接收器的一些示例方法:

Java版本:

代码语言:javascript
复制
writeAsText(String path)

print()

Scala版本:

代码语言:javascript
复制
writeAsText(path: String)

print()

一旦你指定的完整程序需要触发程序执行,可以通过调用StreamExecutionEnvironmentexecute()方法来触发程序的执行。根据执行环境的类型,执行将在你的本地机器上触发,或提交程序在集群上执行。

execute()方法返回一个JobExecutionResult,它包含执行时间和累加器结果。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2018-01-04,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档