聊聊storm TridentTopology的构建

本文主要研究一下storm TridentTopology的构建

实例

    @Test
    public void testDebugTopologyBuild(){
        FixedBatchSpout spout = new FixedBatchSpout(new Fields("user", "score"), 3,
                new Values("nickt1", 4),
                new Values("nickt2", 7),
                new Values("nickt3", 8),
                new Values("nickt4", 9),
                new Values("nickt5", 7),
                new Values("nickt6", 11),
                new Values("nickt7", 5)
        );
        spout.setCycle(false);
        TridentTopology topology = new TridentTopology();
        Stream stream1 = topology.newStream("spout1",spout)
                .each(new Fields("user", "score"), new BaseFunction() {
                    @Override
                    public void execute(TridentTuple tuple, TridentCollector collector) {
                        System.out.println("tuple:"+tuple);
                    }
                },new Fields());
​
        topology.build();
    }
  • 后面的分析为了简单起见,很多是依据这个实例来

TridentTopology.newStream

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/TridentTopology.java

    public Stream newStream(String txId, IRichSpout spout) {
        return newStream(txId, new RichSpoutBatchExecutor(spout));
    }
    
    public Stream newStream(String txId, IPartitionedTridentSpout spout) {
        return newStream(txId, new PartitionedTridentSpoutExecutor(spout));
    }
    
    public Stream newStream(String txId, IOpaquePartitionedTridentSpout spout) {
        return newStream(txId, new OpaquePartitionedTridentSpoutExecutor(spout));
    }
​
    public Stream newStream(String txId, ITridentDataSource dataSource) {
        if (dataSource instanceof IBatchSpout) {
            return newStream(txId, (IBatchSpout) dataSource);
        } else if (dataSource instanceof ITridentSpout) {
            return newStream(txId, (ITridentSpout) dataSource);
        } else if (dataSource instanceof IPartitionedTridentSpout) {
            return newStream(txId, (IPartitionedTridentSpout) dataSource);
        } else if (dataSource instanceof IOpaquePartitionedTridentSpout) {
            return newStream(txId, (IOpaquePartitionedTridentSpout) dataSource);
        } else {
            throw new UnsupportedOperationException("Unsupported stream");
        }
    }
​
    public Stream newStream(String txId, IBatchSpout spout) {
        Node n = new SpoutNode(getUniqueStreamId(), spout.getOutputFields(), txId, spout, SpoutNode.SpoutType.BATCH);
        return addNode(n);
    }
    
    public Stream newStream(String txId, ITridentSpout spout) {
        Node n = new SpoutNode(getUniqueStreamId(), spout.getOutputFields(), txId, spout, SpoutNode.SpoutType.BATCH);
        return addNode(n);
    }
​
    protected Stream addNode(Node n) {
        registerNode(n);
        return new Stream(this, n.name, n);
    }
​
    protected void registerNode(Node n) {
        _graph.addVertex(n);
        if(n.stateInfo!=null) {
            String id = n.stateInfo.id;
            if(!_colocate.containsKey(id)) {
                _colocate.put(id, new ArrayList());
            }
            _colocate.get(id).add(n);
        }
    }
  • newStream的第一个参数是txId,第二个参数是ITridentDataSource
  • ITridentDataSource分为好几个类型,分别有IBatchSpout、ITridentSpout、IPartitionedTridentSpout、IOpaquePartitionedTridentSpout
  • 最后都是创建SpoutNode,然后registerNode添加到graph(`如果node的stateInfo不为null,还会添加到colocate,不过SpoutNode该值为null`),注意SpoutNode的SpoutType为SpoutNode.SpoutType.BATCH

Node

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/planner/Node.java

public class Node extends DefaultResourceDeclarer<Node> implements Serializable {
    private static final AtomicInteger INDEX = new AtomicInteger(0);
    private String nodeId;
​
    public String name = null;
    public Fields allOutputFields;
    public String streamId;
    public Integer parallelismHint = null;
    public NodeStateInfo stateInfo = null;
    public int creationIndex;
​
    public Node(String streamId, String name, Fields allOutputFields) {
        this.nodeId = UUID.randomUUID().toString();
        this.allOutputFields = allOutputFields;
        this.streamId = streamId;
        this.name = name;
        this.creationIndex = INDEX.incrementAndGet();
    }
​
    @Override
    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        return nodeId.equals(((Node) o).nodeId);
    }
