前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊flink的PartitionableListState

聊聊flink的PartitionableListState

原创
作者头像
code4it
发布2018-12-12 10:29:37
6340
发布2018-12-12 10:29:37
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下flink的PartitionableListState

PartitionableListState

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java

代码语言:javascript
复制
    /**
     * Implementation of operator list state.
     *
     * @param <S> the type of an operator state partition.
     */
    static final class PartitionableListState<S> implements ListState<S> {
​
        /**
         * Meta information of the state, including state name, assignment mode, and serializer
         */
        private RegisteredOperatorStateBackendMetaInfo<S> stateMetaInfo;
​
        /**
         * The internal list the holds the elements of the state
         */
        private final ArrayList<S> internalList;
​
        /**
         * A serializer that allows to perform deep copies of internalList
         */
        private final ArrayListSerializer<S> internalListCopySerializer;
​
        PartitionableListState(RegisteredOperatorStateBackendMetaInfo<S> stateMetaInfo) {
            this(stateMetaInfo, new ArrayList<S>());
        }
​
        private PartitionableListState(
                RegisteredOperatorStateBackendMetaInfo<S> stateMetaInfo,
                ArrayList<S> internalList) {
​
            this.stateMetaInfo = Preconditions.checkNotNull(stateMetaInfo);
            this.internalList = Preconditions.checkNotNull(internalList);
            this.internalListCopySerializer = new ArrayListSerializer<>(stateMetaInfo.getPartitionStateSerializer());
        }
​
        private PartitionableListState(PartitionableListState<S> toCopy) {
​
            this(toCopy.stateMetaInfo.deepCopy(), toCopy.internalListCopySerializer.copy(toCopy.internalList));
        }
​
        public void setStateMetaInfo(RegisteredOperatorStateBackendMetaInfo<S> stateMetaInfo) {
            this.stateMetaInfo = stateMetaInfo;
        }
​
        public RegisteredOperatorStateBackendMetaInfo<S> getStateMetaInfo() {
            return stateMetaInfo;
        }
​
        public PartitionableListState<S> deepCopy() {
            return new PartitionableListState<>(this);
        }
​
        @Override
        public void clear() {
            internalList.clear();
        }
​
        @Override
        public Iterable<S> get() {
            return internalList;
        }
​
        @Override
        public void add(S value) {
            Preconditions.checkNotNull(value, "You cannot add null to a ListState.");
            internalList.add(value);
        }
​
        @Override
        public String toString() {
            return "PartitionableListState{" +
                    "stateMetaInfo=" + stateMetaInfo +
                    ", internalList=" + internalList +
                    '}';
        }
​
        public long[] write(FSDataOutputStream out) throws IOException {
​
            long[] partitionOffsets = new long[internalList.size()];
​
            DataOutputView dov = new DataOutputViewStreamWrapper(out);
​
            for (int i = 0; i < internalList.size(); ++i) {
                S element = internalList.get(i);
                partitionOffsets[i] = out.getPos();
                getStateMetaInfo().getPartitionStateSerializer().serialize(element, dov);
            }
​
            return partitionOffsets;
        }
​
        @Override
        public void update(List<S> values) {
            internalList.clear();
​
            addAll(values);
        }
​
        @Override
        public void addAll(List<S> values) {
            if (values != null && !values.isEmpty()) {
                internalList.addAll(values);
            }
        }
    }
  • PartitionableListState是DefaultOperatorStateBackend使用的ListState实现,其内部使用的是ArrayList(internalList)来存储state,而stateMetaInfo使用的是RegisteredOperatorStateBackendMetaInfo;其write方法将internalList的数据序列化到FSDataOutputStream,并返回每个记录对应的offset数组(partitionOffsets)

ListState

flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/state/ListState.java

代码语言:javascript
复制
/**
 * {@link State} interface for partitioned list state in Operations.
 * The state is accessed and modified by user functions, and checkpointed consistently
 * by the system as part of the distributed snapshots.
 *
 * <p>The state is only accessible by functions applied on a {@code KeyedStream}. The key is
 * automatically supplied by the system, so the function always sees the value mapped to the
 * key of the current element. That way, the system can handle stream and state partitioning
 * consistently together.
 *
 * @param <T> Type of values that this list state keeps.
 */
