前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊storm的AssignmentDistributionService

聊聊storm的AssignmentDistributionService

作者头像
code4it
发布2018-10-25 17:34:43
4700
发布2018-10-25 17:34:43
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下storm的AssignmentDistributionService

AssignmentDistributionService

storm-2.0.0/storm-server/src/main/java/org/apache/storm/nimbus/AssignmentDistributionService.java

代码语言:javascript
复制
/**
 * A service for distributing master assignments to supervisors, this service makes the assignments notification
 * asynchronous.
 *
 * <p>We support multiple working threads to distribute assignment, every thread has a queue buffer.
 *
 * <p>Master will shuffle its node request to the queues, if the target queue is full, we just discard the request,
 * let the supervisors sync instead.
 *
 * <p>Caution: this class is not thread safe.
 *
 * <pre>{@code
 * Working mode
 *                      +--------+         +-----------------+
 *                      | queue1 |   ==>   | Working thread1 |
 * +--------+ shuffle   +--------+         +-----------------+
 * | Master |   ==>
 * +--------+           +--------+         +-----------------+
 *                      | queue2 |   ==>   | Working thread2 |
 *                      +--------+         +-----------------+
 * }
 * </pre>
 */
public class AssignmentDistributionService implements Closeable {
    //......
    private ExecutorService service;

    /**
     * Assignments request queue.
     */
    private volatile Map<Integer, LinkedBlockingQueue<NodeAssignments>> assignmentsQueue;

    /**
     * Add an assignments for a node/supervisor for distribution.
     * @param node node id of supervisor.
     * @param host host name for the node.
     * @param serverPort node thrift server port.
     * @param assignments the {@link org.apache.storm.generated.SupervisorAssignments}
     */
    public void addAssignmentsForNode(String node, String host, Integer serverPort, SupervisorAssignments assignments) {
        try {
            //For some reasons, we can not get supervisor port info, eg: supervisor shutdown,
            //Just skip for this scheduling round.
            if (serverPort == null) {
                LOG.warn("Discard an assignment distribution for node {} because server port info is missing.", node);
                return;
            }

            boolean success = nextQueue().offer(NodeAssignments.getInstance(node, host, serverPort, assignments), 5L, TimeUnit.SECONDS);
            if (!success) {
                LOG.warn("Discard an assignment distribution for node {} because the target sub queue is full.", node);
            }

        } catch (InterruptedException e) {
            LOG.error("Add node assignments interrupted: {}", e.getMessage());
            throw new RuntimeException(e);
        }
    }

    private LinkedBlockingQueue<NodeAssignments> nextQueue() {
        return this.assignmentsQueue.get(nextQueueId());
    }
}
  • Nimbus通过调用AssignmentDistributionService的addAssignmentsForNode,将任务分配结果通知到supervisor
  • addAssignmentsForNode主要是将SupervisorAssignments放入到assignmentsQueue

AssignmentDistributionService.getInstance

storm-2.0.0/storm-server/src/main/java/org/apache/storm/nimbus/AssignmentDistributionService.java

