聊聊storm trident spout的_maxTransactionActive

本文主要研究一下storm trident spout的_maxTransactionActive

MasterBatchCoordinator

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/topology/MasterBatchCoordinator.java

​
   TreeMap<Long, TransactionStatus> _activeTx = new TreeMap<Long, TransactionStatus>();
​
   public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        _throttler = new WindowedTimeThrottler((Number)conf.get(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS), 1);
        for(String spoutId: _managedSpoutIds) {
            _states.add(TransactionalState.newCoordinatorState(conf, spoutId));
        }
        _currTransaction = getStoredCurrTransaction();
​
        _collector = collector;
        Number active = (Number) conf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING);
        if(active==null) {
            _maxTransactionActive = 1;
        } else {
            _maxTransactionActive = active.intValue();
        }
        _attemptIds = getStoredCurrAttempts(_currTransaction, _maxTransactionActive);
​
        
        for(int i=0; i<_spouts.size(); i++) {
            String txId = _managedSpoutIds.get(i);
            _coordinators.add(_spouts.get(i).getCoordinator(txId, conf, context));
        }
        LOG.debug("Opened {}", this);
    }
​
   public void nextTuple() {
        sync();
    }
​
    private void sync() {
        // note that sometimes the tuples active may be less than max_spout_pending, e.g.
        // max_spout_pending = 3
        // tx 1, 2, 3 active, tx 2 is acked. there won't be a commit for tx 2 (because tx 1 isn't committed yet),
        // and there won't be a batch for tx 4 because there's max_spout_pending tx active
        TransactionStatus maybeCommit = _activeTx.get(_currTransaction);
        if(maybeCommit!=null && maybeCommit.status == AttemptStatus.PROCESSED) {
            maybeCommit.status = AttemptStatus.COMMITTING;
            _collector.emit(COMMIT_STREAM_ID, new Values(maybeCommit.attempt), maybeCommit.attempt);
            LOG.debug("Emitted on [stream = {}], [tx_status = {}], [{}]", COMMIT_STREAM_ID, maybeCommit, this);
        }
        
        if(_active) {
            if(_activeTx.size() < _maxTransactionActive) {
                Long curr = _currTransaction;
                for(int i=0; i<_maxTransactionActive; i++) {
                    if(!_activeTx.containsKey(curr) && isReady(curr)) {
                        // by using a monotonically increasing attempt id, downstream tasks
                        // can be memory efficient by clearing out state for old attempts
                        // as soon as they see a higher attempt id for a transaction
                        Integer attemptId = _attemptIds.get(curr);
                        if(attemptId==null) {
                            attemptId = 0;
                        } else {
                            attemptId++;
                        }
                        _attemptIds.put(curr, attemptId);
                        for(TransactionalState state: _states) {
                            state.setData(CURRENT_ATTEMPTS, _attemptIds);
                        }
                        
                        TransactionAttempt attempt = new TransactionAttempt(curr, attemptId);
                        final TransactionStatus newTransactionStatus = new TransactionStatus(attempt);
                        _activeTx.put(curr, newTransactionStatus);
                        _collector.emit(BATCH_STREAM_ID, new Values(attempt), attempt);
                        LOG.debug("Emitted on [stream = {}], [tx_attempt = {}], [tx_status = {}], [{}]", BATCH_STREAM_ID, attempt, newTransactionStatus, this);
                        _throttler.markEvent();
                    }
                    curr = nextTransactionId(curr);
                }
            }
        }
    }
​
    private static class TransactionStatus {
        TransactionAttempt attempt;
        AttemptStatus status;
        
        public TransactionStatus(TransactionAttempt attempt) {
            this.attempt = attempt;
            this.status = AttemptStatus.PROCESSING;
        }
​
        @Override
        public String toString() {
            return attempt.toString() + " <" + status.toString() + ">";
        }        
    }
