前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊flink的ParameterTool

聊聊flink的ParameterTool

原创
作者头像
code4it
发布2019-02-15 09:11:57
4K0
发布2019-02-15 09:11:57
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下flink的ParameterTool

实例

fromPropertiesFile

代码语言:javascript
复制
String propertiesFilePath = "/home/sam/flink/myjob.properties";
ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFilePath);
​
File propertiesFile = new File(propertiesFilePath);
ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFile);
​
InputStream propertiesFileInputStream = new FileInputStream(file);
ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFileInputStream);
  • 使用ParameterTool.fromPropertiesFile从.properties文件创建ParameterTool

fromArgs

代码语言:javascript
复制
public static void main(String[] args) {
    ParameterTool parameter = ParameterTool.fromArgs(args);
    // .. regular code ..
}
  • 使用ParameterTool.fromArgs从命令行创建ParameterTool(比如--input hdfs:///mydata --elements 42)

fromSystemProperties

代码语言:javascript
复制
ParameterTool parameter = ParameterTool.fromSystemProperties();
  • 使用ParameterTool.fromSystemProperties从system properties创建ParameterTool(比如-Dinput=hdfs:///mydata)

获取参数值

代码语言:javascript
复制
ParameterTool parameters = // ...
parameter.getRequired("input");
parameter.get("output", "myDefaultValue");
parameter.getLong("expectedCount", -1L);
parameter.getNumberOfParameters()
// .. there are more methods available.
  • 可以使用ParameterTool的get、getRequired、getLong等方法获取参数值

设置为global

代码语言:javascript
复制
env.getConfig().setGlobalJobParameters(parameters);
​
public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {
​
    @Override
    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
    ParameterTool parameters = (ParameterTool)
        getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
    parameters.getRequired("input");
    // ... do more ...
}
  • 使用env.getConfig().setGlobalJobParameters将ParameterTool的访问范围设置为global

GlobalJobParameters

flink-core-1.7.1-sources.jar!/org/apache/flink/api/common/ExecutionConfig.java

代码语言:javascript
复制
    public static class GlobalJobParameters implements Serializable {
        private static final long serialVersionUID = 1L;
​
        /**
         * Convert UserConfig into a {@code Map<String, String>} representation.
         * This can be used by the runtime, for example for presenting the user config in the web frontend.
         *
         * @return Key/Value representation of the UserConfig
         */
        public Map<String, String> toMap() {
            return Collections.emptyMap();
        }
    }
  • GlobalJobParameters里头有一个toMap方法,返回Collections.emptyMap()

ParameterTool

flink-java-1.7.1-sources.jar!/org/apache/flink/api/java/utils/ParameterTool.java

代码语言:javascript
复制
@Public
public class ParameterTool extends ExecutionConfig.GlobalJobParameters implements Serializable, Cloneable {
    private static final long serialVersionUID = 1L;
​
    protected static final String NO_VALUE_KEY = "__NO_VALUE_KEY";
    protected static final String DEFAULT_UNDEFINED = "<undefined>";
​
    //......
​
    // ------------------ ParameterUtil  ------------------------
    protected final Map<String, String> data;
​
    // data which is only used on the client and does not need to be transmitted
    protected transient Map<String, String> defaultData;
    protected transient Set<String> unrequestedParameters;
​
    private ParameterTool(Map<String, String> data) {
        this.data = Collections.unmodifiableMap(new HashMap<>(data));
​
        this.defaultData = new ConcurrentHashMap<>(data.size());
​
        this.unrequestedParameters = Collections.newSetFromMap(new ConcurrentHashMap<>(data.size()));
​
        unrequestedParameters.addAll(data.keySet());
    }
​
    @Override
    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || getClass() != o.getClass()) {
            return false;
        }
        ParameterTool that = (ParameterTool) o;
        return Objects.equals(data, that.data) &&
            Objects.equals(defaultData, that.defaultData) &&
            Objects.equals(unrequestedParameters, that.unrequestedParameters);
    }
​
    @Override
    public int hashCode() {
        return Objects.hash(data, defaultData, unrequestedParameters);
    }
​
    @Override
    public Map<String, String> toMap() {
        return data;
    }
​
    //......
​
        /**
     * Returns {@link ParameterTool} for the given arguments. The arguments are keys followed by values.
     * Keys have to start with '-' or '--'
     *
     * <p><strong>Example arguments:</strong>
     * --key1 value1 --key2 value2 -key3 value3
     *
     * @param args Input array arguments
     * @return A {@link ParameterTool}
     */
    public static ParameterTool fromArgs(String[] args) {
        final Map<String, String> map = new HashMap<>(args.length / 2);
​
        int i = 0;
        while (i < args.length) {
            final String key;
​
            if (args[i].startsWith("--")) {
                key = args[i].substring(2);
            } else if (args[i].startsWith("-")) {
                key = args[i].substring(1);
            } else {
                throw new IllegalArgumentException(
                    String.format("Error parsing arguments '%s' on '%s'. Please prefix keys with -- or -.",
                        Arrays.toString(args), args[i]));
            }
​
            if (key.isEmpty()) {
                throw new IllegalArgumentException(
                    "The input " + Arrays.toString(args) + " contains an empty argument");
            }
​
            i += 1; // try to find the value
​
            if (i >= args.length) {
                map.put(key, NO_VALUE_KEY);
            } else if (NumberUtils.isNumber(args[i])) {
                map.put(key, args[i]);
                i += 1;
            } else if (args[i].startsWith("--") || args[i].startsWith("-")) {
                // the argument cannot be a negative number because we checked earlier
                // -> the next argument is a parameter name
                map.put(key, NO_VALUE_KEY);
            } else {
                map.put(key, args[i]);
                i += 1;
            }
        }
​
        return fromMap(map);
    }
​
    /**
     * Returns {@link ParameterTool} for the given {@link Properties} file.
     *
     * @param path Path to the properties file
     * @return A {@link ParameterTool}
     * @throws IOException If the file does not exist
     * @see Properties
     */
    public static ParameterTool fromPropertiesFile(String path) throws IOException {
        File propertiesFile = new File(path);
        return fromPropertiesFile(propertiesFile);
    }
​
    /**
     * Returns {@link ParameterTool} for the given {@link Properties} file.
     *
     * @param file File object to the properties file
     * @return A {@link ParameterTool}
     * @throws IOException If the file does not exist
     * @see Properties
     */
    public static ParameterTool fromPropertiesFile(File file) throws IOException {
        if (!file.exists()) {
            throw new FileNotFoundException("Properties file " + file.getAbsolutePath() + " does not exist");
        }
        try (FileInputStream fis = new FileInputStream(file)) {
            return fromPropertiesFile(fis);
        }
    }
​
    /**
     * Returns {@link ParameterTool} for the given InputStream from {@link Properties} file.
     *
     * @param inputStream InputStream from the properties file
     * @return A {@link ParameterTool}
     * @throws IOException If the file does not exist
     * @see Properties
     */
    public static ParameterTool fromPropertiesFile(InputStream inputStream) throws IOException {
        Properties props = new Properties();
        props.load(inputStream);
        return fromMap((Map) props);
    }
​
    /**
     * Returns {@link ParameterTool} for the given map.
     *
     * @param map A map of arguments. Both Key and Value have to be Strings
     * @return A {@link ParameterTool}
     */
    public static ParameterTool fromMap(Map<String, String> map) {
        Preconditions.checkNotNull(map, "Unable to initialize from empty map");
        return new ParameterTool(map);
    }
​
    /**
     * Returns {@link ParameterTool} from the system properties.
     * Example on how to pass system properties:
     * -Dkey1=value1 -Dkey2=value2
     *
     * @return A {@link ParameterTool}
     */
    public static ParameterTool fromSystemProperties() {
        return fromMap((Map) System.getProperties());
    }
​
    //......
​
    /**
     * Returns the String value for the given key.
     * If the key does not exist it will return null.
     */
    public String get(String key) {
        addToDefaults(key, null);
        unrequestedParameters.remove(key);
        return data.get(key);
    }
​
    /**
     * Returns the String value for the given key.
     * If the key does not exist it will throw a {@link RuntimeException}.
     */
    public String getRequired(String key) {
        addToDefaults(key, null);
        String value = get(key);
        if (value == null) {
            throw new RuntimeException("No data for required key '" + key + "'");
        }
        return value;
    }
​
    /**
     * Returns the String value for the given key.
     * If the key does not exist it will return the given default value.
     */
    public String get(String key, String defaultValue) {
        addToDefaults(key, defaultValue);
        String value = get(key);
        if (value == null) {
            return defaultValue;
        } else {
            return value;
        }
    }
​
    /**
     * Check if value is set.
     */
    public boolean has(String value) {
        addToDefaults(value, null);
        unrequestedParameters.remove(value);
        return data.containsKey(value);
    }
​
    // -------------- Integer
​
    /**
     * Returns the Integer value for the given key.
     * The method fails if the key does not exist or the value is not an Integer.
     */
    public int getInt(String key) {
        addToDefaults(key, null);
        String value = getRequired(key);
        return Integer.parseInt(value);
    }
​
    /**
     * Returns the Integer value for the given key. If the key does not exists it will return the default value given.
     * The method fails if the value is not an Integer.
     */
    public int getInt(String key, int defaultValue) {
        addToDefaults(key, Integer.toString(defaultValue));
        String value = get(key);
        if (value == null) {
            return defaultValue;
        }
        return Integer.parseInt(value);
    }
​
    // -------------- LONG
​
    /**
     * Returns the Long value for the given key.
     * The method fails if the key does not exist.
     */
    public long getLong(String key) {
        addToDefaults(key, null);
        String value = getRequired(key);
        return Long.parseLong(value);
    }
​
    /**
     * Returns the Long value for the given key. If the key does not exists it will return the default value given.
     * The method fails if the value is not a Long.
     */
    public long getLong(String key, long defaultValue) {
        addToDefaults(key, Long.toString(defaultValue));
        String value = get(key);
        if (value == null) {
            return defaultValue;
        }
        return Long.parseLong(value);
    }
​
    // -------------- FLOAT
​
    /**
     * Returns the Float value for the given key.
     * The method fails if the key does not exist.
     */
    public float getFloat(String key) {
        addToDefaults(key, null);
        String value = getRequired(key);
        return Float.valueOf(value);
    }
​
    /**
     * Returns the Float value for the given key. If the key does not exists it will return the default value given.
     * The method fails if the value is not a Float.
     */
    public float getFloat(String key, float defaultValue) {
        addToDefaults(key, Float.toString(defaultValue));
        String value = get(key);
        if (value == null) {
            return defaultValue;
        } else {
            return Float.valueOf(value);
        }
    }
​
    // -------------- DOUBLE
​
    /**
     * Returns the Double value for the given key.
     * The method fails if the key does not exist.
     */
    public double getDouble(String key) {
        addToDefaults(key, null);
        String value = getRequired(key);
        return Double.valueOf(value);
    }
​
    /**
     * Returns the Double value for the given key. If the key does not exists it will return the default value given.
     * The method fails if the value is not a Double.
     */
    public double getDouble(String key, double defaultValue) {
        addToDefaults(key, Double.toString(defaultValue));
        String value = get(key);
        if (value == null) {
            return defaultValue;
        } else {
            return Double.valueOf(value);
        }
    }
​
    // -------------- BOOLEAN
​
    /**
     * Returns the Boolean value for the given key.
     * The method fails if the key does not exist.
     */
    public boolean getBoolean(String key) {
        addToDefaults(key, null);
        String value = getRequired(key);
        return Boolean.valueOf(value);
    }
​
    /**
     * Returns the Boolean value for the given key. If the key does not exists it will return the default value given.
     * The method returns whether the string of the value is "true" ignoring cases.
     */
    public boolean getBoolean(String key, boolean defaultValue) {
        addToDefaults(key, Boolean.toString(defaultValue));
        String value = get(key);
        if (value == null) {
            return defaultValue;
        } else {
            return Boolean.valueOf(value);
        }
    }
​
    // -------------- SHORT
​
    /**
     * Returns the Short value for the given key.
     * The method fails if the key does not exist.
     */
    public short getShort(String key) {
        addToDefaults(key, null);
        String value = getRequired(key);
        return Short.valueOf(value);
    }
​
    /**
     * Returns the Short value for the given key. If the key does not exists it will return the default value given.
     * The method fails if the value is not a Short.
     */
    public short getShort(String key, short defaultValue) {
        addToDefaults(key, Short.toString(defaultValue));
        String value = get(key);
        if (value == null) {
            return defaultValue;
        } else {
            return Short.valueOf(value);
        }
    }
​
    // -------------- BYTE
​
    /**
     * Returns the Byte value for the given key.
     * The method fails if the key does not exist.
     */
    public byte getByte(String key) {
        addToDefaults(key, null);
        String value = getRequired(key);
        return Byte.valueOf(value);
    }
​
    /**
     * Returns the Byte value for the given key. If the key does not exists it will return the default value given.
     * The method fails if the value is not a Byte.
     */
    public byte getByte(String key, byte defaultValue) {
        addToDefaults(key, Byte.toString(defaultValue));
        String value = get(key);
        if (value == null) {
            return defaultValue;
        } else {
            return Byte.valueOf(value);
        }
    }
​
    //......
}
  • ParameterTool里头有data、defaultData、unrequestedParameters等属性,toMap方法返回的是data属性
  • ParameterTool提供了fromPropertiesFile、fromArgs、fromSystemProperties、fromMap静态方法用于创建ParameterTool
  • ParameterTool提供了get、getRequired、getInt、getLong、getFloat、getDouble、getBoolean、getShort、getByte等方法,每种类型的get均提供了一个支持defaultValue的方法

小结

  • ParameterTool提供了fromPropertiesFile、fromArgs、fromSystemProperties、fromMap静态方法用于创建ParameterTool
  • ParameterTool提供了get、getRequired、getInt、getLong、getFloat、getDouble、getBoolean、getShort、getByte等方法,每种类型的get均提供了一个支持defaultValue的方法
  • ParameterTool继承了ExecutionConfig.GlobalJobParameters,其toMap方法返回的是data属性;使用env.getConfig().setGlobalJobParameters可以将ParameterTool的访问范围设置为global

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 实例
    • fromPropertiesFile
      • fromArgs
        • fromSystemProperties
          • 获取参数值
            • 设置为global
            • GlobalJobParameters
            • ParameterTool
            • 小结
            • doc
            相关产品与服务
            大数据
            全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档