代码语言:javascript
复制
    /**
     * Factory method for initialize a instance.
     * @param conf config.
     * @return an instance of {@link AssignmentDistributionService}
     */
    public static AssignmentDistributionService getInstance(Map conf) {
        AssignmentDistributionService service = new AssignmentDistributionService();
        service.prepare(conf);
        return service;
    }

    /**
     * Function for initialization.
     *
     * @param conf config
     */
    public void prepare(Map conf) {
        this.conf = conf;
        this.random = new Random(47);

        this.threadsNum = ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_ASSIGNMENTS_SERVICE_THREADS), 10);
        this.queueSize = ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_ASSIGNMENTS_SERVICE_THREAD_QUEUE_SIZE), 100);

        this.assignmentsQueue = new HashMap<>();
        for (int i = 0; i < threadsNum; i++) {
            this.assignmentsQueue.put(i, new LinkedBlockingQueue<NodeAssignments>(queueSize));
        }
        //start the thread pool
        this.service = Executors.newFixedThreadPool(threadsNum);
        this.active = true;
        //start the threads
        for (int i = 0; i < threadsNum; i++) {
            this.service.submit(new DistributeTask(this, i));
        }
        // for local cluster
        localSupervisors = new HashMap<>();
        if (ConfigUtils.isLocalMode(conf)) {
            isLocalMode = true;
        }
    }
  • getInstance方法new了一个AssignmentDistributionService,同时调用prepare方法进行初始化
  • prepare的时候,创建了threadsNum数量的LinkedBlockingQueue,队列大小为DaemonConfig.NIMBUS_ASSIGNMENTS_SERVICE_THREAD_QUEUE_SIZE
  • 另外通过Executors.newFixedThreadPool(threadsNum)创建一个线程池,然后提交threadsNum数量的DistributeTask,每个queue对应一个DistributeTask

DistributeTask

storm-2.0.0/storm-server/src/main/java/org/apache/storm/nimbus/AssignmentDistributionService.java

代码语言:javascript
复制
    /**
     * Task to distribute assignments.
     */
    static class DistributeTask implements Runnable {
        private AssignmentDistributionService service;
        private Integer queueIndex;

        DistributeTask(AssignmentDistributionService service, Integer index) {
            this.service = service;
            this.queueIndex = index;
        }

        @Override
        public void run() {
            while (service.isActive()) {
                try {
                    NodeAssignments nodeAssignments = this.service.nextAssignments(queueIndex);
                    sendAssignmentsToNode(nodeAssignments);
                } catch (InterruptedException e) {
                    if (service.isActive()) {
                        LOG.error("Get an unexpected interrupt when distributing assignments to node, {}", e.getCause());
                    } else {
                        // service is off now just interrupt it.
                        Thread.currentThread().interrupt();
                    }
                }
            }
        }

        private void sendAssignmentsToNode(NodeAssignments assignments) {
            if (this.service.isLocalMode) {
                //local node
                Supervisor supervisor = this.service.localSupervisors.get(assignments.getNode());
                if (supervisor != null) {
                    supervisor.sendSupervisorAssignments(assignments.getAssignments());
                } else {
                    LOG.error("Can not find node {} for assignments distribution", assignments.getNode());
                    throw new RuntimeException("null for node " + assignments.getNode() + " supervisor instance.");
                }
            } else {
                // distributed mode
                try (SupervisorClient client = SupervisorClient.getConfiguredClient(service.getConf(),
                                                                                    assignments.getHost(), assignments.getServerPort())) {
                    try {
                        client.getClient().sendSupervisorAssignments(assignments.getAssignments());
                    } catch (Exception e) {
                        //just ignore the exception.
                        LOG.error("Exception when trying to send assignments to node {}: {}", assignments.getNode(), e.getMessage());
                    }
                } catch (Throwable e) {
                    //just ignore any error/exception.
                    LOG.error("Exception to create supervisor client for node {}: {}", assignments.getNode(), e.getMessage());
                }

            }
        }
    }

    /**
     * Get an assignments from the target queue with the specific index.
     * @param queueIndex index of the queue
     * @return an {@link NodeAssignments}
     * @throws InterruptedException
     */
    public NodeAssignments nextAssignments(Integer queueIndex) throws InterruptedException {
        NodeAssignments target = null;
        while (true) {
            target = getQueueById(queueIndex).poll();
            if (target != null) {
                return target;
            }
            Time.sleep(100L);
        }
    }
  • AssignmentDistributionService在prepare的时候,会往线程池提交DistributeTask
  • DistributeTask的run方法不断循环,从对应的queue取NodeAssignments,然后调用sendAssignmentsToNode进行远程通信
  • sendAssignmentsToNode调用client.getClient().sendSupervisorAssignments(assignments.getAssignments())

Supervisor.launchSupervisorThriftServer

storm-2.0.0/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java

