Apache Flink初探

1. Apache Flink的简介

Apache Flink是一个开源的针对批量数据和流数据的处理引擎,已经发展为ASF的顶级项目之一。Flink 的核心是在数据流上提供了数据分发、通信、具备容错的分布式计算。同时,Flink 在流处理引擎上构建了批处理引擎,原生支持了迭代计算、内存管理和程序优化。

Flink的技术栈:

Flink的主要API:

  • DataSet API, 对静态数据进行批处理操作,将静态数据抽象成分布式的数据集,用户可以方便地使用Flink提供的各种操作符对分布式数据集进行处理,支持Java、Scala和Python。
  • DataStream API,对数据流进行流处理操作,将流式的数据抽象成分布式的数据流,用户可以方便地对分布式数据流进行各种操作,支持Java和Scala。
  • Table API,对结构化数据进行查询操作,将结构化数据抽象成关系表,并通过类SQL的DSL对关系表进行各种查询操作,支持Java和Scala。

此外,Flink还针对特定的应用领域提供了领域库,例如:

  • Flink ML,Flink的机器学习库,提供了机器学习Pipelines API并实现了多种机器学习算法。
  • Gelly,Flink的图计算库,提供了图计算的相关API及多种图计算算法实现。

Flink的部署方式:

  • 本地模式
  • 集群模式或yarn集群
  • 云集群部署

另外,Flink也可以方便地和Hadoop生态圈中其他项目集成,例如Flink可以读取存储在HDFS或HBase中的静态数据,以Kafka作为流式的数据源,直接重用MapReduce或Storm代码,或是通过YARN申请集群资源等。

2. Apache Flink的架构

当Flink集群启动后,首先会启动一个JobManger和一个或多个的 TaskManager。由Client提交任务给JobManager,JobManager再调度任务到各个TaskManager去执行,然后TaskManager将心跳和统计信息汇报给 JobManager。TaskManager之间以流的形式进行数据的传输。上述三者均为独立的JVM进程。

  • Client:提交Job的客户端,可以是运行在任何机器上(与JobManager环境连通即可)
  • JobManager:Flink系统的协调者,负责任务的排定分配、快照协调、失败恢复控制等,有三种部署模式:单机、一主多备集群、Yarn集群
  • TaskManger:负责具体数据分析任务的执行,主要有业务数据的计算、传输等,相对于Storm的Worker把内存交给jvm管理,Flink的TaskManager还自己管理了部分内存
    • TaskSlot:运行TaskManager中固定大小的资源子集,一个TaskManager中有多少个TaskSlot意味着可以执行多少个Task
  • Task:执行组件,即业务计算的执行实体

3. Flink运行例子

使用Flink的自带例子: flink-stream-examples/WordCount,这是一个从字符串数组读取句子计算每个单词出现次数的例子。

1、启动flink bin/start-local.sh

2、运行WordCount bin/flink run examples/streaming/WordCount.jar

3、执行完之后查看统计结果 cat log/flink-$USER-jobmanager-0-$USER-VirtualBox.out

4、那么我们访问localhost:8081可以查看到此job的执行计划

  • Source:Collection Source:收据数据源,当前是从字符串数数组里面读取
  • Flat Map:把每一条句子分隔成一个个的单词,设置每个单词的出现次数为1,并提交到下游
  • Keyed Aggregation:对每个单词进行聚合统计,统计每个单词的出现次数
  • Sink Unamed:输出统计结果

WordCount代码:

public static void main(String[] args) throws Exception {

        // Checking input parameters
        final ParameterTool params = ParameterTool.fromArgs(args);

        // set up the execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // make parameters available in the web interface
        env.getConfig().setGlobalJobParameters(params);

        // get input data
        DataStream<String> text;
        if (params.has("input")) {
            // read the text file from given input path
            text = env.readTextFile(params.get("input"));
        } else {
            System.out.println("Executing WordCount example with default input data set.");
            System.out.println("Use --input to specify file input.");
            // get default test text data
            text = env.fromElements(WordCountData.WORDS);
        }

        DataStream<Tuple2<String, Integer>> counts =
        // split up the lines in pairs (2-tuples) containing: (word,1)
        text.flatMap(new Tokenizer()).setParallelism(2)
        // group by the tuple field "0" and sum up tuple field "1"
                .keyBy(0).sum(1).setParallelism(2);

        // emit result
        if (params.has("output")) {
            counts.writeAsText(params.get("output"));
        } else {
            System.out.println("Printing result to stdout. Use --output to specify output path.");
            counts.print();
        }

        // execute program
        env.execute("Streaming WordCount");
    }

