前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >收藏|Flink比Spark好在哪?

收藏|Flink比Spark好在哪?

作者头像
数据社
发布2020-10-09 15:33:12
1K0
发布2020-10-09 15:33:12
举报
文章被收录于专栏:数据社数据社

1 Flink介绍

Flink 是一个面向分布式数据流处理和批量数据处理的开源计算平台。和 Spark 类似,两者都希望提供一个统一功能的计算平台给用户,都在尝试建立一个统一的平台以运行批量,流式,交互式,图处理,机器学习等应用。

1.1部署模式

Flink 集群的部署,本身不依赖 Hadoop 集群,如果用到 HDFS 或是 HBase 中的存储数据,就需要选择对应的 Hadoop 版本。

  • Standalone
  • YARN
  • Mesos
  • Cloud

1.2整合支持

  1. Flink支持消费kafka的数据;
  2. 支持HBase,Cassandra, ElasticSearch
  3. 支持与Alluxio的整合
  4. 支持RabbitMQ

1.3 API支持

  • 对Streaming数据类应用,提供DataStream API
  • 对批处理类应用,提供DataSet API(支持Java/Scala)
  • 对流处理和批处理,都支持Table API
  • 支持双流join

1.4 Libraries支持

  • 支持机器学习(FlinkML)
  • 支持图分析(Gelly)
  • 支持关系数据处理(Table)
  • 支持复杂事件处理(CEP)

1.5 Flink on YARN

Flink提供两种Yarn的部署方式Yarn Setup:

Start a long-running Flink cluster on YARN

  • 通过命令yarn-session.sh来实现,本质上是在yarn集群上启动一个flink集群。
  • 由yarn预先给flink集群分配若干个container给flink使用,在yarn的界面上只能看到一个Flink session with X TaskManagers的任务。
  • 只有一个Flink界面,可以从Yarn的ApplicationMaster链接进入。
  • 使用bin/flink run命令发布任务时,本质上是使用Flink自带的调度,与普通的在Flink集群上发布任务并没有不同。不同的任务可能在一个TaskManager中,也即是在一个JVM进程中,无法实现资源隔离。

Run a Flink job on YARN

  • 通过命令bin/flink run -m yarn-cluster实现,一次只发布一个任务,本质上给每个flink任务启动了一个集群。
  • yarn不事先给flink分配container,而是在任务发布时,启动JobManager(对应Yarn的AM)和TaskManager,如果一个任务指定了n个TaksManager(-yn n),则会启动n+1个Container,其中一个是JobManager。
  • 发布m个应用,则有m个Flink界面,对比方式一,同样发布m个应用,会多出m-1个JobManager的。
  • 发布任务时,实际上是使用了Yarn的调用。不同的任务不可能在一个Container(JVM)中,也即是实现了资源隔离。

以第一种启动方式为例,其主要启动流程如下:

首先我们通过下面的命令行启动flink on yarn的集群 这里将产生总共五个进程:

  • 1个FlinkYarnSessionCli ---> Yarn Client
  • 1个YarnApplicationMasterRunner ---> AM + JobManager
  • 3个YarnTaskManager --> TaskManager

即一个客户端+4个container,1个container启动AM,3个container启动TaskManager。

yarn-session.sh支持的参数:

一个Flink环境在YARN上的启动流程:

  1. FlinkYarnSessionCli 启动的过程中首先会检查Yarn上有没有足够的资源去启动所需要的container,如果有,则上传一些flink的jar和配置文件到HDFS,这里主要是启动AM进程和TaskManager进程的相关依赖jar包和配置文件。
  2. 接着yarn client会首先向RM申请一个container来启动 ApplicationMaster(YarnApplicationMasterRunner进程),然后RM会通知其中一个NM启动这个container,被分配到启动AM的NM会首先去HDFS上下载第一步上传的jar包和配置文件到本地,接着启动AM;在这个过程中会启动JobManager,因为JobManager和AM在同一进程里面,它会把JobManager的地址重新作为一个文件上传到HDFS上去,TaskManager在启动的过程中也会去下载这个文件获取JobManager的地址,然后与其进行通信;AM还负责Flink的web 服务,Flink里面用到的都是随机端口,这样就允许了用户能够启动多个yarn session。

