前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊storm的maxSpoutPending

聊聊storm的maxSpoutPending

作者头像
code4it
发布2018-12-06 16:29:37
3100
发布2018-12-06 16:29:37
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下storm的maxSpoutPending

TOPOLOGY_MAX_SPOUT_PENDING

storm-2.0.0/storm-client/src/jvm/org/apache/storm/Config.java

    /**
     * The maximum number of tuples that can be pending on a spout task at any given time. This config applies to individual tasks, not to
     * spouts or topologies as a whole.
     *
     * A pending tuple is one that has been emitted from a spout but has not been acked or failed yet. Note that this config parameter has
     * no effect for unreliable spouts that don't tag their tuples with a message id.
     */
    @isInteger
    @isPositiveNumber
    public static final String TOPOLOGY_MAX_SPOUT_PENDING = "topology.max.spout.pending";
  • TOPOLOGY_MAX_SPOUT_PENDING设置的是一个spout task已经emit等待ack的tuple的最大数量,该配置仅仅对于发射可靠tuple(设置msgId)的spout起作用
  • defaults.yaml文件中topology.max.spout.pending的默认配置为null

SpoutExecutor

storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java

    public void init(final ArrayList<Task> idToTask, int idToTaskBase) {
        this.threadId = Thread.currentThread().getId();
        executorTransfer.initLocalRecvQueues();
        while (!stormActive.get()) {
            Utils.sleep(100);
        }

        LOG.info("Opening spout {}:{}", componentId, taskIds);
        this.idToTask = idToTask;
        this.maxSpoutPending = ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING), 0) * idToTask.size();
        //......
    }

    public Callable<Long> call() throws Exception {
        init(idToTask, idToTaskBase);
        return new Callable<Long>() {
            final int recvqCheckSkipCountMax = getSpoutRecvqCheckSkipCount();
            int recvqCheckSkips = 0;
            int swIdleCount = 0; // counter for spout wait strategy
            int bpIdleCount = 0; // counter for back pressure wait strategy
            int rmspCount = 0;

            @Override
            public Long call() throws Exception {
                int receiveCount = 0;
                if (recvqCheckSkips++ == recvqCheckSkipCountMax) {
                    receiveCount = receiveQueue.consume(SpoutExecutor.this);
                    recvqCheckSkips = 0;
                }
                long currCount = emittedCount.get();
                boolean reachedMaxSpoutPending = (maxSpoutPending != 0) && (pending.size() >= maxSpoutPending);
                boolean isActive = stormActive.get();

                if (!isActive) {
                    inactiveExecute();
                    return 0L;
                }

                if (!lastActive.get()) {
                    lastActive.set(true);
                    activateSpouts();
                }
                boolean pendingEmitsIsEmpty = tryFlushPendingEmits();
                boolean noEmits = true;
                long emptyStretch = 0;

                if (!reachedMaxSpoutPending && pendingEmitsIsEmpty) {
                    for (int j = 0; j < spouts.size(); j++) { // in critical path. don't use iterators.
                        spouts.get(j).nextTuple();
                    }
                    noEmits = (currCount == emittedCount.get());
                    if (noEmits) {
                        emptyEmitStreak.increment();
                    } else {
                        emptyStretch = emptyEmitStreak.get();
                        emptyEmitStreak.set(0);
                    }
                }
                if (reachedMaxSpoutPending) {
                    if (rmspCount == 0) {
                        LOG.debug("Reached max spout pending");
                    }
                    rmspCount++;
                } else {
                    if (rmspCount > 0) {
                        LOG.debug("Ended max spout pending stretch of {} iterations", rmspCount);
                    }
                    rmspCount = 0;
                }

                if (receiveCount > 1) {
                    // continue without idling
                    return 0L;
                }
                if (!pendingEmits.isEmpty()) { // then facing backpressure
                    backPressureWaitStrategy();
                    return 0L;
                }
                bpIdleCount = 0;
                if (noEmits) {
                    spoutWaitStrategy(reachedMaxSpoutPending, emptyStretch);
                    return 0L;
                }
                swIdleCount = 0;
                return 0L;
            }

            private void backPressureWaitStrategy() throws InterruptedException {
                long start = Time.currentTimeMillis();
                if (bpIdleCount == 0) { // check avoids multiple log msgs when in a idle loop
                    LOG.debug("Experiencing Back Pressure from downstream components. Entering BackPressure Wait.");
                }
                bpIdleCount = backPressureWaitStrategy.idle(bpIdleCount);
                spoutThrottlingMetrics.skippedBackPressureMs(Time.currentTimeMillis() - start);
            }

            private void spoutWaitStrategy(boolean reachedMaxSpoutPending, long emptyStretch) throws InterruptedException {
                emptyEmitStreak.increment();
                long start = Time.currentTimeMillis();
                swIdleCount = spoutWaitStrategy.idle(swIdleCount);
                if (reachedMaxSpoutPending) {
                    spoutThrottlingMetrics.skippedMaxSpoutMs(Time.currentTimeMillis() - start);
                } else {
                    if (emptyStretch > 0) {
                        LOG.debug("Ending Spout Wait Stretch of {}", emptyStretch);
                    }
                }
            }

            // returns true if pendingEmits is empty
            private boolean tryFlushPendingEmits() {
                for (AddressedTuple t = pendingEmits.peek(); t != null; t = pendingEmits.peek()) {
                    if (executorTransfer.tryTransfer(t, null)) {
                        pendingEmits.poll();
                    } else { // to avoid reordering of emits, stop at first failure
                        return false;
                    }
                }
                return true;
            }
        };
    }
  • 这里从topoConf读取Config.TOPOLOGY_MAX_SPOUT_PENDING,如果读取不到则取0,之后乘以task的数量,即为maxSpoutPending
  • maxSpoutPending在call方法里头控制的是reachedMaxSpoutPending变量,只有!reachedMaxSpoutPending && pendingEmitsIsEmpty才能够执行nextTuple发射数据

