前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊storm的CheckpointSpout

聊聊storm的CheckpointSpout

原创
作者头像
code4it
发布2018-11-02 09:52:08
7260
发布2018-11-02 09:52:08
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下storm的CheckpointSpout

TopologyBuilder

storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java

代码语言:javascript
复制
    public StormTopology createTopology() {
        Map<String, Bolt> boltSpecs = new HashMap<>();
        Map<String, SpoutSpec> spoutSpecs = new HashMap<>();
        maybeAddCheckpointSpout();
        for (String boltId : _bolts.keySet()) {
            IRichBolt bolt = _bolts.get(boltId);
            bolt = maybeAddCheckpointTupleForwarder(bolt);
            ComponentCommon common = getComponentCommon(boltId, bolt);
            try {
                maybeAddCheckpointInputs(common);
                boltSpecs.put(boltId, new Bolt(ComponentObject.serialized_java(Utils.javaSerialize(bolt)), common));
            } catch (RuntimeException wrapperCause) {
                if (wrapperCause.getCause() != null && NotSerializableException.class.equals(wrapperCause.getCause().getClass())) {
                    throw new IllegalStateException(
                        "Bolt '" + boltId + "' contains a non-serializable field of type " + wrapperCause.getCause().getMessage() + ", " +
                        "which was instantiated prior to topology creation. " + wrapperCause.getCause().getMessage() + " " +
                        "should be instantiated within the prepare method of '" + boltId + " at the earliest.", wrapperCause);
                }
                throw wrapperCause;
            }
        }
        for (String spoutId : _spouts.keySet()) {
            IRichSpout spout = _spouts.get(spoutId);
            ComponentCommon common = getComponentCommon(spoutId, spout);
            try {
                spoutSpecs.put(spoutId, new SpoutSpec(ComponentObject.serialized_java(Utils.javaSerialize(spout)), common));
            } catch (RuntimeException wrapperCause) {
                if (wrapperCause.getCause() != null && NotSerializableException.class.equals(wrapperCause.getCause().getClass())) {
                    throw new IllegalStateException(
                        "Spout '" + spoutId + "' contains a non-serializable field of type " + wrapperCause.getCause().getMessage() + ", " +
                        "which was instantiated prior to topology creation. " + wrapperCause.getCause().getMessage() + " " +
                        "should be instantiated within the prepare method of '" + spoutId + " at the earliest.", wrapperCause);
                }
                throw wrapperCause;
            }
        }

        StormTopology stormTopology = new StormTopology(spoutSpecs,
                                                        boltSpecs,
                                                        new HashMap<>());

        stormTopology.set_worker_hooks(_workerHooks);

        if (!_componentToSharedMemory.isEmpty()) {
            stormTopology.set_component_to_shared_memory(_componentToSharedMemory);
            stormTopology.set_shared_memory(_sharedMemory);
        }

        return Utils.addVersions(stormTopology);
    }

    /**
     * If the topology has at least one stateful bolt add a {@link CheckpointSpout} component to the topology.
     */
    private void maybeAddCheckpointSpout() {
        if (hasStatefulBolt) {
            setSpout(CHECKPOINT_COMPONENT_ID, new CheckpointSpout(), 1);
        }
    }

    private void maybeAddCheckpointInputs(ComponentCommon common) {
        if (hasStatefulBolt) {
            addCheckPointInputs(common);
        }
    }

    /**
     * If the topology has at least one stateful bolt all the non-stateful bolts are wrapped in {@link CheckpointTupleForwarder} so that the
     * checkpoint tuples can flow through the topology.
     */
    private IRichBolt maybeAddCheckpointTupleForwarder(IRichBolt bolt) {
        if (hasStatefulBolt && !(bolt instanceof StatefulBoltExecutor)) {
            bolt = new CheckpointTupleForwarder(bolt);
        }
        return bolt;
    }

    /**
     * For bolts that has incoming streams from spouts (the root bolts), add checkpoint stream from checkpoint spout to its input. For other
     * bolts, add checkpoint stream from the previous bolt to its input.
     */
    private void addCheckPointInputs(ComponentCommon component) {
        Set<GlobalStreamId> checkPointInputs = new HashSet<>();
        for (GlobalStreamId inputStream : component.get_inputs().keySet()) {
            String sourceId = inputStream.get_componentId();
            if (_spouts.containsKey(sourceId)) {
                checkPointInputs.add(new GlobalStreamId(CHECKPOINT_COMPONENT_ID, CHECKPOINT_STREAM_ID));
            } else {
                checkPointInputs.add(new GlobalStreamId(sourceId, CHECKPOINT_STREAM_ID));
            }
        }
        for (GlobalStreamId streamId : checkPointInputs) {
            component.put_to_inputs(streamId, Grouping.all(new NullStruct()));
        }
    }
  • TopologyBuilder在createTopology的时候,会调用maybeAddCheckpointSpout,如果是hasStatefulBolt的话,则会自动创建并添加CheckpointSpout
  • 如果是hasStatefulBolt,bolt不是StatefulBoltExecutor类型,则会使用CheckpointTupleForwarder进行包装
  • 如果是hasStatefulBolt,会调用addCheckPointInputs,配置inputs

