首页
学习
活动
专区
圈层
工具
发布
49 篇文章
1
YARN
2
Hadoop前世今生
3
AI分类
4
人工智能综述
5
随机森林
6
【HBase】HBase之what
7
【HBase】HBase之how
8
HBase篇--HBase常用优化
9
Hbase优化
10
flink源码从头分析第一篇之WordCount DataStream操作
11
大数据Flink-Java学习之旅第一篇
12
flink(12)-flink on yarn
13
Flink学习——Flink概述
14
Flink学习笔记:2、Flink介绍
15
Flink学习笔记(2) -- Flink部署
16
Flink入门(一)——Apache Flink介绍
17
Flink1.4 Flink程序剖析
18
Flink SQL 优化实战 - 维表 JOIN 优化
19
flink sql 知其所以然(十四):维表 join 的性能优化之路(上)附源码
20
Flink重点难点:维表关联理论和Join实战
21
Flink重点难点:状态(Checkpoint和Savepoint)容错与两阶段提交
22
详解flink中Look up维表的使用
23
Flink 1.11中对接Hive新特性及如何构建数仓体系
24
Flink 实时计算 - SQL 维表 Join 的实现
25
大数据技术周报第 010 期
26
实时数仓在有赞的实践
27
美团基于 Flink 的实时数仓平台建设新进展
28
基于Flink+Hive构建流批一体准实时数仓
29
实时数仓:基于流计算 Oceanus 实现 MySQL 和 HBase 维表到 ClickHouse 的实时分析
30
当 TiDB 与 Flink 相结合:高效、易用的实时数仓
31
flink维表关联系列之Mysql维表关联:全量加载
32
基于Flink的高可靠实时ETL系统
33
基于 Flink 实现的商品实时推荐系统(附源码)
34
【Flink】基于 Flink 的流式数据实时去重
35
Flink 实战 | 贝壳找房基于Flink的实时平台建设
36
Apache Hudi在华米科技的应用-湖仓一体化改造
38
Flink checkpoint
39
理解Flink checkpoint
40
flink checkpoint配置整理
41
flink checkpoint 源码分析 (二)
42
聊聊flink的checkpoint配置
43
Flink中案例学习--State与CheckPoint
44
Flink源码阅读(一)--Checkpoint触发机制
45
Flink企业级优化全面总结(3万字长文,15张图)
46
Flink高频面试题,附答案解析
47
学习Flink,看这篇就够了
48
【最全的大数据面试系列】Flink面试题大全
49
Flink SQL Client综合实战

Flink1.4 Flink程序剖析

Flink程序程序看起来像转换数据集合的普通程序。每个程序都由相同的基本部分组成:

  • 获得一个执行环境
  • 加载/创建初始数据
  • 指定在这些数据上的转换操作
  • 指定计算结果存放位置
  • 触发程序执行

现在我们将对每一步进行一个简要的概述。请注意,Java DataSet API的所有核心类都可以在org.apache.flink.api.java包中找到,而Java DataStream API的类可以在org.apache.flink.streaming.api中找到。Scala DataSet API的所有核心类都可以在org.apache.flink.api.scala包中找到,而Scala DataStream API的类可以在org.apache.flink.streaming.api.scala中找到。

StreamExecutionEnvironment是所有Flink程序的基础。你可以使用StreamExecutionEnvironment上的如下静态方法获取: Java版本:

代码语言:javascript
复制
getExecutionEnvironment()

createLocalEnvironment()

createRemoteEnvironment(String host, int port, String... jarFiles)

Scala版本:

代码语言:javascript
复制
getExecutionEnvironment()

createLocalEnvironment()

createRemoteEnvironment(host: String, port: Int, jarFiles: String*)

通常情况下,我们只需要使用getExecutionEnvironment()即可,因为这会根据上下文做正确的选择:如果你在IDE内执行程序或作为常规的Java程序,将创建一个本地环境,在你的本地机器上执行你的程序。如果使用程序创建JAR文件并通过命令行调用它,那么Flink集群管理器将执行你的main方法,并且getExecutionEnvironment()返回一个用于在集群上执行你程序的执行环境。

对于指定数据源,执行环境有多种方法可以从文件中读取数据:可以逐行读取,以CSV格式文件读取或使用完全自定义的数据输入格式。只要将文本文件作为一系列行读取,就可以使用:

Java版本:

代码语言:javascript
复制
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> text = env.readTextFile("file:///path/to/file");

Scala版本:

代码语言:javascript
复制
val env = StreamExecutionEnvironment.getExecutionEnvironment()

val text: DataStream[String] = env.readTextFile("file:///path/to/file")

这将为你提供一个DataStream,然后就可以应用转换函数来创建新的派生DataStream

通过调用DataStream上的转换函数来应用转换操作。例如,一个map转换函数看起来像这样:

Java版本:

代码语言:javascript
复制
DataStream<String> input = ...;

DataStream<Integer> parsed = input.map(new MapFunction<String, Integer>() {
    @Override
    public Integer map(String value) {
        return Integer.parseInt(value);
    }
});

Scala版本:

代码语言:javascript
复制
val input: DataSet[String] = ...

val mapped = input.map { x => x.toInt }

这将通过将原始集合中的每个String转换为Integer来创建一个新的DataStream

一旦获得了包含最终结果的DataStream,就可以通过创建接收器(sink)将其写入外部系统中。下面是创建接收器的一些示例方法:

Java版本:

代码语言:javascript
复制
writeAsText(String path)

print()

Scala版本:

代码语言:javascript
复制
writeAsText(path: String)

print()

一旦你指定的完整程序需要触发程序执行,可以通过调用StreamExecutionEnvironmentexecute()方法来触发程序的执行。根据执行环境的类型,执行将在你的本地机器上触发,或提交程序在集群上执行。

execute()方法返回一个JobExecutionResult,它包含执行时间和累加器结果。

下一篇
举报
领券