​
    private static enum AttemptStatus {
        PROCESSING,
        PROCESSED,
        COMMITTING
    }
  • MasterBatchCoordinator在open方法对maxTransactionActive进行设置,从Config.TOPOLOGY_MAX_SPOUT_PENDING(topology.max.spout.pending),配置文件默认为null,这里在该值为null时设置maxTransactionActive为1
  • nextTuple这里对同时处理的batches的数量进行了控制,只有_activeTx中的batches处理成功或失败之后才能继续下一个batch
  • _activeTx是一个treeMap,它以transactionId为key,value是TransactionStatus,它里头包含了TransactionAttempt及AttemptStatus;AttemptStatus有三种状态,分别是PROCESSING、PROCESSED、COMMITTING

TridentSpoutCoordinator

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/spout/TridentSpoutCoordinator.java

    RotatingTransactionalState _state;
​
    public void prepare(Map conf, TopologyContext context) {
        _coord = _spout.getCoordinator(_id, conf, context);
        _underlyingState = TransactionalState.newCoordinatorState(conf, _id);
        _state = new RotatingTransactionalState(_underlyingState, META_DIR);
    }
​
    public void execute(Tuple tuple, BasicOutputCollector collector) {
        TransactionAttempt attempt = (TransactionAttempt) tuple.getValue(0);
​
        if(tuple.getSourceStreamId().equals(MasterBatchCoordinator.SUCCESS_STREAM_ID)) {
            _state.cleanupBefore(attempt.getTransactionId());
            _coord.success(attempt.getTransactionId());
        } else {
            long txid = attempt.getTransactionId();
            Object prevMeta = _state.getPreviousState(txid);
            Object meta = _coord.initializeTransaction(txid, prevMeta, _state.getState(txid));
            _state.overrideState(txid, meta);
            collector.emit(MasterBatchCoordinator.BATCH_STREAM_ID, new Values(attempt, meta));
        }
                
    }
  • TridentSpoutCoordinator的execute方法按txid来存取meta,之后往TridentBoltExecutor发射Values(attempt, meta)

TridentBoltExecutor

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/topology/TridentBoltExecutor.java

​
    RotatingMap<Object, TrackedBatch> _batches;
​
​
    public void execute(Tuple tuple) {
        if(TupleUtils.isTick(tuple)) {
            long now = System.currentTimeMillis();
            if(now - _lastRotate > _messageTimeoutMs) {
                _batches.rotate();
                _lastRotate = now;
            }
            return;
        }
        String batchGroup = _batchGroupIds.get(tuple.getSourceGlobalStreamId());
        if(batchGroup==null) {
            // this is so we can do things like have simple DRPC that doesn't need to use batch processing
            _coordCollector.setCurrBatch(null);
            _bolt.execute(null, tuple);
            _collector.ack(tuple);
            return;
        }
        IBatchID id = (IBatchID) tuple.getValue(0);
        //get transaction id
        //if it already exists and attempt id is greater than the attempt there
        
        
        TrackedBatch tracked = (TrackedBatch) _batches.get(id.getId());
//        if(_batches.size() > 10 && _context.getThisTaskIndex() == 0) {
//            System.out.println("Received in " + _context.getThisComponentId() + " " + _context.getThisTaskIndex()
//                    + " (" + _batches.size() + ")" +
//                    "\ntuple: " + tuple +
//                    "\nwith tracked " + tracked +
//                    "\nwith id " + id + 
//                    "\nwith group " + batchGroup
//                    + "\n");
//            
//        }
        //System.out.println("Num tracked: " + _batches.size() + " " + _context.getThisComponentId() + " " + _context.getThisTaskIndex());
        
        // this code here ensures that only one attempt is ever tracked for a batch, so when
        // failures happen you don't get an explosion in memory usage in the tasks
        if(tracked!=null) {
            if(id.getAttemptId() > tracked.attemptId) {
                _batches.remove(id.getId());
                tracked = null;
            } else if(id.getAttemptId() < tracked.attemptId) {
                // no reason to try to execute a previous attempt than we've already seen
                return;
            }
        }
        
        if(tracked==null) {
            tracked = new TrackedBatch(new BatchInfo(batchGroup, id, _bolt.initBatchState(batchGroup, id)), _coordConditions.get(batchGroup), id.getAttemptId());
            _batches.put(id.getId(), tracked);
        }
        _coordCollector.setCurrBatch(tracked);
        
        //System.out.println("TRACKED: " + tracked + " " + tuple);
        
        TupleType t = getTupleType(tuple, tracked);
        if(t==TupleType.COMMIT) {
            tracked.receivedCommit = true;
            checkFinish(tracked, tuple, t);
        } else if(t==TupleType.COORD) {
            int count = tuple.getInteger(1);
            tracked.reportedTasks++;
            tracked.expectedTupleCount+=count;
            checkFinish(tracked, tuple, t);
        } else {
            tracked.receivedTuples++;
            boolean success = true;
            try {
                _bolt.execute(tracked.info, tuple);
                if(tracked.condition.expectedTaskReports==0) {
                    success = finishBatch(tracked, tuple);
                }
            } catch(FailedException e) {
                failBatch(tracked, e);
            }
            if(success) {
                _collector.ack(tuple);                   
            } else {
                _collector.fail(tuple);
            }
        }
        _coordCollector.setCurrBatch(null);
    }
