前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【flink1.7官方文档翻译】DataStream API Tutorial

【flink1.7官方文档翻译】DataStream API Tutorial

作者头像
皮皮熊
发布2019-01-25 11:56:52
1.4K0
发布2019-01-25 11:56:52
举报

DataStream API Tutorial

原文链接: https://ci.apache.org/projects/flink/flink-docs-release-1.7/tutorials/datastream_api.html#top

在本指南中,我们将从头开始,从设置Flink项目到在Flink集群上运行stream分析程序。

Wikipedia提供了一个记录了对Wiki所有编辑的IRC channel。我们将在Flink中读取此channel,并计算每个用户在给定时间窗口内编辑的字节数。这很容易在几分钟内使用Flink实现,但它将为您提供一个良好的基础,从而开始自己构建更复杂的分析程序。

Setting up a Maven Project

我们将使用Flink Maven Archetype来创建我们的项目结构。有关此内容的更多详细信息,请参阅Java API快速入门。我们可以这样运行命令:

代码语言:txt
复制
$ mvn archetype:generate \
    -DarchetypeGroupId=org.apache.flink \
    -DarchetypeArtifactId=flink-quickstart-java \
    -DarchetypeVersion=1.7.0 \
    -DgroupId=wiki-edits \
    -DartifactId=wiki-edits \
    -Dversion=0.1 \
    -Dpackage=wikiedits \
    -DinteractiveMode=false

如果您愿意,可以编辑groupId,artifactId和package。使用上面的参数,Maven将创建一个如下所示的项目结构:

代码语言:txt
复制
$ tree wiki-edits
wiki-edits/
├── pom.xml
└── src
    └── main
        ├── java
        │   └── wikiedits
        │       ├── BatchJob.java
        │       └── StreamingJob.java
        └── resources
            └── log4j.properties

我们的pom.xml文件已经在根目录中添加了Flink依赖项,并在src / main / java中添加了几个示例Flink程序。我们可以删除示例程序,因为我们将从头开始:

代码语言:txt
复制
$ rm wiki-edits/src/main/java/wikiedits/*.java

作为最后一步,我们需要将Flink Wikipedia连接器添加为依赖关系,以便我们可以在我们的程序中使用它。编辑pom.xml的依赖项部分,使其如下所示:

代码语言:txt
复制
<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-wikiedits_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
</dependencies>

请注意添加的flink-connector-wikiedits_2.11依赖项。 (此示例和Wikipedia连接器的灵感来自Apache Samza的Hello Samza示例。)

Writing a Flink Program

到了code时间。启动喜欢的IDE并导入Maven项目或打开文本编辑器并创建文件src/main/java/wikiedits/WikipediaAnalysis.java:

代码语言:txt
复制
package wikiedits;

public class WikipediaAnalysis {

    public static void main(String[] args) throws Exception {

    }
}

该Program现在非常基础,但接下来我们会尽力填写。请注意,我不会在此处提供import语句,因为IDE可以自动添加它们。在本节结束时,如果您只想跳过并在编辑器中输入,我将展示带import的完整代码。

Flink程序的第一步是创建StreamExecutionEnvironment(如果您正在编写批处理作业,则创建ExecutionEnvironment)。这可用于设置执行参数并创建从外部系统读取的源。所以让我们把它添加到main方法:

代码语言:txt
复制
StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();

接下来,我们将创建一个从Wikipedia IRC日志中读取的source:

代码语言:txt
复制
DataStream<WikipediaEditEvent> edits = see.addSource(new WikipediaEditsSource());

这将创建一个我们可以进一步处理的WikipediaEditEvent元素的DataStream。出于本示例的目的,我们确定每个用户在特定时间窗口中添加或删除的字节数,比如说五秒。为此,我们首先要指定我们要在用户名上键入流,也就是说此流上的操作应考虑用户名。在我们的例子中,窗口中编辑的字节的总和应该是每个唯一的用户。为了输入一个Stream,我们必须提供一个KeySelector,如下所示:

代码语言:txt
复制
KeyedStream<WikipediaEditEvent, String> keyedEdits = edits
    .keyBy(new KeySelector<WikipediaEditEvent, String>() {
        @Override
        public String getKey(WikipediaEditEvent event) {
            return event.getUser();
        }
    });

这提供了一个有String Key(即用户名)的WikipediaEditEvent流。我们现在可以指定此流上的窗口,并根据这些窗口中的元素计算结果。窗口指定要在其上执行计算的Stream切片。在无限的元素流上计算聚合时需要Windows。在我们的例子中,我们将每五秒聚合一次编辑的字节总和:

代码语言:txt
复制
DataStream<Tuple2<String, Long>> result = keyedEdits
    .timeWindow(Time.seconds(5))
    .fold(new Tuple2<>("", 0L), new FoldFunction<WikipediaEditEvent, Tuple2<String, Long>>() {
        @Override
        public Tuple2<String, Long> fold(Tuple2<String, Long> acc, WikipediaEditEvent event) {
            acc.f0 = event.getUser();
            acc.f1 += event.getByteDiff();
            return acc;
        }
    });

第一个调用.timeWindow()指定五秒钟的tumbling(非重叠)窗口。第二个调用为每个唯一键指定每个窗口切片的折叠变换。在我们的例子中,我们从初始值("",0L)开始,并在该时间窗口中为用户添加每个编辑的字节差异。生成的Stream包含每隔五秒生成的每个用户的Tuple2 <String,Long>。

剩下要做的就是将流打印到控制台并开始执行:

代码语言:txt
复制
result.print();

see.execute();

最后一次调用是启动实际Flink工作所必需的。所有操作(例如创建源,转换和接收器)仅用于构建内部运行的流程(graph)。只有在调用execute()时,才会在集群上抛出或在本地计算机上执行此执行流程(graph)。

代码语言:txt
复制
package wikiedits;

import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditEvent;
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSource;

public class WikipediaAnalysis {

  public static void main(String[] args) throws Exception {

    StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();

    DataStream<WikipediaEditEvent> edits = see.addSource(new WikipediaEditsSource());

    KeyedStream<WikipediaEditEvent, String> keyedEdits = edits
      .keyBy(new KeySelector<WikipediaEditEvent, String>() {
        @Override
        public String getKey(WikipediaEditEvent event) {
          return event.getUser();
        }
      });

    DataStream<Tuple2<String, Long>> result = keyedEdits
      .timeWindow(Time.seconds(5))
      .fold(new Tuple2<>("", 0L), new FoldFunction<WikipediaEditEvent, Tuple2<String, Long>>() {
        @Override
        public Tuple2<String, Long> fold(Tuple2<String, Long> acc, WikipediaEditEvent event) {
          acc.f0 = event.getUser();
          acc.f1 += event.getByteDiff();
          return acc;
        }
      });

    result.print();

    see.execute();
  }
}

我们可以在ide或者命令行中使用maven运行这个例子:

代码语言:txt
复制
$ mvn clean package
$ mvn exec:java -Dexec.mainClass=wikiedits.WikipediaAnalysis

第一个命令构建我们的项目,第二个项目执行我们的主类。输出结果类似:

代码语言:txt
复制
1> (Fenix down,114)
6> (AnomieBOT,155)
8> (BD2412bot,-3690)
7> (IgnorantArmies,49)
3> (Ckh3111,69)
5> (Slade360,0)
7> (Narutolovehinata5,2195)
6> (Vuyisa2001,79)
4> (Ms Sarah Welch,269)
4> (KasparBot,-245)

每行前面的数字是指来自print sink 的哪个并行实例。

这个可以帮助你开始编写自己的Flink程序。想要了解更多,可以参考我们的概念指南和DataStream API。如果想要了解如何在自己的机器上面配置Flink集群并写入数据到kafka,可以参考接下来的额外练习。

Bonus Exercise: Running on a Cluster and Writing to Kafka(额外练习:将任务运行在一个集群上并将数据写到kafka)

接下来请参考本地安装教程在本机安装一个flink集群,并按照Kafka quickstart部署kafka

第一步我们需要把Flink Kafka connector加到依赖里面所以我们能使用Kafka sink。将以下部分添加到pom.xml的依赖部分:

代码语言:txt
复制
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
    <version>${flink.version}</version>
</dependency>

接下来我们需要修改我们的程序。我们用kafka sink替代print() sink。新的代码:

代码语言:txt
复制
result
    .map(new MapFunction<Tuple2<String,Long>, String>() {
        @Override
        public String map(Tuple2<String, Long> tuple) {
            return tuple.toString();
        }
    })
    .addSink(new FlinkKafkaProducer011<>("localhost:9092", "wiki-result", new SimpleStringSchema()));

需要导入如下的类:

代码语言:txt
复制
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.functions.MapFunction;

注意,我们需要用MapFunction将Tuple2<String, Long>流转换成一个字符串流。我们这样做是因为将纯文本字符串写到kafka里面是更方便的。接下来我们创建一个kafka的sink。你需要将hostname和port改成你安装的值。"wiki-result" 就是我们运行程序前,要创建的Kafka流的名字。用maven构建这个项目,因为我们需要在集群运行这个jar文件:

代码语言:txt
复制
$ mvn clean package

结果jar文件将会出现在目标的子文件夹下:target/wiki-edits-0.1.ja。我们稍后将使用它。

我们已经准备好启动一个flink集群并运行程序将数据写到kafka里面。来到我们安装flink的地方并启动一个本地的集群。

代码语言:txt
复制
$ cd my/flink/directory
$ bin/start-cluster.sh

我们还需要创建一个kafka topic,这样我们的程序可以往里面写入数据:

代码语言:txt
复制
$ cd my/kafka/directory
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic wiki-results

现在我们准备在我们的本地flink集群运行我们的jar文件了:

代码语言:txt
复制
$ cd my/flink/directory
$ bin/flink run -c wikiedits.WikipediaAnalysis path/to/wikiedits-0.1.jar

如果所有事情按计划进行的话,命令行输出的结果类似于:

代码语言:txt
复制
03/08/2016 15:09:27 Job execution switched to status RUNNING.
03/08/2016 15:09:27 Source: Custom Source(1/1) switched to SCHEDULED
03/08/2016 15:09:27 Source: Custom Source(1/1) switched to DEPLOYING
03/08/2016 15:09:27 TriggerWindow(TumblingProcessingTimeWindows(5000), FoldingStateDescriptor{name=window-contents, defaultValue=(,0), serializer=null}, ProcessingTimeTrigger(), WindowedStream.fold(WindowedStream.java:207)) -> Map -> Sink: Unnamed(1/1) switched to SCHEDULED
03/08/2016 15:09:27 TriggerWindow(TumblingProcessingTimeWindows(5000), FoldingStateDescriptor{name=window-contents, defaultValue=(,0), serializer=null}, ProcessingTimeTrigger(), WindowedStream.fold(WindowedStream.java:207)) -> Map -> Sink: Unnamed(1/1) switched to DEPLOYING
03/08/2016 15:09:27 TriggerWindow(TumblingProcessingTimeWindows(5000), FoldingStateDescriptor{name=window-contents, defaultValue=(,0), serializer=null}, ProcessingTimeTrigger(), WindowedStream.fold(WindowedStream.java:207)) -> Map -> Sink: Unnamed(1/1) switched to RUNNING
03/08/2016 15:09:27 Source: Custom Source(1/1) switched to RUNNING

你可以看到每个独立运行的操作。因为性能原因窗口折叠成了一个操作,所以我们只能看到2个操作。在flink里面我们把它称之为chaining。

我们可以通过使用kafka的console consumer从kafka topic里面观察程序的输出:

代码语言:txt
复制
bin/kafka-console-consumer.sh  --zookeeper localhost:2181 --topic wiki-result

我们同样可以访问 http://localhost:8081来检查flink的dashboard。你可以获取你集群资源和运行job的概览。

如果你点击了当前运行的job则进入一个视图来检查独立操作,如处理对象的数目。

本文系外文翻译,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文系外文翻译前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Setting up a Maven Project
  • Writing a Flink Program
  • Bonus Exercise: Running on a Cluster and Writing to Kafka(额外练习:将任务运行在一个集群上并将数据写到kafka)
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档