从这个启动过程中可以看出,在每次启动Flink on YARN之前,需要指定启动多少个TaskManager,每个taskManager分配的资源是固定的,也就是说这个资源量从taskManager出生到死亡,资源情况一直是这么多,不管它所承载的作业需求资源情况,这样在作业需要更多资源的时候,没有更多的资源分配给对应的作业,相反,当一个作业仅需要很少的资源就能够运行的时候,仍然分配的是那些固定的资源,造成资源的浪费。

用户实现的Flink程序是由Stream和Transformation这两个基本构建块组成,其中Stream是一个中间结果数据,而Transformation是一个操作,它对一个或多个输入Stream进行计算处理,输出一个或多个结果Stream。当一个Flink程序被执行的时候,它会被映射为Streaming Dataflow。一个Streaming Dataflow是由一组Stream和Transformation Operator组成,它类似于一个DAG图,在启动的时候从一个或多个Source Operator开始,结束于一个或多个Sink Operator。

下面是一个由Flink程序映射为Streaming Dataflow的示意图,如下所示:

FlinkKafkaConsumer是一个Source Operator,map、keyBy、timeWindow、apply是Transformation Operator,RollingSink是一个Sink Operator。

1.6 CEP(Complex event processing)

Flink CEP 是一套极具通用性、易于使用的实时流式事件处理方案。作为 Flink 的原生组件,省去了第三方库与 Flink 配合使用时可能会导致的各种问题。但其功能现阶段看来还比较基础,不能表达复杂的业务场景,同时它不能够做到动态更新。

代码语言:javascript
复制
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();  
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);  
        // (Event, timestamp)  
        DataStream<Event> input = env.fromElements(  
            Tuple2.of(new Event(1, "start", 1.0), 5L),  
            Tuple2.of(new Event(2, "middle", 2.0), 1L),  
            Tuple2.of(new Event(3, "end", 3.0), 3L),  
            Tuple2.of(new Event(4, "end", 4.0), 10L),  
            Tuple2.of(new Event(5, "middle", 6.0), 7L),  
            Tuple2.of(new Event(6, "middle", 5.0), 7L),  
            // last element for high final watermark  
            Tuple2.of(new Event(7, "middle", 5.0), 100L)  
        ).assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Tuple2<Event, Long>>() { 

具体的业务逻辑

代码语言:javascript
复制
Pattern<Event, ? extends Event> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {  
  
            @Override  
            public boolean filter(Event value) throws Exception {  
                return value.getName().equals("start");  
            }  
        }).followedByAny("middle").where(new SimpleCondition<Event>() {   
            @Override  
            public boolean filter(Event value) throws Exception {  
                return value.getName().equals("middle");  
            }  
        }).followedByAny("end").where(new SimpleCondition<Event>() {  
            @Override  
            public boolean filter(Event value) throws Exception {  
                return value.getName().equals("end");  
            }  
        });  
  
        DataStream<String> result = CEP.pattern(input, pattern, comparator).select(  
            new PatternSelectFunction<Event, String>() {  
  
                @Override  
                public String select(Map<String, List<Event>> pattern) {  
                    StringBuilder builder = new StringBuilder();  
  
                    builder.append(pattern.get("start").get(0).getId()).append(",")  
                        .append(pattern.get("middle").get(0).getId()).append(",")  
                        .append(pattern.get("end").get(0).getId());  
  
                    return builder.toString();  
                }  
            }  
        );  

