聊聊storm nimbus的LeaderElector

本文主要研究一下storm nimbus的LeaderElector

Nimbus

org/apache/storm/daemon/nimbus/Nimbus.java

    public static void main(String[] args) throws Exception {
        Utils.setupDefaultUncaughtExceptionHandler();
        launch(new StandaloneINimbus());
    }

    public static Nimbus launch(INimbus inimbus) throws Exception {
        Map<String, Object> conf = Utils.merge(ConfigUtils.readStormConfig(),
                                               ConfigUtils.readYamlConfig("storm-cluster-auth.yaml", false));
        boolean fixupAcl = (boolean) conf.get(DaemonConfig.STORM_NIMBUS_ZOOKEEPER_ACLS_FIXUP);
        boolean checkAcl = fixupAcl || (boolean) conf.get(DaemonConfig.STORM_NIMBUS_ZOOKEEPER_ACLS_CHECK);
        if (checkAcl) {
            AclEnforcement.verifyAcls(conf, fixupAcl);
        }
        return launchServer(conf, inimbus);
    }

    private static Nimbus launchServer(Map<String, Object> conf, INimbus inimbus) throws Exception {
        StormCommon.validateDistributedMode(conf);
        validatePortAvailable(conf);
        StormMetricsRegistry metricsRegistry = new StormMetricsRegistry();
        final Nimbus nimbus = new Nimbus(conf, inimbus, metricsRegistry);
        nimbus.launchServer();
        final ThriftServer server = new ThriftServer(conf, new Processor<>(nimbus), ThriftConnectionType.NIMBUS);
        metricsRegistry.startMetricsReporters(conf);
        Utils.addShutdownHookWithDelayedForceKill(() -> {
            metricsRegistry.stopMetricsReporters();
            nimbus.shutdown();
            server.stop();
        }, 10);
        if (ClientAuthUtils.areWorkerTokensEnabledServer(server, conf)) {
            nimbus.initWorkerTokenManager();
        }
        LOG.info("Starting nimbus server for storm version '{}'", STORM_VERSION);
        server.serve();
        return nimbus;
    }

    public Nimbus(Map<String, Object> conf, INimbus inimbus, IStormClusterState stormClusterState, NimbusInfo hostPortInfo,
                  BlobStore blobStore, TopoCache topoCache, ILeaderElector leaderElector, IGroupMappingServiceProvider groupMapper,
                  StormMetricsRegistry metricsRegistry)
        throws Exception {
        //......

        if (blobStore == null) {
            blobStore = ServerUtils.getNimbusBlobStore(conf, this.nimbusHostPortInfo, null);
        }
        this.blobStore = blobStore;

        if (topoCache == null) {
            topoCache = new TopoCache(blobStore, conf);
        }
        if (leaderElector == null) {
            leaderElector = Zookeeper.zkLeaderElector(conf, zkClient, blobStore, topoCache, stormClusterState, getNimbusAcls(conf),
                metricsRegistry);
        }
        this.leaderElector = leaderElector;
        this.blobStore.setLeaderElector(this.leaderElector);

        //......
    }

    public void launchServer() throws Exception {
        try {
            BlobStore store = blobStore;
            IStormClusterState state = stormClusterState;
            NimbusInfo hpi = nimbusHostPortInfo;

            LOG.info("Starting Nimbus with conf {}", ConfigUtils.maskPasswords(conf));
            validator.prepare(conf);

            //add to nimbuses
            state.addNimbusHost(hpi.getHost(),
                                new NimbusSummary(hpi.getHost(), hpi.getPort(), Time.currentTimeSecs(), false, STORM_VERSION));
            leaderElector.addToLeaderLockQueue();
            this.blobStore.startSyncBlobs();

            for (ClusterMetricsConsumerExecutor exec: clusterConsumerExceutors) {
                exec.prepare();
            }

            if (isLeader()) {
                for (String topoId : state.activeStorms()) {
                    transition(topoId, TopologyActions.STARTUP, null);
                }
                clusterMetricSet.setActive(true);
            }

            //......
        } catch (Exception e) {
            if (Utils.exceptionCauseIsInstanceOf(InterruptedException.class, e)) {
                throw e;
            }

            if (Utils.exceptionCauseIsInstanceOf(InterruptedIOException.class, e)) {
                throw e;
            }
            LOG.error("Error on initialization of nimbus", e);
            Utils.exitProcess(13, "Error on initialization of nimbus");
        }
    }
  • Nimbus在构造器里头调用Zookeeper.zkLeaderElector创建leaderElector
  • launchServer方法调用了leaderElector.addToLeaderLockQueue()参与leader选举

