聊聊storm的OpaquePartitionedTridentSpoutExecutor

本文主要研究一下storm的OpaquePartitionedTridentSpoutExecutor

TridentTopology.newStream

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/TridentTopology.java

    public Stream newStream(String txId, IOpaquePartitionedTridentSpout spout) {
        return newStream(txId, new OpaquePartitionedTridentSpoutExecutor(spout));
    }
  • TridentTopology.newStream方法,对于IOpaquePartitionedTridentSpout类型的spout会使用OpaquePartitionedTridentSpoutExecutor来包装;而KafkaTridentSpoutOpaque则实现了IOpaquePartitionedTridentSpout接口

TridentTopologyBuilder.buildTopology

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/topology/TridentTopologyBuilder.java

    public StormTopology buildTopology(Map<String, Number> masterCoordResources) {
        TopologyBuilder builder = new TopologyBuilder();
        Map<GlobalStreamId, String> batchIdsForSpouts = fleshOutStreamBatchIds(false);
        Map<GlobalStreamId, String> batchIdsForBolts = fleshOutStreamBatchIds(true);
​
        Map<String, List<String>> batchesToCommitIds = new HashMap<>();
        Map<String, List<ITridentSpout>> batchesToSpouts = new HashMap<>();
        
        for(String id: _spouts.keySet()) {
            TransactionalSpoutComponent c = _spouts.get(id);
            if(c.spout instanceof IRichSpout) {
                
                //TODO: wrap this to set the stream name
                builder.setSpout(id, (IRichSpout) c.spout, c.parallelism);
            } else {
                String batchGroup = c.batchGroupId;
                if(!batchesToCommitIds.containsKey(batchGroup)) {
                    batchesToCommitIds.put(batchGroup, new ArrayList<String>());
                }
                batchesToCommitIds.get(batchGroup).add(c.commitStateId);
​
                if(!batchesToSpouts.containsKey(batchGroup)) {
                    batchesToSpouts.put(batchGroup, new ArrayList<ITridentSpout>());
                }
                batchesToSpouts.get(batchGroup).add((ITridentSpout) c.spout);
                
                
                BoltDeclarer scd =
                      builder.setBolt(spoutCoordinator(id), new TridentSpoutCoordinator(c.commitStateId, (ITridentSpout) c.spout))
                        .globalGrouping(masterCoordinator(c.batchGroupId), MasterBatchCoordinator.BATCH_STREAM_ID)
                        .globalGrouping(masterCoordinator(c.batchGroupId), MasterBatchCoordinator.SUCCESS_STREAM_ID);
                
                for(Map<String, Object> m: c.componentConfs) {
                    scd.addConfigurations(m);
                }
                
                Map<String, TridentBoltExecutor.CoordSpec> specs = new HashMap();
                specs.put(c.batchGroupId, new CoordSpec());
                BoltDeclarer bd = builder.setBolt(id,
                        new TridentBoltExecutor(
                          new TridentSpoutExecutor(
                            c.commitStateId,
                            c.streamName,
                            ((ITridentSpout) c.spout)),
                            batchIdsForSpouts,
                            specs),
                        c.parallelism);
                bd.allGrouping(spoutCoordinator(id), MasterBatchCoordinator.BATCH_STREAM_ID);
                bd.allGrouping(masterCoordinator(batchGroup), MasterBatchCoordinator.SUCCESS_STREAM_ID);
                if(c.spout instanceof ICommitterTridentSpout) {
                    bd.allGrouping(masterCoordinator(batchGroup), MasterBatchCoordinator.COMMIT_STREAM_ID);
                }
                for(Map<String, Object> m: c.componentConfs) {
                    bd.addConfigurations(m);
                }
            }
        }
​
        //......
​
        return builder.createTopology();
    }
  • TridentTopologyBuilder.buildTopology会将IOpaquePartitionedTridentSpout(OpaquePartitionedTridentSpoutExecutor)使用TridentSpoutExecutor包装,然后再使用TridentBoltExecutor包装为bolt

OpaquePartitionedTridentSpoutExecutor

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java