​
    @Override
    public int hashCode() {
        return nodeId.hashCode();
    }
​
    @Override
    public String toString() {
        return ToStringBuilder.reflectionToString(this, ToStringStyle.MULTI_LINE_STYLE);
    }
​
    public String shortString() {
        return "nodeId: " + nodeId + ", allOutputFields: " + allOutputFields;
    }
}
  • Node继承了DefaultResourceDeclarer,而它实现了resources相关的接口:ResourceDeclarer以及ITridentResource
  • Node有几个子类,分别是SpoutNode、ProcessorNode、PartitionNode
  • SpoutNode就是spout信息的节点描述,ProcessorNode一般是trident的each、map、aggregrate、reduce、project等操作的节点描述,PartitionNode就是partition相关的节点描述

TridentTopology.build

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/TridentTopology.java

    public StormTopology build() {
        DefaultDirectedGraph<Node, IndexedEdge> graph = (DefaultDirectedGraph) _graph.clone();
        
        //......
        
        List<SpoutNode> spoutNodes = new ArrayList<>();
        
        // can be regular nodes (static state) or processor nodes
        Set<Node> boltNodes = new LinkedHashSet<>();
        for(Node n: graph.vertexSet()) {
            if(n instanceof SpoutNode) {
                spoutNodes.add((SpoutNode) n);
            } else if(!(n instanceof PartitionNode)) {
                boltNodes.add(n);
            }
        }
        
        Set<Group> initialGroups = new LinkedHashSet<>();
​
        //......
​
        for(Node n: boltNodes) {
            initialGroups.add(new Group(graph, n));
        }
        
        
        GraphGrouper grouper = new GraphGrouper(graph, initialGroups);
        grouper.mergeFully();
        Collection<Group> mergedGroups = grouper.getAllGroups();
        
        
        
        // add identity partitions between groups
        for(IndexedEdge<Node> e: new HashSet<>(graph.edgeSet())) {
            if(!(e.source instanceof PartitionNode) && !(e.target instanceof PartitionNode)) {                
                Group g1 = grouper.nodeGroup(e.source);
                Group g2 = grouper.nodeGroup(e.target);
                // g1 being null means the source is a spout node
                if(g1==null && !(e.source instanceof SpoutNode))
                    throw new RuntimeException("Planner exception: Null source group must indicate a spout node at this phase of planning");
                if(g1==null || !g1.equals(g2)) {
                    graph.removeEdge(e);
                    PartitionNode pNode = makeIdentityPartition(e.source);
                    graph.addVertex(pNode);
                    graph.addEdge(e.source, pNode, new IndexedEdge(e.source, pNode, 0));
                    graph.addEdge(pNode, e.target, new IndexedEdge(pNode, e.target, e.index));                    
                }
            }
        }
        
        //......
        
        // add in spouts as groups so we can get parallelisms
        for(Node n: spoutNodes) {
            grouper.addGroup(new Group(graph, n));
        }
        
        grouper.reindex();
        mergedGroups = grouper.getAllGroups();
                
        
        Map<Node, String> batchGroupMap = new HashMap<>();
        List<Set<Node>> connectedComponents = new ConnectivityInspector<>(graph).connectedSets();
        for(int i=0; i<connectedComponents.size(); i++) {
            String groupId = "bg" + i;
            for(Node n: connectedComponents.get(i)) {
                batchGroupMap.put(n, groupId);
            }
        }
        
//        System.out.println("GRAPH:");
//        System.out.println(graph);
        
        Map<Group, Integer> parallelisms = getGroupParallelisms(graph, grouper, mergedGroups);
​
        TridentTopologyBuilder builder = new TridentTopologyBuilder();
​
        Map<Node, String> spoutIds = genSpoutIds(spoutNodes);
        Map<Group, String> boltIds = genBoltIds(mergedGroups);
​
        for(SpoutNode sn: spoutNodes) {
            Integer parallelism = parallelisms.get(grouper.nodeGroup(sn));
​
            Map<String, Number> spoutRes = new HashMap<>(_resourceDefaults);
            spoutRes.putAll(sn.getResources());
​
            Number onHeap = spoutRes.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
            Number offHeap = spoutRes.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB);
            Number cpuLoad = spoutRes.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);
