聊聊flink KeyedStream的KeySelector

本文主要研究一下flink KeyedStream的KeySelector

KeyedStream

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/KeyedStream.java

@Public
public class KeyedStream<T, KEY> extends DataStream<T> {
​
    /**
     * The key selector that can get the key by which the stream if partitioned from the elements.
     */
    private final KeySelector<T, KEY> keySelector;
​
    /** The type of the key by which the stream is partitioned. */
    private final TypeInformation<KEY> keyType;
​
    /**
     * Creates a new {@link KeyedStream} using the given {@link KeySelector}
     * to partition operator state by key.
     *
     * @param dataStream
     *            Base stream of data
     * @param keySelector
     *            Function for determining state partitions
     */
    public KeyedStream(DataStream<T> dataStream, KeySelector<T, KEY> keySelector) {
        this(dataStream, keySelector, TypeExtractor.getKeySelectorTypes(keySelector, dataStream.getType()));
    }
​
    /**
     * Creates a new {@link KeyedStream} using the given {@link KeySelector}
     * to partition operator state by key.
     *
     * @param dataStream
     *            Base stream of data
     * @param keySelector
     *            Function for determining state partitions
     */
    public KeyedStream(DataStream<T> dataStream, KeySelector<T, KEY> keySelector, TypeInformation<KEY> keyType) {
        this(
            dataStream,
            new PartitionTransformation<>(
                dataStream.getTransformation(),
                new KeyGroupStreamPartitioner<>(keySelector, StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM)),
            keySelector,
            keyType);
    }
​
    /**
     * Creates a new {@link KeyedStream} using the given {@link KeySelector} and {@link TypeInformation}
     * to partition operator state by key, where the partitioning is defined by a {@link PartitionTransformation}.
     *
     * @param stream
     *            Base stream of data
     * @param partitionTransformation
     *            Function that determines how the keys are distributed to downstream operator(s)
     * @param keySelector
     *            Function to extract keys from the base stream
     * @param keyType
     *            Defines the type of the extracted keys
     */
    @Internal
    KeyedStream(
        DataStream<T> stream,
        PartitionTransformation<T> partitionTransformation,
        KeySelector<T, KEY> keySelector,
        TypeInformation<KEY> keyType) {
​
        super(stream.getExecutionEnvironment(), partitionTransformation);
        this.keySelector = clean(keySelector);
        this.keyType = validateKeyType(keyType);
    }
​
    //......
}
  • 这里可以看到KeyedStream的不同构造器中都需要一个KeySelector类型的参数

KeySelector

flink-core-1.7.0-sources.jar!/org/apache/flink/api/java/functions/KeySelector.java

@Public
@FunctionalInterface
public interface KeySelector<IN, KEY> extends Function, Serializable {
​
    /**
     * User-defined function that deterministically extracts the key from an object.
     *
     * <p>For example for a class:
     * <pre>
     *  public class Word {
     *      String word;
     *      int count;
     *  }
     * </pre>
     * The key extractor could return the word as
     * a key to group all Word objects by the String they contain.
     *
     * <p>The code would look like this
     * <pre>
     *  public String getKey(Word w) {
     *      return w.word;
     *  }
     * </pre>
     *
     * @param value The object to get the key from.
     * @return The extracted key.
     *
     * @throws Exception Throwing an exception will cause the execution of the respective task to fail,
     *                   and trigger recovery or cancellation of the program.
     */
    KEY getKey(IN value) throws Exception;
}
  • KeySelector接口继承了Function接口,定义了getKey方法,用于从IN类型中提取出KEY

DataStream.keyBy

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/DataStream.java

    /**
     * It creates a new {@link KeyedStream} that uses the provided key for partitioning
     * its operator states.
     *
     * @param key
     *            The KeySelector to be used for extracting the key for partitioning
     * @return The {@link DataStream} with partitioned state (i.e. KeyedStream)
     */
    public <K> KeyedStream<T, K> keyBy(KeySelector<T, K> key) {
        Preconditions.checkNotNull(key);
        return new KeyedStream<>(this, clean(key));
    }
