聊聊storm的maxSpoutPending

本文主要研究一下storm的maxSpoutPending

TOPOLOGY_MAX_SPOUT_PENDING

storm-2.0.0/storm-client/src/jvm/org/apache/storm/Config.java

    /**
     * The maximum number of tuples that can be pending on a spout task at any given time. This config applies to individual tasks, not to
     * spouts or topologies as a whole.
     *
     * A pending tuple is one that has been emitted from a spout but has not been acked or failed yet. Note that this config parameter has
     * no effect for unreliable spouts that don't tag their tuples with a message id.
     */
    @isInteger
    @isPositiveNumber
    public static final String TOPOLOGY_MAX_SPOUT_PENDING = "topology.max.spout.pending";
  • TOPOLOGY_MAX_SPOUT_PENDING设置的是一个spout task已经emit等待ack的tuple的最大数量,该配置仅仅对于发射可靠tuple(设置msgId)的spout起作用
  • defaults.yaml文件中topology.max.spout.pending的默认配置为null

SpoutExecutor

storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java

    public void init(final ArrayList<Task> idToTask, int idToTaskBase) {
        this.threadId = Thread.currentThread().getId();
        executorTransfer.initLocalRecvQueues();
        while (!stormActive.get()) {
            Utils.sleep(100);
        }

        LOG.info("Opening spout {}:{}", componentId, taskIds);
        this.idToTask = idToTask;
        this.maxSpoutPending = ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING), 0) * idToTask.size();
        //......
    }

    public Callable<Long> call() throws Exception {
        init(idToTask, idToTaskBase);
        return new Callable<Long>() {
            final int recvqCheckSkipCountMax = getSpoutRecvqCheckSkipCount();
            int recvqCheckSkips = 0;
            int swIdleCount = 0; // counter for spout wait strategy
            int bpIdleCount = 0; // counter for back pressure wait strategy
            int rmspCount = 0;

            @Override
            public Long call() throws Exception {
                int receiveCount = 0;
                if (recvqCheckSkips++ == recvqCheckSkipCountMax) {
                    receiveCount = receiveQueue.consume(SpoutExecutor.this);
                    recvqCheckSkips = 0;
                }
                long currCount = emittedCount.get();
                boolean reachedMaxSpoutPending = (maxSpoutPending != 0) && (pending.size() >= maxSpoutPending);
                boolean isActive = stormActive.get();

                if (!isActive) {
                    inactiveExecute();
                    return 0L;
                }

                if (!lastActive.get()) {
                    lastActive.set(true);
                    activateSpouts();
                }
                boolean pendingEmitsIsEmpty = tryFlushPendingEmits();
                boolean noEmits = true;
                long emptyStretch = 0;

                if (!reachedMaxSpoutPending && pendingEmitsIsEmpty) {
                    for (int j = 0; j < spouts.size(); j++) { // in critical path. don't use iterators.
                        spouts.get(j).nextTuple();
                    }
                    noEmits = (currCount == emittedCount.get());
                    if (noEmits) {
                        emptyEmitStreak.increment();
                    } else {
                        emptyStretch = emptyEmitStreak.get();
                        emptyEmitStreak.set(0);
                    }
                }
                if (reachedMaxSpoutPending) {
                    if (rmspCount == 0) {
                        LOG.debug("Reached max spout pending");
                    }
                    rmspCount++;
                } else {
                    if (rmspCount > 0) {
                        LOG.debug("Ended max spout pending stretch of {} iterations", rmspCount);
                    }
                    rmspCount = 0;
                }

                if (receiveCount > 1) {
                    // continue without idling
                    return 0L;
                }
                if (!pendingEmits.isEmpty()) { // then facing backpressure
                    backPressureWaitStrategy();
                    return 0L;
                }
                bpIdleCount = 0;
                if (noEmits) {
                    spoutWaitStrategy(reachedMaxSpoutPending, emptyStretch);
                    return 0L;
                }
                swIdleCount = 0;
                return 0L;
            }

            private void backPressureWaitStrategy() throws InterruptedException {
                long start = Time.currentTimeMillis();
                if (bpIdleCount == 0) { // check avoids multiple log msgs when in a idle loop
                    LOG.debug("Experiencing Back Pressure from downstream components. Entering BackPressure Wait.");
                }
                bpIdleCount = backPressureWaitStrategy.idle(bpIdleCount);
                spoutThrottlingMetrics.skippedBackPressureMs(Time.currentTimeMillis() - start);
            }

            private void spoutWaitStrategy(boolean reachedMaxSpoutPending, long emptyStretch) throws InterruptedException {
                emptyEmitStreak.increment();
                long start = Time.currentTimeMillis();
                swIdleCount = spoutWaitStrategy.idle(swIdleCount);
                if (reachedMaxSpoutPending) {
                    spoutThrottlingMetrics.skippedMaxSpoutMs(Time.currentTimeMillis() - start);
                } else {
                    if (emptyStretch > 0) {
                        LOG.debug("Ending Spout Wait Stretch of {}", emptyStretch);
                    }
                }
            }

            // returns true if pendingEmits is empty
            private boolean tryFlushPendingEmits() {
                for (AddressedTuple t = pendingEmits.peek(); t != null; t = pendingEmits.peek()) {
                    if (executorTransfer.tryTransfer(t, null)) {
                        pendingEmits.poll();
                    } else { // to avoid reordering of emits, stop at first failure
                        return false;
                    }
                }
                return true;
            }
        };
    }
  • 这里从topoConf读取Config.TOPOLOGY_MAX_SPOUT_PENDING,如果读取不到则取0,之后乘以task的数量,即为maxSpoutPending
  • maxSpoutPending在call方法里头控制的是reachedMaxSpoutPending变量,只有!reachedMaxSpoutPending && pendingEmitsIsEmpty才能够执行nextTuple发射数据

