聊聊storm nimbus的mkAssignments

本文主要研究一下storm nimbus的mkAssignments

Nimbus.mkAssignments

storm-2.0.0/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java

    void doRebalance(String topoId, StormBase stormBase) throws Exception {
        RebalanceOptions rbo = stormBase.get_topology_action_options().get_rebalance_options();
        StormBase updated = new StormBase();
        updated.set_topology_action_options(null);
        updated.set_component_debug(Collections.emptyMap());

        if (rbo.is_set_num_executors()) {
            updated.set_component_executors(rbo.get_num_executors());
        }

        if (rbo.is_set_num_workers()) {
            updated.set_num_workers(rbo.get_num_workers());
        }
        stormClusterState.updateStorm(topoId, updated);
        updateBlobStore(topoId, rbo, ServerUtils.principalNameToSubject(rbo.get_principal()));
        idToExecutors.getAndUpdate(new Dissoc<>(topoId)); // remove the executors cache to let it recompute.
        mkAssignments(topoId);
    }

    private void mkAssignments() throws Exception {
        mkAssignments(null);
    }
  • 这里调用stormClusterState.updateStorm(topoId, updated)更新zk数据
  • 这里doRebalance以及mkAssignments都调用了mkAssignments(String scratchTopoId)方法

mkAssignments(String scratchTopoId)

storm-2.0.0/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java

    private void mkAssignments(String scratchTopoId) throws Exception {
        try {
            if (!isReadyForMKAssignments()) {
                return;
            }
            // get existing assignment (just the topologyToExecutorToNodePort map) -> default to {}
            // filter out ones which have a executor timeout
            // figure out available slots on cluster. add to that the used valid slots to get total slots. figure out how many executors
            // should be in each slot (e.g., 4, 4, 4, 5)
            // only keep existing slots that satisfy one of those slots. for rest, reassign them across remaining slots
            // edge case for slots with no executor timeout but with supervisor timeout... just treat these as valid slots that can be
            // reassigned to. worst comes to worse the executor will timeout and won't assign here next time around

            IStormClusterState state = stormClusterState;
            //read all the topologies
            Map<String, StormBase> bases;
            Map<String, TopologyDetails> tds = new HashMap<>();
            synchronized (submitLock) {
                // should promote: only fetch storm bases of topologies that need scheduling.
                bases = state.topologyBases();

                for (Iterator<Entry<String, StormBase>> it = bases.entrySet().iterator(); it.hasNext(); ) {
                    Entry<String, StormBase> entry = it.next();
                    String id = entry.getKey();
                    try {
                        tds.put(id, readTopologyDetails(id, entry.getValue()));
                    } catch (KeyNotFoundException e) {
                        //A race happened and it is probably not running
                        it.remove();
                    }
                }
            }
            List<String> assignedTopologyIds = state.assignments(null);
            Map<String, Assignment> existingAssignments = new HashMap<>();
            for (String id : assignedTopologyIds) {
                //for the topology which wants rebalance (specified by the scratchTopoId)
                // we exclude its assignment, meaning that all the slots occupied by its assignment
                // will be treated as free slot in the scheduler code.
                if (!id.equals(scratchTopoId)) {
                    Assignment currentAssignment = state.assignmentInfo(id, null);
                    if (!currentAssignment.is_set_owner()) {
                        TopologyDetails td = tds.get(id);
                        if (td != null) {
                            currentAssignment.set_owner(td.getTopologySubmitter());
                            state.setAssignment(id, currentAssignment, td.getConf());
                        }
                    }
                    existingAssignments.put(id, currentAssignment);
                }
            }

            // make the new assignments for topologies
            lockingMkAssignments(existingAssignments, bases, scratchTopoId, assignedTopologyIds, state, tds);
        } catch (Exception e) {
            this.mkAssignmentsErrors.mark();
            throw e;
        }
    }
  • 通过readTopologyDetails读取topology对应的TopologyDetails信息
  • 通过state.assignments(Runnable callback)以及state.assignmentInfo(String stormId, Runnable callback)获取已经存在的assignments
  • 然后调用lockingMkAssignments为topologies分配新的assignments