​
    /**
     * It creates a new {@link KeyedStream} that uses the provided key with explicit type information
     * for partitioning its operator states.
     *
     * @param key The KeySelector to be used for extracting the key for partitioning.
     * @param keyType The type information describing the key type.
     * @return The {@link DataStream} with partitioned state (i.e. KeyedStream)
     */
    public <K> KeyedStream<T, K> keyBy(KeySelector<T, K> key, TypeInformation<K> keyType) {
        Preconditions.checkNotNull(key);
        Preconditions.checkNotNull(keyType);
        return new KeyedStream<>(this, clean(key), keyType);
    }
​
    /**
     * Partitions the operator state of a {@link DataStream} by the given key positions.
     *
     * @param fields
     *            The position of the fields on which the {@link DataStream}
     *            will be grouped.
     * @return The {@link DataStream} with partitioned state (i.e. KeyedStream)
     */
    public KeyedStream<T, Tuple> keyBy(int... fields) {
        if (getType() instanceof BasicArrayTypeInfo || getType() instanceof PrimitiveArrayTypeInfo) {
            return keyBy(KeySelectorUtil.getSelectorForArray(fields, getType()));
        } else {
            return keyBy(new Keys.ExpressionKeys<>(fields, getType()));
        }
    }
​
    /**
     * Partitions the operator state of a {@link DataStream} using field expressions.
     * A field expression is either the name of a public field or a getter method with parentheses
     * of the {@link DataStream}'s underlying type. A dot can be used to drill
     * down into objects, as in {@code "field1.getInnerField2()" }.
     *
     * @param fields
     *            One or more field expressions on which the state of the {@link DataStream} operators will be
     *            partitioned.
     * @return The {@link DataStream} with partitioned state (i.e. KeyedStream)
     **/
    public KeyedStream<T, Tuple> keyBy(String... fields) {
        return keyBy(new Keys.ExpressionKeys<>(fields, getType()));
    }
​
    private KeyedStream<T, Tuple> keyBy(Keys<T> keys) {
        return new KeyedStream<>(this, clean(KeySelectorUtil.getSelectorForKeys(keys,
                getType(), getExecutionConfig())));
    }
  • DataStream的keyBy方法用于将DataStream转换为KeyedStream,该方法有不同的重载
  • 一个是支持变长int数组,这个通常用于简单tuple类型,int为tuple的小标,从0开始,如果是多个int,表示是组合key,比如keyBy(0,1)表示要用tuple的第一个和第二个字段作为key;
  • 一个是支持变长String数组,这个通常用于复杂tuple类型及POJO类型,对于POJO,String用于指定字段名,也支持对象/tuple嵌套属性,比如user.zip,对于对象类型的tuple,f0表示该tuple的第一个字段
  • 一个是支持KeySelector,通过Key Selector Function可以自由指定key,比如从对象提取然后做些处理
  • keyBy(int... fields)及keyBy(String... fields)里头均有调用到私有的keyBy(Keys<T> keys)方法,由于KeyedStream的构造器都需要KeySelector参数,所以该方法最后也是通过KeySelectorUtil.getSelectorForKeys将Keys转换为KeySelector对象

Keys.ExpressionKeys

flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/operators/Keys.java

    /**
     * Represents (nested) field access through string and integer-based keys
     */
    public static class ExpressionKeys<T> extends Keys<T> {
        
        public static final String SELECT_ALL_CHAR = "*";
        public static final String SELECT_ALL_CHAR_SCALA = "_";
        private static final Pattern WILD_CARD_REGEX = Pattern.compile("[\\.]?("
                + "\\" + SELECT_ALL_CHAR + "|"
                + "\\" + SELECT_ALL_CHAR_SCALA +")$");
​
        // Flattened fields representing keys fields
        private List<FlatFieldDescriptor> keyFields;
        private TypeInformation<?>[] originalKeyTypes;
​
        //......
​
        /**
         * Create String-based (nested) field expression keys on a composite type.
         */
        public ExpressionKeys(String[] keyExpressions, TypeInformation<T> type) {
            checkNotNull(keyExpressions, "Field expression cannot be null.");
​
            this.keyFields = new ArrayList<>(keyExpressions.length);
​
            if (type instanceof CompositeType){
                CompositeType<T> cType = (CompositeType<T>) type;
                this.originalKeyTypes = new TypeInformation<?>[keyExpressions.length];
​
                // extract the keys on their flat position
                for (int i = 0; i < keyExpressions.length; i++) {
                    String keyExpr = keyExpressions[i];
​
                    if (keyExpr == null) {
                        throw new InvalidProgramException("Expression key may not be null.");
                    }
                    // strip off whitespace
                    keyExpr = keyExpr.trim();
​
                    List<FlatFieldDescriptor> flatFields = cType.getFlatFields(keyExpr);
​
                    if (flatFields.size() == 0) {
                        throw new InvalidProgramException("Unable to extract key from expression '" + keyExpr + "' on key " + cType);
                    }
                    // check if all nested fields can be used as keys
                    for (FlatFieldDescriptor field : flatFields) {
                        if (!field.getType().isKeyType()) {
                            throw new InvalidProgramException("This type (" + field.getType() + ") cannot be used as key.");
                        }
                    }
                    // add flat fields to key fields
                    keyFields.addAll(flatFields);
​
                    String strippedKeyExpr = WILD_CARD_REGEX.matcher(keyExpr).replaceAll("");
                    if (strippedKeyExpr.isEmpty()) {
                        this.originalKeyTypes[i] = type;
                    } else {
                        this.originalKeyTypes[i] = cType.getTypeAt(strippedKeyExpr);
                    }
                }
            }
            else {
                if (!type.isKeyType()) {
                    throw new InvalidProgramException("This type (" + type + ") cannot be used as key.");
                }
​
                // check that all key expressions are valid
                for (String keyExpr : keyExpressions) {
                    if (keyExpr == null) {
                        throw new InvalidProgramException("Expression key may not be null.");
                    }
                    // strip off whitespace
                    keyExpr = keyExpr.trim();
                    // check that full type is addressed
                    if (!(SELECT_ALL_CHAR.equals(keyExpr) || SELECT_ALL_CHAR_SCALA.equals(keyExpr))) {
                        throw new InvalidProgramException(
                            "Field expression must be equal to '" + SELECT_ALL_CHAR + "' or '" + SELECT_ALL_CHAR_SCALA + "' for non-composite types.");
                    }
                    // add full type as key
                    keyFields.add(new FlatFieldDescriptor(0, type));
                }
                this.originalKeyTypes = new TypeInformation[] {type};
            }
        }
​
        //......
    }
  • ExpressionKeys是Keys里头的一个静态类,它继承了Keys对象;keyBy(int... fields)及keyBy(String... fields)里头均有通过new Keys.ExpressionKeys,将fields转换为Keys.ExpressionKeys,最后调用私有的keyBy(Keys<T> keys)方法

KeySelectorUtil.getSelectorForKeys

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/util/keys/KeySelectorUtil.java

@Internal
public final class KeySelectorUtil {
​
    public static <X> KeySelector<X, Tuple> getSelectorForKeys(Keys<X> keys, TypeInformation<X> typeInfo, ExecutionConfig executionConfig) {
        if (!(typeInfo instanceof CompositeType)) {
            throw new InvalidTypesException(
                    "This key operation requires a composite type such as Tuples, POJOs, or Case Classes.");
        }
​
        CompositeType<X> compositeType = (CompositeType<X>) typeInfo;
​
        int[] logicalKeyPositions = keys.computeLogicalKeyPositions();
        int numKeyFields = logicalKeyPositions.length;
​
        TypeInformation<?>[] typeInfos = keys.getKeyFieldTypes();
        // use ascending order here, the code paths for that are usually a slight bit faster
        boolean[] orders = new boolean[numKeyFields];
        for (int i = 0; i < numKeyFields; i++) {
            orders[i] = true;
        }
​
        TypeComparator<X> comparator = compositeType.createComparator(logicalKeyPositions, orders, 0, executionConfig);
        return new ComparableKeySelector<>(comparator, numKeyFields, new TupleTypeInfo<>(typeInfos));
    }
​
    //......
}
  • KeySelectorUtil.getSelectorForKeys方法用于将Keys转换为KeySelector类型

