前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink Scala Shell:使用交互式编程环境学习和调试Flink

Flink Scala Shell:使用交互式编程环境学习和调试Flink

作者头像
PP鲁
发布2019-12-26 14:38:26
2.1K0
发布2019-12-26 14:38:26
举报
文章被收录于专栏:皮皮鲁的AI星球皮皮鲁的AI星球

交互式编程环境:REPL

当前最著名的交互式编程环境莫属Jupyter Notebook了,程序员可以启动一个交互的Session,在这Session中编写代码、执行程序、获取结果,所见即所得。

交互式编程的优势包括:

  • 快速启动一个程序:程序员不需要“编译、打包、执行”这一套复杂过程,只需要开启一个交互Session,敲下代码,直接得到结果,非常适合验证一段代码的结果。
  • 直接获得程序反馈:使用print,可以在交互环境中直接得到程序结果,无需将输出导出到文件或其他位置。

程序员敲入几行程序命令,环境可以立刻给出反馈,因此这种交互式环境非常适合调试程序,尤其对于初学者来说非常友好。交互式编程环境通常被称为REPL(Read-Eval-Print Loop),这种方式跟Linux的命令行非常相似,因此又被称为Shell。支持REPL的语言有Python、R、Matlab、Scala以及Java 9。

我之前经常使用Spark的交互式环境spark-shell,Flink基于Java和Scala,其实也是支持交互式编程的,这里推荐新人使用REPL交互式环境来上手和学习Flink。

注意,Flink的交互式编程环境只支持Scala语言,程序员可以基于Scala语言调用DataStream/DataSet API、Table API & SQL,不支持Java。另外,Flink提供了Python版本的REPL环境,不过目前Flink(1.9)的Python API只支持Table API调用。本文主要展示Scala的REPL的使用方法。

启动REPL环境

下载Flink