代码语言:javascript
复制
    private void launchSupervisorThriftServer(Map<String, Object> conf) throws IOException {
        // validate port
        int port = getThriftServerPort();
        try {
            ServerSocket socket = new ServerSocket(port);
            socket.close();
        } catch (BindException e) {
            LOG.error("{} is not available. Check if another process is already listening on {}", port, port);
            throw new RuntimeException(e);
        }

        TProcessor processor = new org.apache.storm.generated.Supervisor.Processor(
            new org.apache.storm.generated.Supervisor.Iface() {
                @Override
                public void sendSupervisorAssignments(SupervisorAssignments assignments)
                    throws AuthorizationException, TException {
                    checkAuthorization("sendSupervisorAssignments");
                    LOG.info("Got an assignments from master, will start to sync with assignments: {}", assignments);
                    SynchronizeAssignments syn = new SynchronizeAssignments(getSupervisor(), assignments,
                                                                            getReadClusterState());
                    getEventManger().add(syn);
                }

                //......
            });
        this.thriftServer = new ThriftServer(conf, processor, ThriftConnectionType.SUPERVISOR);
        this.thriftServer.serve();
    }
  • Supervisor.launchSupervisorThriftServer的时候,添加了TProcessor,将SupervisorAssignments包装为SynchronizeAssignments添加到EventManager中

SynchronizeAssignments.run

storm-2.0.0/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SynchronizeAssignments.java

代码语言:javascript
复制
/**
 * A runnable which will synchronize assignments to node local and then worker processes.
 */
public class SynchronizeAssignments implements Runnable {
    //......
    @Override
    public void run() {
        // first sync assignments to local, then sync processes.
        if (null == assignments) {
            getAssignmentsFromMaster(this.supervisor.getConf(), this.supervisor.getStormClusterState(), this.supervisor.getAssignmentId());
        } else {
            assignedAssignmentsToLocal(this.supervisor.getStormClusterState(), assignments);
        }
        this.readClusterState.run();
    }

    private static void assignedAssignmentsToLocal(IStormClusterState clusterState, SupervisorAssignments assignments) {
        if (null == assignments) {
            //unknown error, just skip
            return;
        }
        Map<String, byte[]> serAssignments = new HashMap<>();
        for (Map.Entry<String, Assignment> entry : assignments.get_storm_assignment().entrySet()) {
            serAssignments.put(entry.getKey(), Utils.serialize(entry.getValue()));
        }
        clusterState.syncRemoteAssignments(serAssignments);
    }
}
  • 这里调用了assignedAssignmentsToLocal,然后还触发了this.readClusterState.run()
  • assignedAssignmentsToLocal调用了clusterState.syncRemoteAssignments(serAssignments)

StormClusterStateImpl.syncRemoteAssignments

storm-2.0.0/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java

代码语言:javascript
复制
    @Override
    public void syncRemoteAssignments(Map<String, byte[]> remote) {
        if (null != remote) {
            this.assignmentsBackend.syncRemoteAssignments(remote);
        } else {
            Map<String, byte[]> tmp = new HashMap<>();
            List<String> stormIds = this.stateStorage.get_children(ClusterUtils.ASSIGNMENTS_SUBTREE, false);
            for (String stormId : stormIds) {
                byte[] assignment = this.stateStorage.get_data(ClusterUtils.assignmentPath(stormId), false);
                tmp.put(stormId, assignment);
            }
            this.assignmentsBackend.syncRemoteAssignments(tmp);
        }
    }
  • 这里将serAssignments信息更新到assignmentsBackend(即本地内存)
  • 如果remote为null,这里则从zk读取分配信息,然后更新到内存;zk地址为ClusterUtils.assignmentPath(stormId)(/assignments/{topologyId})

ReadClusterState.run

storm-2.0.0/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java