MasterBatchCoordinator

storm-2.0.0/storm-client/src/jvm/org/apache/storm/trident/topology/MasterBatchCoordinator.java

    public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
        _throttler = new WindowedTimeThrottler((Number) conf.get(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS), 1);
        for (String spoutId : _managedSpoutIds) {
            _states.add(TransactionalState.newCoordinatorState(conf, spoutId));
        }
        _currTransaction = getStoredCurrTransaction();

        _collector = collector;
        Number active = (Number) conf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING);
        if (active == null) {
            _maxTransactionActive = 1;
        } else {
            _maxTransactionActive = active.intValue();
        }
        _attemptIds = getStoredCurrAttempts(_currTransaction, _maxTransactionActive);

        for (int i = 0; i < _spouts.size(); i++) {
            String txId = _managedSpoutIds.get(i);
            _coordinators.add(_spouts.get(i).getCoordinator(txId, conf, context));
        }
        LOG.debug("Opened {}", this);
    }

    private void sync() {
        // note that sometimes the tuples active may be less than max_spout_pending, e.g.
        // max_spout_pending = 3
        // tx 1, 2, 3 active, tx 2 is acked. there won't be a commit for tx 2 (because tx 1 isn't committed yet),
        // and there won't be a batch for tx 4 because there's max_spout_pending tx active
        TransactionStatus maybeCommit = _activeTx.get(_currTransaction);
        if (maybeCommit != null && maybeCommit.status == AttemptStatus.PROCESSED) {
            maybeCommit.status = AttemptStatus.COMMITTING;
            _collector.emit(COMMIT_STREAM_ID, new Values(maybeCommit.attempt), maybeCommit.attempt);
            LOG.debug("Emitted on [stream = {}], [tx_status = {}], [{}]", COMMIT_STREAM_ID, maybeCommit, this);
        }

        if (_active) {
            if (_activeTx.size() < _maxTransactionActive) {
                Long curr = _currTransaction;
                for (int i = 0; i < _maxTransactionActive; i++) {
                    if (!_activeTx.containsKey(curr) && isReady(curr)) {
                        // by using a monotonically increasing attempt id, downstream tasks
                        // can be memory efficient by clearing out state for old attempts
                        // as soon as they see a higher attempt id for a transaction
                        Integer attemptId = _attemptIds.get(curr);
                        if (attemptId == null) {
                            attemptId = 0;
                        } else {
                            attemptId++;
                        }
                        _attemptIds.put(curr, attemptId);
                        for (TransactionalState state : _states) {
                            state.setData(CURRENT_ATTEMPTS, _attemptIds);
                        }

                        TransactionAttempt attempt = new TransactionAttempt(curr, attemptId);
                        final TransactionStatus newTransactionStatus = new TransactionStatus(attempt);
                        _activeTx.put(curr, newTransactionStatus);
                        _collector.emit(BATCH_STREAM_ID, new Values(attempt), attempt);
                        LOG.debug("Emitted on [stream = {}], [tx_attempt = {}], [tx_status = {}], [{}]", BATCH_STREAM_ID, attempt,
                                  newTransactionStatus, this);
                        _throttler.markEvent();
                    }
                    curr = nextTransactionId(curr);
                }
            }
        }
    }
  • MasterBatchCoordinator的open方法从conf读取Config.TOPOLOGY_MAX_SPOUT_PENDING设置到_maxTransactionActive,如果为null则默认为1
  • 这里只有_activeTx.size() < _maxTransactionActive才会往BATCH_STREAM_ID发射数据

小结

  • Config.TOPOLOGY_MAX_SPOUT_PENDING(topology.max.spout.pending),默认为null,只对于开启可靠(msgId)消息的spout起作用
  • 对于普通的spout,指的是等待ack的数量的最大值,超过这个值,SpoutExecutor不会调用spout的nextTuple发射数据
  • 对于trident的spout来说,指的是同时处理的batches的数量,只有这些batches处理成功或失败之后才能继续下一个batch

doc

  • Trident Spouts
  • 聊聊storm的IWaitStrategy
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2018-10-30,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • TOPOLOGY_MAX_SPOUT_PENDING
  • SpoutExecutor
  • MasterBatchCoordinator
  • 小结
  • doc
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档