首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Apache Flink实战(二) - 第一个Flink应用程序

Apache Flink实战(二) - 第一个Flink应用程序

作者头像
JavaEdge
修改2023-03-27 14:45:12
9620
修改2023-03-27 14:45:12
举报
文章被收录于专栏:JavaEdgeJavaEdge

编辑切换为居中

添加图片注释,不超过 140 字(可选)

 brew install apache-flink

1 需求

1.1 Flink开发批处理应用程序

词频统计(word count)

一个文件,统计文件中每个单词出现的次数,分隔符是\t。统计结果我们直接打印在控制台(生产上肯定是Sink到目的地)

2 开发环境

2.1 Maven构建

2.1.1 Requirements

Maven 3.0.4(或更高版本)

Java 8

2.1.2 Create Project

使用以下命令之一创建项目

maven archetype

 $ mvn archetype:generate                               \
      -DarchetypeGroupId=org.apache.flink              \
      -DarchetypeArtifactId=flink-quickstart-java      \
      -DarchetypeVersion=1.8.0

允许为新创建的项目命名。 它将以交互方式询问您groupId,artifactId和包名称。

编辑切换为居中

添加图片注释,不超过 140 字(可选)

Run the quickstart script

  $ curl https://flink.apache.org/q/quickstart.sh | bash -s 1.8.0

编辑切换为居中

添加图片注释,不超过 140 字(可选)

2.1.3 Inspect Project

工作目录中将有一个新目录。 如果使用curl,则该目录称为quickstart。 否则,它具有artifactId的名称:

编辑切换为居中

添加图片注释,不超过 140 字(可选)

使用IDEA打开该项目即可!

示例项目是一个Maven项目,它包含两个类:StreamingJob和BatchJob是DataStream和DataSet程序的基本框架程序。 主要方法是程序的入口点,既可用于IDE测试/执行,也可用于正确部署。 建议将此项目导入IDE以进行开发和测试。 IntelliJ IDEA支持开箱即用的Maven项目。 不建议Eclipse

对于Flink,Java的默认JVM堆可能太小。须手动增加它。在IntelliJ IDEA中,推荐的更改JVM选项的方法来自Help | 编辑自定义VM选项菜单 -Xmx800m

2.1.4 Build Project

如果要构建/打包项目,请转到项目目录并运行

mvn clean package

或者使用插件

编辑

添加图片注释,不超过 140 字(可选)

编辑切换为居中

添加图片注释,不超过 140 字(可选)

您将找到包含应用程序的JAR文件,以及可能已作为依赖项添加到应用程序的连接器和库:

target / <artifact-id>  -  <version> .jar

编辑

添加图片注释,不超过 140 字(可选)

注意:如果您使用与StreamingJob不同的类作为应用程序的主类/入口点,我们建议您相应地更改pom.xml文件中的mainClass设置。 这样,Flink可以从JAR文件运行应用程序,而无需另外指定主类。

3 公式型编程

  • step 1 : set up the batch execution environment
  • step 2 : Start with getting some data from the environment, like
env.readTextFile(textPath);
  • step 3 : 开发业务逻辑的核心代码 transform the resulting DataSet<String> using operations,like
.filter()
.flatMap()
.join()
.coGroup()
  • step 4 : execute program

4 Flink批处理应用Java开发之功能实现

在相应目录下建立文本:

编辑

添加图片注释,不超过 140 字(可选)

测试代码:

编辑切换为居中

添加图片注释,不超过 140 字(可选)

成功读取:

编辑切换为居中

添加图片注释,不超过 140 字(可选)

4.1 功能拆解

1)读取数据

Hello JavaEdge

2)每一行的数据按照指定的分隔符拆分

Hello JavaEdge

3)为每一个单词赋上次数为1

(Hello,1) (JavaEdge,1)

4) 合并操作

groupBy

代码

编辑切换为居中

添加图片注释,不超过 140 字(可选)

结果

编辑切换为居中

添加图片注释,不超过 140 字(可选)

## 7 实时处理应用功能实现

 package com.javaedge.flink.basic;
 ​
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.util.Collector;
 ​
 /**
  * 第一个基于Flink实时处理快速入门案例
  */
 public class StreamingWCApp {
     public static void main(String[] args) throws Exception {
 ​
         // 创建上下文
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 ​
         // 对接数据源的数据
         DataStreamSource<String> source = env.socketTextStream("localhost", 9527);
 ​
         // 业务逻辑处理: transformation
         source.flatMap(new FlatMapFunction<String, String>() {
                     @Override
                     public void flatMap(String value, Collector<String> out) throws Exception {
                         String[] words = value.split(",");
                         for (String word : words) {
                             out.collect(word.toLowerCase().trim());
                         }
                     }
                 }).filter(new FilterFunction<String>() {
                     @Override
                     public boolean filter(String value) throws Exception {
                         return StringUtils.isNotEmpty(value);
                     }
                 }).map(new MapFunction<String, Tuple2<String, Integer>>() {
                     @Override
                     public Tuple2<String, Integer> map(String value) throws Exception {
                         return new Tuple2<>(value, 1);
                     }
                 }).keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
                     @Override
                     public String getKey(Tuple2<String, Integer> value) throws Exception {
                         return value.f0;
                     }
                 }).sum(1)
                 .print();
 ​
         env.execute("StreamingWCApp");
     }
 }

可能遇到拒绝连接,记得

nc -lk 9527

socket发送数据:

编辑切换为居中

添加图片注释,不超过 140 字(可选)

控制台收到结果:

编辑切换为居中

添加图片注释,不超过 140 字(可选)

9 Flink实时处理应用代码重构

如何突破端口的限制呢,需重构:

传入参数args

编辑切换为居中

添加图片注释,不超过 140 字(可选)

获得参数:

编辑切换为居中

添加图片注释,不超过 140 字(可选)

11 开发过程中依赖的注意事项

Configuring Dependencies, Connectors, Libraries:

每个Flink应用程序都依赖于一组Flink库。 至少,应用程序依赖于Flink API。 许多应用程序还依赖于某些连接器库(如Kafka,Cassandra等)。 运行Flink应用程序时(在分布式部署中或在IDE中进行测试),Flink运行时库也必须可用。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019-05-05,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1 需求
    • 1.1 Flink开发批处理应用程序
      • 词频统计(word count)
      • 2 开发环境
        • 2.1 Maven构建
          • 2.1.1 Requirements
            • 2.1.2 Create Project
              • maven archetype
                • Run the quickstart script
                  • 2.1.3 Inspect Project
                    • 2.1.4 Build Project
                    • 3 公式型编程
                    • 4 Flink批处理应用Java开发之功能实现
                      • 4.1 功能拆解
                        • 1)读取数据
                          • 2)每一行的数据按照指定的分隔符拆分
                            • 3)为每一个单词赋上次数为1
                              • 4) 合并操作
                              • 9 Flink实时处理应用代码重构
                              • 11 开发过程中依赖的注意事项
                              相关产品与服务
                              大数据
                              全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
                              领券
                              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档