CheckpointSpout

storm-2.0.0/storm-client/src/jvm/org/apache/storm/spout/CheckpointSpout.java

代码语言:javascript
复制
/**
 * Emits checkpoint tuples which is used to save the state of the {@link org.apache.storm.topology.IStatefulComponent} across the topology.
 * If a topology contains Stateful bolts, Checkpoint spouts are automatically added to the topology. There is only one Checkpoint task per
 * topology. Checkpoint spout stores its internal state in a {@link KeyValueState}.
 *
 * @see CheckPointState
 */
public class CheckpointSpout extends BaseRichSpout {
    public static final String CHECKPOINT_STREAM_ID = "$checkpoint";
    public static final String CHECKPOINT_COMPONENT_ID = "$checkpointspout";
    public static final String CHECKPOINT_FIELD_TXID = "txid";
    public static final String CHECKPOINT_FIELD_ACTION = "action";
    private static final Logger LOG = LoggerFactory.getLogger(CheckpointSpout.class);
    private static final String TX_STATE_KEY = "__state";
    private TopologyContext context;
    private SpoutOutputCollector collector;
    private long lastCheckpointTs;
    private int checkpointInterval;
    private int sleepInterval;
    private boolean recoveryStepInProgress;
    private boolean checkpointStepInProgress;
    private boolean recovering;
    private KeyValueState<String, CheckPointState> checkpointState;
    private CheckPointState curTxState;

    public static boolean isCheckpoint(Tuple input) {
        return CHECKPOINT_STREAM_ID.equals(input.getSourceStreamId());
    }