lockingMkAssignments

storm-2.0.0/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java

    private void lockingMkAssignments(Map<String, Assignment> existingAssignments, Map<String, StormBase> bases,
                                      String scratchTopoId, List<String> assignedTopologyIds, IStormClusterState state,
                                      Map<String, TopologyDetails> tds) throws Exception {
        Topologies topologies = new Topologies(tds);

        synchronized (schedLock) {
            Map<String, SchedulerAssignment> newSchedulerAssignments =
                    computeNewSchedulerAssignments(existingAssignments, topologies, bases, scratchTopoId);

            Map<String, Map<List<Long>, List<Object>>> topologyToExecutorToNodePort =
                    computeTopoToExecToNodePort(newSchedulerAssignments, assignedTopologyIds);
            Map<String, Map<WorkerSlot, WorkerResources>> newAssignedWorkerToResources =
                    computeTopoToNodePortToResources(newSchedulerAssignments);
            int nowSecs = Time.currentTimeSecs();
            Map<String, SupervisorDetails> basicSupervisorDetailsMap = basicSupervisorDetailsMap(state);
            //construct the final Assignments by adding start-times etc into it
            Map<String, Assignment> newAssignments = new HashMap<>();
            //......

            //tasks figure out what tasks to talk to by looking at topology at runtime
            // only log/set when there's been a change to the assignment
            for (Entry<String, Assignment> entry : newAssignments.entrySet()) {
                String topoId = entry.getKey();
                Assignment assignment = entry.getValue();
                Assignment existingAssignment = existingAssignments.get(topoId);
                TopologyDetails td = topologies.getById(topoId);
                if (assignment.equals(existingAssignment)) {
                    LOG.debug("Assignment for {} hasn't changed", topoId);
                } else {
                    LOG.info("Setting new assignment for topology id {}: {}", topoId, assignment);
                    state.setAssignment(topoId, assignment, td.getConf());
                }
            }

            //grouping assignment by node to see the nodes diff, then notify nodes/supervisors to synchronize its owned assignment
            //because the number of existing assignments is small for every scheduling round,
            //we expect to notify supervisors at almost the same time
            Map<String, String> totalAssignmentsChangedNodes = new HashMap<>();
            for (Entry<String, Assignment> entry : newAssignments.entrySet()) {
                String topoId = entry.getKey();
                Assignment assignment = entry.getValue();
                Assignment existingAssignment = existingAssignments.get(topoId);
                totalAssignmentsChangedNodes.putAll(assignmentChangedNodes(existingAssignment, assignment));
            }
            notifySupervisorsAssignments(newAssignments, assignmentsDistributer, totalAssignmentsChangedNodes,
                    basicSupervisorDetailsMap);

            Map<String, Collection<WorkerSlot>> addedSlots = new HashMap<>();
            for (Entry<String, Assignment> entry : newAssignments.entrySet()) {
                String topoId = entry.getKey();
                Assignment assignment = entry.getValue();
                Assignment existingAssignment = existingAssignments.get(topoId);
                if (existingAssignment == null) {
                    existingAssignment = new Assignment();
                    existingAssignment.set_executor_node_port(new HashMap<>());
                    existingAssignment.set_executor_start_time_secs(new HashMap<>());
                }
                Set<WorkerSlot> newSlots = newlyAddedSlots(existingAssignment, assignment);
                addedSlots.put(topoId, newSlots);
            }
            inimbus.assignSlots(topologies, addedSlots);
        }
    }
  • 这里首先调用computeNewSchedulerAssignments方法计算newSchedulerAssignments
  • 对newAssignments进行遍历,如果assignment有变动,则调用state.setAssignment(topoId, assignment, td.getConf())将分配信息写到zk
  • 然后计算totalAssignmentsChangedNodes调用notifySupervisorsAssignments,借助AssignmentDistributionService通知到supervisors

computeNewSchedulerAssignments