public class OpaquePartitionedTridentSpoutExecutor implements ICommitterTridentSpout<Object> {
    protected final Logger LOG = LoggerFactory.getLogger(OpaquePartitionedTridentSpoutExecutor.class);
​
    IOpaquePartitionedTridentSpout<Object, ISpoutPartition, Object> _spout;
    
    //......
​
    public OpaquePartitionedTridentSpoutExecutor(IOpaquePartitionedTridentSpout<Object, ISpoutPartition, Object> spout) {
        _spout = spout;
    }
    
    @Override
    public ITridentSpout.BatchCoordinator<Object> getCoordinator(String txStateId, Map conf, TopologyContext context) {
        return new Coordinator(conf, context);
    }
​
    @Override
    public ICommitterTridentSpout.Emitter getEmitter(String txStateId, Map conf, TopologyContext context) {
        return new Emitter(txStateId, conf, context);
    }
​
    @Override
    public Fields getOutputFields() {
        return _spout.getOutputFields();
    }
​
    @Override
    public Map<String, Object> getComponentConfiguration() {
        return _spout.getComponentConfiguration();
    }
    
}
  • OpaquePartitionedTridentSpoutExecutor实现了ICommitterTridentSpout,这里getCoordinator返回的是ITridentSpout.BatchCoordinator,getEmitter返回的是ICommitterTridentSpout.Emitter

ITridentSpout.BatchCoordinator

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java

    public class Coordinator implements ITridentSpout.BatchCoordinator<Object> {
        IOpaquePartitionedTridentSpout.Coordinator _coordinator;
​
        public Coordinator(Map conf, TopologyContext context) {
            _coordinator = _spout.getCoordinator(conf, context);
        }
        
        @Override
        public Object initializeTransaction(long txid, Object prevMetadata, Object currMetadata) {
            LOG.debug("Initialize Transaction. [txid = {}], [prevMetadata = {}], [currMetadata = {}]", txid, prevMetadata, currMetadata);
            return _coordinator.getPartitionsForBatch();
        }
​
​
        @Override
        public void close() {
            LOG.debug("Closing");
            _coordinator.close();
            LOG.debug("Closed");
        }
​
        @Override
        public void success(long txid) {
            LOG.debug("Success [txid = {}]", txid);
        }
​
        @Override
        public boolean isReady(long txid) {
            boolean ready = _coordinator.isReady(txid);
            LOG.debug("[isReady = {}], [txid = {}]", ready, txid);
            return ready;
        }
    }
  • 包装了spout的_coordinator,它的类型IOpaquePartitionedTridentSpout.Coordinator,这里仅仅是多了debug日志