​
            SpoutDeclarer spoutDeclarer = null;
​
            if(sn.type == SpoutNode.SpoutType.DRPC) {
​
                spoutDeclarer = builder.setBatchPerTupleSpout(spoutIds.get(sn), sn.streamId,
                                                              (IRichSpout) sn.spout, parallelism, batchGroupMap.get(sn));
            } else {
                ITridentSpout s;
                if(sn.spout instanceof IBatchSpout) {
                    s = new BatchSpoutExecutor((IBatchSpout)sn.spout);
                } else if(sn.spout instanceof ITridentSpout) {
                    s = (ITridentSpout) sn.spout;
                } else {
                    throw new RuntimeException("Regular rich spouts not supported yet... try wrapping in a RichSpoutBatchExecutor");
                    // TODO: handle regular rich spout without batches (need lots of updates to support this throughout)
                }
                spoutDeclarer = builder.setSpout(spoutIds.get(sn), sn.streamId, sn.txId, s, parallelism, batchGroupMap.get(sn));
            }
​
            if(onHeap != null) {
                if(offHeap != null) {
                    spoutDeclarer.setMemoryLoad(onHeap, offHeap);
                }
                else {
                    spoutDeclarer.setMemoryLoad(onHeap);
                }
            }
​
            if(cpuLoad != null) {
                spoutDeclarer.setCPULoad(cpuLoad);
            }
        }
​
        for(Group g: mergedGroups) {
            if(!isSpoutGroup(g)) {
                Integer p = parallelisms.get(g);
                Map<String, String> streamToGroup = getOutputStreamBatchGroups(g, batchGroupMap);
                Map<String, Number> groupRes = g.getResources(_resourceDefaults);
​
                Number onHeap = groupRes.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
                Number offHeap = groupRes.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB);
                Number cpuLoad = groupRes.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);
​
                BoltDeclarer d = builder.setBolt(boltIds.get(g), new SubtopologyBolt(graph, g.nodes, batchGroupMap), p,
                                                 committerBatches(g, batchGroupMap), streamToGroup);
​
                if(onHeap != null) {
                    if(offHeap != null) {
                        d.setMemoryLoad(onHeap, offHeap);
                    }
                    else {
                        d.setMemoryLoad(onHeap);
                    }
                }
​
                if(cpuLoad != null) {
                    d.setCPULoad(cpuLoad);
                }
​
                Collection<PartitionNode> inputs = uniquedSubscriptions(externalGroupInputs(g));
                for(PartitionNode n: inputs) {
                    Node parent = TridentUtils.getParent(graph, n);
                    String componentId = parent instanceof SpoutNode ?
                            spoutIds.get(parent) : boltIds.get(grouper.nodeGroup(parent));
                    d.grouping(new GlobalStreamId(componentId, n.streamId), n.thriftGrouping);
                }
            }
        }
        HashMap<String, Number> combinedMasterCoordResources = new HashMap<String, Number>(_resourceDefaults);
        combinedMasterCoordResources.putAll(_masterCoordResources);
        return builder.buildTopology(combinedMasterCoordResources);
    }
  • 这里创建了TridentTopologyBuilder,然后对于spoutNodes,调用TridentTopologyBuilder.setSpout(String id, String streamName, String txStateId, ITridentSpout spout, Integer parallelism, String batchGroup)方法,添加spout
  • 对于IBatchSpout类型的spout,通过BatchSpoutExecutor包装为ITridentSpout
  • 这里的streamName为streamId,通过UniqueIdGen.getUniqueStreamId生成,以s开头,之后是_streamCounter的计数,比如1,合起来就是s1;txStateId为用户传入的txId;batchGroup以bg开头,之后是connectedComponents的元素的index,比如0,合起来就是bg0;parallelism参数就是用户构建topology时设置的
  • 设置完spout之后,就是设置spout的相关资源配置,比如memoryLoad、cpuLoad;之后设置bolt,这里使用的是SubtopologyBolt,然后设置bolt相关的资源配置
  • 最后调用TridentTopologyBuilder.buildTopology

TridentTopologyBuilder.setSpout

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/topology/TridentTopologyBuilder.java

    Map<GlobalStreamId, String> _batchIds = new HashMap();
    Map<String, TransactionalSpoutComponent> _spouts = new HashMap();