storm-2.0.0/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java

    private Map<String, SchedulerAssignment> computeNewSchedulerAssignments(Map<String, Assignment> existingAssignments,
                                                                            Topologies topologies, Map<String, StormBase> bases,
                                                                            String scratchTopologyId)
        throws KeyNotFoundException, AuthorizationException, InvalidTopologyException, IOException {

        Map<String, Set<List<Integer>>> topoToExec = computeTopologyToExecutors(bases);

        Set<String> zkHeartbeatTopologies = topologies.getTopologies().stream()
                                                      .filter(topo -> !supportRpcHeartbeat(topo))
                                                      .map(TopologyDetails::getId)
                                                      .collect(Collectors.toSet());

        updateAllHeartbeats(existingAssignments, topoToExec, zkHeartbeatTopologies);

        Map<String, Set<List<Integer>>> topoToAliveExecutors = computeTopologyToAliveExecutors(existingAssignments, topologies,
                                                                                               topoToExec, scratchTopologyId);
        Map<String, Set<Long>> supervisorToDeadPorts = computeSupervisorToDeadPorts(existingAssignments, topoToExec,
                                                                                    topoToAliveExecutors);
        Map<String, SchedulerAssignmentImpl> topoToSchedAssignment = computeTopologyToSchedulerAssignment(existingAssignments,
                                                                                                          topoToAliveExecutors);
        Set<String> missingAssignmentTopologies = new HashSet<>();
        for (TopologyDetails topo : topologies.getTopologies()) {
            String id = topo.getId();
            Set<List<Integer>> allExecs = topoToExec.get(id);
            Set<List<Integer>> aliveExecs = topoToAliveExecutors.get(id);
            int numDesiredWorkers = topo.getNumWorkers();
            int numAssignedWorkers = numUsedWorkers(topoToSchedAssignment.get(id));
            if (allExecs == null || allExecs.isEmpty() || !allExecs.equals(aliveExecs) || numDesiredWorkers > numAssignedWorkers) {
                //We have something to schedule...
                missingAssignmentTopologies.add(id);
            }
        }
        Map<String, SupervisorDetails> supervisors =
            readAllSupervisorDetails(supervisorToDeadPorts, topologies, missingAssignmentTopologies);
        Cluster cluster = new Cluster(inimbus, resourceMetrics, supervisors, topoToSchedAssignment, topologies, conf);
        cluster.setStatusMap(idToSchedStatus.get());

        schedulingStartTimeNs.set(Time.nanoTime());
        scheduler.schedule(topologies, cluster);
        //Get and set the start time before getting current time in order to avoid potential race with the longest-scheduling-time-ms gauge
        //......

        return cluster.getAssignments();
    }
  • computeTopologyToExecutors返回的是Map<string, set <list<list>,Integer为executorId</list</list
  • computeTopologyToAliveExecutors返回的是Map<string, set <list<list>,Integer为executorId</list</list
  • computeSupervisorToDeadPorts返回的是Map<string, set ,Long为port端口
  • computeTopologyToSchedulerAssignment返回的是Map topoToSchedAssignment,key为topologyId,value为SchedulerAssignmentImpl
  • 之后根据topology配置要求的woker数量及executor数量跟现有的assignment进行对比,计算出missingAssignmentTopologies
  • readAllSupervisorDetails返回Map supervisors,即存活的可以分配的supervisor信息
  • 最后这里创建Cluster,然后调用scheduler.schedule(topologies, cluster);进行调度

DefaultScheduler.schedule