ICommitterTridentSpout.Emitter

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java

   public class Emitter implements ICommitterTridentSpout.Emitter {        
        IOpaquePartitionedTridentSpout.Emitter<Object, ISpoutPartition, Object> _emitter;
        TransactionalState _state;
        TreeMap<Long, Map<String, Object>> _cachedMetas = new TreeMap<>();
        Map<String, EmitterPartitionState> _partitionStates = new HashMap<>();
        int _index;
        int _numTasks;
​
        public Emitter(String txStateId, Map conf, TopologyContext context) {
            _emitter = _spout.getEmitter(conf, context);
            _index = context.getThisTaskIndex();
            _numTasks = context.getComponentTasks(context.getThisComponentId()).size();
            _state = TransactionalState.newUserState(conf, txStateId);
            LOG.debug("Created {}", this);
        }
​
        Object _savedCoordinatorMeta = null;
        boolean _changedMeta = false;
​
        @Override
        public void emitBatch(TransactionAttempt tx, Object coordinatorMeta, TridentCollector collector) {
            LOG.debug("Emitting Batch. [transaction = {}], [coordinatorMeta = {}], [collector = {}], [{}]",
                    tx, coordinatorMeta, collector, this);
​
            if(_savedCoordinatorMeta==null || !_savedCoordinatorMeta.equals(coordinatorMeta)) {
                _partitionStates.clear();
                final List<ISpoutPartition> taskPartitions = _emitter.getPartitionsForTask(_index, _numTasks, coordinatorMeta);
                for (ISpoutPartition partition : taskPartitions) {
                    _partitionStates.put(partition.getId(), new EmitterPartitionState(new RotatingTransactionalState(_state, partition.getId()), partition));
                }
​
                // refresh all partitions for backwards compatibility with old spout
                _emitter.refreshPartitions(_emitter.getOrderedPartitions(coordinatorMeta));
                _savedCoordinatorMeta = coordinatorMeta;
                _changedMeta = true;
            }
            Map<String, Object> metas = new HashMap<>();
            _cachedMetas.put(tx.getTransactionId(), metas);
​
            Entry<Long, Map<String, Object>> entry = _cachedMetas.lowerEntry(tx.getTransactionId());
            Map<String, Object> prevCached;
            if(entry!=null) {
                prevCached = entry.getValue();
            } else {
                prevCached = new HashMap<>();
            }
            
            for(Entry<String, EmitterPartitionState> e: _partitionStates.entrySet()) {
                String id = e.getKey();
                EmitterPartitionState s = e.getValue();
                s.rotatingState.removeState(tx.getTransactionId());
                Object lastMeta = prevCached.get(id);
                if(lastMeta==null) lastMeta = s.rotatingState.getLastState();
                Object meta = _emitter.emitPartitionBatch(tx, collector, s.partition, lastMeta);
                metas.put(id, meta);
            }
            LOG.debug("Emitted Batch. [transaction = {}], [coordinatorMeta = {}], [collector = {}], [{}]",
                    tx, coordinatorMeta, collector, this);
        }
​
        @Override
        public void success(TransactionAttempt tx) {
            for(EmitterPartitionState state: _partitionStates.values()) {
                state.rotatingState.cleanupBefore(tx.getTransactionId());
            }
            LOG.debug("Success transaction {}. [{}]", tx, this);
        }
​
        @Override
        public void commit(TransactionAttempt attempt) {
            LOG.debug("Committing transaction {}. [{}]", attempt, this);
            // this code here handles a case where a previous commit failed, and the partitions
            // changed since the last commit. This clears out any state for the removed partitions
            // for this txid.
            // we make sure only a single task ever does this. we're also guaranteed that
            // it's impossible for there to be another writer to the directory for that partition
            // because only a single commit can be happening at once. this is because in order for 
            // another attempt of the batch to commit, the batch phase must have succeeded in between.
            // hence, all tasks for the prior commit must have finished committing (whether successfully or not)
            if(_changedMeta && _index==0) {
                Set<String> validIds = new HashSet<>();
                for(ISpoutPartition p: _emitter.getOrderedPartitions(_savedCoordinatorMeta)) {
                    validIds.add(p.getId());
                }
                for(String existingPartition: _state.list("")) {
                    if(!validIds.contains(existingPartition)) {
                        RotatingTransactionalState s = new RotatingTransactionalState(_state, existingPartition);
                        s.removeState(attempt.getTransactionId());
                    }
                }
                _changedMeta = false;
            }
            
            Long txid = attempt.getTransactionId();
            Map<String, Object> metas = _cachedMetas.remove(txid);
            for(Entry<String, Object> entry: metas.entrySet()) {
                _partitionStates.get(entry.getKey()).rotatingState.overrideState(txid, entry.getValue());
            }
            LOG.debug("Exiting commit method for transaction {}. [{}]", attempt, this);
        }
​
        @Override
        public void close() {
            LOG.debug("Closing");
            _emitter.close();
            LOG.debug("Closed");
        }
​
        @Override
        public String toString() {
            return "Emitter{" +
                    ", _state=" + _state +
                    ", _cachedMetas=" + _cachedMetas +
                    ", _partitionStates=" + _partitionStates +
                    ", _index=" + _index +
                    ", _numTasks=" + _numTasks +
                    ", _savedCoordinatorMeta=" + _savedCoordinatorMeta +
                    ", _changedMeta=" + _changedMeta +
                    '}';
        }
    }
​
    static class EmitterPartitionState {
        public RotatingTransactionalState rotatingState;
        public ISpoutPartition partition;
        
        public EmitterPartitionState(RotatingTransactionalState s, ISpoutPartition p) {
            rotatingState = s;
            partition = p;
        }
    }
  • 这里对spout的IOpaquePartitionedTridentSpout.Emitter进行了封装,_partitionStates使用了EmitterPartitionState
  • emitBatch方法首先计算partitionStates,然后计算prevCached,最后调用emitter.emitPartitionBatch(tx, collector, s.partition, lastMeta)
  • success方法调用state.rotatingState.cleanupBefore(tx.getTransactionId()),清空该txid之前的状态信息;commit方法主要是更新_partitionStates

