前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊flink KeyedStream的reduce操作

聊聊flink KeyedStream的reduce操作

原创
作者头像
code4it
发布2018-12-29 12:29:29
4.1K0
发布2018-12-29 12:29:29
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下flink KeyedStream的reduce操作

实例

代码语言:javascript
复制
    @Test
    public void testWordCount() throws Exception {
        // Checking input parameters
//        final ParameterTool params = ParameterTool.fromArgs(args);
​
        // set up the execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
​
        // make parameters available in the web interface
//        env.getConfig().setGlobalJobParameters(params);
​
        // get input data
        DataStream<String> text = env.fromElements(WORDS);
​
        DataStream<Tuple2<String, Integer>> counts =
                // split up the lines in pairs (2-tuples) containing: (word,1)
                text.flatMap(new Tokenizer())
                        // group by the tuple field "0" and sum up tuple field "1"
                        .keyBy(0)
                        .reduce(new ReduceFunction<Tuple2<String, Integer>>() {
                            @Override
                            public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) {
                                System.out.println("value1:"+value1.f1+";value2:"+value2.f1);
                                return new Tuple2<>(value1.f0, value1.f1 + value2.f1);
                            }
                        });
​
        // emit result
        System.out.println("Printing result to stdout. Use --output to specify output path.");
        counts.print();
​
        // execute program
        env.execute("Streaming WordCount");
    }
  • 这里对KeyedStream进行reduce操作,自定义了ReduceFunction,在reduce方法里头累加word的计数

KeyedStream.reduce

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/KeyedStream.java

代码语言:javascript
复制
@Public
public class KeyedStream<T, KEY> extends DataStream<T> {
    //......
​
    /**
     * Applies a reduce transformation on the grouped data stream grouped on by
     * the given key position. The {@link ReduceFunction} will receive input
     * values based on the key value. Only input values with the same key will
     * go to the same reducer.
     *
     * @param reducer
     *            The {@link ReduceFunction} that will be called for every
     *            element of the input values with the same key.
     * @return The transformed DataStream.
     */
    public SingleOutputStreamOperator<T> reduce(ReduceFunction<T> reducer) {
        return transform("Keyed Reduce", getType(), new StreamGroupedReduce<T>(
                clean(reducer), getType().createSerializer(getExecutionConfig())));
    }
​
    @Override
    @PublicEvolving
    public <R> SingleOutputStreamOperator<R> transform(String operatorName,
            TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {
​
        SingleOutputStreamOperator<R> returnStream = super.transform(operatorName, outTypeInfo, operator);
​
        // inject the key selector and key type
        OneInputTransformation<T, R> transform = (OneInputTransformation<T, R>) returnStream.getTransformation();
        transform.setStateKeySelector(keySelector);
        transform.setStateKeyType(keyType);
​
        return returnStream;
    }
​
    //......
}
  • KeyedStream的reduce方法调用了transform方法,而构造的OneInputStreamOperator为StreamGroupedReduce

ReduceFunction

flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/functions/ReduceFunction.java

代码语言:javascript
复制
@Public
@FunctionalInterface
public interface ReduceFunction<T> extends Function, Serializable {
​
    /**
     * The core method of ReduceFunction, combining two values into one value of the same type.
     * The reduce function is consecutively applied to all values of a group until only a single value remains.
     *
     * @param value1 The first value to combine.
     * @param value2 The second value to combine.
     * @return The combined value of both input values.
     *
     * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
     *                   to fail and may trigger recovery.
     */
    T reduce(T value1, T value2) throws Exception;
}
  • ReduceFunction定义了reduce方法,它主要是用来将两个同类型的值操作为一个同类型的值,第一个参数为前面reduce的结果,第二参数为当前的元素

Task.run

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/taskmanager/Task.java

代码语言:javascript
复制
/**
 * The Task represents one execution of a parallel subtask on a TaskManager.
 * A Task wraps a Flink operator (which may be a user function) and
 * runs it, providing all services necessary for example to consume input data,
 * produce its results (intermediate result partitions) and communicate
 * with the JobManager.
 *
 * <p>The Flink operators (implemented as subclasses of
 * {@link AbstractInvokable} have only data readers, -writers, and certain event callbacks.
 * The task connects those to the network stack and actor messages, and tracks the state
 * of the execution and handles exceptions.
 *
 * <p>Tasks have no knowledge about how they relate to other tasks, or whether they
 * are the first attempt to execute the task, or a repeated attempt. All of that
 * is only known to the JobManager. All the task knows are its own runnable code,
 * the task's configuration, and the IDs of the intermediate results to consume and
 * produce (if any).
 *
 * <p>Each Task is run by one dedicated thread.
 */
public class Task implements Runnable, TaskActions, CheckpointListener {
    //......
    /**
     * The core work method that bootstraps the task and executes its code.
     */
    @Override
    public void run() {
​
        // ----------------------------
        //  Initial State transition
        // ----------------------------
        //......
​
        // all resource acquisitions and registrations from here on
        // need to be undone in the end
        Map<String, Future<Path>> distributedCacheEntries = new HashMap<>();
        AbstractInvokable invokable = null;
​
        try {
​
            // now load and instantiate the task's invokable code
            invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env);
​
            // ----------------------------------------------------------------
            //  actual task core work
            // ----------------------------------------------------------------
​
            // we must make strictly sure that the invokable is accessible to the cancel() call
            // by the time we switched to running.
            this.invokable = invokable;
​
            // switch to the RUNNING state, if that fails, we have been canceled/failed in the meantime
            if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
                throw new CancelTaskException();
            }
​
            // notify everyone that we switched to running
            taskManagerActions.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING));