4. Apache Flink兼容Apache Storm

考虑到业界当前主流的流式处理引擎为Apache Storm,Flink为了更好的与业界衔接,在流处理上对Storm是做了兼容,通过复用代码的方式即可实现Storm在Flink运行环境的执行,这个也大大降低了Storm使用者过渡到Flink的难度;同理Flink也可以运行我们数平的JStorm。

1、先来对比一下Apache Flink 与Apache Storm的异同:

  • 与Apache Storm相比,Apache Flink少了一层节点管理器,TaskManager直接由主控节点管理
  • 在流处理这一块,Apache Flink与Apache Storm从运行实体到任务组件,基本上能一一对应

2、由上可得,虽然两者运行实体的结构及代码有一定的差别,但归根到底两者运行的都是有向无环图(DAG),所以从Storm的Topology相关类转换成Flink执行的DataStream相关类是可以作转换的。 以下是粗略的转换过程:Storm Topology -> Flink Topology -> DataStream StreamGraph

3、举个例子:已有WordCountTopology,需要提交到Flink集群,那么只需下面几行代码:

final TopologyBuilder builder = WordCountTopology.buildTopology();//构造storm的topology
Map conf = new HashMap();
conf.put("nimbus.host", xxxx);//optional,master server
conf.put("nimbus.thrift.port", xxxx);//optional, master server port
FlinkSubmitter.submitTopology(topologyId, conf, FlinkTopology.createTopology(bgy转换成FlinkTopology再提交

原创声明,本文系作者授权云+社区-专栏发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

编辑于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Python小屋

Python大数据处理扩展库pySpark用法精要

Spark是一个开源的、通用的并行计算与分布式计算框架,其活跃度在Apache基金会所有开源项目中排第三位,最大特点是基于内存计算,适合迭代计算,兼容多种应用场...

3355
来自专栏华章科技

SparkR:数据科学家的新利器

摘要:R是数据科学家中最流行的编程语言和环境之一,在Spark中加入对R的支持是社区中较受关注的话题。作为增强Spark对数据科学家群体吸引力的最新举措,最近发...

522
来自专栏一枝花算不算浪漫

[Spring框架]Spring 事务管理基础入门总结.

3889
来自专栏小鹏的专栏

堆排序

堆排序排序是优秀的算法,但是在实际应用中,快速排序的性能一般会优于堆排序, 尽管如此,堆排序仍然有很多应用,例如:作为高效的优先队列,最大优先队列应用于共享计算...

1886
来自专栏牛肉圆粉不加葱

[Spark源码剖析]Spark 延迟调度策略

在 Spark 中,若 task 与其输入数据在同一个 jvm 中,我们称 task 的本地性为 PROCESS_LOCAL,这种本地性(locality le...

753
来自专栏Albert陈凯

Spark系列课程-0020Spark RDD图例讲解

我们从这节课开始,讲Spark的内核,英文叫做Spark Core,在讲Spark Core之前我们先讲一个重要的概念,RDD, ? image.png 我们S...

2477
来自专栏DOTNET

Entity Framework——性能测试

内容提要 一、对EF框架的性能测试 增、删、改,查测试及性能优化 二、使用sql执行 增、删、改,查测试 三、对以上两种方式对比分析 一 对EF框架的测试 1...

4286
来自专栏个人分享

Spark RDD简介与运行机制概述

主要分为三部分:创建RDD对象,DAG调度器创建执行计划,Task调度器分配任务并调度Worker开始运行。

794
来自专栏个人分享

Spark on Yarn年度知识整理

Spark是整个BDAS的核心组件,是一个大数据分布式编程框架,不仅实现了MapReduce的算子map 函数和reduce函数及计算模型,还提供更为丰富的算子...

592
来自专栏服务端技术杂谈

JAVA NIO内存泄漏

前言 写NIO程序时,经常使用ByteBuffer来读取写入数据,那使用ByteBuffer.allocate()还是ByteBuffer.allocateDi...

2748

扫码关注云+社区