@PublicEvolving
public interface ListState<T> extends MergingState<T, Iterable<T>> {
​
    /**
     * Updates the operator state accessible by {@link #get()} by updating existing values to
     * to the given list of values. The next time {@link #get()} is called (for the same state
     * partition) the returned state will represent the updated list.
     *
     * <p>If null or an empty list is passed in, the state value will be null.
     *
     * @param values The new values for the state.
     *
     * @throws Exception The method may forward exception thrown internally (by I/O or functions).
     */
    void update(List<T> values) throws Exception;
​
    /**
     * Updates the operator state accessible by {@link #get()} by adding the given values
     * to existing list of values. The next time {@link #get()} is called (for the same state
     * partition) the returned state will represent the updated list.
     *
     * <p>If null or an empty list is passed in, the state value remains unchanged.
     *
     * @param values The new values to be added to the state.
     *
     * @throws Exception The method may forward exception thrown internally (by I/O or functions).
     */
    void addAll(List<T> values) throws Exception;
}
  • ListState主要用于operation存储partitioned list state,它继承了MergingState接口(指定OUT的泛型为Iterable<T>),同时声明了两个方法;其中update用于全量更新state,如果参数为null或者empty,那么state会被清空;addAll方法用于增量更新,如果参数为null或者empty,则保持不变,否则则新增给定的values

MergingState

flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/state/MergingState.java

代码语言:javascript
复制
/**
 * Extension of {@link AppendingState} that allows merging of state. That is, two instances
 * of {@link MergingState} can be combined into a single instance that contains all the
 * information of the two merged states.
 *
 * @param <IN> Type of the value that can be added to the state.
 * @param <OUT> Type of the value that can be retrieved from the state.
 */
@PublicEvolving
public interface MergingState<IN, OUT> extends AppendingState<IN, OUT> { }
  • MergingState接口仅仅是继承了AppendingState接口,用接口命名表示该state支持state合并

AppendingState

flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/state/AppendingState.java

代码语言:javascript
复制
/**
 * Base interface for partitioned state that supports adding elements and inspecting the current
 * state. Elements can either be kept in a buffer (list-like) or aggregated into one value.
 *
 * <p>The state is accessed and modified by user functions, and checkpointed consistently
 * by the system as part of the distributed snapshots.
 *
 * <p>The state is only accessible by functions applied on a {@code KeyedStream}. The key is
 * automatically supplied by the system, so the function always sees the value mapped to the
 * key of the current element. That way, the system can handle stream and state partitioning
 * consistently together.
 *
 * @param <IN> Type of the value that can be added to the state.
 * @param <OUT> Type of the value that can be retrieved from the state.
 */
@PublicEvolving
public interface AppendingState<IN, OUT> extends State {
​
    /**
     * Returns the current value for the state. When the state is not
     * partitioned the returned value is the same for all inputs in a given
     * operator instance. If state partitioning is applied, the value returned
     * depends on the current operator input, as the operator maintains an
     * independent state for each partition.
     *
     * <p><b>NOTE TO IMPLEMENTERS:</b> if the state is empty, then this method
     * should return {@code null}.
     *
     * @return The operator state value corresponding to the current input or {@code null}
     * if the state is empty.
     *
     * @throws Exception Thrown if the system cannot access the state.
     */
    OUT get() throws Exception;
​
    /**
     * Updates the operator state accessible by {@link #get()} by adding the given value
     * to the list of values. The next time {@link #get()} is called (for the same state
     * partition) the returned state will represent the updated list.
     *
     * <p>If null is passed in, the state value will remain unchanged.
     *
     * @param value The new value for the state.
     *
     * @throws Exception Thrown if the system cannot access the state.
     */
    void add(IN value) throws Exception;
​
}
  • AppendingState是partitioned state的基本接口,它继承了State接口,同时声明了get、add两个方法;get方法用于返回当前state的值,如果为空则返回null;add方法用于给state添加值

State

flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/state/State.java

代码语言:javascript
复制
/**
 * Interface that different types of partitioned state must implement.
 *
 * <p>The state is only accessible by functions applied on a {@code KeyedStream}. The key is
 * automatically supplied by the system, so the function always sees the value mapped to the
 * key of the current element. That way, the system can handle stream and state partitioning
 * consistently together.
 */
@PublicEvolving
public interface State {
​
    /**
     * Removes the value mapped under the current key.
     */
    void clear();
}
  • State接口定义了所有不同partitioned state实现必须实现的方法,这里定义了clear方法用于清空当前state的所有值

RegisteredOperatorStateBackendMetaInfo

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/RegisteredOperatorStateBackendMetaInfo.java