KafkaTridentSpoutOpaque

storm-kafka-client-1.2.2-sources.jar!/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java

public class KafkaTridentSpoutOpaque<K,V> implements IOpaquePartitionedTridentSpout<List<Map<String, Object>>,
        KafkaTridentSpoutTopicPartition, Map<String, Object>> {
    private static final long serialVersionUID = -8003272486566259640L;
​
    private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutOpaque.class);
​
    private final KafkaTridentSpoutManager<K, V> kafkaManager;
​
    public KafkaTridentSpoutOpaque(KafkaSpoutConfig<K, V> conf) {
        this(new KafkaTridentSpoutManager<>(conf));
    }
    
    public KafkaTridentSpoutOpaque(KafkaTridentSpoutManager<K, V> kafkaManager) {
        this.kafkaManager = kafkaManager;
        LOG.debug("Created {}", this.toString());
    }
​
    @Override
    public Emitter<List<Map<String, Object>>, KafkaTridentSpoutTopicPartition, Map<String, Object>> getEmitter(
            Map conf, TopologyContext context) {
        return new KafkaTridentSpoutEmitter<>(kafkaManager, context);
    }
​
    @Override
    public Coordinator<List<Map<String, Object>>> getCoordinator(Map conf, TopologyContext context) {
        return new KafkaTridentSpoutOpaqueCoordinator<>(kafkaManager);
    }
​
    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
​
    @Override
    public Fields getOutputFields() {
        final Fields outputFields = kafkaManager.getFields();
        LOG.debug("OutputFields = {}", outputFields);
        return outputFields;
    }
​
    @Override
    public final String toString() {
        return super.toString() +
                "{kafkaManager=" + kafkaManager + '}';
    }
}
  • KafkaTridentSpoutOpaque的getCoordinator返回的是KafkaTridentSpoutOpaqueCoordinator;getEmitter返回的是KafkaTridentSpoutEmitter

KafkaTridentSpoutOpaqueCoordinator

storm-kafka-client-1.2.2-sources.jar!/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaqueCoordinator.java

public class KafkaTridentSpoutOpaqueCoordinator<K,V> implements IOpaquePartitionedTridentSpout.Coordinator<List<Map<String, Object>>>,
        Serializable {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutOpaqueCoordinator.class);
​
    private final TopicPartitionSerializer tpSerializer = new TopicPartitionSerializer();
    private final KafkaTridentSpoutManager<K,V> kafkaManager;
​
    public KafkaTridentSpoutOpaqueCoordinator(KafkaTridentSpoutManager<K, V> kafkaManager) {
        this.kafkaManager = kafkaManager;
        LOG.debug("Created {}", this.toString());
    }
​
    @Override
    public boolean isReady(long txid) {
        LOG.debug("isReady = true");
        return true;    // the "old" trident kafka spout always returns true, like this
    }
​
    @Override
    public List<Map<String, Object>> getPartitionsForBatch() {
        final ArrayList<TopicPartition> topicPartitions = new ArrayList<>(kafkaManager.getTopicPartitions());
        LOG.debug("TopicPartitions for batch {}", topicPartitions);
        List<Map<String, Object>> tps = new ArrayList<>();
        for(TopicPartition tp : topicPartitions) {
            tps.add(tpSerializer.toMap(tp));
        }
        return tps;
    }
​
    @Override
    public void close() {
        LOG.debug("Closed"); // the "old" trident kafka spout is no op like this
    }
​
    @Override
    public final String toString() {
        return super.toString() +
                "{kafkaManager=" + kafkaManager +
                '}';
    }
}
  • 这里的isReady始终返回true,getPartitionsForBatch方法主要是将kafkaManager.getTopicPartitions()信息转换为map结构

KafkaTridentSpoutEmitter

storm-kafka-client-1.2.2-sources.jar!/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java

