前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >大数据Flink进阶(七):Flink批和流案例总结

大数据Flink进阶(七):Flink批和流案例总结

原创
作者头像
Lansonli
发布2023-03-27 18:56:00
1.2K0
发布2023-03-27 18:56:00
举报
文章被收录于专栏:Lansonli技术博客Lansonli技术博客

Flink批和流案例总结

关于Flink 批数据处理和流式数据处理案例有以下几个点需要注意:

一、Flink程序编写流程总结

编写Flink代码要符合一定的流程,Flink代码编写流程如下:

a. 获取flink的执行环境,批和流不同,Execution Environment。 b. 加载数据数据-- soure。 c. 对加载的数据进行转换-- transformation。 d. 对结果进行保存或者打印-- sink。 e. 触发flink程序的执行 --env.execute()

在Flink批处理过程中不需要执行execute触发执行,在流式处理过程中需要执行env.execute触发程序执行。

二、关于Flink的批处理和流处理上下文环境

创建Flink批和流上下文环境有以下三种方式,批处理上下文创建环境如下:

//设置Flink运行环境,如果在本地启动则创建本地环境,如果是在集群中启动,则创建集群环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

//指定并行度创建本地环境
LocalEnvironment localEnv = ExecutionEnvironment.createLocalEnvironment(10);

//指定远程JobManagerIp 和RPC 端口以及运行程序所在Jar包及其依赖包
ExecutionEnvironment remoteEnv = ExecutionEnvironment.createRemoteEnvironment("JobManagerHost", 6021, 5, "application.jar");

流处理上下文创建环境如下:

//设置Flink运行环境,如果在本地启动则创建本地环境,如果是在集群中启动,则创建集群环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

//指定并行度创建本地环境
LocalStreamEnvironment localEnv = StreamExecutionEnvironment.createLocalEnvironment(5);

//指定远程JobManagerIp 和RPC 端口以及运行程序所在Jar包及其依赖包
StreamExecutionEnvironment remoteEnv = StreamExecutionEnvironment.createRemoteEnvironment("JobManagerHost", 6021, 5, "application.jar");

同样在Scala api 中批和流创建Flink 上下文环境也有以上三种方式,在实际开发中建议批处理使用"ExecutionEnvironment.getExecutionEnvironment()"方式创建。流处理使用"StreamExecutionEnvironment.getExecution-Environment()"方式创建。

三、Flink批和流 Java 和 Scala导入包不同

在编写Flink Java api代码和Flink Scala api代码处理批或者流数据时,引入的ExecutionEnvironment或StreamExecutionEnvironment包不同,在编写代码时导入错误的包会导致编程有问题。

批处理不同API引入ExecutionEnvironment如下:

//Flink Java api 引入的包
import org.apache.flink.api.java.ExecutionEnvironment;
//Flink Scala api 引入的包
import org.apache.flink.api.scala.ExecutionEnvironment

流处理不同API引入StreamExecutionEnvironment如下:

//Flink Java api 引入的包
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
//Flink Scala api 引入的包
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

四、Flink Java Api中创建 Tuple 方式

在Flink Java api中创建Tuple2时,可以通过new Tuple2方式也可以通过Tuple2.of方式,两者本质一样。

五、Flink Scala api需要导入隐式转换

在Flink Scala api中批处理和流处理代码编写过程中需要导入对应的隐式转换来推断函数操作后的类型,在批和流中导入隐式转换不同,具体如下:

//Scala 批处理导入隐式转换,使用Scala API 时需要隐式转换来推断函数操作后的类型
import org.apache.flink.api.scala._
//Scala 流处理导入隐式转换,使用Scala API 时需要隐式转换来推断函数操作后的类型
import org.apache.flink.streaming.api.scala._

六、关于Flink Java api 中的 returns 方法

Flink Java api中可以使用Lambda表达式,当涉及到使用泛型Java会擦除泛型类型信息,需要最后调用returns方法指定类型,明确声明类型,告诉系统函数生成的数据集或者数据流的类型。

七、批和流对数据进行分组方法不同

批和流处理中都是通过readTextFile来读取数据文件,对数据进行转换处理后,Flink批处理过程中通过groupBy指定按照什么规则进行数据分组,groupBy中可以根据字段位置指定key(例如:groupBy(0)),如果数据是POJO自定义类型也可以根据字段名称指定key(例如:groupBy("name")),对于复杂的数据类型也可以通过定义key的选择器KeySelector来实现分组的key。

Flink流处理过程中通过keyBy指定按照什么规则进行数据分组,keyBy中也有以上三种方式指定分组key,建议使用通过KeySelector来选择key,其他方式已经过时。

八、关于DataSet Api (Legacy)软弃用

Flink架构可以处理批和流,Flink 批处理数据需要使用到Flink中的DataSet API,此API 主要是支持Flink针对批数据进行操作,本质上Flink处理批数据也是看成一种特殊的流处理(有界流),所以没有必要分成批和流两套API,从Flink1.12版本往后,Dataset API 已经标记为Legacy(已过时),已被官方软弃用,官方建议使用Table API 或者SQL 来处理批数据,我们也可以使用带有Batch执行模式的DataStream API来处理批数据,在未来Flink版本中DataSet API 将会被删除。关于这些API 具体使用后续文章会进行讲解。

关于Flink集群提交任务及Flink flink-conf.yaml配置文件在下个章节集群搭建会进行介绍。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Flink批和流案例总结
    • 一、Flink程序编写流程总结
      • 二、关于Flink的批处理和流处理上下文环境
        • 三、Flink批和流 Java 和 Scala导入包不同
          • 四、Flink Java Api中创建 Tuple 方式
            • 五、Flink Scala api需要导入隐式转换
              • 六、关于Flink Java api 中的 returns 方法
                • 七、批和流对数据进行分组方法不同
                  • 八、关于DataSet Api (Legacy)软弃用
                  相关产品与服务
                  大数据处理套件 TBDS
                  腾讯大数据处理套件(Tencent Big Data Suite,TBDS)依托腾讯多年海量数据处理经验,基于云原生技术和泛 Hadoop 生态开源技术对外提供的可靠、安全、易用的大数据处理平台。 TBDS可在公有云、私有云、非云化环境,根据不同数据处理需求组合合适的存算分析组件,包括 Hive、Spark、HBase、Flink、presto、Iceberg、Alluxio 等,以快速构建企业级数据湖、数据仓库。
                  领券
                  问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档