Zookeeper.zkLeaderElector

storm-core-1.1.0-sources.jar!/org/apache/storm/zookeeper/Zookeeper.java

    /**
     * Get master leader elector.
     *
     * @param conf         Config.
     * @param zkClient     ZkClient, the client must have a default Config.STORM_ZOOKEEPER_ROOT as root path.
     * @param blobStore    {@link BlobStore}
     * @param tc           {@link TopoCache}
     * @param clusterState {@link IStormClusterState}
     * @param acls         ACLs
     * @return Instance of {@link ILeaderElector}
     *
     * @throws UnknownHostException
     */
    public static ILeaderElector zkLeaderElector(Map<String, Object> conf, CuratorFramework zkClient, BlobStore blobStore,
                                                 final TopoCache tc, IStormClusterState clusterState, List<ACL> acls,
                                                 StormMetricsRegistry metricsRegistry) throws UnknownHostException {
        return _instance.zkLeaderElectorImpl(conf, zkClient, blobStore, tc, clusterState, acls, metricsRegistry);
    }

    protected ILeaderElector zkLeaderElectorImpl(Map<String, Object> conf, CuratorFramework zk, BlobStore blobStore,
                                                 final TopoCache tc, IStormClusterState clusterState, List<ACL> acls,
                                                 StormMetricsRegistry metricsRegistry) throws
        UnknownHostException {
        List<String> servers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS);
        String leaderLockPath = "/leader-lock";
        String id = NimbusInfo.fromConf(conf).toHostPortString();
        AtomicReference<LeaderLatch> leaderLatchAtomicReference = new AtomicReference<>(new LeaderLatch(zk, leaderLockPath, id));
        AtomicReference<LeaderLatchListener> leaderLatchListenerAtomicReference =
            new AtomicReference<>(leaderLatchListenerImpl(
                new LeaderListenerCallback(conf, zk, leaderLatchAtomicReference.get(), blobStore, tc, clusterState, acls, metricsRegistry)));
        return new LeaderElectorImp(conf, servers, zk, leaderLockPath, id, leaderLatchAtomicReference,
                                    leaderLatchListenerAtomicReference, blobStore, tc, clusterState, acls, metricsRegistry);
    }
  • 这里使用/leader-lock路径创建了LeaderLatch,然后使用leaderLatchListenerImpl创建了LeaderLatchListener
  • 最后使用LeaderElectorImp创建ILeaderElector

leaderLatchListenerImpl

