前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊curator recipes的LeaderLatch

聊聊curator recipes的LeaderLatch

作者头像
code4it
发布2018-10-18 15:18:54
1.4K0
发布2018-10-18 15:18:54
举报
文章被收录于专栏:码匠的流水账

本文主要研究一下curator recipes的LeaderLatch

实例

代码语言:javascript
复制
    @Test
    public void testCuratorLeaderLatch() throws Exception {
        CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", new ExponentialBackoffRetry(1000, 3));
        client.start();

        String leaderLockPath = "/leader-lock2";

        List<LeaderLatch> latchList = IntStream.rangeClosed(1,10)
                .parallel()
                .mapToObj(i -> new LeaderLatch(client,leaderLockPath,"client"+i))
                .collect(Collectors.toList());

        latchList.parallelStream()
                .forEach(latch -> {
                    try {
                        latch.start();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                });

        TimeUnit.SECONDS.sleep(5);

        Iterator<LeaderLatch> iterator = latchList.iterator();
        while (iterator.hasNext()){
            LeaderLatch latch = iterator.next();
            if(latch.hasLeadership()){
                System.out.println(latch.getId() + " hasLeadership");
                try {
                    latch.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
                iterator.remove();
            }
        }

        TimeUnit.SECONDS.sleep(5);

        latchList.stream()
                .filter(latch -> latch.hasLeadership())
                .forEach(latch -> System.out.println(latch.getId() + " hasLeadership"));

        Participant participant = latchList.get(0).getLeader();
        System.out.println(participant);

        TimeUnit.MINUTES.sleep(15);

        latchList.stream()
                .forEach(latch -> {
                    try {
                        latch.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                });
        client.close();
    }
  • zkCli查询 [zk: localhost:2181(CONNECTED) 17] ls / [leader-lock1, leader-lock2, zookeeper, leader-lock] [zk: localhost:2181(CONNECTED) 18] ls /leader-lock2 [_c_4e86edb9-075f-4e18-a00c-cbf4fbf11b23-latch-0000000048, _c_b53efe1b-39ba-48df-8edb-905ddcccf5c9-latch-0000000042, _c_5ea234cc-8350-47ef-beda-8795694b62f6-latch-0000000045, _c_5f3330d9-384c-4abf-8f3e-21623213a374-latch-0000000044, _c_3fdec032-b8a4-44b9-9a9f-20285553a23e-latch-0000000049, _c_97a53125-0ab1-48ea-85cc-cdba631ce20f-latch-0000000047, _c_2bb56be2-ba17-485e-bbd3-10aa1d6af57c-latch-0000000043, _c_93fb732d-541b-48c6-aca7-dd2cd9b6f93e-latch-0000000041, _c_e09f0307-344c-4041-ab71-d68e10a48d02-latch-0000000046, _c_754a4f90-b03c-4803-915b-0654ad35ec9f-latch-0000000040]

LeaderLatch.start

curator-recipes-4.0.1-sources.jar!/org/apache/curator/framework/recipes/leader/LeaderLatch.java

代码语言:javascript
复制
    /**
     * Add this instance to the leadership election and attempt to acquire leadership.
     *
     * @throws Exception errors
     */
    public void start() throws Exception
    {
        Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");

        startTask.set(AfterConnectionEstablished.execute(client, new Runnable()
        {
            @Override
            public void run()
            {
                try
                {
                    internalStart();
                }
                finally
                {
                    startTask.set(null);
                }
            }
        }));
    }

    private synchronized void internalStart()
    {
        if ( state.get() == State.STARTED )
        {
            client.getConnectionStateListenable().addListener(listener);
            try
            {
                reset();
            }
            catch ( Exception e )
            {
                ThreadUtils.checkInterrupted(e);
                log.error("An error occurred checking resetting leadership.", e);
            }
        }
    }

    @VisibleForTesting
    void reset() throws Exception
    {
        setLeadership(false);
        setNode(null);

        BackgroundCallback callback = new BackgroundCallback()
        {
            @Override
            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
            {
                if ( debugResetWaitLatch != null )
                {
                    debugResetWaitLatch.await();
                    debugResetWaitLatch = null;
                }

                if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
                {
                    setNode(event.getName());
                    if ( state.get() == State.CLOSED )
                    {
                        setNode(null);
                    }
                    else
                    {
                        getChildren();
                    }
                }
                else
                {
                    log.error("getChildren() failed. rc = " + event.getResultCode());
                }
            }
        };
        client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).inBackground(callback).forPath(ZKPaths.makePath(latchPath, LOCK_NAME), LeaderSelector.getIdBytes(id));
    }
  • 这里start方法表示参与选举,reset方法通过forPath创建子节点
  • 这里ZKPaths.makePath(latchPath, LOCK_NAME)返回的是/latchPath/latch-
  • 这里有个callback主要做getChildren处理

CreateBuilderImpl.forPath

curator-framework-4.0.1-sources.jar!/org/apache/curator/framework/imps/CreateBuilderImpl.java

代码语言:javascript
复制
    @VisibleForTesting
    static final String PROTECTED_PREFIX = "_c_";

    @Override
    public String forPath(final String givenPath, byte[] data) throws Exception
    {
        if ( compress )
        {
            data = client.getCompressionProvider().compress(givenPath, data);
        }

        final String adjustedPath = adjustPath(client.fixForNamespace(givenPath, createMode.isSequential()));
        List<ACL> aclList = acling.getAclList(adjustedPath);
        client.getSchemaSet().getSchema(givenPath).validateCreate(createMode, givenPath, data, aclList);

        String returnPath = null;
        if ( backgrounding.inBackground() )
        {
            pathInBackground(adjustedPath, data, givenPath);
        }
        else
        {
            String path = protectedPathInForeground(adjustedPath, data, aclList);
            returnPath = client.unfixForNamespace(path);
        }
        return returnPath;
    }

    @VisibleForTesting
    String adjustPath(String path) throws Exception
    {
        if ( doProtected )
        {
            ZKPaths.PathAndNode pathAndNode = ZKPaths.getPathAndNode(path);
            String name = getProtectedPrefix(protectedId) + pathAndNode.getNode();
            path = ZKPaths.makePath(pathAndNode.getPath(), name);
        }
        return path;
    }

    private static String getProtectedPrefix(String protectedId)
    {
        return PROTECTED_PREFIX + protectedId + "-";
    }
  • 如果CuratorFramework创建的时候没有指定的namespace的话,这里client.fixForNamespace返回原值
  • adjustPath对于需要doProtected的进行处理,添加上PROTECTED_PREFIX以及protectedId(UUID)还有-,比如原来是latch-,处理之后变为_c_a749fd26-b739-4510-9e1b-d2974f6dd1d1-latch-
  • 之后由于创建的是EPHEMERAL_SEQUENTIAL,因而最后会添加上编号,比如/leader-lock2/_c_a749fd26-b739-4510-9e1b-d2974f6dd1d1-latch-0000000045,而节点的值为LeaderLatch指定的id

LeaderLatch.getChildren

curator-recipes-4.0.1-sources.jar!/org/apache/curator/framework/recipes/leader/LeaderLatch.java

代码语言:javascript
复制
    private void getChildren() throws Exception
    {
        BackgroundCallback callback = new BackgroundCallback()
        {
            @Override
            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
            {
                if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
                {
                    checkLeadership(event.getChildren());
                }
            }
        };
        client.getChildren().inBackground(callback).forPath(ZKPaths.makePath(latchPath, null));
    }

    private void checkLeadership(List<String> children) throws Exception
    {
        final String localOurPath = ourPath.get();
        List<String> sortedChildren = LockInternals.getSortedChildren(LOCK_NAME, sorter, children);
        int ourIndex = (localOurPath != null) ? sortedChildren.indexOf(ZKPaths.getNodeFromPath(localOurPath)) : -1;
        if ( ourIndex < 0 )
        {
            log.error("Can't find our node. Resetting. Index: " + ourIndex);
            reset();
        }
        else if ( ourIndex == 0 )
        {
            setLeadership(true);
        }
        else
        {
            String watchPath = sortedChildren.get(ourIndex - 1);
            Watcher watcher = new Watcher()
            {
                @Override
                public void process(WatchedEvent event)
                {
                    if ( (state.get() == State.STARTED) && (event.getType() == Event.EventType.NodeDeleted) && (localOurPath != null) )
                    {
                        try
                        {
                            getChildren();
                        }
                        catch ( Exception ex )
                        {
                            ThreadUtils.checkInterrupted(ex);
                            log.error("An error occurred checking the leadership.", ex);
                        }
                    }
                }
            };

            BackgroundCallback callback = new BackgroundCallback()
            {
                @Override
                public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
                {
                    if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() )
                    {
                        // previous node is gone - reset
                        reset();
                    }
                }
            };
            // use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
            client.getData().usingWatcher(watcher).inBackground(callback).forPath(ZKPaths.makePath(latchPath, watchPath));
        }
    }
  • 这里主要是调用了checkLeadership方法,该方法对于index为0的标记为leader,对于index大于0的则添加watch,watch的路径为前一个节点,如果前一个节点被删除了,则重新触发getChildren方法
  • 这里还注册一个callback,如果前一个节点被删除,则重新触发reset操作

LeaderLatch.close

curator-recipes-4.0.1-sources.jar!/org/apache/curator/framework/recipes/leader/LeaderLatch.java

代码语言:javascript
复制
    /**
     * Remove this instance from the leadership election. If this instance is the leader, leadership
     * is released. IMPORTANT: the only way to release leadership is by calling close(). All LeaderLatch
     * instances must eventually be closed.
     *
     * @throws IOException errors
     */
    @Override
    public void close() throws IOException
    {
        close(closeMode);
    }

    /**
     * Remove this instance from the leadership election. If this instance is the leader, leadership
     * is released. IMPORTANT: the only way to release leadership is by calling close(). All LeaderLatch
     * instances must eventually be closed.
     *
     * @param closeMode allows the default close mode to be overridden at the time the latch is closed.
     * @throws IOException errors
     */
    public synchronized void close(CloseMode closeMode) throws IOException
    {
        Preconditions.checkState(state.compareAndSet(State.STARTED, State.CLOSED), "Already closed or has not been started");
        Preconditions.checkNotNull(closeMode, "closeMode cannot be null");

        cancelStartTask();

        try
        {
            setNode(null);
            client.removeWatchers();
        }
        catch ( Exception e )
        {
            ThreadUtils.checkInterrupted(e);
            throw new IOException(e);
        }
        finally
        {
            client.getConnectionStateListenable().removeListener(listener);

            switch ( closeMode )
            {
            case NOTIFY_LEADER:
            {
                setLeadership(false);
                listeners.clear();
                break;
            }

            default:
            {
                listeners.clear();
                setLeadership(false);
                break;
            }
            }
        }
    }

    private synchronized void setLeadership(boolean newValue)
    {
        boolean oldValue = hasLeadership.getAndSet(newValue);

        if ( oldValue && !newValue )
        { // Lost leadership, was true, now false
            listeners.forEach(new Function<LeaderLatchListener, Void>()
                {
                    @Override
                    public Void apply(LeaderLatchListener listener)
                    {
                        listener.notLeader();
                        return null;
                    }
                });
        }
        else if ( !oldValue && newValue )
        { // Gained leadership, was false, now true
            listeners.forEach(new Function<LeaderLatchListener, Void>()
                {
                    @Override
                    public Void apply(LeaderLatchListener input)
                    {
                        input.isLeader();
                        return null;
                    }
                });
        }

        notifyAll();
    }
  • close方法用于将该LeaderLatch退出选举,如果该latch是leader,则需要释放leadership
  • close方法首先cancel掉StartTask,设置节点值为null,然后移除了watcher以及ConnectionStateListener,最后设置leadership为false,然后触发相关listener
  • 注意如果closeMode是NOTIFY_LEADER,则先设置leadership为false,触发相关listener之后再移除listener;否则是先移除listener,再设置为false
  • setLeadership根据新旧值调用listener.notLeader()或者input.isLeader()

ConnectionStateListener

curator-recipes-4.0.1-sources.jar!/org/apache/curator/framework/recipes/leader/LeaderLatch.java

代码语言:javascript
复制
    private final ConnectionStateListener listener = new ConnectionStateListener()
    {
        @Override
        public void stateChanged(CuratorFramework client, ConnectionState newState)
        {
            handleStateChange(newState);
        }
    };

    private void handleStateChange(ConnectionState newState)
    {
        switch ( newState )
        {
            default:
            {
                // NOP
                break;
            }

            case RECONNECTED:
            {
                try
                {
                    if ( client.getConnectionStateErrorPolicy().isErrorState(ConnectionState.SUSPENDED) || !hasLeadership.get() )
                    {
                        reset();
                    }
                }
                catch ( Exception e )
                {
                    ThreadUtils.checkInterrupted(e);
                    log.error("Could not reset leader latch", e);
                    setLeadership(false);
                }
                break;
            }

            case SUSPENDED:
            {
                if ( client.getConnectionStateErrorPolicy().isErrorState(ConnectionState.SUSPENDED) )
                {
                    setLeadership(false);
                }
                break;
            }

            case LOST:
            {
                setLeadership(false);
                break;
            }
        }
    }
  • LeaderLatch注册了一个自定义的ConnectionStateListener,分别在RECONNECTED、SUSPENDED、LOST的时候进行相应处理
  • setLeadership(false)的时候,会根据新旧值通知相应的listener做处理,如果原来是leader,则回调listener.notLeader()
  • 对于RECONNECTED状态,如果当前latch不是leader,则调用reset,重新走start过程注册节点

小结

  • curator recipes的LeaderLatch给我们提供了leader选举的便利方法,并提供了LeaderLatchListener供自定义处理
  • LeaderLatch使用了zk的EPHEMERALSEQUENTIAL,节点名会自动带上编号,默认LOCK_NAME为latch-,另外对于protected的,会自动添加上PROTECTED_PREFIX(`_c)以及protectedId(UUID`),因而最后的节点名的格式为PROTECTED_PREFIX+UUID+LOCK_NAME+编号,类似_c_a749fd26-b739-4510-9e1b-d2974f6dd1d1-latch-0000000045
  • LeaderLatch使用了ConnectionStateListener对自身节点变化进行相应处理,取index为0的节点位leader,对于非leader的还对前一个节点添加watcher针对前一节点删除进行处理,触发checkLeadership操作,重新检查自身的index是否是在children排在第一位,如果是则更新为leader,触发相应操作,如果不是则重新watch前面一个节点。如此一环扣一环的实现显得十分精妙。

doc

  • Leader Latch
  • Apache Curator Leader选举 简单示例
  • 基于Apache Curator框架的两种分布式Leader选举策略详解
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2018-10-11,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 实例
  • LeaderLatch.start
    • CreateBuilderImpl.forPath
      • LeaderLatch.getChildren
      • LeaderLatch.close
      • ConnectionStateListener
      • 小结
      • doc
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档