    @Override
    public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
        open(context, collector, loadCheckpointInterval(conf), loadCheckpointState(conf, context));
    }

    // package access for unit test
    void open(TopologyContext context, SpoutOutputCollector collector,
              int checkpointInterval, KeyValueState<String, CheckPointState> checkpointState) {
        this.context = context;
        this.collector = collector;
        this.checkpointInterval = checkpointInterval;
        this.sleepInterval = checkpointInterval / 10;
        this.checkpointState = checkpointState;
        this.curTxState = checkpointState.get(TX_STATE_KEY);
        lastCheckpointTs = 0;
        recoveryStepInProgress = false;
        checkpointStepInProgress = false;
        recovering = true;
    }

    @Override
    public void nextTuple() {
        if (shouldRecover()) {
            handleRecovery();
            startProgress();
        } else if (shouldCheckpoint()) {
            doCheckpoint();
            startProgress();
        } else {
            Utils.sleep(sleepInterval);
        }
    }

    @Override
    public void ack(Object msgId) {
        LOG.debug("Got ack with txid {}, current txState {}", msgId, curTxState);
        if (curTxState.getTxid() == ((Number) msgId).longValue()) {
            if (recovering) {
                handleRecoveryAck();
            } else {
                handleCheckpointAck();
            }
        } else {
            LOG.warn("Ack msgid {}, txState.txid {} mismatch", msgId, curTxState.getTxid());
        }
        resetProgress();
    }

    @Override
    public void fail(Object msgId) {
        LOG.debug("Got fail with msgid {}", msgId);
        if (!recovering) {
            LOG.debug("Checkpoint failed, will trigger recovery");
            recovering = true;
        }
        resetProgress();
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declareStream(CHECKPOINT_STREAM_ID, new Fields(CHECKPOINT_FIELD_TXID, CHECKPOINT_FIELD_ACTION));
    }

    private int loadCheckpointInterval(Map<String, Object> topoConf) {
        int interval = 0;
        if (topoConf.containsKey(Config.TOPOLOGY_STATE_CHECKPOINT_INTERVAL)) {
            interval = ((Number) topoConf.get(Config.TOPOLOGY_STATE_CHECKPOINT_INTERVAL)).intValue();
        }
        // ensure checkpoint interval is not less than a sane low value.
        interval = Math.max(100, interval);
        LOG.info("Checkpoint interval is {} millis", interval);
        return interval;
    }

    private boolean shouldCheckpoint() {
        return !recovering && !checkpointStepInProgress &&
               (curTxState.getState() != COMMITTED || checkpointIntervalElapsed());
    }

    private boolean checkpointIntervalElapsed() {
        return (System.currentTimeMillis() - lastCheckpointTs) > checkpointInterval;
    }

    private void doCheckpoint() {
        LOG.debug("In checkpoint");
        if (curTxState.getState() == COMMITTED) {
            saveTxState(curTxState.nextState(false));
            lastCheckpointTs = System.currentTimeMillis();
        }
        Action action = curTxState.nextAction(false);
        emit(curTxState.getTxid(), action);
    }

    private void emit(long txid, Action action) {
        LOG.debug("Current state {}, emitting txid {}, action {}", curTxState, txid, action);
        collector.emit(CHECKPOINT_STREAM_ID, new Values(txid, action), txid);
    }

    //......
}
  • CheckpointSpout从Config.TOPOLOGY_STATE_CHECKPOINT_INTERVAL(topology.state.checkpoint.interval.ms)读取checkpoint的时间间隔,defaults.yaml中默认是1000,如果没有指定,则使用100,最低值为100
  • nextTuple方法首先判断shouldRecover,如果需要恢复则调用handleRecovery进行恢复,然后startProgress;如果需要checkpoint则进行checkpoint,否则sleepInterval再进行下次判断
  • 如果不需要recover,则调用shouldCheckpoint方法判断是否需要进行checkpoint,如果当前状态不是COMMITTED或者当前时间距离上次checkpoint的时间超过了checkpointInterval,则进行doCheckpoint操作,往CHECKPOINT_STREAM_ID发送下一步的action
  • CheckpointSpout在收到ack之后会进行saveTxState操作,调用checkpointState.commit提交整个checkpoint,然后调用resetProgress重置状态
  • 如果是fail的ack,则调用resetProgress重置状态

CheckPointState

storm-2.0.0/storm-client/src/jvm/org/apache/storm/spout/CheckPointState.java

代码语言:javascript
复制
    /**
     * Get the next state based on this checkpoint state.
     *
     * @param recovering if in recovering phase
     * @return the next checkpoint state based on this state.
     */
    public CheckPointState nextState(boolean recovering) {
        CheckPointState nextState;
        switch (state) {
            case PREPARING:
                nextState = recovering ? new CheckPointState(txid - 1, COMMITTED) : new CheckPointState(txid, COMMITTING);
                break;
            case COMMITTING:
                nextState = new CheckPointState(txid, COMMITTED);
                break;
            case COMMITTED:
                nextState = recovering ? this : new CheckPointState(txid + 1, PREPARING);
                break;
            default:
                throw new IllegalStateException("Unknown state " + state);
        }
        return nextState;
    }

    /**
     * Get the next action to perform based on this checkpoint state.
     *
     * @param recovering if in recovering phase
     * @return the next action to perform based on this state
     */
    public Action nextAction(boolean recovering) {
        Action action;
        switch (state) {
            case PREPARING:
                action = recovering ? Action.ROLLBACK : Action.PREPARE;
                break;
            case COMMITTING:
                action = Action.COMMIT;
                break;
            case COMMITTED:
                action = recovering ? Action.INITSTATE : Action.PREPARE;
                break;
            default:
                throw new IllegalStateException("Unknown state " + state);
        }
        return action;
    }
  • CheckPointState提供了nextState方法进行状态的切换,nextAction方法则提供了对应state的的下个动作

BaseStatefulBoltExecutor

storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/BaseStatefulBoltExecutor.java