MasterBatchCoordinator

storm-2.0.0/storm-client/src/jvm/org/apache/storm/trident/topology/MasterBatchCoordinator.java

    public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
        _throttler = new WindowedTimeThrottler((Number) conf.get(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS), 1);
        for (String spoutId : _managedSpoutIds) {
            _states.add(TransactionalState.newCoordinatorState(conf, spoutId));
        }
        _currTransaction = getStoredCurrTransaction();

        _collector = collector;
        Number active = (Number) conf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING);
        if (active == null) {
            _maxTransactionActive = 1;
        } else {
            _maxTransactionActive = active.intValue();
        }
        _attemptIds = getStoredCurrAttempts(_currTransaction, _maxTransactionActive);

        for (int i = 0; i < _spouts.size(); i++) {
            String txId = _managedSpoutIds.get(i);
            _coordinators.add(_spouts.get(i).getCoordinator(txId, conf, context));
        }
        LOG.debug("Opened {}", this);
    }

    private void sync() {
        // note that sometimes the tuples active may be less than max_spout_pending, e.g.
        // max_spout_pending = 3
        // tx 1, 2, 3 active, tx 2 is acked. there won't be a commit for tx 2 (because tx 1 isn't committed yet),
        // and there won't be a batch for tx 4 because there's max_spout_pending tx active
        TransactionStatus maybeCommit = _activeTx.get(_currTransaction);
        if (maybeCommit != null && maybeCommit.status == AttemptStatus.PROCESSED) {
            maybeCommit.status = AttemptStatus.COMMITTING;
            _collector.emit(COMMIT_STREAM_ID, new Values(maybeCommit.attempt), maybeCommit.attempt);
            LOG.debug("Emitted on [stream = {}], [tx_status = {}], [{}]", COMMIT_STREAM_ID, maybeCommit, this);
        }

        if (_active) {
            if (_activeTx.size() < _maxTransactionActive) {
                Long curr = _currTransaction;
                for (int i = 0; i < _maxTransactionActive; i++) {
                    if (!_activeTx.containsKey(curr) && isReady(curr)) {
                        // by using a monotonically increasing attempt id, downstream tasks
                        // can be memory efficient by clearing out state for old attempts
                        // as soon as they see a higher attempt id for a transaction
                        Integer attemptId = _attemptIds.get(curr);
                        if (attemptId == null) {
                            attemptId = 0;
                        } else {
                            attemptId++;
                        }
                        _attemptIds.put(curr, attemptId);
                        for (TransactionalState state : _states) {
                            state.setData(CURRENT_ATTEMPTS, _attemptIds);
                        }

                        TransactionAttempt attempt = new TransactionAttempt(curr, attemptId);
                        final TransactionStatus newTransactionStatus = new TransactionStatus(attempt);
                        _activeTx.put(curr, newTransactionStatus);
                        _collector.emit(BATCH_STREAM_ID, new Values(attempt), attempt);
                        LOG.debug("Emitted on [stream = {}], [tx_attempt = {}], [tx_status = {}], [{}]", BATCH_STREAM_ID, attempt,
                                  newTransactionStatus, this);
                        _throttler.markEvent();
                    }
                    curr = nextTransactionId(curr);
                }
            }
        }
    }
  • MasterBatchCoordinator的open方法从conf读取Config.TOPOLOGY_MAX_SPOUT_PENDING设置到_maxTransactionActive,如果为null则默认为1
  • 这里只有_activeTx.size() < _maxTransactionActive才会往BATCH_STREAM_ID发射数据