​
            // make sure the user code classloader is accessible thread-locally
            executingThread.setContextClassLoader(userCodeClassLoader);
​
            // run the invokable
            invokable.invoke();
​
            //......
        }
        catch (Throwable t) {
            //......
        }
        finally {
            //......
        }   
    }
}
  • Task的run方法会调用invokable.invoke(),这里的invokable为OneInputStreamTask,而OneInputStreamTask继承了StreamTask,这里实际调用的invoke()方法是StreamTask里头的

StreamTask.invoke

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/runtime/tasks/StreamTask.java

代码语言:javascript
复制
@Internal
public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
        extends AbstractInvokable
        implements AsyncExceptionHandler {
​
    //......
​
    protected abstract void run() throws Exception;
​
    @Override
    public final void invoke() throws Exception {
​
        boolean disposed = false;
        try {
            // -------- Initialize ---------
            LOG.debug("Initializing {}.", getName());
​
            asyncOperationsThreadPool = Executors.newCachedThreadPool();
​
            CheckpointExceptionHandlerFactory cpExceptionHandlerFactory = createCheckpointExceptionHandlerFactory();
​
            synchronousCheckpointExceptionHandler = cpExceptionHandlerFactory.createCheckpointExceptionHandler(
                getExecutionConfig().isFailTaskOnCheckpointError(),
                getEnvironment());
​
            asynchronousCheckpointExceptionHandler = new AsyncCheckpointExceptionHandler(this);
​
            stateBackend = createStateBackend();
            checkpointStorage = stateBackend.createCheckpointStorage(getEnvironment().getJobID());
​
            // if the clock is not already set, then assign a default TimeServiceProvider
            if (timerService == null) {
                ThreadFactory timerThreadFactory = new DispatcherThreadFactory(TRIGGER_THREAD_GROUP,
                    "Time Trigger for " + getName(), getUserCodeClassLoader());
​
                timerService = new SystemProcessingTimeService(this, getCheckpointLock(), timerThreadFactory);
            }
​
            operatorChain = new OperatorChain<>(this, streamRecordWriters);
            headOperator = operatorChain.getHeadOperator();
​
            // task specific initialization
            init();
​
            // save the work of reloading state, etc, if the task is already canceled
            if (canceled) {
                throw new CancelTaskException();
            }
​
            // -------- Invoke --------
            LOG.debug("Invoking {}", getName());
​
            // we need to make sure that any triggers scheduled in open() cannot be
            // executed before all operators are opened
            synchronized (lock) {
​
                // both the following operations are protected by the lock
                // so that we avoid race conditions in the case that initializeState()
                // registers a timer, that fires before the open() is called.
​
                initializeState();
                openAllOperators();
            }
​
            // final check to exit early before starting to run
            if (canceled) {
                throw new CancelTaskException();
            }
​
            // let the task do its work
            isRunning = true;
            run();
​
            // if this left the run() method cleanly despite the fact that this was canceled,
            // make sure the "clean shutdown" is not attempted
            if (canceled) {
                throw new CancelTaskException();
            }
​
            LOG.debug("Finished task {}", getName());
​
            //......
        }
        finally {
            //......
        }
    }
}
  • StreamTask的invoke方法会调用run方法,该方法为抽象方法,由子类实现,这里就是OneInputStreamTask的run方法