代码语言:javascript
复制
    public void execute(Tuple input) {
        if (CheckpointSpout.isCheckpoint(input)) {
            processCheckpoint(input);
        } else {
            handleTuple(input);
        }
    }

    /**
     * Invokes handleCheckpoint once checkpoint tuple is received on all input checkpoint streams to this component.
     */
    private void processCheckpoint(Tuple input) {
        CheckPointState.Action action = (CheckPointState.Action) input.getValueByField(CHECKPOINT_FIELD_ACTION);
        long txid = input.getLongByField(CHECKPOINT_FIELD_TXID);
        if (shouldProcessTransaction(action, txid)) {
            LOG.debug("Processing action {}, txid {}", action, txid);
            try {
                if (txid >= lastTxid) {
                    handleCheckpoint(input, action, txid);
                    if (action == ROLLBACK) {
                        lastTxid = txid - 1;
                    } else {
                        lastTxid = txid;
                    }
                } else {
                    LOG.debug("Ignoring old transaction. Action {}, txid {}", action, txid);
                    collector.ack(input);
                }
            } catch (Throwable th) {
                LOG.error("Got error while processing checkpoint tuple", th);
                collector.fail(input);
                collector.reportError(th);
            }
        } else {
            LOG.debug("Waiting for action {}, txid {} from all input tasks. checkPointInputTaskCount {}, " +
                      "transactionRequestCount {}", action, txid, checkPointInputTaskCount, transactionRequestCount);
            collector.ack(input);
        }
    }

    /**
     * Checks if check points have been received from all tasks across all input streams to this component
     */
    private boolean shouldProcessTransaction(CheckPointState.Action action, long txid) {
        TransactionRequest request = new TransactionRequest(action, txid);
        Integer count;
        if ((count = transactionRequestCount.get(request)) == null) {
            transactionRequestCount.put(request, 1);
            count = 1;
        } else {
            transactionRequestCount.put(request, ++count);
        }
        if (count == checkPointInputTaskCount) {
            transactionRequestCount.remove(request);
            return true;
        }
        return false;
    }

    protected void declareCheckpointStream(OutputFieldsDeclarer declarer) {
        declarer.declareStream(CHECKPOINT_STREAM_ID, new Fields(CHECKPOINT_FIELD_TXID, CHECKPOINT_FIELD_ACTION));
    }
  • BaseStatefulBoltExecutor的execute方法首先通过CheckpointSpout.isCheckpoint(input)判断是否是CheckpointSpout发来的tuple,如果是则执行processCheckpoint
  • processCheckpoint首先调用shouldProcessTransaction判断所有输入流的task是否都有给它发送checkpint tuple来决定是否往下处理
  • 如果txid大于lastTxid,则调用handleCheckpoint方法,该方法由子类实现

StatefulBoltExecutor.handleCheckpoint

storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/StatefulBoltExecutor.java

代码语言:javascript
复制
public class StatefulBoltExecutor<T extends State> extends BaseStatefulBoltExecutor {
    //......

    protected void handleCheckpoint(Tuple checkpointTuple, Action action, long txid) {
        LOG.debug("handleCheckPoint with tuple {}, action {}, txid {}", checkpointTuple, action, txid);
        if (action == PREPARE) {
            if (boltInitialized) {
                bolt.prePrepare(txid);
                state.prepareCommit(txid);
                preparedTuples.addAll(collector.ackedTuples());
            } else {
                /*
                 * May be the task restarted in the middle and the state needs be initialized.
                 * Fail fast and trigger recovery.
                 */
                LOG.debug("Failing checkpointTuple, PREPARE received when bolt state is not initialized.");
                collector.fail(checkpointTuple);
                return;
            }
        } else if (action == COMMIT) {
            bolt.preCommit(txid);
            state.commit(txid);
            ack(preparedTuples);
        } else if (action == ROLLBACK) {
            bolt.preRollback();
            state.rollback();
            fail(preparedTuples);
            fail(collector.ackedTuples());
        } else if (action == INITSTATE) {
            if (!boltInitialized) {
                bolt.initState((T) state);
                boltInitialized = true;
                LOG.debug("{} pending tuples to process", pendingTuples.size());
                for (Tuple tuple : pendingTuples) {
                    doExecute(tuple);
                }
                pendingTuples.clear();
            } else {
                /*
                 * If a worker crashes, the states of all workers are rolled back and an initState message is sent across
                 * the topology so that crashed workers can initialize their state.
                 * The bolts that have their state already initialized need not be re-initialized.
                 */
                LOG.debug("Bolt state is already initialized, ignoring tuple {}, action {}, txid {}",
                          checkpointTuple, action, txid);
            }
        }
        collector.emit(CheckpointSpout.CHECKPOINT_STREAM_ID, checkpointTuple, new Values(txid, action));
        collector.delegate.ack(checkpointTuple);
    }