从例子代码中可以看到,patterns需要用java代码写,需要编译,很冗长很麻烦,没法动态配置;需要可配置,或提供一种DSL;再者,对于一个流同时只能设置一个pattern,比如对于不同的用户实例想配置不同的pattern,就没法支持;需要支持按key设置pattern。

1.7 Flink目前存在的一些问题

在实时计算中有这么一个普遍的逻辑:业务逻辑中以一个流式数据源与几个相关的配置表进行join操作,而配置表并不是一成不变的,会定期的进行数据更新,可以看成一个缓慢变化的流。这种join环境存在以下几个尚未解决的问题:

1.对元数据库的读压力;如果分析程序有1000并发,是否需要读1000次;

2.读维表数据不能拖慢主数据流的throughput,每秒千万条数据量;

3.动态维表更新问题和一致性问题;元数据是不断变化的,如何把更新同步到各个并发上;

4.冷启动问题,如何保证主数据流流过的时候,维表数据已经ready,否则会出现数据无法处理;

5.超大维表数据会导致流量抖动和频繁gc,比如几十万条的实例数据,可能上百兆。

在Flink社区,对该问题也进行了关注

https://issues.apache.org/jira/browse/FLINK-6131

https://issues.apache.org/jira/browse/FLINK-2320

https://issues.apache.org/jira/browse/FLINK-3514

当然在生产环境上也有相应的解决方案:

使用redis来做cache,只用一个job,负责从元数据库同步数据到redis,这样就解决1,3

然后所有的并发都从redis直接查询需要的元数据,这样就解决4;对于2,在并发上做local cache,只有第一次需要真正查询redis,后续定期异步更新就好,不会影响到主数据流;对于5,因为现在不需要一下全量的读取维表数据到内存,用到的时候才去读,分摊了负载,也可以得到缓解。

这个方案也有一定的弊端,增加了架构的外部依赖,要额外保障外部redis和同步job的稳定性。

2 Flink vs Spark

2.1 框架

Spark把streaming看成是更快的批处理,而Flink把批处理看成streaming的special case。这里面的思路决定了各自的方向,其中两者的差异点有如下这些: 实时 vs 近实时的角度:Flink提供了基于每个事件的流式处理机制,所以可以被认为是一个真正的流式计;而Spark,不是基于事件的粒度,而是用小批量来模拟流式,也就是多个事件的集合。所以Spark被认为是近实时的处理系统。 Spark streaming 是更快的批处理,而Flink Batch是有限数据的流式计算。

2.1.1 流式计算和批处理API

Spark对于流式计算和批处理,都是基于RDD的抽象。这样很方便将两种计算方式合并表示。而Flink将流式计算和批处理分别抽象出来DataStream和DataSet两种API,这一点上Flink相对于spark来说是一个糟糕的设计。

2.2 社区活跃度对比

Spark 2.3 继续向更快、更易用、更智能的目标迈进,引入了低延迟的持续处理能力和流到流的连接,让 Structured Streaming 达到了一个里程碑式的高度。

3 提交一个Flink作业

启动flink服务

./bin/yarn-session.sh -n 4 -jm 2048 -tm 2048

在yarn监控界面上可以看到该作业的执行状态

并验证Wordcount例子

代码语言:javascript
复制
./bin/flink run -m yarn-cluster -yn 4 -yjm 2048 -ytm 2048 
./examples/batch/WordCount.jar

在client端可以看到log:

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-09-30,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 数据社 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1 Flink介绍
    • 1.1部署模式
      • 1.2整合支持
        • 1.3 API支持
          • 1.4 Libraries支持
            • 1.5 Flink on YARN
              • 1.6 CEP(Complex event processing)
                • 1.7 Flink目前存在的一些问题
                • 2 Flink vs Spark
                  • 2.1 框架
                    • 2.2 社区活跃度对比
                    • 3 提交一个Flink作业
                    相关产品与服务
                    流计算 Oceanus
                    流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的企业级实时大数据分析平台,具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。
                    领券
                    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档