OneInputStreamTask.run

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java

代码语言:javascript
复制
@Internal
public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamOperator<IN, OUT>> {
​
    private StreamInputProcessor<IN> inputProcessor;
​
    private volatile boolean running = true;
​
    private final WatermarkGauge inputWatermarkGauge = new WatermarkGauge();
​
    /**
     * Constructor for initialization, possibly with initial state (recovery / savepoint / etc).
     *
     * @param env The task environment for this task.
     */
    public OneInputStreamTask(Environment env) {
        super(env);
    }
​
    /**
     * Constructor for initialization, possibly with initial state (recovery / savepoint / etc).
     *
     * <p>This constructor accepts a special {@link ProcessingTimeService}. By default (and if
     * null is passes for the time provider) a {@link SystemProcessingTimeService DefaultTimerService}
     * will be used.
     *
     * @param env The task environment for this task.
     * @param timeProvider Optionally, a specific time provider to use.
     */
    @VisibleForTesting
    public OneInputStreamTask(
            Environment env,
            @Nullable ProcessingTimeService timeProvider) {
        super(env, timeProvider);
    }
​
    @Override
    public void init() throws Exception {
        StreamConfig configuration = getConfiguration();
​
        TypeSerializer<IN> inSerializer = configuration.getTypeSerializerIn1(getUserCodeClassLoader());
        int numberOfInputs = configuration.getNumberOfInputs();
​
        if (numberOfInputs > 0) {
            InputGate[] inputGates = getEnvironment().getAllInputGates();
​
            inputProcessor = new StreamInputProcessor<>(
                    inputGates,
                    inSerializer,
                    this,
                    configuration.getCheckpointMode(),
                    getCheckpointLock(),
                    getEnvironment().getIOManager(),
                    getEnvironment().getTaskManagerInfo().getConfiguration(),
                    getStreamStatusMaintainer(),
                    this.headOperator,
                    getEnvironment().getMetricGroup().getIOMetricGroup(),
                    inputWatermarkGauge);
        }
        headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge);
        // wrap watermark gauge since registered metrics must be unique
        getEnvironment().getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge::getValue);
    }
​
    @Override
    protected void run() throws Exception {
        // cache processor reference on the stack, to make the code more JIT friendly
        final StreamInputProcessor<IN> inputProcessor = this.inputProcessor;
​
        while (running && inputProcessor.processInput()) {
            // all the work happens in the "processInput" method
        }
    }
​
    @Override
    protected void cleanup() throws Exception {
        if (inputProcessor != null) {
            inputProcessor.cleanup();
        }
    }
​
    @Override
    protected void cancelTask() {
        running = false;
    }
}
  • OneInputStreamTask的run方法会不断循环调用inputProcessor.processInput(),inputProcessor这里为StreamInputProcessor

StreamInputProcessor.processInput

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java

代码语言:javascript
复制
@Internal
public class StreamInputProcessor<IN> {
​
    //......
​
    public boolean processInput() throws Exception {
        if (isFinished) {
            return false;
        }
        if (numRecordsIn == null) {
            try {
                numRecordsIn = ((OperatorMetricGroup) streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
            } catch (Exception e) {
                LOG.warn("An exception occurred during the metrics setup.", e);
                numRecordsIn = new SimpleCounter();
            }
        }
​
        while (true) {
            if (currentRecordDeserializer != null) {
                DeserializationResult result = currentRecordDeserializer.getNextRecord(deserializationDelegate);
​
                if (result.isBufferConsumed()) {
                    currentRecordDeserializer.getCurrentBuffer().recycleBuffer();
                    currentRecordDeserializer = null;
                }
​
                if (result.isFullRecord()) {
                    StreamElement recordOrMark = deserializationDelegate.getInstance();
​
                    if (recordOrMark.isWatermark()) {
                        // handle watermark
                        statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), currentChannel);
                        continue;
                    } else if (recordOrMark.isStreamStatus()) {
                        // handle stream status
                        statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), currentChannel);
                        continue;
                    } else if (recordOrMark.isLatencyMarker()) {
                        // handle latency marker
                        synchronized (lock) {
                            streamOperator.processLatencyMarker(recordOrMark.asLatencyMarker());
                        }
                        continue;
                    } else {
                        // now we can do the actual processing
                        StreamRecord<IN> record = recordOrMark.asRecord();
                        synchronized (lock) {
                            numRecordsIn.inc();
                            streamOperator.setKeyContextElement1(record);
                            streamOperator.processElement(record);
                        }
                        return true;
                    }
                }
            }
