前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊flink的BoltWrapper

聊聊flink的BoltWrapper

原创
作者头像
code4it
发布2018-11-25 11:03:00
8120
发布2018-11-25 11:03:00
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下flink的BoltWrapper

BoltWrapper

flink-storm_2.11-1.6.2-sources.jar!/org/apache/flink/storm/wrappers/BoltWrapper.java

代码语言:javascript
复制
/**
 * A {@link BoltWrapper} wraps an {@link IRichBolt} in order to execute the Storm bolt within a Flink Streaming program.
 * It takes the Flink input tuples of type {@code IN} and transforms them into {@link StormTuple}s that the bolt can
 * process. Furthermore, it takes the bolt's output tuples and transforms them into Flink tuples of type {@code OUT}
 * (see {@link AbstractStormCollector} for supported types).<br/>
 * <br/>
 * <strong>Works for single input streams only! See {@link MergedInputsBoltWrapper} for multi-input stream
 * Bolts.</strong>
 */
public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements OneInputStreamOperator<IN, OUT> {
​
    @Override
    public void open() throws Exception {
        super.open();
​
        this.flinkCollector = new TimestampedCollector<>(this.output);
​
        GlobalJobParameters config = getExecutionConfig().getGlobalJobParameters();
        StormConfig stormConfig = new StormConfig();
​
        if (config != null) {
            if (config instanceof StormConfig) {
                stormConfig = (StormConfig) config;
            } else {
                stormConfig.putAll(config.toMap());
            }
        }
​
        this.topologyContext = WrapperSetupHelper.createTopologyContext(
                getRuntimeContext(), this.bolt, this.name, this.stormTopology, stormConfig);
​
        final OutputCollector stormCollector = new OutputCollector(new BoltCollector<OUT>(
                this.numberOfAttributes, this.topologyContext.getThisTaskId(), this.flinkCollector));
​
        if (this.stormTopology != null) {
            Map<GlobalStreamId, Grouping> inputs = this.topologyContext.getThisSources();
​
            for (GlobalStreamId inputStream : inputs.keySet()) {
                for (Integer tid : this.topologyContext.getComponentTasks(inputStream
                        .get_componentId())) {
                    this.inputComponentIds.put(tid, inputStream.get_componentId());
                    this.inputStreamIds.put(tid, inputStream.get_streamId());
                    this.inputSchemas.put(tid,
                            this.topologyContext.getComponentOutputFields(inputStream));
                }
            }
        }
​
        this.bolt.prepare(stormConfig, this.topologyContext, stormCollector);
    }
​
    @Override
    public void dispose() throws Exception {
        super.dispose();
        this.bolt.cleanup();
    }
​
    @Override
    public void processElement(final StreamRecord<IN> element) throws Exception {
        this.flinkCollector.setTimestamp(element);
​
        IN value = element.getValue();
​
        if (this.stormTopology != null) {
            Tuple tuple = (Tuple) value;
            Integer producerTaskId = tuple.getField(tuple.getArity() - 1);
​
            this.bolt.execute(new StormTuple<>(value, this.inputSchemas.get(producerTaskId),
                    producerTaskId, this.inputStreamIds.get(producerTaskId), this.inputComponentIds
                    .get(producerTaskId), MessageId.makeUnanchored()));
​
        } else {
            this.bolt.execute(new StormTuple<>(value, this.inputSchemas.get(null), -1, null, null,
                    MessageId.makeUnanchored()));
        }
    }
​
​
}
  • flink用BoltWrapper来包装storm的IRichBolt,它实现了OneInputStreamOperator接口,继承AbstractStreamOperator类
  • OneInputStreamOperator接口继承了StreamOperator接口,额外定义了processElement、processWatermark、processLatencyMarker三个接口
  • AbstractStreamOperator类实现的是StreamOperator接口,但是里头帮忙实现了processWatermark、processLatencyMarker这两个接口
  • BoltWrapper里头主要是实现OneInputStreamOperator接口的processElement方法,然后是覆盖StreamOperator接口定义的open及dispose方法
  • open方法有个要点就是调用bolt的prepare方法,传入包装BoltCollector的OutputCollector,通过BoltCollector来收集bolt发射的数据到flink,它使用的是flink的TimestampedCollector

