前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >大数据-Flink环境部署(Windows)及Flink编程

大数据-Flink环境部署(Windows)及Flink编程

作者头像
码客说
发布2022-10-04 21:34:42
9220
发布2022-10-04 21:34:42
举报
文章被收录于专栏:码客码客

部署与测试

下载地址

https://archive.apache.org/dist/flink/

本次以Flink 1.9.3版本为例

image-20220926102244311
image-20220926102244311

运行 Flink 需要安装 Java 7.x 或更高的版本

代码语言:javascript
复制
java -version

运行

image-20220926142021974
image-20220926142021974

访问 Flink UI

Flink有个UI界面,可以用于监控Flilnk的job运行状态 http://localhost:8081/

测试

运行自带的 WordCount 示例

以统计 Flink 自带的 README.txt 文件为例。

命令:

代码语言:javascript
复制
D:
cd D:\Tools\bigdata\flink-1.9.3\bin
.\flink.bat run ..\examples\batch\WordCount.jar -input ..\README.txt -output ..\README_CountWord_Result.txt

查看任务完成情况

http://localhost:8081/#/job/completed

编程开发

开发过程中并不需要在本地部署Flink环境。

创建项目

创建项目

image-20220421143002437
image-20220421143002437

项目名WordCount

image-20220421143202036
image-20220421143202036

在项目名称WordCount上单击鼠标右键,在弹出的菜单中点击Add Framework Support

image-20220421184711204
image-20220421184711204

java目录上单击鼠标右键,在弹出的菜单中选择Refactor,再在弹出的菜单中选择Rename

然后,在出现的界面中把java目录名称修改为scala

image-20220421143610503
image-20220421143610503

添加类WordCount

image-20220421143726513
image-20220421143726513

在IDEA开发界面中,打开pom.xml,清空里面的内容,输入如下内容:

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>cn.psvmc</groupId>
  <artifactId>WordCount</artifactId>
  <version>1.0</version>

  <properties>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
    <flink.version>1.9.3</flink.version>
    <scala.version>2.12.15</scala.version>
  </properties>

  <repositories>
    <repository>
      <id>alimaven</id>
      <name>aliyun maven</name>
      <url>https://maven.aliyun.com/repository/public</url>
    </repository>
  </repositories>
  <dependencies>
    <!-- flink核心API -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-java</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-scala_2.12</artifactId>
      <version>${flink.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java_2.12</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-scala_2.12</artifactId>
      <version>${flink.version}</version>
    </dependency>
  </dependencies>
  <build>
    <plugins>
      <plugin>
        <groupId>net.alchim31.maven</groupId>
        <artifactId>scala-maven-plugin</artifactId>
        <version>3.4.6</version>
        <executions>
          <execution>
            <goals>
              <goal>compile</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-assembly-plugin</artifactId>
        <version>3.0.0</version>
        <configuration>
          <descriptorRefs>
            <descriptorRef>jar-with-dependencies</descriptorRef>
          </descriptorRefs>
        </configuration>
        <executions>
          <execution>
            <id>make-assembly</id>
            <phase>package</phase>
            <goals>
              <goal>single</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>
</project>

测试

创建测试文件wordcount.txt

D:\bigdata_study\stu_list.txt

代码语言:javascript
复制
10010,张三,女,16,IS
10011,李四,男,18,IS
10012,王五,男,19,IS
10013,赵六,女,15,CS

然后,再打开WordCount.scala代码文件,清空里面的内容,输入如下内容:

代码语言:javascript
复制
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment, _}
object WordCount {
  case class Student(id: String, name: String, sex: String, age: Int, department: String)
  def main(args: Array[String]): Unit = {
    //设置用户名,避免权限错误
    System.setProperty("HADOOP_USER_NAME", "hadoop");
    val environment = ExecutionEnvironment.getExecutionEnvironment
    val stu_list: DataSet[Student] = environment.readCsvFile[Student](
      filePath = "file:///D:/bigdata_study/stu_list.txt",
      lineDelimiter = "\n",
      fieldDelimiter = ",",
      quoteCharacter = null,
      ignoreFirstLine = false,
      ignoreComments = "#",
      lenient = false,
      includedFields = Array[Int](0, 1, 2, 3, 4),
      pojoFields = Array[String]("id", "name", "sex", "age", "department")
    )
    println("-------------原数据----------")
    stu_list.print
  }
}

运行就可以看到结果为

————-原数据———- Student(10012,王五,男,19,IS) Student(10010,张三,女,16,IS) Student(10013,赵六,女,15,CS) Student(10011,李四,男,18,IS)

打包运行

在IDEA开发界面的右侧,点击Maven图标,会弹出Maven调试界面

在Maven调试界面中点击package,就可以对应用程序进行打包,打包成JAR包。

image-20220421165441710
image-20220421165441710

这时,到IDEA开发界面左侧的项目目录树中,在“target”目录下,就可以看到生成了两个JAR文件,