小结

  • Config.TOPOLOGY_MAX_SPOUT_PENDING(topology.max.spout.pending),默认为null,只对于开启可靠(msgId)消息的spout起作用
  • 对于普通的spout,指的是等待ack的数量的最大值,超过这个值,SpoutExecutor不会调用spout的nextTuple发射数据
  • 对于trident的spout来说,指的是同时处理的batches的数量,只有这些batches处理成功或失败之后才能继续下一个batch

doc

  • Trident Spouts
  • 聊聊storm的IWaitStrategy

原文发布于微信公众号 - 码匠的流水账(geek_luandun)

原文发表时间:2018-10-30

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏菩提树下的杨过

温故而知新:设计模式之原型模式(Prototype)

原型模式个人以为最适合的场景:参照现有的某一个对象实例,快速得到多个完整的实例副本。(通常是深拷贝的副本) 深拷贝在c#中实现的最简单方式莫过于通过反序列化得到...

2415
来自专栏码匠的流水账

聊聊storm的LoggingMetricsConsumer

storm-2.0.0/storm-client/src/jvm/org/apache/storm/metric/LoggingMetricsConsumer....

1433
来自专栏跟着阿笨一起玩NET

[C#]工具类—FTP上传下载

  不错的文章:http://www.cnblogs.com/greatverve/archive/2012/03/03/csharp-ftp.html

3491
来自专栏计算机视觉与深度学习基础

Leetcode 213 House Robber II

Note: This is an extension of House Robber. After robbing those houses on that...

2098
来自专栏wOw的Android小站

[设计模式]之十四:备忘录模式

在不破坏封装性的前提下,捕获一个对象的内部状态,并在该状态之外保存这个状态。这样以后就可将该对象恢复到原先保存的状态。

731
来自专栏菩提树下的杨过

[c#]Webservice中如何实现方法重载(overload)以及如何传送不能序列化的对象作参数

1。Webservice中的方法重载问题 (1)在要重载的WebMethod上打个MessageName标签 比如: [WebMethod(Message...

19710
来自专栏码匠的流水账

聊聊storm的AssignmentDistributionService

本文主要研究一下storm的AssignmentDistributionService

1521
来自专栏程序员的SOD蜜

使用泛型委托,构筑最快的通用属性访问器

最近做一个父类的属性向子类的属性赋值的小程序,用了下AutoMapper组件,感觉不错,想探究下它的原理,自己动手做一个例子试试看。实现这个功能,第一反应使用反...

2529
来自专栏码匠的流水账

聊聊flink的CheckpointScheduler

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/checkpoint/Checkp...

1533
来自专栏码匠的流水账

Deque的使用实例

双向队列(Deque),是Queue的一个子接口,双向队列是指该队列两端的元素既能入队(offer)也能出队(poll)。使用场景比如工作窃取,比如限流。

1041

扫码关注云+社区

领取腾讯云代金券