    //......
}
  • StatefulBoltExecutor继承了BaseStatefulBoltExecutor,实现了handleCheckpoint方法
  • 该方法根据不同的action进行相应的处理,PREPARE的话,调用bolt的prePrepare,对state调用prepareCommit;COMMIT的话则调用bolt的preCommit,对state调用commit;ROLLBACK的话,调用bolt的preRollback,对state调用rollback;对于INITSTATE,如果bolt未初始化,则调用bolt的initState
  • 根据action执行完之后,继续流转checkpoint tuple,然后调用collector.delegate.ack(checkpointTuple)进行ack

CheckpointTupleForwarder.handleCheckpoint

storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/CheckpointTupleForwarder.java

代码语言:javascript
复制
/**
 * Wraps {@link IRichBolt} and forwards checkpoint tuples in a stateful topology.
 * <p>
 * When a storm topology contains one or more {@link IStatefulBolt} all non-stateful bolts are wrapped in {@link CheckpointTupleForwarder}
 * so that the checkpoint tuples can flow through the entire topology DAG.
 * </p>
 */
public class CheckpointTupleForwarder extends BaseStatefulBoltExecutor {
    //......
    /**
     * Forwards the checkpoint tuple downstream.
     *
     * @param checkpointTuple the checkpoint tuple
     * @param action          the action (prepare, commit, rollback or initstate)
     * @param txid            the transaction id.
     */
    protected void handleCheckpoint(Tuple checkpointTuple, Action action, long txid) {
        collector.emit(CHECKPOINT_STREAM_ID, checkpointTuple, new Values(txid, action));
        collector.ack(checkpointTuple);
    }

    //......
}
  • CheckpointTupleForwarder用于包装non-stateful bolts,使得checkpoint tuples得以在整个topology DAG中顺利流转

小结

  • 如果topology有IStatefulBolt的话(IStatefulBolt为bolt提供了存取state的抽象,通过checkpiont机制持久化state并利用ack机制提供at-least once语义),TopologyBuilder会自动添加CheckpointSpout,对于bolt不是StatefulBoltExecutor类型,则会使用CheckpointTupleForwarder进行包装,这样使得checkpint tuple贯穿整个topology的DAG
  • CheckpointSpout在nextTuple方法先判断是否需要recover,在判断是否需要进行checkpoint,都不是的话则sleep一段时间,sleepInterval为checkpointInterval/10,而checkpointInterval最小为100,从Config.TOPOLOGY_STATE_CHECKPOINT_INTERVAL配置读取,默认是1000;注意该值并不是意味着每隔checkpointInterval就进行checkpoint检测,也就是说不是fixedRate效果而是fixedDelay的效果,即如果当前checkpoint还没有结束,是不会再重复进行checkpoint检测的
  • recover及checkpoint都会往CHECKPOINT_STREAM_ID发送tuple;BaseStatefulBoltExecutor则在execute方法封装了对checkpoint tuple的处理,非checkpint tuple则通过抽象方法handleTuple由子类去实现;具体的handleCheckpoint方法由子类实现,BaseStatefulBoltExecutor只是对其进行前提判断,要求收到所有输入流的task发来的checkpoint tuple,且txid >= lastTxid才可以执行handleCheckpoint操作
  • StatefulBoltExecutor继承了BaseStatefulBoltExecutor,实现了handleCheckpoint方法,对PREPARE、COMMIT、ROLLBACK、INITSTATE这几个action(类似three phase commit protocol)进行相应处理,然后继续流转checkpoint tuple,并进行ack
  • CheckpointSpout在发送checkpoint tuple的时候,使用txid作为msgId来发送可靠的tuple,在所有checkpoint tuple在整个topology的DAG都被ack之后,会收到ack,然后调用checkpointState.commit提交整个checkpoint;如果是fail的话则重置相关状态;一般情况下Config.TOPOLOGY_STATE_CHECKPOINT_INTERVAL(topology.state.checkpoint.interval.ms,默认1000,即1秒)值小于Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS(topology.message.timeout.secs,默认30秒);如果checkpointInterval设置得太大,中间假设worker crash了恢复后的state就不太实时,这样就失去了checkpoint的意义了。

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • TopologyBuilder
  • CheckpointSpout
    • CheckPointState
    • BaseStatefulBoltExecutor
      • StatefulBoltExecutor.handleCheckpoint
        • CheckpointTupleForwarder.handleCheckpoint
        • 小结
        • doc
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档