​
    public static class TrackedBatch {
        int attemptId;
        BatchInfo info;
        CoordCondition condition;
        int reportedTasks = 0;
        int expectedTupleCount = 0;
        int receivedTuples = 0;
        Map<Integer, Integer> taskEmittedTuples = new HashMap<>();
        boolean failed = false;
        boolean receivedCommit;
        Tuple delayedAck = null;
        
        public TrackedBatch(BatchInfo info, CoordCondition condition, int attemptId) {
            this.info = info;
            this.condition = condition;
            this.attemptId = attemptId;
            receivedCommit = condition.commitStream == null;
        }
​
        @Override
        public String toString() {
            return ToStringBuilder.reflectionToString(this);
        }        
    }
​
    private void checkFinish(TrackedBatch tracked, Tuple tuple, TupleType type) {
        if(tracked.failed) {
            failBatch(tracked);
            _collector.fail(tuple);
            return;
        }
        CoordCondition cond = tracked.condition;
        boolean delayed = tracked.delayedAck==null &&
                              (cond.commitStream!=null && type==TupleType.COMMIT
                               || cond.commitStream==null);
        if(delayed) {
            tracked.delayedAck = tuple;
        }
        boolean failed = false;
        if(tracked.receivedCommit && tracked.reportedTasks == cond.expectedTaskReports) {
            if(tracked.receivedTuples == tracked.expectedTupleCount) {
                finishBatch(tracked, tuple);                
            } else {
                //TODO: add logging that not all tuples were received
                failBatch(tracked);
                _collector.fail(tuple);
                failed = true;
            }
        }
        
        if(!delayed && !failed) {
            _collector.ack(tuple);
        }
        
    }
​
    private boolean finishBatch(TrackedBatch tracked, Tuple finishTuple) {
        boolean success = true;
        try {
            _bolt.finishBatch(tracked.info);
            String stream = COORD_STREAM(tracked.info.batchGroup);
            for(Integer task: tracked.condition.targetTasks) {
                _collector.emitDirect(task, stream, finishTuple, new Values(tracked.info.batchId, Utils.get(tracked.taskEmittedTuples, task, 0)));
            }
            if(tracked.delayedAck!=null) {
                _collector.ack(tracked.delayedAck);
                tracked.delayedAck = null;
            }
        } catch(FailedException e) {
            failBatch(tracked, e);
            success = false;
        }
        _batches.remove(tracked.info.batchId.getId());
        return success;
    }
