前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >使用Apache Flink进行流处理

使用Apache Flink进行流处理

作者头像
此中剑无涯
发布2018-06-04 16:37:40
3.8K4
发布2018-06-04 16:37:40

如果在你的脑海里,“Apache Flink”和“流处理”没有很强的联系,那么你可能最近没有看新闻。Apache Flink已经席卷全球大数据领域。现在正是这样的工具蓬勃发展的绝佳机会:流处理在数据处理中变得越来越流行,Apache Flink引入了许多重要的创新。

在本文中,我将演示如何使用Apache Flink编写流处理算法。我们将读取维基百科的编辑流,并将了解如何从中获得一些有意义的数据。在这个过程中,您将看到如何读写流数据,如何执行简单的操作以及如何实现更复杂一点的算法。

入门

我相信,如果您是Apache Flink新手,最好从学习批处理开始,因为它更简单,并能为您学习流处理提供一个坚实的基础。我已经写了一篇介绍性的博客文章,介绍如何使用Apache Flink 进行批处理,我建议您先阅读它。

如果您已经知道如何在Apache Flink中使用批处理,那么流处理对您来说没有太多惊喜。和以前一样,我们将看看应用程序中的三个不同的阶段:从源中读取数据,处理数据以及将数据写入外部系统。

与批处理相比,这几乎没有显着差异。首先,在批处理中,所有数据都被提前准备好。当处理进程在运行时,即使有新的数据到达我们也不会处理它。

不过,在流处理方面有所不同。我们在生成数据时会读取数据,而我们需要处理的数据流可能是无限的。采用这种方法,我们几乎可以实时处理传入数据。

在流模式下,Flink将读取数据并将数据写入不同的系统,包括Apache Kafka,Rabbit MQ等基本上可以产生和使用稳定数据流的系统。需要注意的是,我们也可以从HDFS或S3读取数据。在这种情况下,Apache Flink会不断监视一个文件夹,并在文件生成时处理它们。

以下是我们如何在流模式下从文件中读取数据:

代码语言:txt
复制
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> stream = env.readTextFile("file/path");

请注意,要使用流处理,我们需要使用StreamExecutionEnvironment类而不是ExecutionEnvironment类。此外,读取数据的方法会返回一个稍后将用于数据处理的DataStream类的实例。

我们也可以像批处理案例中那样从集合或数组创建有限流:

代码语言:txt
复制
DataStream<Integer> numbers = env.fromCollection(Arrays.asList(1, 2, 3, 4, 5 6);
DataStream<Integer> numbers = env.fromElements(1, 2, 3, 4, 5);

简单的数据处理

对于处理流中的一个流项目,Flink提供给操作员一些类似批处理的操作如map, filtermapReduce

让我们来实现我们的第一个流处理示例。我们将阅读一个维基百科的编辑流并显示我们感兴趣的内容。

首先,要阅读编辑流,我们需要使用WikipediaEditsSource

代码语言:txt
复制
DataStream<WikipediaEditEvent> edits = env.addSource(new WikipediaEditsSource());

要使用它,我们需要调用用于从Kafka,Kinesis,RabbitMQ等源中读取数据的方法addSource。此方法返回我们当前可以处理的编辑流。

让我们来筛选所有非机器生成并且已经改变了一千多个字节的编辑:

代码语言:txt
复制
edits.filter((FilterFunction<WikipediaEditEvent>) edit -> {
    return !edit.isBotEdit() && edit.getByteDiff() > 1000;
})
.print();

这与在批处理情况下如何使用filter方法非常相似,唯一的不同是它处理的是无限流。

现在最后一步是运行我们的程序。像以前一样,我们需要调用执行方法execute

代码语言:txt
复制
env.execute()

该程序将开始打印筛选的Wikipedia编辑,直到我们停止它:

代码语言:txt
复制
2> WikipediaEditEvent{timestamp=1506499898043, channel='#en.wikipedia', title='17 FIBA Womens Melanesia Basketball Cup', diffUrl='https://en.wikipedia.org/w/index.php?diff=802608251&oldid=802520770', user='Malto15', byteDiff=1853, summary='/* Preliminary round */', flags=0}
7> WikipediaEditEvent{timestamp=1506499911216, channel='#en.wikipedia', title='User:MusikBot/StaleDrafts/Report', diffUrl='https://en.wikipedia.org/w/index.php?diff=802608262&oldid=802459885', user='MusikBot', byteDiff=11674, summary='Reporting 142 stale non-AfC drafts', flags=0}
...

流窗口

请注意,到目前为止,我们已经讨论过的所有方法都是针对流中的各个元素进行的。看上去我们不可能使用这些简单的操作来实现出许多有趣的流算法。仅使用它们不可能实现以下用例:

  • 计算每分钟执行的编辑次数。
  • 计算每十分钟每个用户执行的编辑次数。

很明显,要解决这些问题,我们需要处理一组元素。这是流窗口的用途。

简而言之,流窗口允许我们对流中的元素进行分组,并对每个组执行用户自定义的功能。这个用户自定义函数可以返回零个,一个或多个元素,并以这种方式创建一个新的流,我们可以在一个独立的系统中处理或存储它。

我们如何将流中的元素分组?Flink提供了几个选项来执行此操作:

  • 滚动窗口:在流中创建不重叠的相邻窗口。我们可以按时间对元素进行分组(例如,从10:00到10:05的所有元素分为一个组)或计数(前50个元素进入单独的组)。比如,我们可以使用它来解决一个问题,例如“对流中的多个元素进行非重复五分钟间隔计数”。
  • 滑动窗口:与滚动窗口类似,但在这里,窗口可以重叠。如果我们需要计算最近五分钟的指标,我们可以使用它,但我们希望每分钟显示一次输出。
  • 会话窗口:在这种情况下,Flink将彼此时间上邻近的事件分组。
  • 全局窗口:在这种情况下,Flink将所有元素放到一个窗口中。这仅在我们定义一个窗口何时完成的自定义触发器时是有用的。

除了选择如何将元素分配给不同的窗口,我们还需要选择一个流类型。Flink有两种流类型:

  1. 键控流:使用此流类型,Flink将通过键(例如,进行编辑的用户的名称)将单个流划分为多个独立的流。当我们在键控流中处理窗口时,我们定义的函数只能访问具有相同键的项目。但使用多个独立的流时Flink可以进行并行工作。
  2. 非键控流:在这种情况下,流中的所有元素将被一起处理,我们的用户自定义函数将访问流中所有元素。这种流类型的缺点是它不提供并行性,集群中只能有一台机器执行我们的代码。

现在,让我们使用流窗口来进行一些演示。首先,让我们来看看维基百科每分钟执行多少次编辑。首先,我们需要读取编辑流:

代码语言:txt
复制
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<WikipediaEditEvent> edits = env.addSource(new WikipediaEditsSource());

然后,我们指定要将流分成一分钟不重复的窗口:

代码语言:txt
复制
edits
    // Non-overlapping one-minute windows
    .timeWindowAll(Time.minutes(1))

现在,我们可以定义一个自定义函数来处理每个一分钟窗口中的所有元素。要做到这一点,我们将使用apply方法并传递接口AllWindowFunction

代码语言:txt
复制
edits
    .timeWindowAll(Time.minutes(1))
    .apply(new AllWindowFunction<WikipediaEditEvent, Tuple3<Date, Long, Long>, TimeWindow>() {
        @Override
        public void apply(TimeWindow timeWindow, Iterable<WikipediaEditEvent> iterable, Collector<Tuple3<Date, Long, Long>> collector) throws Exception {
            long count = 0;
            long bytesChanged = 0;
            // Count number of edits
            for (WikipediaEditEvent event : iterable) {
                count++;
                bytesChanged += event.getByteDiff();
            }
            // Output a number of edits and window's end time
            collector.collect(new Tuple3<>(new Date(timeWindow.getEnd()), count, bytesChanged));
        }
    })
    .print();

尽管有点长,但该方法非常简单。apply方法接收三个参数:

  • timeWindow:包含关于我们正在处理的窗口的信息。
  • iterable:单个窗口中元素的迭代器。
  • collector:可以用来将元素输出到结果流中的对象。

我们在这里所做的是计算多个更改,然后使用collector实例输出计算结果以及窗口的结束时间戳。

如果我们运行这个程序,我们将看到apply方法生成的条目打印到了输出流中:

代码语言:txt
复制
1> (Wed Sep 27 12:58:00 IST 2017,62,62016)
2> (Wed Sep 27 12:59:00 IST 2017,82,12812)
3> (Wed Sep 27 13:00:00 IST 2017,89,45532)
4> (Wed Sep 27 13:01:00 IST 2017,79,11128)
5> (Wed Sep 27 13:02:00 IST 2017,82,26582)

键控流示例

现在,我们来看一个更复杂的例子。我们来计算一个用户每十分钟的间隔进行了多少次编辑。这可以帮助识别最活跃的用户或在系统中发现一些不寻常的活动。

当然,我们可以使用非键控流,迭代窗口中的所有元素,并使用一个字典来跟踪计数。但这种方法不利于推广,因为非键控流不可并行化。为了高效地使用Flink集群的资源,我们需要通过用户名键入我们的流,这将创建多个逻辑流,每个用户一个。

代码语言:txt
复制
DataStream<WikipediaEditEvent> edits = env.addSource(new WikipediaEditsSource());
edits
    // Key by user name
    .keyBy((KeySelector<WikipediaEditEvent, String>) WikipediaEditEvent::getUser)
    // Ten-minute non-overlapping windows
    .timeWindow(Time.minutes(10))

唯一的区别是我们使用keyBy方法为我们的流指定一个键。在这里,我们简单地使用用户名作为分区键。

现在,当我们有一个键控流时,我们可以执行一个函数来处理每个窗口。和以前一样,我们将使用apply方法:

代码语言:txt
复制
edits
    .keyBy((KeySelector<WikipediaEditEvent, String>) WikipediaEditEvent::getUser)
    .timeWindow(Time.minutes(10))
    .apply(new WindowFunction<WikipediaEditEvent, Tuple2<String, Long>, String, TimeWindow>() {
        @Override
        public void apply(String userName, TimeWindow timeWindow, Iterable<WikipediaEditEvent> iterable, Collector<Tuple2<String, Long>> collector) throws Exception {
            long changesCount = 0;
            // Count number of changes
            for (WikipediaEditEvent ignored : iterable) {
                changesCount++;
            }
            // Output user name and number of changes
            collector.collect(new Tuple2<>(userName, changesCount));
        }
    })
    .print();

这里一个比较大的区别是这个版本的apply方法有四个参数。额外的第一个参数为我们的函数正在处理的逻辑流指定一个键。

如果我们执行这个程序,我们将得到一个流,其中每个元素包含一个用户名和一个用户每10分钟执行的编辑次数:

代码语言:txt
复制
...
5> (InternetArchiveBot,6)
1> (Francis Schonken,1)
6> (.30.124.210,1)
1> (MShabazz,1)
5> (Materialscientist,18)
1> (Aquaelfin,1)
6> (Cote d'Azur,2)
1> (Daniel Cavallari,3)
5> (00:1:F159:6D32:2578:A6F7:AB88:C8D,2)
...

正如你所看到的,今天有一些用户在维基百科上疯狂编辑!

这是一篇介绍性文章,还有更多有关Apache Flink的东西。我会在不久的将来写更多关于Flink的文章,敬请关注!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 入门
  • 简单的数据处理
  • 流窗口
  • 键控流示例
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档