BoltCollector

flink-storm_2.11-1.6.2-sources.jar!/org/apache/flink/storm/wrappers/BoltCollector.java

代码语言:javascript
复制
/**
 * A {@link BoltCollector} is used by {@link BoltWrapper} to provided an Storm compatible
 * output collector to the wrapped bolt. It transforms the emitted Storm tuples into Flink tuples
 * and emits them via the provide {@link Output} object.
 */
class BoltCollector<OUT> extends AbstractStormCollector<OUT> implements IOutputCollector {
​
    /** The Flink output Collector. */
    private final Collector<OUT> flinkOutput;
​
    /**
     * Instantiates a new {@link BoltCollector} that emits Flink tuples to the given Flink output object. If the
     * number of attributes is negative, any output type is supported (ie, raw type). If the number of attributes is
     * between 0 and 25, the output type is {@link Tuple0} to {@link Tuple25}, respectively.
     *
     * @param numberOfAttributes
     *            The number of attributes of the emitted tuples per output stream.
     * @param taskId
     *            The ID of the producer task (negative value for unknown).
     * @param flinkOutput
     *            The Flink output object to be used.
     * @throws UnsupportedOperationException
     *             if the specified number of attributes is greater than 25
     */
    BoltCollector(final HashMap<String, Integer> numberOfAttributes, final int taskId,
            final Collector<OUT> flinkOutput) throws UnsupportedOperationException {
        super(numberOfAttributes, taskId);
        assert (flinkOutput != null);
        this.flinkOutput = flinkOutput;
    }
​
    @Override
    protected List<Integer> doEmit(final OUT flinkTuple) {
        this.flinkOutput.collect(flinkTuple);
        // TODO
        return null;
    }
​
    @Override
    public void reportError(final Throwable error) {
        // not sure, if Flink can support this
    }
​
    @Override
    public List<Integer> emit(final String streamId, final Collection<Tuple> anchors, final List<Object> tuple) {
        return this.tansformAndEmit(streamId, tuple);
    }
​
    @Override
    public void emitDirect(final int taskId, final String streamId, final Collection<Tuple> anchors, final List<Object> tuple) {
        throw new UnsupportedOperationException("Direct emit is not supported by Flink");
    }
​
    @Override
    public void ack(final Tuple input) {}
​
    @Override
    public void fail(final Tuple input) {}
​
    @Override
    public void resetTimeout(Tuple var1) {}
​
}
  • BoltCollector实现了storm的IOutputCollector接口,只是ack、fail、resetTimeout、reportError操作都为空,不支持emitDirect操作
  • doEmit方法调用的是flinkOutput.collect(flinkTuple)
  • emit方法调用的是tansformAndEmit(streamId, tuple),它由继承的父类AbstractStormCollector实现

TimestampedCollector.collect

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/operators/TimestampedCollector.java

代码语言:javascript
复制
/**
 * Wrapper around an {@link Output} for user functions that expect a {@link Collector}.
 * Before giving the {@link TimestampedCollector} to a user function you must set
 * the timestamp that should be attached to emitted elements. Most operators
 * would set the timestamp of the incoming
 * {@link org.apache.flink.streaming.runtime.streamrecord.StreamRecord} here.
 *
 * @param <T> The type of the elements that can be emitted.
 */