​
    private void failBatch(TrackedBatch tracked, FailedException e) {
        if(e!=null && e instanceof ReportedFailedException) {
            _collector.reportError(e);
        }
        tracked.failed = true;
        if(tracked.delayedAck!=null) {
            _collector.fail(tracked.delayedAck);
            tracked.delayedAck = null;
        }
    }
  • TridentBoltExecutor使用RotatingMap(_batches)来存放batch的信息,key为txid,而valute为TrackedBatch
  • 在调用bolt.execute(tracked.info, tuple)方法时,传递了BatchInfo,它里头的state值为bolt.initBatchState(batchGroup, id),通过bolt的initBatchState得来的,这是在第一次batches里头没有该txid信息的时候,第一次创建的时候调用
  • 这里的checkFinish也是根据batch对应的TrackedBatch信息来进行判断的;finishBatch的时候会调用_bolt.finishBatch(tracked.info),传递batchInfo过去;failBatch也是对batch对应的TrackedBatch进行操作

BatchInfo

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/topology/BatchInfo.java

public class BatchInfo {
    public IBatchID batchId;
    public Object state;
    public String batchGroup;
    
    public BatchInfo(String batchGroup, IBatchID batchId, Object state) {
        this.batchGroup = batchGroup;
        this.batchId = batchId;
        this.state = state;
    }
}
  • BatchInfo里头包含了batchId,state以及batchGroup信息

TridentSpoutExecutor

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/spout/TridentSpoutExecutor.java

    public Object initBatchState(String batchGroup, Object batchId) {
        return null;
    }
​
    public void execute(BatchInfo info, Tuple input) {
        // there won't be a BatchInfo for the success stream
        TransactionAttempt attempt = (TransactionAttempt) input.getValue(0);
        if(input.getSourceStreamId().equals(MasterBatchCoordinator.COMMIT_STREAM_ID)) {
            if(attempt.equals(_activeBatches.get(attempt.getTransactionId()))) {
                ((ICommitterTridentSpout.Emitter) _emitter).commit(attempt);
                _activeBatches.remove(attempt.getTransactionId());
            } else {
                 throw new FailedException("Received commit for different transaction attempt");
            }
        } else if(input.getSourceStreamId().equals(MasterBatchCoordinator.SUCCESS_STREAM_ID)) {
            // valid to delete before what's been committed since 
            // those batches will never be accessed again
            _activeBatches.headMap(attempt.getTransactionId()).clear();
            _emitter.success(attempt);
        } else {            
            _collector.setBatch(info.batchId);
            _emitter.emitBatch(attempt, input.getValue(1), _collector);
            _activeBatches.put(attempt.getTransactionId(), attempt);
        }
    }
​
    public void finishBatch(BatchInfo batchInfo) {
    }
  • TridentSpoutExecutor的execute方法,也是根据txid来区分各自batch的信息

SubtopologyBolt

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/planner/SubtopologyBolt.java

    public Object initBatchState(String batchGroup, Object batchId) {
        ProcessorContext ret = new ProcessorContext(batchId, new Object[_nodes.size()]);
        for(TridentProcessor p: _myTopologicallyOrdered.get(batchGroup)) {
            p.startBatch(ret);
        }
        return ret;
    }
​
    public void execute(BatchInfo batchInfo, Tuple tuple) {
        String sourceStream = tuple.getSourceStreamId();
        InitialReceiver ir = _roots.get(sourceStream);
        if(ir==null) {
            throw new RuntimeException("Received unexpected tuple " + tuple.toString());
        }
        ir.receive((ProcessorContext) batchInfo.state, tuple);
    }
​
    public void finishBatch(BatchInfo batchInfo) {
        for(TridentProcessor p: _myTopologicallyOrdered.get(batchInfo.batchGroup)) {
            p.finishBatch((ProcessorContext) batchInfo.state);
        }
    }