storm-core-1.1.0-sources.jar!/org/apache/storm/zookeeper/Zookeeper.java

    // Leader latch listener that will be invoked when we either gain or lose leadership
    public static LeaderLatchListener leaderLatchListenerImpl(final Map conf, final CuratorFramework zk, final BlobStore blobStore, final LeaderLatch leaderLatch) throws UnknownHostException {
        final String hostName = InetAddress.getLocalHost().getCanonicalHostName();
        return new LeaderLatchListener() {
            final String STORM_JAR_SUFFIX = "-stormjar.jar";
            final String STORM_CODE_SUFFIX = "-stormcode.ser";
            final String STORM_CONF_SUFFIX = "-stormconf.ser";

            @Override
            public void isLeader() {
                Set<String> activeTopologyIds = new TreeSet<>(Zookeeper.getChildren(zk, conf.get(Config.STORM_ZOOKEEPER_ROOT) + ClusterUtils.STORMS_SUBTREE, false));

                Set<String> activeTopologyBlobKeys = populateTopologyBlobKeys(activeTopologyIds);
                Set<String> activeTopologyCodeKeys = filterTopologyCodeKeys(activeTopologyBlobKeys);
                Set<String> allLocalBlobKeys = Sets.newHashSet(blobStore.listKeys());
                Set<String> allLocalTopologyBlobKeys = filterTopologyBlobKeys(allLocalBlobKeys);

                // this finds all active topologies blob keys from all local topology blob keys
                Sets.SetView<String> diffTopology = Sets.difference(activeTopologyBlobKeys, allLocalTopologyBlobKeys);
                LOG.info("active-topology-blobs [{}] local-topology-blobs [{}] diff-topology-blobs [{}]",
                        generateJoinedString(activeTopologyIds), generateJoinedString(allLocalTopologyBlobKeys),
                        generateJoinedString(diffTopology));

                if (diffTopology.isEmpty()) {
                    Set<String> activeTopologyDependencies = getTopologyDependencyKeys(activeTopologyCodeKeys);

                    // this finds all dependency blob keys from active topologies from all local blob keys
                    Sets.SetView<String> diffDependencies = Sets.difference(activeTopologyDependencies, allLocalBlobKeys);
                    LOG.info("active-topology-dependencies [{}] local-blobs [{}] diff-topology-dependencies [{}]",
                            generateJoinedString(activeTopologyDependencies), generateJoinedString(allLocalBlobKeys),
                            generateJoinedString(diffDependencies));

                    if (diffDependencies.isEmpty()) {
                        LOG.info("Accepting leadership, all active topologies and corresponding dependencies found locally.");
                    } else {
                        LOG.info("Code for all active topologies is available locally, but some dependencies are not found locally, giving up leadership.");
                        closeLatch();
                    }
                } else {
                    LOG.info("code for all active topologies not available locally, giving up leadership.");
                    closeLatch();
                }
            }

            @Override
            public void notLeader() {
                LOG.info("{} lost leadership.", hostName);
            }

            //......

            private void closeLatch() {
                try {
                    leaderLatch.close();
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
        };
    }
  • leaderLatchListenerImpl返回一个LeaderLatchListener接口的实现类
  • isLeader接口里头做了一些校验,即当被zookeeper选中为leader的时候,如果本地没有所有的active topologies或者本地没有所有dependencies,那么就需要调用leaderLatch.close()放弃leadership
  • notLeader接口主要打印一下log

LeaderElectorImp

org/apache/storm/zookeeper/LeaderElectorImp.java

public class LeaderElectorImp implements ILeaderElector {
    private static Logger LOG = LoggerFactory.getLogger(LeaderElectorImp.class);
    private final Map<String, Object> conf;
    private final List<String> servers;
    private final CuratorFramework zk;
    private final String leaderlockPath;
    private final String id;
    private final AtomicReference<LeaderLatch> leaderLatch;
    private final AtomicReference<LeaderLatchListener> leaderLatchListener;
    private final BlobStore blobStore;
    private final TopoCache tc;
    private final IStormClusterState clusterState;
    private final List<ACL> acls;
    private final StormMetricsRegistry metricsRegistry;

    public LeaderElectorImp(Map<String, Object> conf, List<String> servers, CuratorFramework zk, String leaderlockPath, String id,
                            AtomicReference<LeaderLatch> leaderLatch, AtomicReference<LeaderLatchListener> leaderLatchListener,
                            BlobStore blobStore, final TopoCache tc, IStormClusterState clusterState, List<ACL> acls,
                            StormMetricsRegistry metricsRegistry) {
        this.conf = conf;
        this.servers = servers;
        this.zk = zk;
        this.leaderlockPath = leaderlockPath;
        this.id = id;
        this.leaderLatch = leaderLatch;
        this.leaderLatchListener = leaderLatchListener;
        this.blobStore = blobStore;
        this.tc = tc;
        this.clusterState = clusterState;
        this.acls = acls;
        this.metricsRegistry = metricsRegistry;
    }

    @Override
    public void prepare(Map<String, Object> conf) {
        // no-op for zookeeper implementation
    }

    @Override
    public void addToLeaderLockQueue() throws Exception {
        // if this latch is already closed, we need to create new instance.
        if (LeaderLatch.State.CLOSED.equals(leaderLatch.get().getState())) {
            leaderLatch.set(new LeaderLatch(zk, leaderlockPath));
            LeaderListenerCallback callback = new LeaderListenerCallback(conf, zk, leaderLatch.get(), blobStore, tc, clusterState, acls,
                metricsRegistry);
            leaderLatchListener.set(Zookeeper.leaderLatchListenerImpl(callback));
            LOG.info("LeaderLatch was in closed state. Resetted the leaderLatch and listeners.");
        }
        // Only if the latch is not already started we invoke start
        if (LeaderLatch.State.LATENT.equals(leaderLatch.get().getState())) {
            leaderLatch.get().addListener(leaderLatchListener.get());
            leaderLatch.get().start();
            LOG.info("Queued up for leader lock.");
        } else {
            LOG.info("Node already in queue for leader lock.");
        }
    }

    @Override
    // Only started latches can be closed.
    public void removeFromLeaderLockQueue() throws Exception {
        if (LeaderLatch.State.STARTED.equals(leaderLatch.get().getState())) {
            leaderLatch.get().close();
            LOG.info("Removed from leader lock queue.");
        } else {
            LOG.info("leader latch is not started so no removeFromLeaderLockQueue needed.");
        }
    }

    @Override
    public boolean isLeader() throws Exception {
        return leaderLatch.get().hasLeadership();
    }

    @Override
    public NimbusInfo getLeader() {
        try {
            return Zookeeper.toNimbusInfo(leaderLatch.get().getLeader());
        } catch (Exception e) {
            throw Utils.wrapInRuntime(e);
        }
    }

    @Override
    public List<NimbusInfo> getAllNimbuses() throws Exception {
        List<NimbusInfo> nimbusInfos = new ArrayList<>();
        Collection<Participant> participants = leaderLatch.get().getParticipants();
        for (Participant participant : participants) {
            nimbusInfos.add(Zookeeper.toNimbusInfo(participant));
        }
        return nimbusInfos;
    }

    @Override
    public void close() {
        //Do nothing now.
    }
}
  • LeaderElectorImp实现了ILeaderElector接口
  • addToLeaderLockQueue方法检测如果latch已经closed,则重新创建一个新的,然后检测latch的状态,如果还没有start的话,则调用start参与选举
  • 之所以对closed状态的latch创建一个,主要有两个原因:一是对已经closed的latch进行方法调用会抛异常,二是被zk选举为leader,但是不满意storm的一些leader条件会放弃leadership即close掉

小结

  • storm nimbus的LeaderElector主要是基于zookeeper recipies的LeaderLatch来实现
  • storm nimbus自定义了LeaderLatchListener,对成为leader之后的nimbus进行校验,需要本地拥有所有的active topologies以及所有dependencies,否则放弃leadership

doc

  • Highly Available Nimbus Design

原文发布于微信公众号 - 码匠的流水账(geek_luandun)

原文发表时间:2018-10-10

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏ShaoYL

OC 实现一个TODO宏

415120
来自专栏码匠的流水账

聊聊storm supervisor的启动

storm-core-1.2.2-sources.jar!/org/apache/storm/daemon/supervisor/Supervisor.java

19530
来自专栏码匠的流水账

kafka0.8生产者异常处理

本文简单解析一下kafka0.8.2.2版本中的java producer的异常处理。

11110
来自专栏码匠的流水账

聊聊storm的LinearDRPCTopologyBuilder

storm-2.0.0/storm-client/src/jvm/org/apache/storm/drpc/LinearDRPCTopologyBuilder...

12530
来自专栏码匠的流水账

聊聊storm的IEventLogger

storm-2.0.0/storm-client/src/jvm/org/apache/storm/metric/IEventLogger.java

14730
来自专栏函数式编程语言及工具

Akka(23): Stream:自定义流构件功能-Custom defined stream processing stages

    从总体上看:akka-stream是由数据源头Source,流通节点Flow和数据流终点Sink三个框架性的流构件(stream components)...

45380
来自专栏码匠的流水账

聊聊rocketmq的ProducerImpl

io/openmessaging/rocketmq/producer/ProducerImpl.java

10910
来自专栏码匠的流水账

聊聊storm的LoggingMetricsConsumer

storm-2.0.0/storm-client/src/jvm/org/apache/storm/metric/LoggingMetricsConsumer....

14830
来自专栏编码小白

ofbiz 服务引擎(一) controller中服务的调用解析

首先根据handler-controller.xml文件中对应handler文件,然后运行RequestHandler中的runEvent方法,方法如下: /*...

38340
来自专栏码匠的流水账

聊聊storm的IWaitStrategy

storm-2.0.0/storm-client/src/jvm/org/apache/storm/policy/IWaitStrategy.java

13650

扫码关注云+社区

领取腾讯云代金券