public class KafkaTridentSpoutEmitter<K, V> implements IOpaquePartitionedTridentSpout.Emitter<
        List<Map<String, Object>>,
        KafkaTridentSpoutTopicPartition,
        Map<String, Object>>,
        Serializable {
​
    private static final long serialVersionUID = -7343927794834130435L;
    private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutEmitter.class);
​
    // Kafka
    private final KafkaConsumer<K, V> kafkaConsumer;
​
    // Bookkeeping
    private final KafkaTridentSpoutManager<K, V> kafkaManager;
    // set of topic-partitions for which first poll has already occurred, and the first polled txid
    private final Map<TopicPartition, Long> firstPollTransaction = new HashMap<>(); 
​
    // Declare some KafkaTridentSpoutManager references for convenience
    private final long pollTimeoutMs;
    private final KafkaSpoutConfig.FirstPollOffsetStrategy firstPollOffsetStrategy;
    private final RecordTranslator<K, V> translator;
    private final Timer refreshSubscriptionTimer;
    private final TopicPartitionSerializer tpSerializer = new TopicPartitionSerializer();
​
    private TopologyContext topologyContext;
​
    /**
     * Create a new Kafka spout emitter.
     * @param kafkaManager The Kafka consumer manager to use
     * @param topologyContext The topology context
     * @param refreshSubscriptionTimer The timer for deciding when to recheck the subscription
     */
    public KafkaTridentSpoutEmitter(KafkaTridentSpoutManager<K, V> kafkaManager, TopologyContext topologyContext, Timer refreshSubscriptionTimer) {
        this.kafkaConsumer = kafkaManager.createAndSubscribeKafkaConsumer(topologyContext);
        this.kafkaManager = kafkaManager;
        this.topologyContext = topologyContext;
        this.refreshSubscriptionTimer = refreshSubscriptionTimer;
        this.translator = kafkaManager.getKafkaSpoutConfig().getTranslator();
​
        final KafkaSpoutConfig<K, V> kafkaSpoutConfig = kafkaManager.getKafkaSpoutConfig();
        this.pollTimeoutMs = kafkaSpoutConfig.getPollTimeoutMs();
        this.firstPollOffsetStrategy = kafkaSpoutConfig.getFirstPollOffsetStrategy();
        LOG.debug("Created {}", this.toString());
    }
​
    /**
     * Creates instance of this class with default 500 millisecond refresh subscription timer
     */
    public KafkaTridentSpoutEmitter(KafkaTridentSpoutManager<K, V> kafkaManager, TopologyContext topologyContext) {
        this(kafkaManager, topologyContext, new Timer(500,
                kafkaManager.getKafkaSpoutConfig().getPartitionRefreshPeriodMs(), TimeUnit.MILLISECONDS));
    }
​
    //......
​
    @Override
    public Map<String, Object> emitPartitionBatch(TransactionAttempt tx, TridentCollector collector,
            KafkaTridentSpoutTopicPartition currBatchPartition, Map<String, Object> lastBatch) {
​
        LOG.debug("Processing batch: [transaction = {}], [currBatchPartition = {}], [lastBatchMetadata = {}], [collector = {}]",
                tx, currBatchPartition, lastBatch, collector);
​
        final TopicPartition currBatchTp = currBatchPartition.getTopicPartition();
        final Set<TopicPartition> assignments = kafkaConsumer.assignment();
        KafkaTridentSpoutBatchMetadata lastBatchMeta = lastBatch == null ? null : KafkaTridentSpoutBatchMetadata.fromMap(lastBatch);
        KafkaTridentSpoutBatchMetadata currentBatch = lastBatchMeta;
        Collection<TopicPartition> pausedTopicPartitions = Collections.emptySet();
​
        if (assignments == null || !assignments.contains(currBatchPartition.getTopicPartition())) {
            LOG.warn("SKIPPING processing batch [transaction = {}], [currBatchPartition = {}], [lastBatchMetadata = {}], " +
                            "[collector = {}] because it is not part of the assignments {} of consumer instance [{}] " +
                            "of consumer group [{}]", tx, currBatchPartition, lastBatch, collector, assignments,
                    kafkaConsumer, kafkaManager.getKafkaSpoutConfig().getConsumerGroupId());
        } else {
            try {
                // pause other topic-partitions to only poll from current topic-partition
                pausedTopicPartitions = pauseTopicPartitions(currBatchTp);
​
                seek(currBatchTp, lastBatchMeta, tx.getTransactionId());
​
                // poll
                if (refreshSubscriptionTimer.isExpiredResetOnTrue()) {
                    kafkaManager.getKafkaSpoutConfig().getSubscription().refreshAssignment();
                }
​
                final ConsumerRecords<K, V> records = kafkaConsumer.poll(pollTimeoutMs);
                LOG.debug("Polled [{}] records from Kafka.", records.count());
​
                if (!records.isEmpty()) {
                    emitTuples(collector, records);
                    // build new metadata
                    currentBatch = new KafkaTridentSpoutBatchMetadata(currBatchTp, records);
                }
            } finally {
                kafkaConsumer.resume(pausedTopicPartitions);
                LOG.trace("Resumed topic-partitions {}", pausedTopicPartitions);
            }
            LOG.debug("Emitted batch: [transaction = {}], [currBatchPartition = {}], [lastBatchMetadata = {}], " +
                    "[currBatchMetadata = {}], [collector = {}]", tx, currBatchPartition, lastBatch, currentBatch, collector);
        }
​
        return currentBatch == null ? null : currentBatch.toMap();
    }