​
    public SpoutDeclarer setSpout(String id, String streamName, String txStateId, ITridentSpout spout, Integer parallelism, String batchGroup) {
        Map<String, String> batchGroups = new HashMap();
        batchGroups.put(streamName, batchGroup);
        markBatchGroups(id, batchGroups);
​
        TransactionalSpoutComponent c = new TransactionalSpoutComponent(spout, streamName, parallelism, txStateId, batchGroup);
        _spouts.put(id, c);
        return new SpoutDeclarerImpl(c);
    }
​
    private void markBatchGroups(String component, Map<String, String> batchGroups) {
        for(Map.Entry<String, String> entry: batchGroups.entrySet()) {
            _batchIds.put(new GlobalStreamId(component, entry.getKey()), entry.getValue());
        }
    }
  • 这里调用了markBatchGroups,将新的component添加到batchIds中,同时也添加到spouts中

TridentTopologyBuilder.setBolt

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/topology/TridentTopologyBuilder.java

    Map<GlobalStreamId, String> _batchIds = new HashMap();
    Map<String, Component> _bolts = new HashMap();
​
    // map from stream name to batch id
    public BoltDeclarer setBolt(String id, ITridentBatchBolt bolt, Integer parallelism, Set<String> committerBatches, Map<String, String> batchGroups) {
        markBatchGroups(id, batchGroups);
        Component c = new Component(bolt, parallelism, committerBatches);
        _bolts.put(id, c);
        return new BoltDeclarerImpl(c);
        
    }
​
    private void markBatchGroups(String component, Map<String, String> batchGroups) {
        for(Map.Entry<String, String> entry: batchGroups.entrySet()) {
            _batchIds.put(new GlobalStreamId(component, entry.getKey()), entry.getValue());
        }
    }
  • 这里调用了markBatchGroups将新的component添加到batchIds中,同时也添加到bolts中;对于trident来说,就是一系列的ProcessorNode(可能也会有PartitionNode)

TridentTopologyBuilder.buildTopology

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/topology/TridentTopologyBuilder.java

   public StormTopology buildTopology(Map<String, Number> masterCoordResources) {
        TopologyBuilder builder = new TopologyBuilder();
        Map<GlobalStreamId, String> batchIdsForSpouts = fleshOutStreamBatchIds(false);
        Map<GlobalStreamId, String> batchIdsForBolts = fleshOutStreamBatchIds(true);
​
        Map<String, List<String>> batchesToCommitIds = new HashMap<>();
        Map<String, List<ITridentSpout>> batchesToSpouts = new HashMap<>();
        
        for(String id: _spouts.keySet()) {
            TransactionalSpoutComponent c = _spouts.get(id);
            if(c.spout instanceof IRichSpout) {
                
                //TODO: wrap this to set the stream name
                builder.setSpout(id, (IRichSpout) c.spout, c.parallelism);
            } else {
                String batchGroup = c.batchGroupId;
                if(!batchesToCommitIds.containsKey(batchGroup)) {
                    batchesToCommitIds.put(batchGroup, new ArrayList<String>());
                }
                batchesToCommitIds.get(batchGroup).add(c.commitStateId);
​
                if(!batchesToSpouts.containsKey(batchGroup)) {
                    batchesToSpouts.put(batchGroup, new ArrayList<ITridentSpout>());
                }
                batchesToSpouts.get(batchGroup).add((ITridentSpout) c.spout);
                
                
                BoltDeclarer scd =
                      builder.setBolt(spoutCoordinator(id), new TridentSpoutCoordinator(c.commitStateId, (ITridentSpout) c.spout))
                        .globalGrouping(masterCoordinator(c.batchGroupId), MasterBatchCoordinator.BATCH_STREAM_ID)
                        .globalGrouping(masterCoordinator(c.batchGroupId), MasterBatchCoordinator.SUCCESS_STREAM_ID);
                
                for(Map<String, Object> m: c.componentConfs) {
                    scd.addConfigurations(m);
                }
                
                Map<String, TridentBoltExecutor.CoordSpec> specs = new HashMap();
                specs.put(c.batchGroupId, new CoordSpec());
                BoltDeclarer bd = builder.setBolt(id,
                        new TridentBoltExecutor(
                          new TridentSpoutExecutor(
                            c.commitStateId,
                            c.streamName,
                            ((ITridentSpout) c.spout)),
                            batchIdsForSpouts,
                            specs),
                        c.parallelism);
                bd.allGrouping(spoutCoordinator(id), MasterBatchCoordinator.BATCH_STREAM_ID);
                bd.allGrouping(masterCoordinator(batchGroup), MasterBatchCoordinator.SUCCESS_STREAM_ID);
                if(c.spout instanceof ICommitterTridentSpout) {
                    bd.allGrouping(masterCoordinator(batchGroup), MasterBatchCoordinator.COMMIT_STREAM_ID);
                }
                for(Map<String, Object> m: c.componentConfs) {
                    bd.addConfigurations(m);
                }
            }
        }
        
        //......
​
        Number onHeap = masterCoordResources.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
        Number offHeap = masterCoordResources.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB);
        Number cpuLoad = masterCoordResources.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);