​
    protected static class InitialReceiver {
        List<TridentProcessor> _receivers = new ArrayList<>();
        RootFactory _factory;
        ProjectionFactory _project;
        String _stream;
        
        public InitialReceiver(String stream, Fields allFields) {
            // TODO: don't want to project for non-batch bolts...???
            // how to distinguish "batch" streams from non-batch streams?
            _stream = stream;
            _factory = new RootFactory(allFields);
            List<String> projected = new ArrayList<>(allFields.toList());
            projected.remove(0);
            _project = new ProjectionFactory(_factory, new Fields(projected));
        }
        
        public void receive(ProcessorContext context, Tuple tuple) {
            TridentTuple t = _project.create(_factory.create(tuple));
            for(TridentProcessor r: _receivers) {
                r.execute(context, _stream, t);
            }            
        }
        
        public void addReceiver(TridentProcessor p) {
            _receivers.add(p);
        }
        
        public Factory getOutputFactory() {
            return _project;
        }
    }
  • SubtopologyBolt在initBatchState的时候,创建ProcessorContext的也是带有batchId的标识,这样子不同的batch并行的话,它们的ProcessorContext也是区分开来的
  • execute方法使用的是各自batch的ProcessorContext(batchInfo.state),调用TridentProcessor的execute方法,使用的是各自batch的ProcessorContext
  • finishBatch方法也一样,将(ProcessorContext) batchInfo.state传递给TridentProcessor.finishBatch

AggregateProcessor

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/planner/processor/AggregateProcessor.java

    public void startBatch(ProcessorContext processorContext) {
        _collector.setContext(processorContext);
        processorContext.state[_context.getStateIndex()] = _agg.init(processorContext.batchId, _collector);
    }    
​
    public void execute(ProcessorContext processorContext, String streamId, TridentTuple tuple) {
        _collector.setContext(processorContext);
        _agg.aggregate(processorContext.state[_context.getStateIndex()], _projection.create(tuple), _collector);
    }
    
    public void finishBatch(ProcessorContext processorContext) {
        _collector.setContext(processorContext);
        _agg.complete(processorContext.state[_context.getStateIndex()], _collector);
    }
  • AggregateProcessor的startBatch、execute、finishBatch方法都使用了ProcessorContext的state,而该ProcessorContext从SubtopologyBolt传递过来的就是区分batch的

EachProcessor

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/planner/processor/EachProcessor.java

    public void prepare(Map conf, TopologyContext context, TridentContext tridentContext) {
        List<Factory> parents = tridentContext.getParentTupleFactories();
        if(parents.size()!=1) {
            throw new RuntimeException("Each operation can only have one parent");
        }
        _context = tridentContext;
        _collector = new AppendCollector(tridentContext);
        _projection = new ProjectionFactory(parents.get(0), _inputFields);
        _function.prepare(conf, new TridentOperationContext(context, _projection));
    }
    
    public void execute(ProcessorContext processorContext, String streamId, TridentTuple tuple) {
        _collector.setContext(processorContext, tuple);
        _function.execute(_projection.create(tuple), _collector);
    }
​
    public void startBatch(ProcessorContext processorContext) {
    }
​
    public void finishBatch(ProcessorContext processorContext) {
    }
  • EachProcessor则是将ProcessorContext设置到collector,然后调用function.execute的时候,将collector传递过去;这里的collector为AppendCollector

AppendCollector

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/planner/processor/AppendCollector.java

public class AppendCollector implements TridentCollector {
    OperationOutputFactory _factory;
    TridentContext _triContext;
    TridentTuple tuple;
    ProcessorContext context;
    
    public AppendCollector(TridentContext context) {
        _triContext = context;
        _factory = new OperationOutputFactory(context.getParentTupleFactories().get(0), context.getSelfOutputFields());
    }
                
    public void setContext(ProcessorContext pc, TridentTuple t) {
        this.context = pc;
        this.tuple = t;
    }
​
    @Override
    public void emit(List<Object> values) {
        TridentTuple toEmit = _factory.create((TridentTupleView) tuple, values);
        for(TupleReceiver r: _triContext.getReceivers()) {
            r.execute(context, _triContext.getOutStreamId(), toEmit);
        }
    }
​
    @Override
    public void reportError(Throwable t) {
        _triContext.getDelegateCollector().reportError(t);
    } 
    
    public Factory getOutputFactory() {
        return _factory;
    }
}
  • 当_function.execute使用AppendCollector进行emit的时候,AppendCollector会将这些tuple交给TupleReceiver去处理,而传递过去的context为EachProcessor设置的ProcessorContext,即每个batch自己的ProcessorContext;TupleReceiver的execute方法可能对ProcessorContext进行存取,这个也是batch维度的,比如AggregateProcessor将聚合结果存放到自己batch的processorContext.state中