​
    private void emitTuples(TridentCollector collector, ConsumerRecords<K, V> records) {
        for (ConsumerRecord<K, V> record : records) {
            final List<Object> tuple = translator.apply(record);
            collector.emit(tuple);
            LOG.debug("Emitted tuple {} for record [{}]", tuple, record);
        }
    }
​
    @Override
    public void refreshPartitions(List<KafkaTridentSpoutTopicPartition> partitionResponsibilities) {
        LOG.trace("Refreshing of topic-partitions handled by Kafka. " +
                "No action taken by this method for topic partitions {}", partitionResponsibilities);
    }
​
    /**
     * Computes ordered list of topic-partitions for this task taking into consideration that topic-partitions
     * for this task must be assigned to the Kafka consumer running on this task.
     *
     * @param allPartitionInfo list of all partitions as returned by {@link KafkaTridentSpoutOpaqueCoordinator}
     * @return ordered list of topic partitions for this task
     */
    @Override
    public List<KafkaTridentSpoutTopicPartition> getOrderedPartitions(final List<Map<String, Object>> allPartitionInfo) {
        List<TopicPartition> allTopicPartitions = new ArrayList<>();
        for(Map<String, Object> map : allPartitionInfo) {
            allTopicPartitions.add(tpSerializer.fromMap(map));
        }
        final List<KafkaTridentSpoutTopicPartition> allPartitions = newKafkaTridentSpoutTopicPartitions(allTopicPartitions);
        LOG.debug("Returning all topic-partitions {} across all tasks. Current task index [{}]. Total tasks [{}] ",
                allPartitions, topologyContext.getThisTaskIndex(), getNumTasks());
        return allPartitions;
    }
​
    @Override
    public List<KafkaTridentSpoutTopicPartition> getPartitionsForTask(int taskId, int numTasks,
        List<Map<String, Object>> allPartitionInfo) {
        final Set<TopicPartition> assignedTps = kafkaConsumer.assignment();
        LOG.debug("Consumer [{}], running on task with index [{}], has assigned topic-partitions {}", kafkaConsumer, taskId, assignedTps);
        final List<KafkaTridentSpoutTopicPartition> taskTps = newKafkaTridentSpoutTopicPartitions(assignedTps);
        LOG.debug("Returning topic-partitions {} for task with index [{}]", taskTps, taskId);
        return taskTps;
    }
​
    @Override
    public void close() {
        kafkaConsumer.close();
        LOG.debug("Closed");
    }