​
        for(String batch: batchesToCommitIds.keySet()) {
            List<String> commitIds = batchesToCommitIds.get(batch);
            SpoutDeclarer masterCoord = builder.setSpout(masterCoordinator(batch), new MasterBatchCoordinator(commitIds, batchesToSpouts.get(batch)));
​
            if(onHeap != null) {
                if(offHeap != null) {
                    masterCoord.setMemoryLoad(onHeap, offHeap);
                }
                else {
                    masterCoord.setMemoryLoad(onHeap);
                }
            }
​
            if(cpuLoad != null) {
                masterCoord.setCPULoad(cpuLoad);
            }
        }
                
        for(String id: _bolts.keySet()) {
            Component c = _bolts.get(id);
            
            Map<String, CoordSpec> specs = new HashMap<>();
            
            for(GlobalStreamId s: getBoltSubscriptionStreams(id)) {
                String batch = batchIdsForBolts.get(s);
                if(!specs.containsKey(batch)) specs.put(batch, new CoordSpec());
                CoordSpec spec = specs.get(batch);
                CoordType ct;
                if(_batchPerTupleSpouts.containsKey(s.get_componentId())) {
                    ct = CoordType.single();
                } else {
                    ct = CoordType.all();
                }
                spec.coords.put(s.get_componentId(), ct);
            }
            
            for(String b: c.committerBatches) {
                specs.get(b).commitStream = new GlobalStreamId(masterCoordinator(b), MasterBatchCoordinator.COMMIT_STREAM_ID);
            }
            
            BoltDeclarer d = builder.setBolt(id, new TridentBoltExecutor(c.bolt, batchIdsForBolts, specs), c.parallelism);
            for(Map<String, Object> conf: c.componentConfs) {
                d.addConfigurations(conf);
            }
            
            for(InputDeclaration inputDecl: c.declarations) {
               inputDecl.declare(d);
            }
            
            Map<String, Set<String>> batchToComponents = getBoltBatchToComponentSubscriptions(id);
            for(Map.Entry<String, Set<String>> entry: batchToComponents.entrySet()) {
                for(String comp: entry.getValue()) {
                    d.directGrouping(comp, TridentBoltExecutor.COORD_STREAM(entry.getKey()));
                }
            }
            
            for(String b: c.committerBatches) {
                d.allGrouping(masterCoordinator(b), MasterBatchCoordinator.COMMIT_STREAM_ID);
            }
        }
​
        return builder.createTopology();
    }
  • buildTopology对于非IRichSpout的的spout会在topology中创建TridentSpoutCoordinator这个bolt,它globalGrouping了MasterBatchCoordinator.BATCH_STREAM_ID($batch)、MasterBatchCoordinator.SUCCESS_STREAM_ID($success)这两个stream;同时还创建了TridentBoltExecutor这个bolt,它allGrouping了MasterBatchCoordinator.BATCH_STREAM_ID($batch)、MasterBatchCoordinator.SUCCESS_STREAM_ID($success),对于spout是ICommitterTridentSpout类型的,还allGrouping了MasterBatchCoordinator.COMMIT_STREAM_ID($commit);注意这里将非IRichSpout的spout转换为bolt
  • 之后对batchesToCommitIds中的每个batch创建MasterBatchCoordinator这个spout,正好前前面的TridentSpoutCoordinator以及TridentBoltExecutor衔接起来
  • 对于bolt来说(包装了ProcessorNode的SubtopologyBolt),这里设置了TridentBoltExecutor这个bolt,它directGrouping了TridentBoltExecutor.COORD_STREAM($coord-),同时还allGrouping了MasterBatchCoordinator.COMMIT_STREAM_ID($commit)