代码语言:javascript
复制
    @Override
    public synchronized void run() {
        try {
            List<String> stormIds = stormClusterState.assignments(null);
            Map<String, Assignment> assignmentsSnapshot = getAssignmentsSnapshot(stormClusterState);

            Map<Integer, LocalAssignment> allAssignments = readAssignments(assignmentsSnapshot);
            if (allAssignments == null) {
                //Something odd happened try again later
                return;
            }
            Map<String, List<ProfileRequest>> topoIdToProfilerActions = getProfileActions(stormClusterState, stormIds);

            HashSet<Integer> assignedPorts = new HashSet<>();
            LOG.debug("Synchronizing supervisor");
            LOG.debug("All assignment: {}", allAssignments);
            LOG.debug("Topology Ids -> Profiler Actions {}", topoIdToProfilerActions);
            for (Integer port : allAssignments.keySet()) {
                if (iSuper.confirmAssigned(port)) {
                    assignedPorts.add(port);
                }
            }
            HashSet<Integer> allPorts = new HashSet<>(assignedPorts);
            iSuper.assigned(allPorts);
            allPorts.addAll(slots.keySet());

            Map<Integer, Set<TopoProfileAction>> filtered = new HashMap<>();
            for (Entry<String, List<ProfileRequest>> entry : topoIdToProfilerActions.entrySet()) {
                String topoId = entry.getKey();
                if (entry.getValue() != null) {
                    for (ProfileRequest req : entry.getValue()) {
                        NodeInfo ni = req.get_nodeInfo();
                        if (host.equals(ni.get_node())) {
                            Long port = ni.get_port().iterator().next();
                            Set<TopoProfileAction> actions = filtered.get(port.intValue());
                            if (actions == null) {
                                actions = new HashSet<>();
                                filtered.put(port.intValue(), actions);
                            }
                            actions.add(new TopoProfileAction(topoId, req));
                        }
                    }
                }
            }

            for (Integer port : allPorts) {
                Slot slot = slots.get(port);
                if (slot == null) {
                    slot = mkSlot(port);
                    slots.put(port, slot);
                    slot.start();
                }
                slot.setNewAssignment(allAssignments.get(port));
                slot.addProfilerActions(filtered.get(port));
            }

        } catch (Exception e) {
            LOG.error("Failed to Sync Supervisor", e);
            throw new RuntimeException(e);
        }
    }
  • 这里调用slot的setNewAssignment进行分配,设置slot的AtomicReference newAssignment
  • Slot的run方法会轮询通过stateMachineStep方法对newAssignment进行判断然后更新nextState

小结

  • Nimbus通过调用AssignmentDistributionService的addAssignmentsForNode,将任务分配结果通知到supervisor
    • addAssignmentsForNode主要是将SupervisorAssignments放入到assignmentsQueue;AssignmentDistributionService默认创建一个指定线程数的线程池,同时创建指定线程数的队列及DistributeTask
    • DistributeTask不断循环从指定queue拉取SynchronizeAssignments,然后调用sendAssignmentsToNode通知到supervisor
  • Supervisor在启动的时候会launchSupervisorThriftServer,注册了响应sendSupervisorAssignments的processor,将接收到的SupervisorAssignments包装为SynchronizeAssignments添加到EventManager中
    • EventManager处理SynchronizeAssignments时执行其run方法,调用了assignedAssignmentsToLocal,然后还触发了this.readClusterState.run()
    • assignedAssignmentsToLocal调用了clusterState.syncRemoteAssignments(serAssignments)将分配信息更新到本地内存;而readClusterState.run()主要是更新slot的newAssignment值,之后依赖Slot的轮询去感知状态变化,然后触发相应的处理

doc

  • Understanding the Parallelism of a Storm Topology
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2018-10-16,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • AssignmentDistributionService
    • AssignmentDistributionService.getInstance
      • DistributeTask
      • Supervisor.launchSupervisorThriftServer
        • SynchronizeAssignments.run
          • StormClusterStateImpl.syncRemoteAssignments
            • ReadClusterState.run
            • 小结
            • doc
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档