​
    @Override
    public final String toString() {
        return super.toString() +
                "{kafkaManager=" + kafkaManager +
                '}';
    }
}
  • 这里的refreshSubscriptionTimer的interval取的是kafkaManager.getKafkaSpoutConfig().getPartitionRefreshPeriodMs(),默认是2000
  • emitPartitionBatch方法没调用一次都会判断refreshSubscriptionTimer.isExpiredResetOnTrue(),如果时间到了,就会调用kafkaManager.getKafkaSpoutConfig().getSubscription().refreshAssignment()刷新assignment
  • emitPartitionBatch方法主要是找到与该batch关联的partition,停止从其他parition拉取消息,然后根据firstPollOffsetStrategy以及lastBatchMeta信息,调用kafkaConsumer的seek相关方法seek到指定位置
  • 之后就是用kafkaConsumer.poll(pollTimeoutMs)拉取数据,然后emitTuples;emitTuples方法会是用translator转换数据,然后调用collector.emit发射出去
  • refreshPartitions方法目前仅仅是trace下日志;getOrderedPartitions方法先将allPartitionInfo的数据从map结构反序列化回来,然后转换为KafkaTridentSpoutTopicPartition返回;getPartitionsForTask方法主要是通过kafkaConsumer.assignment()的信息转换为KafkaTridentSpoutTopicPartition返回

小结

  • storm-kafka-client提供了KafkaTridentSpoutOpaque这个spout作为trident的kafka spout(旧版的为OpaqueTridentKafkaSpout,在storm-kafka类库中),它实现了IOpaquePartitionedTridentSpout接口
  • TridentTopology.newStream方法,对于IOpaquePartitionedTridentSpout类型的spout会使用OpaquePartitionedTridentSpoutExecutor来包装;TridentTopologyBuilder.buildTopology会将IOpaquePartitionedTridentSpout(OpaquePartitionedTridentSpoutExecutor)先使用TridentSpoutExecutor包装,然后再使用TridentBoltExecutor包装为bolt
  • OpaquePartitionedTridentSpoutExecutor的getCoordinator返回的是ITridentSpout.BatchCoordinator,getEmitter返回的是ICommitterTridentSpout.Emitter;他们分别对KafkaTridentSpoutOpaque这个原始spout返回的KafkaTridentSpoutOpaqueCoordinator以及KafkaTridentSpoutEmitter进行包装再处理;其中对coordinator加了debug日志,对emitter则主要多了对EmitterPartitionState的存取

doc

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

编辑于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏码匠的流水账

聊聊flink LocalEnvironment的execute方法

flink-java-1.6.2-sources.jar!/org/apache/flink/api/java/DataSet.java

14220
来自专栏高性能服务器开发

(三)dict哈希结构1

昨天分析完adlist的Redis代码,今天马上马不停蹄的继续学习Redis代码中的哈希部分的结构学习,不过在这里他不叫什么hashMap,而是叫dict,而且...

30350
来自专栏开发与安全

90% of python in 90 minutes

注:本文整理自 http://www.slideshare.net/MattHarrison4/learn-90 -----------------------...

23100
来自专栏码匠的流水账

聊聊springboot1.x及2.x的JvmGcMetrics的区别

本文主要研究一下springboot1.x及2.x的JvmGcMetrics的区别

35810
来自专栏JMCui

项目工具类

一、前言     在工作中,难免遇到各种各样的问题,每个人似乎都有一套自己的解决方案。而我,又不想每次解决完问题就把东西扔了,捡了芝麻,丢了西瓜,什么时候才能进...

47660
来自专栏机器学习与自然语言处理

06-图2 Saving James Bond - Easy Version

题目来源:http://pta.patest.cn/pta/test/18/exam/4/question/625 This time let us consi...

21160
来自专栏小樱的经验随笔

HDU 4256 The Famous Clock

The Famous Clock Time Limit: 2000/1000 MS (Java/Others)    Memory Limit: 32768/3...

27760
来自专栏码匠的流水账

聊聊storm的OpaquePartitionedTridentSpoutExecutor

本文主要研究一下storm的OpaquePartitionedTridentSpoutExecutor

9430
来自专栏码匠的流水账

聊聊storm的reportError

storm-2.0.0/storm-client/src/jvm/org/apache/storm/task/IErrorReporter.java

20830
来自专栏码匠的流水账

聊聊storm的PartialKeyGrouping

storm-core-1.2.2-sources.jar!/org/apache/storm/grouping/PartialKeyGrouping.java

12820

扫码关注云+社区

领取腾讯云代金券