分别是:WordCount-1.0.jarWordCount-1.0-jar-with-dependencies.jar

image-20220421181248319
image-20220421181248319

语法

加载数据

代码语言:javascript
复制
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment, _}
object WordCount {
  case class Student(id: String, name: String, sex: String, age: Int, department: String)

  def main(args: Array[String]): Unit = {
    //设置用户名,避免权限错误
    System.setProperty("HADOOP_USER_NAME", "hadoop");
    val env = ExecutionEnvironment.getExecutionEnvironment
    val stu: DataSet[(Int, String, Double)] = env.fromElements(
      (19, "Wilson", 178.8),
      (17, "Edith", 168.8),
      (18, "Joyce", 174.8),
      (18, "May", 195.8),
      (18, "Gloria", 182.7),
      (21, "Jessie", 184.8)
    )
    stu.print
  }
}

从文件中加载

代码语言:javascript
复制
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment, _}
object WordCount {
  case class Student(id: String, name: String, sex: String, age: Int, department: String)
  def main(args: Array[String]): Unit = {
    //设置用户名,避免权限错误
    System.setProperty("HADOOP_USER_NAME", "hadoop");
    val environment = ExecutionEnvironment.getExecutionEnvironment
    val stu_list: DataSet[Student] = environment.readCsvFile[Student](
      filePath = "file:///D:/bigdata_study/stu_list.txt",
      lineDelimiter = "\n",
      fieldDelimiter = ",",
      quoteCharacter = null,
      ignoreFirstLine = false,
      ignoreComments = "#",
      lenient = false,
      includedFields = Array[Int](0, 1, 2, 3, 4),
      pojoFields = Array[String]("id", "name", "sex", "age", "department")
    )
    println("-------------原数据----------")
    stu_list.print
  }
}

数据导出

导出到HDFS

代码语言:javascript
复制
//写入到HDFS
val output2 = "hdfs://bdedev/flink/Student002.csv"
ds2.writeAsCsv(output2, rowDelimiter = "\n", fieldDelimiter = "|||", WriteMode.OVERWRITE)
env.execute()

导出到文件

代码语言:javascript
复制
//写入到文件
val output2 = "file:///D:/bigdata_study/result001.txt"
ds3.writeAsCsv(output2, rowDelimiter = "\n", fieldDelimiter = ",", WriteMode.OVERWRITE)
env.execute()

值转换

Flink的Transformation转换主要包括四种:单数据流基本转换、基于Key的分组转换、多数据流转换和数据重分布转换。

单数据流基本转换

image-20220926175102708
image-20220926175102708

基于Key的分组转换

对数据分组主要是为了进行后续的聚合操作,即对同组数据进行聚合分析。

groupBy会将一个DataSet转化为一个GroupedDataSet,聚合操作会将GroupedDataSet转化为DataSet。如果聚合前每个元素数据类型是T,聚合后的数据类型仍为T。

image-20220926180418786
image-20220926180418786
aggregation

常见的聚合操作有summaxmin等,这些聚合操作统称为aggregation。aggregation需要一个参数来指定按照哪个字段进行聚合。跟groupBy相似,我们可以使用数字位置来指定对哪个字段进行聚合,也可以使用字段名。

与批处理不同,这些聚合函数是对流数据进行数据,流数据是依次进入Flink的,聚合操作是对之前流入的数据进行统计聚合。sum算子的功能对该字段进行加和,并将结果保存在该字段上。min操作无法确定其他字段的数值。

代码语言:javascript
复制
val tupleStream = env.fromElements(
  (0, 0, 0), (0, 1, 1), (0, 2, 2),
  (1, 0, 6), (1, 1, 7), (1, 2, 8)
)
tupleStream.groupBy(0).sum(1).print()

第0个分组,第1个求和

结果

(1,3,8) (0,3,2)

reduce
代码语言:javascript
复制
import org.apache.flink.api.common.operators.Order
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment, _}

object WordCount {
  def main(args: Array[String]): Unit = {
    //设置用户名,避免权限错误
    System.setProperty("HADOOP_USER_NAME", "hadoop");
    val env = ExecutionEnvironment.getExecutionEnvironment
    val ds1: DataSet[String] = env.fromElements(
      "good good study", "day day up"
    )
    val group_ds  = ds1.flatMap(line => line.split(" ")).map(word => (word, 1)).groupBy(0)
    val ds3 = group_ds.reduce((a, b) => (a._1, a._2 + b._2))
    ds3.sortPartition(0, Order.ASCENDING).print
  }
}

结果

(up,1) (day,2) (good,2) (study,1)

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022-09-26,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 部署与测试
    • 运行
      • 访问 Flink UI
        • 测试
        • 编程开发
          • 创建项目
            • 测试
              • 打包运行
              • 语法
                • 加载数据
                  • 数据导出
                    • 值转换
                      • 单数据流基本转换
                      • 基于Key的分组转换
                  相关产品与服务
                  大数据
                  全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
                  领券
                  问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档