@Internal
public class TimestampedCollector<T> implements Collector<T> {
​
    private final Output<StreamRecord<T>> output;
​
    private final StreamRecord<T> reuse;
​
    /**
     * Creates a new {@link TimestampedCollector} that wraps the given {@link Output}.
     */
    public TimestampedCollector(Output<StreamRecord<T>> output) {
        this.output = output;
        this.reuse = new StreamRecord<T>(null);
    }
​
    @Override
    public void collect(T record) {
        output.collect(reuse.replace(record));
    }
​
    public void setTimestamp(StreamRecord<?> timestampBase) {
        if (timestampBase.hasTimestamp()) {
            reuse.setTimestamp(timestampBase.getTimestamp());
        } else {
            reuse.eraseTimestamp();
        }
    }
​
    public void setAbsoluteTimestamp(long timestamp) {
        reuse.setTimestamp(timestamp);
    }
​
    public void eraseTimestamp() {
        reuse.eraseTimestamp();
    }
​
    @Override
    public void close() {
        output.close();
    }
}
  • TimestampedCollector实现了flink的Collector接口,这里头额外新增了setTimestamp、setAbsoluteTimestamp、eraseTimestamp方法
  • 它使用了StreamRecord对象,它里头有value、timestamp、hasTimestamp三个属性,可以将value与时间戳关联起来
  • 这里的collect方法调用了StreamRecord的replace返回的对象,replace方法只是更新了value引用,但是里头的时间戳没有更新

AbstractStormCollector.tansformAndEmit

flink-storm_2.11-1.6.2-sources.jar!/org/apache/flink/storm/wrappers/AbstractStormCollector.java

代码语言:javascript
复制
    /**
     * Transforms a Storm tuple into a Flink tuple of type {@code OUT} and emits this tuple via {@link #doEmit(Object)}
     * to the specified output stream.
     *
     * @param The
     *            The output stream id.
     * @param tuple
     *            The Storm tuple to be emitted.
     * @return the return value of {@link #doEmit(Object)}
     */
    @SuppressWarnings("unchecked")
    protected final List<Integer> tansformAndEmit(final String streamId, final List<Object> tuple) {
        List<Integer> taskIds;
​
        int numAtt = this.numberOfAttributes.get(streamId);
        int taskIdIdx = numAtt;
        if (this.taskId >= 0 && numAtt < 0) {
            numAtt = 1;
            taskIdIdx = 0;
        }
        if (numAtt >= 0) {
            assert (tuple.size() == numAtt);
            Tuple out = this.outputTuple.get(streamId);
            for (int i = 0; i < numAtt; ++i) {
                out.setField(tuple.get(i), i);
            }
            if (this.taskId >= 0) {
                out.setField(this.taskId, taskIdIdx);
            }
            if (this.split) {
                this.splitTuple.streamId = streamId;
                this.splitTuple.value = out;
​
                taskIds = doEmit((OUT) this.splitTuple);
            } else {
                taskIds = doEmit((OUT) out);
            }
​
        } else {
            assert (tuple.size() == 1);
            if (this.split) {
                this.splitTuple.streamId = streamId;
                this.splitTuple.value = tuple.get(0);
​
                taskIds = doEmit((OUT) this.splitTuple);
            } else {
                taskIds = doEmit((OUT) tuple.get(0));
            }
        }
        this.tupleEmitted = true;
​
        return taskIds;
    }
  • AbstractStormCollector.tansformAndEmit,这里主要处理了split的场景,即一个bolt declare了多个stream,最后都通过子类BoltCollector.doEmit来发射数据
  • 如果split为true,则传给doEmit方法的是splitTuple,即SplitStreamType,它记录了streamId及其value
  • 如果split为false,则传给doEmit方法的是Tuple类型,即相当于SplitStreamType中的value,相比于SplitStreamType少了streamId信息

Task.run

flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/taskmanager/Task.java

代码语言:javascript
复制
/**
 * The Task represents one execution of a parallel subtask on a TaskManager.
 * A Task wraps a Flink operator (which may be a user function) and
 * runs it, providing all services necessary for example to consume input data,
 * produce its results (intermediate result partitions) and communicate
 * with the JobManager.
 *
 * <p>The Flink operators (implemented as subclasses of
 * {@link AbstractInvokable} have only data readers, -writers, and certain event callbacks.
 * The task connects those to the network stack and actor messages, and tracks the state
 * of the execution and handles exceptions.
 *
 * <p>Tasks have no knowledge about how they relate to other tasks, or whether they
 * are the first attempt to execute the task, or a repeated attempt. All of that
 * is only known to the JobManager. All the task knows are its own runnable code,
 * the task's configuration, and the IDs of the intermediate results to consume and
 * produce (if any).
 *
 * <p>Each Task is run by one dedicated thread.
 */
