专栏首页大数据使用Apache Flink进行批处理入门教程

使用Apache Flink进行批处理入门教程

Getting Started With Batch Processing Using Apache Flink

原文作者:Ivan Mushketyk

原文地址:https://dzone.com/articles/getting-started-with-batch-processing-using-apache

译者微博:@从流域到海域

译者博客:blog.csdn.net/solo95

使用Apache Flink进行批处理入门教程

如果你一直在关注最近有关软件开发的新闻,你可能听说过一个名为Apache Flink的新项目。我已经这里这里写了一些关于它的文章,如果你不熟悉它的话可以参考一下。Apache Flink是一种新一代的大数据处理工具,可以处理有限数据集(这也称为批处理)或者可能无限的数据流(流处理)。就它的新功能而言,许多人认为Apache Flink是一款有能力成为规则改变者的软件,未来甚至可以取代Apache Spark。

在本文中,我将向您介绍如何使用Apache Flink来实现简单的批处理算法。我们将从设置我们的开发环境开始,接着你会看到如何加载数据,处理数据集以及将数据写回到外部系统。

为什么使用批处理?

您可能已经听说流处理是“现在最新的热点”,而且Apache Flink恰好就是一个流处理工具。这可能会产生一个问题:为什么我们仍然需要学习如何实现批处理应用程序?

尽管流处理已经变得越来越普遍,但许多任务仍然需要批处理。另外,如果你刚刚开始使用Apache Flink,在我看来,最好从批处理开始,因为它更简单,并且类似于使用数据库。一旦您学会如何完成批处理,就可以认识到Apache Flink在流处理功能上的强大之处!

如何遵循示例进行编程

如果你想自己实现一些Apache Flink应用程序,首先你需要创建一个Flink项目。在本文中,我们将使用Java来编写应用程序,当然您也可以在Scala,Python或R中的一门语言来编写Flink应用程序。

要创建Flink Java项目,请执行以下命令:

mvn archetype:generate \ -DarchetypeGroupId=org.apache.flink \ -DarchetypeArtifactId=flink-quickstart-java \ -DarchetypeVersion=1.3.2

输入group id(组编号),artifact id(项目编号)和project version(项目版本)后,此命令将创建以下项目结构:

. ├── pom.xml └── src └── main ├── java │ └── flinkProject │ ├── BatchJob.java │ ├── SocketTextStreamWordCount.java │ ├── StreamingJob.java │ └── WordCount.java └── resources └── log4j.properties

这里最重要的是大量的pom.xml明确定义了所有必要的依赖关系。自动创建的Java类是一些简单的Flink应用程序的示例,您可以参考一下这些应用程序,但我们不需要它们来实现我们的目的。

要开始开发您的第一个Flink应用程序,请使用下面的main方法创建一个类:

public class FilterMovies {
      public static void main(String[] args) throws Exception
      { 

// Create Flink execution environment final ExecutionEnvironment env =  ExecutionEnvironment.getExecutionEnvironment(); 
// We will write our code here 
// Start Flink application env.execute(); } }

main方法并没有什么特别之处。我们所要做的就是添加一些样板代码。

首先,我们需要创建一个Flink执行环境,如果您在本地机器或Flink群集上运行Flink执行环境,其行为将会有所不同:

  • 在本地机器上,它将创建一个拥有多个本地节点的完整的Flink集群。这是测试应用程序如何在实际环境中工作的好方法
  • 在Flink集群上,它将不会创建任何内容,而是使用现有的集群资源

或者,你可以像这样创建一个接口环境:

ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment();

这将会创建一个Flink执行环境,而不是在本地集群上运行Flink应用程序,它将模拟在单个Java进程中使用内存集合的所有操作。您的应用程序运行速度会更快,但此环境与具有多个节点的本地集群会有一些细微差别。

我们从哪里开始?

在我们做任何事情之前,我们需要将数据读入Apache Flink。我们可以从众多系统中读取数据,包括本地文件系统,S3,HDFS,HBase,Cassandra等。无论从哪里读取数据集,Apache Flink都允许我们使用DataSet类以统一的方式处理数据:

DataSet<Integer> numbers = ...

数据集中的所有项目应具有相同的类型。单个泛型参数指定了存储在数据集中的数据的类型。

要从文件中读取数据,我们可以使用readTextFileString这样一种方法,它将逐行读取文件中的行并返回类型为string的数据集:

DataSet<String> lines = env.readTextFile("path/to/file.txt"); 

如果你指一个定这样的文件路径,Flink将尝试读取本地文件。如果你想从HDFS读取文件,你需要指定hdfs://协议:

env.readCsvFile("hdfs:///path/to/file.txt")

Flink同样也支持CSV文件,但在适用CSV文件的情况下,它不会返回字符串数据集。它会尝试解析每一行并返回实例类型为Tuple的数据集:

DataSet<Tuple2<Long, String>> lines = env.readCsvFile("data.csv") .types(Long.class, String.class);

Tuple2是存储不可改变的两个域中的一对值的一个类,但也有其他类似的类,从Tuple0Tuple3一直到Tuple25存储从0到25个字段的类。稍后,你将看到如何使用这些类。

types方法指定CSV文件中列的类型和数量,因此Flink可以读取到它们的解析。`

我们还可以创建对于小实验和单元测试非常有用的小型数据集:

// Create from a list 
DataSet<String> letters = env.fromCollection(Arrays.asList("a", "b", "c")); 
// Create from an array 
DataSet<Integer> numbers = env.fromElements(1, 2, 3, 4, 5);

您可能会问的问题是我们可以在DataSet中存储哪些数据?并非每种Java类型都可用于数据集,但你可以使用四种不同类型的类型:

  • 内置Java类型和POJO类
  • Flink tuples(元组)和Scala case类
  • Values,它是Java基本类型的特殊可变式装饰器,可用于提高性能(我将在即将发布的其中一篇文章中对此进行介绍)
  • Hadoop可写接口的实现

使用Apache Flink处理数据

现在到了数据处理部分!你如何实现一个算法来处理你的数据?为此,您可以使用许多类似于Java 8标准流操作的操作,例如:

  • map:使用用户定义的函数转换数据集中的项目。每个输入元素都被转换为一个输出元素。
  • filter:根据用户定义的函数过滤数据集中的项目。
  • flatMap:类似于map运算符,但允许返回零个,一个或多个元素。
  • groupBy:按键值分组得元素。与SQL中的GROUP BY运算符类似。
  • project:在tuples(元组)数据集中选择指定的字段,类似于SQL中的SELECT操作符。
  • reduce:使用用户定义的函数将数据集中的元素组合为单个值。

请记住,Java流操作与这些操作之间最大的区别在于Java 8可以处理内存中的数据并且可以访问本地数据,而Flink在分布式环境中处理集群中的数据。

我们来看看使用了这些操作的简单示例。以下示例非常简单。它创建一个数字数据集,它对每个数字进行平方,然后过滤掉所有的奇数。

// Create a dataset of numbers 
DataSet<Integer> numbers = env.fromElements(1, 2, 3, 4, 5, 6, 7); 
// Square every number 
DataSet<Integer> result = numbers.map(new MapFunction<Integer, Integer>() { @Override public Integer map(Integer integer) throws Exception { return integer * integer; } }) 
// Leave only even 
numbers .filter(new FilterFunction<Integer>() { @Override public boolean filter(Integer integer) throws Exception { return integer % 2 == 0; } });

如果您对Java 8有任何经验,您可能想知道为什么我在这里不使用lambdas(表达式)。我们可以在这里使用lambda,但它可能会导致一些复杂性,正如我在这篇文章中写的那样。

保存数据返回

在我们完成数据处理后,保存我们的辛苦工作的结果是有意义的。Flink可以将数据存储到许多第三方系统中,如HDFS,S3,Cassandra等。

例如,要将数据写入文件,我们需要使用writeAsTextDataSet类中的方法:

DataSet<Integer> ds = ... ds.writeAsText("path/to/file");

为了调试/测试目的,Flink可以将数据写入标准输出或标准输出流:

DataSet<Integer> ds = ... 
// Output dataset to the standard output 
ds.print(); 
// Output dataset to the standard err 
ds.printToErr()

更复杂的例子

为了实现一些更有意义的算法,我们首先需要下载Grouplens电影数据集。它包含几个电影和电影评级信息的CSV文件。我们将从movies.csv这个包含所有电影列表的数据集中处理文件,如下所示:

movieId,title,genres 1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy 2,Jumanji (1995),Adventure|Children|Fantasy 3,Grumpier Old Men (1995),Comedy|Romance 4,Waiting to Exhale (1995),Comedy|Drama|Romance 5,Father of the Bride Part II (1995),Comedy 6,Heat (1995),Action|Crime|Thriller 7,Sabrina (1995),Comedy|Romance 8,Tom and Huck (1995),Adventure|Children 9,Sudden Death (1995),Action 10,GoldenEye (1995),Action|Adventure|Thriller

它有三列:

  • movieId:此数据集中电影的唯一电影ID。
  • title:电影的标题。
  • genres:将每部电影其他电影区分开的类型列表。

我们现在可以在Apache Flink中加载这个CSV文件并执行一些有意义的处理。在这里,我们将从本地文件系统来加载文件,而在实际应用环境中,您将可能会读取更大规模的数据集,并且它可能驻留在分布式系统中,例如S3或HDFS。

在这个演示中,让我们找到所有“动作”类型的电影。这是一个代码片段,它可以实现这一点:

// Load dataset of movies 
DataSet<Tuple3<Long, String, String>> lines = env.readCsvFile("movies.csv") .ignoreFirstLine() .parseQuotedStrings('"') .ignoreInvalidLines() .types(Long.class, String.class, String.class); 

DataSet<Movie> movies = lines.map(new MapFunction<Tuple3<Long,String,String>, Movie>() { @Override public Movie map(Tuple3<Long, String, String> csvLine) throws Exception { String movieName = csvLine.f1; String[] genres = csvLine.f2.split("\\|"); return new Movie(movieName, new HashSet<>(Arrays.asList(genres))); } }); 

DataSet<Movie> filteredMovies = movies.filter(new FilterFunction<Movie>() { @Override public boolean filter(Movie movie) throws Exception { return movie.getGenres().contains("Action"); } }); 

filteredMovies.writeAsText("output.txt");

让我们来拆开它。首先,我们使用readCsvFile方法读取CSV文件:

DataSet<Tuple3<Long, String, String>> lines = env.readCsvFile("movies.csv") 
// ignore CSV header .ignoreFirstLine() 
// Set strings quotes character .parseQuotedStrings('"') 
// Ignore invalid lines in the CSV file .ignoreInvalidLines() 
// Specify types of columns in the CSV file .types(Long.class, String.class, String.class);
//以上注释左边是要实现的操作 右边是具体的代码 译者注

使用helper方法,我们指定了如何解析CSV文件中的字符串,并且我们需要跳过第一行。在最后一行中,我们指定了CSV文件中每一列的类型,Flink将为我们解析数据。

现在,当我们在Flink集群中加载数据集时,我们可以进行一些数据处理。首先,我们使用map方法解析每部电影的流派列表:

DataSet<Movie> movies = lines.map(new MapFunction<Tuple3<Long,String,String>, Movie>() { @Override public Movie map(Tuple3<Long, String, String> csvLine) throws Exception { String movieName = csvLine.f1; String[] genres = csvLine.f2.split("\\|"); return new Movie(movieName, new HashSet<>(Arrays.asList(genres))); } });

为了转换每一部电影我们需要实现MapFunction方法,它将接收每个CSV记录作为Tuple3实例并将其转换为Movie的POJO类:

class Movie { private String name; private Set<String> genres; public Movie(String name, Set<String> genres) { this.name = name; this.genres = genres; } public String getName() { return name; } public Set<String> getGenres() { return genres; } }

如果您回想起CSV文件的结构,第二列包含了电影名称,第三列包含了类型列表。因此,我们使用f1字段和f2字段分别访问这些列。

现在,当我们有一个电影数据集时,我们可以实现算法的核心部分并过滤出所有的动作电影:

DataSet<Movie> filteredMovies = movies.filter(new FilterFunction<Movie>() { @Override public boolean filter(Movie movie) throws Exception { return movie.getGenres().contains("Action"); } });

这只会返回包含流派中包含“动作”的电影。

现在最后一步非常简单 - 我们将结果数据存储到一个文件中:

filteredMovies.writeAsText("output.txt"); 

这段代码只是将结果数据存储到本地的文本文件中,但与readTextFilehdfs方法一样,我们可以通过指定类似hdfs://的协议将此文件写入HDFS或S3中。

更多信息

这是一篇介绍性文章,关于Apache Flink还有更多内容。我会在不久的将来写更多关于Flink的文章,敬请关注!您可以在这里阅读我的其他文章,或者您可以查看我的Pluralsight课程,其中详细介绍了Apache Flink了解Apache Flink。这是本课程的简短预览

本文的版权归 Steve Wang 所有,如需转载请联系作者。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 使用 Spring Data 以 Redis 作为数据存储来构建应用 - 第 1 部分

    在本文里面,我将介绍 Java 开发者使用 Spring Data 访问 Redis 并执行操作的编程方式。

    大数据弄潮儿
  • 如何部署 Galera 数据库集群

    MariaDB数据库管理系统是MySQL的一个分支,主要由开源社区在维护,采用GPL授权许可。开发这个分支的原因之一是:甲骨文公司收购了MySQL后,有将MyS...

    大数据弄潮儿
  • 使用Google App Script和Google Sheet自动生成数据仪表盘

    仪表盘是将数据内容有效地传达给团队的方法之一。举例来说,仪表盘可以用来跟踪关键性能指标(KPI)的进度。在Lucid,有一个KPI就是我们的产品在第三方市场中的...

    大数据弄潮儿
  • 写给大忙人的Flink的Data Types

    二、Flink 是如何处理 Data Type 的 首先Flink会根据自身的序列化器进行序列化,如果不行,则默认回退到 Kryo 序列化器进行序列化。

    shengjk1
  • Spring Boot 自动配置

    ○@SpringBootConfiguration:标记当前类为配置类 ○@EnableAutoConfiguration:开启自动配置 ○@Compone...

    赵哥窟
  • Java数组赋值

    String [] word = {"hello", "world", "java"}; String [] dest = new String...

    一灰灰blog
  • 重磅|Flink1.9新特性抢先看,文末附PPT下载

    6月29日,Apache Flink Meetup 北京站圆满落幕,Apache Flink 1.9 版本是自 Flink 1.0 之后变化最大的版本,社区对 ...

    王知无
  • Android安全攻防战,反编译与混淆技术完全解析(上)

    之前一直有犹豫过要不要写这篇文章,毕竟去反编译人家的程序并不是什么值得骄傲的事情。不过单纯从技术角度上来讲,掌握反编译功能确实是一项非常有用的技能,可能平常不太...

    用户1158055
  • UrlEncodedFormEntity

    UrlEncodedFormEntity这个类是用来把输入数据编码成合适的内容,下面以注册的时候传递的参数为例:

    wust小吴
  • 【万字长文】Spring MVC 层层递进轻松入门 !

    Html是“名词”,CSS是“形容词”,JavaScript是“动词”,这三个兄弟凑在一起,就构成了 “静态” 页面,那么如何让他 “动态” 起来呢?这就需要后...

    BWH_Steven

扫码关注云+社区

领取腾讯云代金券