小结

  • storm的trident使用[id,count]数据来告诉下游的TridentBoltExecutor来结束一个batch;而TridentBoltExecutor在接收[id,count]数据的时候,会先判断tracked.reportedTasks是否等于cond.expectedTaskReports(这个在上游的TridentBoltExecutor的parallelism大于1的时候用来聚合这些task的数据),相等之后再判断tracked.receivedTuples是否等于tracked.expectedTupleCount,相等才能进行finishBatch
  • storm的trident spout的maxTransactionActive参数根据Config.TOPOLOGY_MAX_SPOUT_PENDING(topology.max.spout.pending)进行设置,配置文件默认为null,在该值为null时maxTransactionActive为1
  • MasterBatchCoordinator对同时处理的batches的数量进行了控制,只有activeTx中的batches处理成功或失败之后才能继续下一个batch;而当并行有多个activeTx的时候,下游的TridentBoltExecutor也能够区分batch来进行处理,不会造成混乱;比如SubtopologyBolt在initBatchState的时候,创建ProcessorContext的也是带有batchId的标识,这样子不同的batch并行的话,它们的ProcessorContext也是区分开来的;SubtopologyBolt里头调用的TridentProcessor有的会使用ProcessorContext来存储结果,比如AggregateProcessor将聚合结果存放到自己batch的processorContext.state中

doc

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

编辑于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏对角另一面

lodash源码分析之baseFindIndex中的运算符优先级

我悟出权力本来就是不讲理的——蟑螂就是海米;也悟出要造反,内心必须强大到足以承受任何后果才行。 ——北岛《城门开》 本文为读 lodash 源码的第十篇,后...

27811
来自专栏pangguoming

JDBC上关于数据库中多表操作一对多关系和多对多关系的实现方法

我们知道,在设计一个Java bean的时候,要把这些BEAN 的数据存放在数据库中的表结构,然而这些数据库中的表直接又有些特殊的关系,例如员工与部门直接有一对...

8687
来自专栏生信小驿站

数据处理第一节:选取列的基本到高级方法选取列列名

博客原文:https://suzan.rbind.io/2018/01/dplyr-tutorial-1/ 作者:Suzan Baert

1032
来自专栏haifeiWu与他朋友们的专栏

Kotlin:Android开发技巧

Kotlin作为Android开发第一语言,然而身边做Android的大多还是使用java。Android转到Kotlin的趋势是必然的,公司隔壁部门已经全部使...

1463
来自专栏为数不多的Android技巧

Java高效分割字符串

最近优化一段代码的调用时间,发现性能瓶颈居然是io和split!io操作慢情有可原,那么对于split有没有更高效的方法呢?

1.3K2
来自专栏北京马哥教育

深度详解 Python yield与实现

学Python最简单的方法是什么?推荐阅读:Python开发工程师成长魔法 Python yield与实现 yield的功能类似于return,但是不同之处在于...

61012
来自专栏GIS讲堂

geotools编写shp转sql,实现shp数据入Oracle Spatial库

用到Oracle Spatial就很难避免shp文件的入库问题,虽然有shp2sdo工具,但是用起来不是很习惯,所以,本文讲述如何结合geotools实现shp...

2714
来自专栏自由而无用的灵魂的碎碎念

通过写Java代码来对MyEclipse进行注册

import java.io.BufferedReader; import java.io.IOException; import java.io...

1324
来自专栏Ldpe2G的个人博客

Graphviz4S ---- 在Scala中使用DOT语言绘图的开源工具

2106
来自专栏Golang语言社区

Golang中time包用法--转

time包中包括两类时间:时间点(某一时刻)和时常(某一段时间) 1时间常量(时间格式化) const ( ANSIC = "Mon Jan...

1.5K8

扫码关注云+社区

领取腾讯云代金券