public class Task implements Runnable, TaskActions, CheckpointListener {
    //......
​
    /**
     * The core work method that bootstraps the task and executes its code.
     */
    @Override
    public void run() {
            //......
            // now load and instantiate the task's invokable code
            invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env);
​
            // ----------------------------------------------------------------
            //  actual task core work
            // ----------------------------------------------------------------
​
            // we must make strictly sure that the invokable is accessible to the cancel() call
            // by the time we switched to running.
            this.invokable = invokable;
​
            // switch to the RUNNING state, if that fails, we have been canceled/failed in the meantime
            if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
                throw new CancelTaskException();
            }
​
            // notify everyone that we switched to running
            notifyObservers(ExecutionState.RUNNING, null);
            taskManagerActions.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING));
​
            // make sure the user code classloader is accessible thread-locally
            executingThread.setContextClassLoader(userCodeClassLoader);
​
            // run the invokable
            invokable.invoke();
​
            //......
    }
}
  • Task的run方法会调用invokable.invoke(),这里的invokable为StreamTask

StreamTask.invoke

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/tasks/StreamTask.java

代码语言:javascript
复制
/**
 * Base class for all streaming tasks. A task is the unit of local processing that is deployed
 * and executed by the TaskManagers. Each task runs one or more {@link StreamOperator}s which form
 * the Task's operator chain. Operators that are chained together execute synchronously in the
 * same thread and hence on the same stream partition. A common case for these chains
 * are successive map/flatmap/filter tasks.
 *
 * <p>The task chain contains one "head" operator and multiple chained operators.
 * The StreamTask is specialized for the type of the head operator: one-input and two-input tasks,
 * as well as for sources, iteration heads and iteration tails.
 *
 * <p>The Task class deals with the setup of the streams read by the head operator, and the streams
 * produced by the operators at the ends of the operator chain. Note that the chain may fork and
 * thus have multiple ends.
 *
 * <p>The life cycle of the task is set up as follows:
 * <pre>{@code
 *  -- setInitialState -> provides state of all operators in the chain
 *
 *  -- invoke()
 *        |
 *        +----> Create basic utils (config, etc) and load the chain of operators
 *        +----> operators.setup()
 *        +----> task specific init()
 *        +----> initialize-operator-states()
 *        +----> open-operators()
 *        +----> run()
 *        +----> close-operators()
 *        +----> dispose-operators()
 *        +----> common cleanup
 *        +----> task specific cleanup()
 * }</pre>
 *
 * <p>The {@code StreamTask} has a lock object called {@code lock}. All calls to methods on a
 * {@code StreamOperator} must be synchronized on this lock object to ensure that no methods
 * are called concurrently.
 *
 * @param <OUT>
 * @param <OP>
 */
@Internal
public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
        extends AbstractInvokable
        implements AsyncExceptionHandler {
​
        //......
​
    @Override
    public final void invoke() throws Exception {
​
        boolean disposed = false;
        try {
            //......
​
            // let the task do its work
            isRunning = true;
            run();
​
            // if this left the run() method cleanly despite the fact that this was canceled,
            // make sure the "clean shutdown" is not attempted
            if (canceled) {
                throw new CancelTaskException();
            }
​
            LOG.debug("Finished task {}", getName());
​
            //......
        }
        finally {
            // clean up everything we initialized
            isRunning = false;
​
            //......
        }
    }
}
  • StreamTask的invoke方法里头调用了子类的run方法,这里子类为OneInputStreamTask

OneInputStreamTask.run

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java