storm-2.0.0/storm-server/src/main/java/org/apache/storm/scheduler/DefaultScheduler.java

    public void schedule(Topologies topologies, Cluster cluster) {
        defaultSchedule(topologies, cluster);
    }

    public static void defaultSchedule(Topologies topologies, Cluster cluster) {
        for (TopologyDetails topology : cluster.needsSchedulingTopologies()) {
            List<WorkerSlot> availableSlots = cluster.getAvailableSlots();
            Set<ExecutorDetails> allExecutors = topology.getExecutors();

            Map<WorkerSlot, List<ExecutorDetails>> aliveAssigned =
                EvenScheduler.getAliveAssignedWorkerSlotExecutors(cluster, topology.getId());
            Set<ExecutorDetails> aliveExecutors = new HashSet<ExecutorDetails>();
            for (List<ExecutorDetails> list : aliveAssigned.values()) {
                aliveExecutors.addAll(list);
            }

            Set<WorkerSlot> canReassignSlots = slotsCanReassign(cluster, aliveAssigned.keySet());
            int totalSlotsToUse = Math.min(topology.getNumWorkers(), canReassignSlots.size() + availableSlots.size());

            Set<WorkerSlot> badSlots = null;
            if (totalSlotsToUse > aliveAssigned.size() || !allExecutors.equals(aliveExecutors)) {
                badSlots = badSlots(aliveAssigned, allExecutors.size(), totalSlotsToUse);
            }
            if (badSlots != null) {
                cluster.freeSlots(badSlots);
            }

            EvenScheduler.scheduleTopologiesEvenly(new Topologies(topology), cluster);
        }
    }
  • 通过cluster.needsSchedulingTopologies()遍历需要调度的TopologyDetails进行处理
  • 通过cluster.getAvailableSlots()获取可用的slots
  • 通过EvenScheduler.getAliveAssignedWorkerSlotExecutors获取aliveAssigned,根据aliveAssigned调用slotsCanReassign获取canReassignSlots
  • Math.min(topology.getNumWorkers(), canReassignSlots.size() + availableSlots.size())计算totalSlotsToUse,对badSlots进行释放
  • 最后通过EvenScheduler.scheduleTopologiesEvenly(new Topologies(topology), cluster)进行分配

EvenScheduler.scheduleTopologiesEvenly

storm-2.0.0/storm-server/src/main/java/org/apache/storm/scheduler/EvenScheduler.java

    public static void scheduleTopologiesEvenly(Topologies topologies, Cluster cluster) {
        for (TopologyDetails topology : cluster.needsSchedulingTopologies()) {
            String topologyId = topology.getId();
            Map<ExecutorDetails, WorkerSlot> newAssignment = scheduleTopology(topology, cluster);
            Map<WorkerSlot, List<ExecutorDetails>> nodePortToExecutors = Utils.reverseMap(newAssignment);

            for (Map.Entry<WorkerSlot, List<ExecutorDetails>> entry : nodePortToExecutors.entrySet()) {
                WorkerSlot nodePort = entry.getKey();
                List<ExecutorDetails> executors = entry.getValue();
                cluster.assign(nodePort, topologyId, executors);
            }
        }
    }
  • 通过scheduleTopology(topology, cluster)计算newAssignment,转换为Map<workerslot, list
  • 通过cluster.assign(nodePort, topologyId, executors)进行分配

EvenScheduler.scheduleTopology

storm-2.0.0/storm-server/src/main/java/org/apache/storm/scheduler/EvenScheduler.java

    private static Map<ExecutorDetails, WorkerSlot> scheduleTopology(TopologyDetails topology, Cluster cluster) {
        List<WorkerSlot> availableSlots = cluster.getAvailableSlots();
        Set<ExecutorDetails> allExecutors = topology.getExecutors();
        Map<WorkerSlot, List<ExecutorDetails>> aliveAssigned = getAliveAssignedWorkerSlotExecutors(cluster, topology.getId());
        int totalSlotsToUse = Math.min(topology.getNumWorkers(), availableSlots.size() + aliveAssigned.size());

        List<WorkerSlot> sortedList = sortSlots(availableSlots);
        if (sortedList == null) {
            LOG.error("No available slots for topology: {}", topology.getName());
            return new HashMap<ExecutorDetails, WorkerSlot>();
        }

        //allow requesting slots number bigger than available slots
        int toIndex = (totalSlotsToUse - aliveAssigned.size())
                      > sortedList.size() ? sortedList.size() : (totalSlotsToUse - aliveAssigned.size());
        List<WorkerSlot> reassignSlots = sortedList.subList(0, toIndex);

        Set<ExecutorDetails> aliveExecutors = new HashSet<ExecutorDetails>();
        for (List<ExecutorDetails> list : aliveAssigned.values()) {
            aliveExecutors.addAll(list);
        }
        Set<ExecutorDetails> reassignExecutors = Sets.difference(allExecutors, aliveExecutors);

        Map<ExecutorDetails, WorkerSlot> reassignment = new HashMap<ExecutorDetails, WorkerSlot>();
        if (reassignSlots.size() == 0) {
            return reassignment;
        }

        List<ExecutorDetails> executors = new ArrayList<ExecutorDetails>(reassignExecutors);
        Collections.sort(executors, new Comparator<ExecutorDetails>() {
            @Override
            public int compare(ExecutorDetails o1, ExecutorDetails o2) {
                return o1.getStartTask() - o2.getStartTask();
            }
        });

        for (int i = 0; i < executors.size(); i++) {
            reassignment.put(executors.get(i), reassignSlots.get(i % reassignSlots.size()));
        }

        if (reassignment.size() != 0) {
            LOG.info("Available slots: {}", availableSlots.toString());
        }
        return reassignment;
    }
  • 根据availableSlots计算出List reassignSlots
  • 通过Sets.difference(allExecutors, aliveExecutors)计算reassignExecutors,并按startTask进行排序
  • 最后遍历reassignExecutors,分配slots,slot按reassignSlots.get(i % reassignSlots.size())分配

