专栏首页使用Apache Flink进行流处理

使用Apache Flink进行流处理

如果在你的脑海里,“Apache Flink”和“流处理”没有很强的联系,那么你可能最近没有看新闻。Apache Flink已经席卷全球大数据领域。现在正是这样的工具蓬勃发展的绝佳机会:流处理在数据处理中变得越来越流行,Apache Flink引入了许多重要的创新。

在本文中,我将演示如何使用Apache Flink编写流处理算法。我们将读取维基百科的编辑流,并将了解如何从中获得一些有意义的数据。在这个过程中,您将看到如何读写流数据,如何执行简单的操作以及如何实现更复杂一点的算法。

入门

我相信,如果您是Apache Flink新手,最好从学习批处理开始,因为它更简单,并能为您学习流处理提供一个坚实的基础。我已经写了一篇介绍性的博客文章,介绍如何使用Apache Flink 进行批处理,我建议您先阅读它。

如果您已经知道如何在Apache Flink中使用批处理,那么流处理对您来说没有太多惊喜。和以前一样,我们将看看应用程序中的三个不同的阶段:从源中读取数据,处理数据以及将数据写入外部系统。

与批处理相比,这几乎没有显着差异。首先,在批处理中,所有数据都被提前准备好。当处理进程在运行时,即使有新的数据到达我们也不会处理它。

不过,在流处理方面有所不同。我们在生成数据时会读取数据,而我们需要处理的数据流可能是无限的。采用这种方法,我们几乎可以实时处理传入数据。

在流模式下,Flink将读取数据并将数据写入不同的系统,包括Apache Kafka,Rabbit MQ等基本上可以产生和使用稳定数据流的系统。需要注意的是,我们也可以从HDFS或S3读取数据。在这种情况下,Apache Flink会不断监视一个文件夹,并在文件生成时处理它们。

以下是我们如何在流模式下从文件中读取数据:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> stream = env.readTextFile("file/path");

请注意,要使用流处理,我们需要使用StreamExecutionEnvironment类而不是ExecutionEnvironment类。此外,读取数据的方法会返回一个稍后将用于数据处理的DataStream类的实例。

我们也可以像批处理案例中那样从集合或数组创建有限流:

DataStream<Integer> numbers = env.fromCollection(Arrays.asList(1, 2, 3, 4, 5 6);
DataStream<Integer> numbers = env.fromElements(1, 2, 3, 4, 5);

简单的数据处理

对于处理流中的一个流项目,Flink提供给操作员一些类似批处理的操作如map, filtermapReduce

让我们来实现我们的第一个流处理示例。我们将阅读一个维基百科的编辑流并显示我们感兴趣的内容。

首先,要阅读编辑流,我们需要使用WikipediaEditsSource

DataStream<WikipediaEditEvent> edits = env.addSource(new WikipediaEditsSource());

要使用它,我们需要调用用于从Kafka,Kinesis,RabbitMQ等源中读取数据的方法addSource。此方法返回我们当前可以处理的编辑流。

让我们来筛选所有非机器生成并且已经改变了一千多个字节的编辑:

edits.filter((FilterFunction<WikipediaEditEvent>) edit -> {
    return !edit.isBotEdit() && edit.getByteDiff() > 1000;
})
.print();

这与在批处理情况下如何使用filter方法非常相似,唯一的不同是它处理的是无限流。

现在最后一步是运行我们的程序。像以前一样,我们需要调用执行方法execute

env.execute()

该程序将开始打印筛选的Wikipedia编辑,直到我们停止它:

2> WikipediaEditEvent{timestamp=1506499898043, channel='#en.wikipedia', title='17 FIBA Womens Melanesia Basketball Cup', diffUrl='https://en.wikipedia.org/w/index.php?diff=802608251&oldid=802520770', user='Malto15', byteDiff=1853, summary='/* Preliminary round */', flags=0}
7> WikipediaEditEvent{timestamp=1506499911216, channel='#en.wikipedia', title='User:MusikBot/StaleDrafts/Report', diffUrl='https://en.wikipedia.org/w/index.php?diff=802608262&oldid=802459885', user='MusikBot', byteDiff=11674, summary='Reporting 142 stale non-AfC drafts', flags=0}
...

流窗口

请注意,到目前为止,我们已经讨论过的所有方法都是针对流中的各个元素进行的。看上去我们不可能使用这些简单的操作来实现出许多有趣的流算法。仅使用它们不可能实现以下用例:

  • 计算每分钟执行的编辑次数。
  • 计算每十分钟每个用户执行的编辑次数。

很明显,要解决这些问题,我们需要处理一组元素。这是流窗口的用途。

简而言之,流窗口允许我们对流中的元素进行分组,并对每个组执行用户自定义的功能。这个用户自定义函数可以返回零个,一个或多个元素,并以这种方式创建一个新的流,我们可以在一个独立的系统中处理或存储它。

我们如何将流中的元素分组?Flink提供了几个选项来执行此操作:

  • 滚动窗口:在流中创建不重叠的相邻窗口。我们可以按时间对元素进行分组(例如,从10:00到10:05的所有元素分为一个组)或计数(前50个元素进入单独的组)。比如,我们可以使用它来解决一个问题,例如“对流中的多个元素进行非重复五分钟间隔计数”。
  • 滑动窗口:与滚动窗口类似,但在这里,窗口可以重叠。如果我们需要计算最近五分钟的指标,我们可以使用它,但我们希望每分钟显示一次输出。
  • 会话窗口:在这种情况下,Flink将彼此时间上邻近的事件分组。
  • 全局窗口:在这种情况下,Flink将所有元素放到一个窗口中。这仅在我们定义一个窗口何时完成的自定义触发器时是有用的。

除了选择如何将元素分配给不同的窗口,我们还需要选择一个流类型。Flink有两种流类型:

  1. 键控流:使用此流类型,Flink将通过键(例如,进行编辑的用户的名称)将单个流划分为多个独立的流。当我们在键控流中处理窗口时,我们定义的函数只能访问具有相同键的项目。但使用多个独立的流时Flink可以进行并行工作。
  2. 非键控流:在这种情况下,流中的所有元素将被一起处理,我们的用户自定义函数将访问流中所有元素。这种流类型的缺点是它不提供并行性,集群中只能有一台机器执行我们的代码。

现在,让我们使用流窗口来进行一些演示。首先,让我们来看看维基百科每分钟执行多少次编辑。首先,我们需要读取编辑流:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<WikipediaEditEvent> edits = env.addSource(new WikipediaEditsSource());

然后,我们指定要将流分成一分钟不重复的窗口:

edits
    // Non-overlapping one-minute windows
    .timeWindowAll(Time.minutes(1))

现在,我们可以定义一个自定义函数来处理每个一分钟窗口中的所有元素。要做到这一点,我们将使用apply方法并传递接口AllWindowFunction

edits
    .timeWindowAll(Time.minutes(1))
    .apply(new AllWindowFunction<WikipediaEditEvent, Tuple3<Date, Long, Long>, TimeWindow>() {
        @Override
        public void apply(TimeWindow timeWindow, Iterable<WikipediaEditEvent> iterable, Collector<Tuple3<Date, Long, Long>> collector) throws Exception {
            long count = 0;
            long bytesChanged = 0;
            // Count number of edits
            for (WikipediaEditEvent event : iterable) {
                count++;
                bytesChanged += event.getByteDiff();
            }
            // Output a number of edits and window's end time
            collector.collect(new Tuple3<>(new Date(timeWindow.getEnd()), count, bytesChanged));
        }
    })
    .print();

尽管有点长,但该方法非常简单。apply方法接收三个参数:

  • timeWindow:包含关于我们正在处理的窗口的信息。
  • iterable:单个窗口中元素的迭代器。
  • collector:可以用来将元素输出到结果流中的对象。

我们在这里所做的是计算多个更改,然后使用collector实例输出计算结果以及窗口的结束时间戳。

如果我们运行这个程序,我们将看到apply方法生成的条目打印到了输出流中:

1> (Wed Sep 27 12:58:00 IST 2017,62,62016)
2> (Wed Sep 27 12:59:00 IST 2017,82,12812)
3> (Wed Sep 27 13:00:00 IST 2017,89,45532)
4> (Wed Sep 27 13:01:00 IST 2017,79,11128)
5> (Wed Sep 27 13:02:00 IST 2017,82,26582)

键控流示例

现在,我们来看一个更复杂的例子。我们来计算一个用户每十分钟的间隔进行了多少次编辑。这可以帮助识别最活跃的用户或在系统中发现一些不寻常的活动。

当然,我们可以使用非键控流,迭代窗口中的所有元素,并使用一个字典来跟踪计数。但这种方法不利于推广,因为非键控流不可并行化。为了高效地使用Flink集群的资源,我们需要通过用户名键入我们的流,这将创建多个逻辑流,每个用户一个。

DataStream<WikipediaEditEvent> edits = env.addSource(new WikipediaEditsSource());
edits
    // Key by user name
    .keyBy((KeySelector<WikipediaEditEvent, String>) WikipediaEditEvent::getUser)
    // Ten-minute non-overlapping windows
    .timeWindow(Time.minutes(10))

唯一的区别是我们使用keyBy方法为我们的流指定一个键。在这里,我们简单地使用用户名作为分区键。

现在,当我们有一个键控流时,我们可以执行一个函数来处理每个窗口。和以前一样,我们将使用apply方法:

edits
    .keyBy((KeySelector<WikipediaEditEvent, String>) WikipediaEditEvent::getUser)
    .timeWindow(Time.minutes(10))
    .apply(new WindowFunction<WikipediaEditEvent, Tuple2<String, Long>, String, TimeWindow>() {
        @Override
        public void apply(String userName, TimeWindow timeWindow, Iterable<WikipediaEditEvent> iterable, Collector<Tuple2<String, Long>> collector) throws Exception {
            long changesCount = 0;
            // Count number of changes
            for (WikipediaEditEvent ignored : iterable) {
                changesCount++;
            }
            // Output user name and number of changes
            collector.collect(new Tuple2<>(userName, changesCount));
        }
    })
    .print();

这里一个比较大的区别是这个版本的apply方法有四个参数。额外的第一个参数为我们的函数正在处理的逻辑流指定一个键。

如果我们执行这个程序,我们将得到一个流,其中每个元素包含一个用户名和一个用户每10分钟执行的编辑次数:

...
5> (InternetArchiveBot,6)
1> (Francis Schonken,1)
6> (.30.124.210,1)
1> (MShabazz,1)
5> (Materialscientist,18)
1> (Aquaelfin,1)
6> (Cote d'Azur,2)
1> (Daniel Cavallari,3)
5> (00:1:F159:6D32:2578:A6F7:AB88:C8D,2)
...

正如你所看到的,今天有一些用户在维基百科上疯狂编辑!

这是一篇介绍性文章,还有更多有关Apache Flink的东西。我会在不久的将来写更多关于Flink的文章,敬请关注!

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 使用Apache Flink进行批处理入门教程

    原文地址:https://dzone.com/articles/getting-started-with-batch-processing-using-apac...

    大数据弄潮儿
  • Flink 使用Flink进行高吞吐,低延迟和Exactly-Once语义流处理

    在本文中,我们将深入探讨Flink新颖的检查点机制是如何工作的,以及它是如何取代旧架构以实现流容错和恢复。我们在各种类型的流处理应用程序上对Flink性能进行测...

    smartsi
  • Apache-Flink深度解析-概述

    Apache Flink 的命脉 "命脉" 即生命与血脉,常喻极为重要的事物。系列的首篇,首篇的首段不聊Apache Flink的历史,不聊Apache Fli...

    王知无-import_bigdata
  • 最新消息!Cloudera 全球发行版正式集成 Apache Flink

    摘要:近期 Cloudera Hadoop 大神 Arun 在 Twitter 上宣布 Cloudera Data Platform 正式集成了 Flink 作...

    Fayson
  • 实时即未来:Apache Flink实践(一)

    的确,实时这个名词在现代这个科技社会越来越重要,仅以此篇文章记录我的Apache Flink实践学习过程~

    星橙
  • Flink 和 Pulsar 的批流融合

    Apache Flink 和 Apache Pulsar 的开源数据技术框架可以以不同的方式融合,来提供大规模弹性数据处理。4 月 2 日,我司 CEO 郭斯杰...

    Spark学习技巧
  • Apache Flink初探

    Apache Flink 是一个开源的针对批量数据和流数据的处理引擎,已经发展为ASF的顶级项目之一。Flink 的核心是在数据流上提供了数据分发、通信、具备容...

    1001482
  • Linode Cloud中的大数据:使用Apache Storm进行流数据处理

    Apache Storm是一项大数据技术,使软件,数据和基础架构工程师能够实时处理高速,大容量数据并提取有用信息。任何涉及实时处理高速数据流的项目都可以从中受益...

    GongAo啊_
  • 最火的实时计算框架Flink和下一代分布式消息队列Pulsar的批流融合

    Apache Flink 和 Apache Pulsar 的开源数据技术框架可以以不同的方式融合,来提供大规模弹性数据处理。Flink Forward San ...

    王知无-import_bigdata
  • 最火的实时计算框架Flink和下一代分布式消息队列Pulsar的批流融合

    Apache Flink 和 Apache Pulsar 的开源数据技术框架可以以不同的方式融合,来提供大规模弹性数据处理。Flink Forward San ...

    大数据真好玩
  • 大数据框架—Flink与Beam

    Flink是Apache的一个顶级项目,Apache Flink 是一个开源的分布式流处理和批处理系统。Flink 的核心是在数据流上提供数据分发、通信、具备容...

    端碗吹水
  • ApacheFlink深度解析-FaultTolerance

    本系列文章来自云栖社区,对Flink的解析兼具广度和深度,适合对Flink有一定研究的同学学习。

    王知无-import_bigdata
  • 【Flink】Flink 运行架构及 Flink 流处理 API

    Flink 运行时架构主要包括四个不同的组件,它们会在运行流处理应用程序时协同工作:作业管理器(JobManager)、资源管理器(ResourceManage...

    魏晓蕾
  • Apache Flink基本编程模型

    “前一篇文章中<一文了解Flink数据-有界数据与无界数据>大致讲解了Apache Flink数据的形态问题。Apache Flink实现分布式集合数据集转换、...

    CainGao
  • 2021年大数据Flink(八):Flink入门案例

    Flink提供了多个层次的API供开发者使用,越往上抽象程度越高,使用起来越方便;越往下越底层,使用起来难度越大

    Lanson
  • Apache Flink Table Api&SQL 介绍与使用

    “ Apache Flink,Spark,Hadoop包括其他计算框架都趋向于使用SQL的方式对数据进行检索。很少再有通过代码的方式进行数据的操作。数据计算框架...

    CainGao
  • 11-时间戳和水印

    地址为:https://yq.aliyun.com/articles/666056?spm=a2c4e.11155435.0.0.106e1b10snGqMd

    王知无-import_bigdata
  • Flink未来-将与 Pulsar集成提供大规模的弹性数据处理

    问题导读 1.什么是Pulsar? 2.Pulsar都有哪些概念? 3.Pulsar有什么特点? 4.Flink未来如何与Pulsar整合? Apache ...

    用户1410343
  • Flink1.7发布中的新功能

    Apache Flink 社区正式宣布 Apache Flink 1.7.0 发布。最新版本包括解决了420多个问题以及令人兴奋的新增功能,我们将在本文进行描述...

    smartsi

扫码关注云+社区

领取腾讯云代金券