代码语言:javascript
复制
    @Override
    protected void run() throws Exception {
        // cache processor reference on the stack, to make the code more JIT friendly
        final StreamInputProcessor<IN> inputProcessor = this.inputProcessor;
​
        while (running && inputProcessor.processInput()) {
            // all the work happens in the "processInput" method
        }
    }
  • 该run方法主要是调用inputProcessor.processInput(),这里的inputProcessor为StreamInputProcessor

StreamInputProcessor.processInput

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java

代码语言:javascript
复制
    public boolean processInput() throws Exception {
        if (isFinished) {
            return false;
        }
        if (numRecordsIn == null) {
            try {
                numRecordsIn = ((OperatorMetricGroup) streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
            } catch (Exception e) {
                LOG.warn("An exception occurred during the metrics setup.", e);
                numRecordsIn = new SimpleCounter();
            }
        }
​
        while (true) {
            if (currentRecordDeserializer != null) {
                DeserializationResult result = currentRecordDeserializer.getNextRecord(deserializationDelegate);
​
                if (result.isBufferConsumed()) {
                    currentRecordDeserializer.getCurrentBuffer().recycleBuffer();
                    currentRecordDeserializer = null;
                }
​
                if (result.isFullRecord()) {
                    StreamElement recordOrMark = deserializationDelegate.getInstance();
​
                    if (recordOrMark.isWatermark()) {
                        // handle watermark
                        statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), currentChannel);
                        continue;
                    } else if (recordOrMark.isStreamStatus()) {
                        // handle stream status
                        statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), currentChannel);
                        continue;
                    } else if (recordOrMark.isLatencyMarker()) {
                        // handle latency marker
                        synchronized (lock) {
                            streamOperator.processLatencyMarker(recordOrMark.asLatencyMarker());
                        }
                        continue;
                    } else {
                        // now we can do the actual processing
                        StreamRecord<IN> record = recordOrMark.asRecord();
                        synchronized (lock) {
                            numRecordsIn.inc();
                            streamOperator.setKeyContextElement1(record);
                            streamOperator.processElement(record);
                        }
                        return true;
                    }
                }
            }
​
            //......
        }
    }
  • 该processInput方法,先是通过currentRecordDeserializer.getNextRecord(deserializationDelegate)获取nextRecord,之后有调用到streamOperator.processElement(record)来处理,这里的streamOperator为BoltWrapper

小结

  • flink用BoltWrapper来包装storm的IRichBolt,它实现OneInputStreamOperator接口的processElement方法,在该方法中执行bolt.execute方法;另外在实现StreamOperator的open方法中调用bolt的prepare方法,传入包装BoltCollector的OutputCollector,通过BoltCollector来收集bolt.execute时发射的数据到flink,它使用的是flink的TimestampedCollector
  • BoltCollector的emit方法内部调用了AbstractStormCollector.tansformAndEmit(它最后调用BoltCollector.doEmit方法来发射),针对多个stream的场景,封装了SplitStreamType的tuple给到doEmit方法;如果只有一个stream,则仅仅将普通的tuple传给doEmit方法
  • flink的Task的run方法会调用StreamTask的invoke方法,而StreamTask的invoke方法会调用子类(这里子类为OneInputStreamTask)的run方法,OneInputStreamTask的run方法是不断循环调用inputProcessor.processInput(),这里的inputProcessor为StreamInputProcessor,它的processInput()会调用currentRecordDeserializer.getNextRecord(deserializationDelegate)获取nextRecord,之后根据条件选择调用streamOperator.processElement(record)方法,这里的streamOperator为BoltWrapper,而BoltWrapper的processElement正好调用storm bolt的execute方法来执行bolt逻辑并使用flink的BoltCollector进行发射

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • BoltWrapper
  • BoltCollector
    • TimestampedCollector.collect
      • AbstractStormCollector.tansformAndEmit
      • Task.run
        • StreamTask.invoke
          • OneInputStreamTask.run
            • StreamInputProcessor.processInput
            • 小结
            • doc
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档