小结

  • KeyedStream的不同构造器中都需要一个KeySelector参数
  • DataStream的keyBy方法有不同的重载,支持变长int数组,变长String数组以及KeySelector类型
  • keyBy(int... fields)及keyBy(String... fields)里头均有通过new Keys.ExpressionKeys,将fields转换为Keys.ExpressionKeys,最后调用私有的keyBy(Keys<T> keys)方法,该方法通过调用KeySelectorUtil.getSelectorForKeys方法将Keys转换为KeySelector类型

doc

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Spark学习技巧

Flink异步IO第一讲

Async I/O 是阿里巴巴贡献给社区的一个呼声非常高的特性,于1.2版本引入。主要目的是为了解决与外部系统交互时网络延迟成为了系统瓶颈的问题。

32940
来自专栏IT大咖说

春哥与你相约在网易,OpenResty Con 2018

OpenResty 大会是由 OpenResty 软件基金会和 OpenResty Inc. 主办的技术盛会,旨在为业界建立一个广泛、有效的交流合作平台。

21430
来自专栏Java学习网

Java开发 中运用动态挂载实现 Bug 的热修复

大多数 JVM 具备 Java 的 HotSwap 特性,大部分开发者认为它仅仅是一个调试工具。利用这一特性,有可能在不重启 Java 进程条件下,改变 Jav...

12330
来自专栏Java与Android技术栈

TensorFlow Lite for Android 初探(附demo)一. TensorFlow Lite二. tflite 格式三. 常用的 Java API四. TensorFlow Lite

我们知道大多数的 AI 是在云端运算的,但是在移动端使用 AI 具有无网络延迟、响应更加及时、数据隐私等特性。

83120
来自专栏刘望舒

带你彻底了解Android Jetpack组件的Paging库

初次接除 paging, 可能会一脸懵逼,感觉出来了很多 API, 不知道从哪里下手。我们先对 paging 的组成部分进行一个了解。

39220
来自专栏比原链

Bytom Java版本离线签名

Gitee地址:https://gitee.com/BytomBlockchain/bytom

16540
来自专栏杨建荣的学习笔记

基于MHA+consul的MySQL高可用设计

基于MHA的高可用方案算是一种相对保守的使用,一方面也是为了兼顾低版本的历史问题。而采用MGR的过程,中间还有一些路和坑要走完,所以在快和慢之间,在数据一致性之...

29320
来自专栏程序员的知识天地

还在付费爱奇艺VIP?神级程序员教你用Python任意下!

我相信如果看电影的都知道,不管是爱奇艺还是腾讯视频还是优酷很多的电影电视都是需要VIP的,但是为了看这么一个电视或者电影开个vip又不是很划算。

52010
来自专栏数据和云

嘉年华专访 | 国际上智能运维研究

张圣林,南开大学助理教授,于2017年7月获清华大学工学博士学位(计算机科学与技术专业)并获得清华大学优秀博士学位论文,导师是刘莹老师和裴丹老师。

57530
来自专栏老九学堂

Java的IO流之字节流,Java中必须要学的内容,你会嘛?快打开学习

IO流用来处理设备之间的数据传输,Java对数据的操作是通过流的方式,用于操作流的类都在IO包中。

13630

扫码关注云+社区

领取腾讯云代金券

年度创作总结 领取年终奖励