代码语言:javascript
复制
/**
 * Compound meta information for a registered state in an operator state backend.
 * This contains the state name, assignment mode, and state partition serializer.
 *
 * @param <S> Type of the state.
 */
public class RegisteredOperatorStateBackendMetaInfo<S> extends RegisteredStateMetaInfoBase {
​
    /**
     * The mode how elements in this state are assigned to tasks during restore
     */
    @Nonnull
    private final OperatorStateHandle.Mode assignmentMode;
​
    /**
     * The type serializer for the elements in the state list
     */
    @Nonnull
    private final TypeSerializer<S> partitionStateSerializer;
​
    public RegisteredOperatorStateBackendMetaInfo(
            @Nonnull String name,
            @Nonnull TypeSerializer<S> partitionStateSerializer,
            @Nonnull OperatorStateHandle.Mode assignmentMode) {
        super(name);
        this.partitionStateSerializer = partitionStateSerializer;
        this.assignmentMode = assignmentMode;
    }
​
    private RegisteredOperatorStateBackendMetaInfo(@Nonnull RegisteredOperatorStateBackendMetaInfo<S> copy) {
        this(
            Preconditions.checkNotNull(copy).name,
            copy.partitionStateSerializer.duplicate(),
            copy.assignmentMode);
    }
​
    @SuppressWarnings("unchecked")
    public RegisteredOperatorStateBackendMetaInfo(@Nonnull StateMetaInfoSnapshot snapshot) {
        this(
            snapshot.getName(),
            (TypeSerializer<S>) Preconditions.checkNotNull(
                snapshot.restoreTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER)),
            OperatorStateHandle.Mode.valueOf(
                snapshot.getOption(StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE)));
        Preconditions.checkState(StateMetaInfoSnapshot.BackendStateType.OPERATOR == snapshot.getBackendStateType());
    }
​
    /**
     * Creates a deep copy of the itself.
     */
    @Nonnull
    public RegisteredOperatorStateBackendMetaInfo<S> deepCopy() {
        return new RegisteredOperatorStateBackendMetaInfo<>(this);
    }
​
    @Nonnull
    @Override
    public StateMetaInfoSnapshot snapshot() {
        return computeSnapshot();
    }
​
    //......
​
    @Nonnull
    private StateMetaInfoSnapshot computeSnapshot() {
        Map<String, String> optionsMap = Collections.singletonMap(
            StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE.toString(),
            assignmentMode.toString());
        String valueSerializerKey = StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString();
        Map<String, TypeSerializer<?>> serializerMap =
            Collections.singletonMap(valueSerializerKey, partitionStateSerializer.duplicate());
        Map<String, TypeSerializerSnapshot<?>> serializerConfigSnapshotsMap =
            Collections.singletonMap(valueSerializerKey, partitionStateSerializer.snapshotConfiguration());
​
        return new StateMetaInfoSnapshot(
            name,
            StateMetaInfoSnapshot.BackendStateType.OPERATOR,
            optionsMap,
            serializerConfigSnapshotsMap,
            serializerMap);
    }
}
  • RegisteredOperatorStateBackendMetaInfo继承了抽象类RegisteredStateMetaInfoBase,实现了snapshot的抽象方法,这里是通过computeSnapshot方法来实现;computeSnapshot方法主要是构造StateMetaInfoSnapshot所需的optionsMap、serializerConfigSnapshotsMap、serializerMap

小结

  • flink的manageed operator state仅仅支持ListState,DefaultOperatorStateBackend使用的ListState实现是PartitionableListState,其内部使用的是ArrayList(internalList)来存储state,而stateMetaInfo使用的是RegisteredOperatorStateBackendMetaInfo
  • PartitionableListState实现了ListState接口(update、addAll方法);而ListState接口继承了MergingState接口(指定OUT的泛型为Iterable<T>);MergingState接口没有声明其他方法,它继承了AppendingState接口;AppendingState接口继承了State接口,同时声明了get、add方法;State接口则定义了clear方法
  • RegisteredOperatorStateBackendMetaInfo继承了抽象类RegisteredStateMetaInfoBase,实现了snapshot的抽象方法,这里是通过computeSnapshot方法来实现;computeSnapshot方法主要是构造StateMetaInfoSnapshot所需的optionsMap、serializerConfigSnapshotsMap、serializerMap

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • PartitionableListState
  • ListState
    • MergingState
      • AppendingState
        • State
        • RegisteredOperatorStateBackendMetaInfo
        • 小结
        • doc
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档