前往Flink Downloads页面(https://flink.apache.org/downloads.html)下载编译好的Flink程序,这里根据你的Scala版本、是否需要搭载Hadoop环境等需求来选择适合的版本,没有特殊需求的选择最近版本的Flink即可。下载的tgz包解压缩后,在bin目录下有一些Flink提供的基础工具。

注意,Flink目前对类Unix系统(Linux、macOS)比较友好,尽管Flink可以在Windows默认命令行上运行,但只限于一些基础命令,比如不支持REPL。Windows用户需要使用Cygwin、Windows Subsystem for Linux等方式来模拟Unix环境。

启动REPL

在命令行里进入Flink解压缩之后的目录,在本地启动一个Flink REPL交互式环境。

代码语言:javascript
复制
bin/start-scala-shell.sh local

启动后,命令行中会反馈一些注意信息:

验证一下Scala的Hello World:

代码语言:javascript
复制
scala >  打印(“ Hello World!”)
 世界您好!

Scala Shell的使用

使用正确的运行环境

Flink Shell已经支持批处理和流处理两种模式。如上图所示,Flink在这个交互环境中默认提供运行环境Execution Environment,其中批处理为benv、流处理为senv。Flink是一个运行在集群上的大数据系统,需要通过运行环境这个入口与集群交互,因此运行环境是Flink程序必不可少的重要元素。

下面演示使用Scala Shell来运行一个最基础的map算子:

代码语言:javascript
复制
scala >  val  dataStream:DataStream [ Int ] =  senv。fromElements(1,2,- 3,0,5,- 9,8)
dataStream:组织。阿帕奇。flink。流。api。斯卡拉(Scala)。DataStream [ Int ] =  org。阿帕奇。flink。流。api。斯卡拉(Scala)。数据流@ 3013e1e8
scala >  val  lambda  =  dataStream。map(输入 =>  输入 *  2)。列印()
lambda:组织。阿帕奇。flink。流。api。数据流。DataStreamSink [ Int ] =  org。阿帕奇。flink。流。api。数据流。DataStreamSink @ 4ff00844
斯卡拉>  SENV。执行(“基本地图转换”)
2
4
- 6
0
10
- 18
16
res0:组织。阿帕奇。flink。api。普通的。JobExecutionResult  =  org。阿帕奇。flink。api。普通的。JobExecutionResult @ 7f59f4e4

我创建了一个数字列表DataStream,然后使用map对每个元素乘以2,并打印出来。

注意,在流处理模式下,print不会自动触发,必须调用execute才能触发执行前面的程序。

代码拷贝

我们经常遇到的一个使用场景是从网上看到一些代码片段,需要拷贝过来验证正确性。在Scala Shell中,可以使用:paste命令进入拷贝模式,复制粘贴之后,再使用Control + D按键组合退出粘贴模式。

代码语言:javascript
复制

斯卡拉>:粘贴
//进入粘贴模式(按Ctrl-D完成)
val  textStreaming  =  senv。fromElements(
  “成为或不成为-这是一个问题:-”,
  “心中难免会受苦”,
  “吊索和离谱财富的箭”,
  “或采取行动抵御麻烦之海,”)
^ D
//退出粘贴模式,现在正在解释。
textStreaming:组织。阿帕奇。flink。流。api。斯卡拉(Scala)。DataStream [ String ] =  org。阿帕奇。flink。流。api。斯卡拉(Scala)。数据流@ 62e8ef9f

使用其他依赖

如果程序依赖了其他包,可以在启动Flink Scala Shell时,加上参数-a <path/to/jar>--addclasspath <path/to/jar>

例如,我想使用Gson来解析json数据:

代码语言:javascript
复制
bin / start-scala-shell.sh本地-a /Users/luweizheng/.m2/repository/com/google/code/gson/gson/2.8.5/gson-2.8.5.jar

这样我就能在交互式环境中使用这个包下的各种类和方法了。

绝大多数情况下,我们可能要依赖多个不同的包,这时候需要使用maven-shade-plugin工具将所依赖包合并到一起,打成一个超级包(uber-jar),超级包内包含了这个程序所有必备的依赖。

使用Flink

Flink Scala Shell也支持扩展模式,包括独立的Flink集成和与其他应用程序共享的纱线实现。

远程链接

使用remote模式,指定JobManager的机器名(IP)和端口号:

代码语言:javascript
复制
bin / start-scala-shell.sh远程<主机名> <端口号>
纱线

使用这个命令可以在Yarn上部署一个新的Flink集群,并使用其他参数来配置集群信息,比如`-n 2将申请2个TaskManager,其他详细使用方法可以参见下面完整使用手册。

代码语言:javascript
复制

bin / start-scala-shell.sh yarn -n  2

完整使用方法

代码语言:javascript
复制
Flink Scala壳
用法:start-scala-shell.sh [本地|远程|纱线] [选项] <args> ...
命令:本地[选项]
使用本地Flink集群启动Flink Scala Shell
  -a <路径/到/罐子> | --addclasspath <路径/到/罐子>
        指定在 Flink中使用的其他jar
命令:远程[选项] <主机> <端口>
启动Flink Scala Shell连接到远程集群
  <主机>
        远程主机名作为字符串
  <端口>
        远程端口为整数
  -a <路径/到/罐子> | --addclasspath <路径/到/罐子>
        指定在 Flink中使用的其他jar
命令:yarn [options]
启动Flink Scala外壳连接到纱线簇
  -n arg | -容器 arg
        要分配的YARN容器数(= TaskManagers数)
  -jm arg | --jobManagerMemory arg
        存储器为具有可选的单元JobManager容器(默认值:MB)
  -nm <值> | --name <值>
为 YARN上的应用程序        设置自定义名称
  -qu <arg> | --queue <arg>
        指定YARN队列
  -s <arg> | --slots <arg>
        每个TaskManager的插槽数
  -tm <arg> | --taskManagerMemory <arg>
        每个TaskManager容器的内存,带可选单位(默认值:MB)
  -a <路径/到/罐子> | --addclasspath <路径/到/罐子>
        指定在 Flink中使用的其他jar
  --configDir <值>
        配置目录。
  -h |  - 救命
        打印此用法文本
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-11-21,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 皮皮鲁的AI星球 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 交互式编程环境:REPL
  • 启动REPL环境
    • 下载Flink
      • 启动REPL
      • Scala Shell的使用
        • 使用正确的运行环境
          • 代码拷贝
            • 使用其他依赖
              • 使用Flink
                • 远程链接
                • 纱线
            • 完整使用方法
            相关产品与服务
            大数据
            全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档