前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink中如何解析与传递参数

Flink中如何解析与传递参数

作者头像
smartsi
发布2020-11-03 14:32:14
4.5K0
发布2020-11-03 14:32:14
举报
文章被收录于专栏:SmartSiSmartSi

几乎所有的 Flink 应用程序(包括批处理与流处理程序)都需要依赖外部配置参数。例如,可以用来指定输入和输出源(如路径或者地址),系统参数(并发数,运行时配置)以及应用程序特定参数(通常用在自定义函数中)。

从 0.9 版本开始,Flink 提供了一个叫 ParameterTool 的简单程序,提供一些基础的工具来解决上述问题,当然你也可以不用这里描述的 ParameterTool,你可以使用其他框架,例如,Commons CLIargparse4j 在 Flink 中也是支持的。

1. 解析参数

下面我们看一下如何获取配置并导入 ParameterTool 中。ParameterTool 提供了一系列预定义的静态方法来读取配置信息,ParameterTool 内部存储是一个 Map 数据结构,因此很容易与你自己的配置相集成。

1.1 读取.properties文件参数

下面方法将去读取一个本地 Properties 文件,并返回 key/value 键值对:

代码语言:javascript
复制
String propertiesFile = "/Users/wy/study/dev.properties";
ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFile);
1.2 读取命令行参数

下面会从命令行中获取像 –input hdfs:///mydata –elements 42 这种形式的参数:

代码语言:javascript
复制
public static void main(String[] args) {
    ParameterTool parameter = ParameterTool.fromArgs(args);
}
1.3 从系统属性中获取参数

当启动一个 JVM 时,你可以将系统属性传递给它:-Dinput=hdfs:///mydata,你还可以用这些系统属性来初始化 ParameterTool:

代码语言:javascript
复制
ParameterTool parameter = ParameterTool.fromSystemProperties();
1.4 使用参数

我们已经将参数放在了 ParameterTool 对象中,那现在我们如何从 ParameterTool 对象中获取参数呢?ParameterTool 提供了一些内置的方法以供我们访问:

代码语言:javascript
复制
ParameterTool parameters = // ...
parameter.getRequired("input");
parameter.get("output", "myDefaultValue");
parameter.getLong("expectedCount", -1L);
parameter.getNumberOfParameters()

我们可以直接在 main() 方法中使用这些方法的返回值。例如,我们可以像如下设置算子的并行度:

代码语言:javascript
复制
ParameterTool parameters = ParameterTool.fromArgs(args);
int parallelism = parameters.get("mapParallelism", 2);
DataSet<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()).setParallelism(parallelism);

2. 传递参数

在数据处理的过程中,往往需要给函数传递一些参数,那下面看看有哪些方法可以进行参数的传递?

2.1 使用Configuration

我们可以通过 Configuration 对象为函数传递参数。如下显示将 Configuration 作为配置对象传递给用户定义的函数:

代码语言:javascript
复制
ParameterTool parameters = ParameterTool.fromArgs(args);
Configuration conf = parameters.getConfiguration();
DataSet<Tuple2<String, Integer>> counts = text
        .flatMap(new Tokenizer())
        .withParameters(conf);

在 Tokenizer 中,我们可以通过 open(Configuration conf) 方法访问传递过来的参数:

代码语言:javascript
复制
public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {
    @Override
    public void open(Configuration conf) throws Exception {
      // 获取参数
	    conf.getInteger('xxx', -1);
    }
}

如果我们不需要使用 ParameterTool 来解析参数,我们可以直接定义 Configuration:

代码语言:javascript
复制
Configuration conf = new Configuration();
conf.setString("xxx", "xxx");
DataSet<Tuple2<String, Integer>> counts = text
        .flatMap(new Tokenizer())
        .withParameters(conf);
2.2 注册全局参数

除了上述方法之外,我们还可以在 ExecutionConfig 中将参数注册为全局作业参数,可以在 JobManager 的 WEB 界面或者用户自定义函数中访问配置值。如下代码所示注册全局参数:

代码语言:javascript
复制
ParameterTool parameters = ParameterTool.fromArgs(args);
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(parameters);

可以在任意 Rich 用户函数中访问参数:

代码语言:javascript
复制
public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {
    @Override
    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
      ParameterTool parameters = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
	    parameters.getRequired('xxx');
    }
}
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2020-10-31,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. 解析参数
    • 1.1 读取.properties文件参数
      • 1.2 读取命令行参数
        • 1.3 从系统属性中获取参数
          • 1.4 使用参数
          • 2. 传递参数
            • 2.1 使用Configuration
              • 2.2 注册全局参数
              相关产品与服务
              大数据
              全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档