首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

从Example出发:读懂start-start-shell.sh任务执行流程

微信公众号:深广大数据Club

关注可了解更多大数据的相关资讯。问题或建议,请公众号留言;

如果你觉得深广大数据Club对你有帮助,帮忙转发文章到微信朋友圈

本文我们主要介绍Apache Flink集成的交互式Scala Shell脚本。我们既可以在本地安装模式下或者集群模式下运行该脚本。之后就可以在这之上执行你所编写的代码程序。

Scala REPL

start-scala-shell.sh脚本存放于Flink安装目录的bin底下。

通过如下命令在单机模式下启动shell脚本:

详细的使用方法可具体查看Scala REPL:https://github.com/Jonathan-Wei/Flink-Docs-CN/blob/master/06%20%E9%83%A8%E7%BD%B2-%E6%93%8D%E4%BD%9C/07%20Scala%20REPL.md

start-scala-shell.sh

这里主要看脚本最后调用的代码信息,从如下代码可以看到,脚本最终调用的是FlinkShell.scala

FlinkShell.scala

从方法入手,来查看具体的代码逻辑。

从代码上可以看出,启动方式包含三种:local、remote、yarn

在指定启动方式之后,会将executionMode指定对应的模式。最后通过startShell启动脚本。

startShell方法

以上代码做了两件事,第一件事是获取链接信息fetchConnectionInfo(host, port, cluster)并读取配置,另外一件事是基于配置new一个FlinkIloop对象repl。之后通过repl.process(settings)启动处理

我们再来看下关键的fetchConnectionInfo方法

fetchConnectionInfo方法中对ExecutionMode的类型进行匹配。

LOCAL模式

REMOTE模式

远程模式仅获取host和port提供调用。

YARN模式

如果yarnConfig不为空调用,否则则使用yarn properties文件的信息,调用

deployNewYarnCluster

fetchDeployedYarnClusterInfo

两个方法最终的都是通过AkkaUtils.getInetSocketAddressFromAkkaURL获取host以及port。

不同的是前面的部署,deployNewYarnCluster通过clusterDescriptor.deploySessionCluster部署集群获取Cluster,而fetchDeployedYarnClusterInfo则是先获取clusterID,调用clusterDescriptor.retrieve()传入clusterId获取Cluster

FlinkILoop

回过头来看下之前提到的repl,其实就是FlinkILoop的实例。在FlinkILoop包含了Local模式的环境信息以及Remote模式的环境信息

LOCAL模式

批量env:ExecutionEnvironment

流式env:StreamExecutionEnvironment

local模式在《

Flink源码解析 | 从Example出发:读懂本地任务执行流程

》讲过,这里就不再赘述

REMOTE模式

批量env:ScalaShellRemoteEnvironment

流式env:ScalaShellRemoteStreamEnvironment

我们在交互式shell脚本运行后,在其命令行中编写代码逻辑,编写完成之后通过env.execute()执行。

代码入口

这里我们拿官网的例子来看。

我们运行脚本进入交互式界面后,其实脚本就已经内置了benv以及senv环境变量,之后编写代码,调用execute方法。

我们来看下ScalaShellRemoteEnvironment以及ScalaShellRemoteStreamEnvironment的内部实现。

ScalaShellRemoteEnvironment继承RemoteEnvironment

通过getExecutor方法获取PlanExecutor执行器对象

创建ProgramPlan

通过executor.executePlan方法执行计划并返回result

ScalaShellRemoteStreamEnvironment继承RemoteStreamEnvironment

先通过RemoteStreamEnvironment.execute()方法获取StreamGraph,再调用其子类ScalaShellRemoteStreamEnvironment的executeRemotely方法

ScalaShellRemoteStreamEnvironment.executeRemotely

获取url对象以及所需添加的jar包的url对象后,调用父类RemoteStreamEnvironment的executeRemotely方法。

RemoteStreamEnvironment.executeRemotely

主要做了两件事

获取ClusterClient,此处的ClusterClient是RestClusterClient

调用RestClusterClient.run()执行并获取JobExecutionResult

后续的流程与先前文章的流程类似,只是最后submitJob是调用的RestClusterClient的submitJob。具体内容我这里就不再深入。自己试着理解下。其他大体都和之前的流程类似。

  • 发表于:
  • 原文链接https://kuaibao.qq.com/s/20181210G1D8W200?refer=cp_1026
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券