TridentTopologyBuilder.createTopology

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/topology/TridentTopologyBuilder.java

    public StormTopology createTopology() {
        Map<String, Bolt> boltSpecs = new HashMap<>();
        Map<String, SpoutSpec> spoutSpecs = new HashMap<>();
        maybeAddCheckpointSpout();
        for(String boltId: _bolts.keySet()) {
            IRichBolt bolt = _bolts.get(boltId);
            bolt = maybeAddCheckpointTupleForwarder(bolt);
            ComponentCommon common = getComponentCommon(boltId, bolt);
            try{
                maybeAddCheckpointInputs(common);
                boltSpecs.put(boltId, new Bolt(ComponentObject.serialized_java(Utils.javaSerialize(bolt)), common));
            }catch(RuntimeException wrapperCause){
                if (wrapperCause.getCause() != null && NotSerializableException.class.equals(wrapperCause.getCause().getClass())){
                    throw new IllegalStateException(
                        "Bolt '" + boltId + "' contains a non-serializable field of type " + wrapperCause.getCause().getMessage() + ", " +
                        "which was instantiated prior to topology creation. " + wrapperCause.getCause().getMessage() + " " +
                        "should be instantiated within the prepare method of '" + boltId + " at the earliest.", wrapperCause);
                }
                throw wrapperCause;
            }
        }
        for(String spoutId: _spouts.keySet()) {
            IRichSpout spout = _spouts.get(spoutId);
            ComponentCommon common = getComponentCommon(spoutId, spout);
            try{
                spoutSpecs.put(spoutId, new SpoutSpec(ComponentObject.serialized_java(Utils.javaSerialize(spout)), common));
            }catch(RuntimeException wrapperCause){
                if (wrapperCause.getCause() != null && NotSerializableException.class.equals(wrapperCause.getCause().getClass())){
                    throw new IllegalStateException(
                        "Spout '" + spoutId + "' contains a non-serializable field of type " + wrapperCause.getCause().getMessage() + ", " +
                        "which was instantiated prior to topology creation. " + wrapperCause.getCause().getMessage() + " " +
                        "should be instantiated within the prepare method of '" + spoutId + " at the earliest.", wrapperCause);
                }
                throw wrapperCause;
            }
        }
​
        StormTopology stormTopology = new StormTopology(spoutSpecs,
                boltSpecs,
                new HashMap<String, StateSpoutSpec>());
​
        stormTopology.set_worker_hooks(_workerHooks);
​
        return Utils.addVersions(stormTopology);
    }
​
    /**
     * If the topology has at least one stateful bolt
     * add a {@link CheckpointSpout} component to the topology.
     */
    private void maybeAddCheckpointSpout() {
        if (hasStatefulBolt) {
            setSpout(CHECKPOINT_COMPONENT_ID, new CheckpointSpout(), 1);
        }
    }
  • createTopology的时候,判断如果有stateful的bolt,则会添加CheckpointSpout这个spout;同时对每个bolt判断如果是statefulBolt且不是StatefulBoltExecutor,那么会添加CheckpointTupleForwarder
  • 经过buildTopology的一系列设置,到了createTopology这里,已经有了3个bolt,一个是包装了ProcessNode的TridentBoltExecutor,一个是TridentSpoutCoordinator,还有一个是包装了原始spout的TridentBoltExecutor
  • spout这里只有一个就是MasterBatchCoordinator,在buildTopology的时候,对于非IRichSpout的的spout,会被转化为TridentSpoutCoordinator这个bolt