ExecutorDetails

storm-2.0.0/storm-server/src/main/java/org/apache/storm/scheduler/ExecutorDetails.java

public class ExecutorDetails {
    public final int startTask;
    public final int endTask;

    //......

    @Override
    public boolean equals(Object other) {
        if (!(other instanceof ExecutorDetails)) {
            return false;
        }

        ExecutorDetails executor = (ExecutorDetails) other;
        return (this.startTask == executor.startTask) && (this.endTask == executor.endTask);
    }
}
  • ExecutorDetails虽然起名为executor,但是里头封装的是startTask及endTask信息
  • schedule的过程,其实是将标记staskTask及endTask的任务分配给slot去处理
  • 注意这里重写了equals方法,根据startTask及endTask来对比,主要是前面的集合对比操作需要

Cluster.assign

storm-2.0.0/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java

    /**
     * Assign the slot to the executors for this topology.
     *
     * @throws RuntimeException if the specified slot is already occupied.
     */
    public void assign(WorkerSlot slot, String topologyId, Collection<ExecutorDetails> executors) {
        assertValidTopologyForModification(topologyId);
        if (isSlotOccupied(slot)) {
            throw new RuntimeException(
                "slot: [" + slot.getNodeId() + ", " + slot.getPort() + "] is already occupied.");
        }

        TopologyDetails td = topologies.getById(topologyId);
        if (td == null) {
            throw new IllegalArgumentException(
                "Trying to schedule for topo "
                + topologyId
                + " but that is not a known topology "
                + topologies.getAllIds());
        }
        WorkerResources resources = calculateWorkerResources(td, executors);
        SchedulerAssignmentImpl assignment = assignments.get(topologyId);
        if (assignment == null) {
            assignment = new SchedulerAssignmentImpl(topologyId);
            assignments.put(topologyId, assignment);
        } else {
            for (ExecutorDetails executor : executors) {
                if (assignment.isExecutorAssigned(executor)) {
                    throw new RuntimeException(
                        "Attempting to assign executor: "
                        + executor
                        + " of topology: "
                        + topologyId
                        + " to workerslot: "
                        + slot
                        + ". The executor is already assigned to workerslot: "
                        + assignment.getExecutorToSlot().get(executor)
                        + ". The executor must unassigned before it can be assigned to another slot!");
                }
            }
        }

        assignment.assign(slot, executors, resources);
        String nodeId = slot.getNodeId();
        double sharedOffHeapMemory = calculateSharedOffHeapMemory(nodeId, assignment);
        assignment.setTotalSharedOffHeapMemory(nodeId, sharedOffHeapMemory);
        updateCachesForWorkerSlot(slot, resources, sharedOffHeapMemory);
        totalResourcesPerNodeCache.remove(slot.getNodeId());
    }
  • 这里主要是委托给assignment.assign(slot, executors, resources)

SchedulerAssignmentImpl.assign