​
            //......
        }
    }
​
    //......
}
  • StreamInputProcessor的processInput方法,会在while true循环里头不断处理nextRecord,这里根据StreamElement的不同类型做不同处理,如果是普通的数据,则调用streamOperator.processElement进行处理,这里的streamOperator为StreamGroupedReduce

StreamGroupedReduce.processElement

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java

代码语言:javascript
复制
/**
 * A {@link StreamOperator} for executing a {@link ReduceFunction} on a
 * {@link org.apache.flink.streaming.api.datastream.KeyedStream}.
 */
​
@Internal
public class StreamGroupedReduce<IN> extends AbstractUdfStreamOperator<IN, ReduceFunction<IN>>
        implements OneInputStreamOperator<IN, IN> {
​
    private static final long serialVersionUID = 1L;
​
    private static final String STATE_NAME = "_op_state";
​
    private transient ValueState<IN> values;
​
    private TypeSerializer<IN> serializer;
​
    public StreamGroupedReduce(ReduceFunction<IN> reducer, TypeSerializer<IN> serializer) {
        super(reducer);
        this.serializer = serializer;
    }
​
    @Override
    public void open() throws Exception {
        super.open();
        ValueStateDescriptor<IN> stateId = new ValueStateDescriptor<>(STATE_NAME, serializer);
        values = getPartitionedState(stateId);
    }
​
    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        IN value = element.getValue();
        IN currentValue = values.value();
​
        if (currentValue != null) {
            IN reduced = userFunction.reduce(currentValue, value);
            values.update(reduced);
            output.collect(element.replace(reduced));
        } else {
            values.update(value);
            output.collect(element.replace(value));
        }
    }
}
  • StreamGroupedReduce使用了ValueState存储reduce操作的结果值,在processElement方法里头调用userFunction的reduce操作,userFunction就是用户自定义的ReduceFunction,而reduce的第一个参数就是ValueState的value,即上一次reduce操作的结果值,然后第二个参数就当前element的value;而在执行完userFunction的reduce操作之后,会将该结果update到ValueState

小结

  • KeyedStream的reduce方法,里头调用了transform方法,而构造的OneInputStreamOperator为StreamGroupedReduce;reduce方法接收的是ReduceFunction,它定义了reduce方法,用来将两个同类型的值操作为一个同类型的值
  • Task的run方法会调用invokable.invoke(),这里的invokable为OneInputStreamTask,而OneInputStreamTask继承了StreamTask,这里实际调用的invoke()方法是StreamTask里头的;StreamTask的invoke方法会调用run方法,该方法为抽象方法,由子类实现,这里就是OneInputStreamTask的run方法;OneInputStreamTask的run方法,会不断循环调用inputProcessor.processInput(),inputProcessor这里为StreamInputProcessor;StreamInputProcessor的processInput方法,会在while true循环里头不断处理nextRecord,这里根据StreamElement的不同类型做不同处理,如果是普通的数据,则调用streamOperator.processElement进行处理,这里的streamOperator为StreamGroupedReduce
  • StreamGroupedReduce的processElement方法会调用userFunction的reduce操作,第一个参数就是ValueState的value,即上一次reduce操作的结果值,然后第二个参数就当前element的value;而在执行完userFunction的reduce操作之后,会将该结果update到ValueState

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 实例
  • KeyedStream.reduce
  • ReduceFunction
  • Task.run
  • StreamTask.invoke
  • OneInputStreamTask.run
  • StreamInputProcessor.processInput
  • StreamGroupedReduce.processElement
  • 小结
  • doc
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档