拓扑结构

  • 以前面的实例来讲,经过TridentTopologyBuilder的createTopology,最后的拓扑结构为一个spout为MasterBatchCoordinator($mastercoord-bg0),3个bolt分别为TridentSpoutCoordinator($spoutcoord-spout-spout1)、包装了非IRichSpout的的spout的TridentBoltExecutor(spout-spout1)、包装了ProcessorNode的TridentBoltExecutor(b-0);一共涉及到了几个stream,分别为MasterBatchCoordinator.SUCCESS_STREAM_ID($success)、MasterBatchCoordinator.COMMIT_STREAM_ID($commit)、MasterBatchCoordinator.BATCH_STREAM_ID($batch)、TridentBoltExecutor.COORD_STREAM($coord-bg0)、s1、s2
  • $mastercoord-bg0它declare了$success$commit$batch这三个stream,outputFields均为tx这个字段
  • $spoutcoord-spout-spout1它接收了$mastercoord-bg0$success$batch这两个stream,同时declare了$batch这个stream,outputFields为[tx,metadata]
  • spout-spout1,它allGrouping接收$mastercoord-bg0$success,以及$spoutcoord-spout-spout1$batch这两个stream的数据;同时会往$coord-bg0发送[id,count]数据,以及stream(s1)发送数据tuple
  • b-0它接收了spout-spout1$coord-bg0以及s1这两个stream的数据,之后往stream(s2)发送数据(output_fields:[$batchId, user, score]),同时也会往stream($coord-bg0)发送[id, count]数据

小结

  • TridentTopologyBuilder在buildTopology的时候,对于非IRichSpout的的spout,会被转化为TridentBoltExecutor这个bolt,同时会新增一个TridentSpoutCoordinator这个bolt;ProcessorNode则会被包装为TridentBoltExecutor这个bolt;TridentTopology为了方便管理将用户设定的spout包装为bolt,然后创建MasterBatchCoordinator作为真正的spout
  • TridentBoltExecutor.COORD_STREAM($coord-)这个stream用来在component之间传递[id, count]数据,用于保障tuple在每个component能够完整传输,即spout和bolt都会往该stream发送[id, count]数据
  • MasterBatchCoordinator、TridentSpoutCoordinator、包装原始spout的TridentBoltExecutor(spout-spout1)它们之间的关系如下:master会给spout-spout1发送suceess数据(tuple\指令),给coordinator发送suceess、batch数据(tuple\指令);coordinator会给spout-spout1发送batch数据(tuple\指令)

doc

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏我是攻城师

Fastjson解析嵌套Map例子

3685
来自专栏小樱的经验随笔

POJ 2492 A Bug's Life

A Bug's Life Time Limit: 10000MS Memory Limit: 65536K Total Submissions:...

28610
来自专栏SeanCheney的专栏

《Pandas Cookbook》第09章 合并Pandas对象

1501
来自专栏calmound

HDU 3368 Reversi

http://acm.hdu.edu.cn/showproblem.php?pid=3368 题意:模拟黑白棋,下一步黑手最大可以转化多少个白旗 分析:暴力  ...

3277
来自专栏LhWorld哥陪你聊算法

Hadoop源码篇--Reduce篇

Reduce文件会从Mapper任务中拉取很多小文件,小文件内部有序,但是整体是没序的,Reduce会合并小文件,然后套个归并算法,变成一个整体有序的文件。

2181
来自专栏码匠的流水账

聊聊storm的WindowedBolt

storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/IWindowedBolt.java

752
来自专栏葡萄城控件技术团队

深入浅出OOP(五): C#访问修饰符(Public/Private/Protected/Internal/Sealed/Constants)

访问修饰符(或者叫访问控制符)是面向对象语言的特性之一,用于对类、类成员函数、类成员变量进行访问控制。同时,访问控制符也是语法保留关键字,用于封装组件。 Pub...

3099
来自专栏函数式编程语言及工具

SDP(9):MongoDB-Scala - data access and modeling

    MongoDB是一种文件型数据库,对数据格式没有硬性要求,所以可以实现灵活多变的数据存储和读取。MongoDB又是一种分布式数据库,与传统关系数据库不同...

3894
来自专栏数据结构与算法

P2580 于是他错误的点名开始了

题目背景 XS中学化学竞赛组教练是一个酷爱炉石的人。 他会一边搓炉石一边点名以至于有一天他连续点到了某个同学两次,然后正好被路过的校长发现了然后就是一顿欧拉欧拉...

2937
来自专栏Albert陈凯

2018-04-17 Java的Collection集合类3分钟搞掂Set集合前言

3分钟搞掂Set集合 前言 声明,本文用的是jdk1.8 现在这篇主要讲Set集合的三个子类: HashSet集合 A:底层数据结构是哈希表(是一个元素为链...

2977

扫码关注云+社区