storm-2.0.0/storm-server/src/main/java/org/apache/storm/scheduler/SchedulerAssignmentImpl.java

    /**
     * Assign the slot to executors.
     */
    public void assign(WorkerSlot slot, Collection<ExecutorDetails> executors, WorkerResources slotResources) {
        assert slot != null;
        for (ExecutorDetails executor : executors) {
            this.executorToSlot.put(executor, slot);
        }
        slotToExecutors.computeIfAbsent(slot, MAKE_LIST)
            .addAll(executors);
        if (slotResources != null) {
            resources.put(slot, slotResources);
        } else {
            resources.remove(slot);
        }
    }
  • 给executorDetail(startTask及endTask)分配slot

小结

  • Nimbus.mkAssignments承担着调度分配的任务,主要计算newSchedulerAssignments(Map<String, SchedulerAssignmentImpl>,key为topologyId),期间通过scheduler.schedule(topologies, cluster);进行调度
  • EvenScheduler.scheduleTopology方法是调度的核心操作,主要是对需要分配的ExecutorDetails分配slots,EvenScheduler采用的是robbin round的策略,对每个reassignExecutor,按reassignSlots.get(i % reassignSlots.size())分配slot
  • ExecutorDetail里头有两个属性,一个是startTask,一个是endTask,分配的过程其实是对这些task分配slot
  • 分配完之后,lockingMkAssignments里头有两个操作,一个是state.setAssignment(topoId, assignment, td.getConf())将变动的assignment存储到zk;另外一个是调用notifySupervisorsAssignments方法,通知到supervisor,supervisor接收到之后更新本地内存

doc

  • Understanding the Parallelism of a Storm Topology

原文发布于微信公众号 - 码匠的流水账(geek_luandun)

原文发表时间:2018-10-15

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏电光石火

HashSet/HashMap详解

HashMap和HashSet是Java Collection接口两个重要的成员,其中HashMap是Map接口常用的实现类,HashSet是Set接口常用...

23410
来自专栏恰童鞋骚年

数据结构基础温故-4.树与二叉树(下)

上面两篇我们了解了树的基本概念以及二叉树的遍历算法,还对二叉查找树进行了模拟实现。数学表达式求值是程序设计语言编译中的一个基本问题,表达式求值是栈应用的一个典型...

1042
来自专栏计算机视觉与深度学习基础

Leetcode 209 Minimum Size Subarray Sum

Given an array of n positive integers and a positive integer s, find the minima...

20810
来自专栏自学笔记

Data Structurestackheapheap的实现索引堆tree并查集图 Graph

堆的基本性质: ①堆中的某一个节点总是不小于或不大于其父节点的值。 ②堆总是一棵完全二叉树 比较经典的堆有二叉堆,费波纳茨堆等等。如果一棵二叉树最下层上的...

1013
来自专栏Java爬坑系列

【Java入门提高篇】Day28 Java容器类详解(十)LinkedHashMap详解

  今天来介绍一下容器类中的另一个哈希表———》LinkedHashMap。这是HashMap的关门弟子,直接继承了HashMap的衣钵,所以拥有HashMap...

922
来自专栏Java帮帮-微信公众号-技术文章全总结

HashSet 源码分析【面试+工作】

在工作中,经常有这样的需求,需要判断某个ID是否在某个组的管理之下等,就需要查询该组下的ID放到一个集合中,且集合中元素不能有重复,之后判断该集合是否包含我们的...

1233
来自专栏机器学习实践二三事

leetcode之-题19

题目 [图片] Given a linked list, remove the nth node from the end of list and re...

2027
来自专栏架构之路

Combination Sum II 组合数求和之2-Leetcode

原题: Given a collection of candidate numbers (C) and a target number (T), find al...

3075
来自专栏xingoo, 一个梦想做发明家的程序员

二叉堆

容易证明: 一棵高为h的完全二叉树有2^h 到 2^(h+1)-1个结点。 这就意味着,完全二叉树的高是[logN] 特点: 任意位置i: 左儿子在位置2i上,...

2118
来自专栏JAVA高级架构

Java数据结构与算法解析——2-3树

二叉查找树对于大多数情况下的查找和插入在效率上来说是没有问题的,但是他在最差的情况下效率比较低。平衡查找树的数据结构能够保证在最差的情况下也能达到lgN的效率,...

3987

扫码关注云+社区

领取腾讯云代金券