前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >zookeeper源码分析(9)-Curator相关介绍

zookeeper源码分析(9)-Curator相关介绍

作者头像
Monica2333
发布2020-06-22 11:40:45
2K0
发布2020-06-22 11:40:45
举报

zookeeper常用的Java客户端有三种:zookeeper原生的、Apache Curator、开源的zkclient。Curator官网上这么说

image

一般生产环境我们使用curator,它主要解决了三类问题:

1.封装ZooKeeper client与ZooKeeper server之间的连接处理,当会话超时时可自动重连。

2.提供了一套流式风格的操作API

3.提供ZooKeeper各种分布式协调应用场景(recipe, 比如leader选举,分布式锁,分布式缓存等

)的抽象封装。

本文基于3.1.0版本结合curator的使用简要介绍curator的启动加载,会话管理通知方式recipe功能的实现。

curator的用法

curator组件如下:

其中curator-recipes是建立在Curator Framework之上实现的,提供了zookeeper分布式协调相关的技巧,大多时候我们只需要依赖这一个jar包即可。

启动加载

//1.初始化过程
CuratorFramework client = CuratorFrameworkFactory.builder().connectString(connectionString)
                .retryPolicy(retryPolicy)
                .connectionTimeoutMs(connectionTimeoutMs)
                .sessionTimeoutMs(sessionTimeoutMs)
                // etc
                .build();
//2.启动过程
client.start();

可分为初始化过程和启动过程。

初始化过程

通过以上builder模式即可创建一个CuratorFrameworkImpl客户端实例,初始化方法主要为:

public CuratorFrameworkImpl(CuratorFrameworkFactory.Builder builder)
    {
//定义创建原生客户端实例zookeeper的工厂方法
        ZookeeperFactory localZookeeperFactory = makeZookeeperFactory(builder.getZookeeperFactory());
//zookeeper的包装类,可处理curator较低层次的会话保持和同步请求等
        this.client = new CuratorZookeeperClient
            (
                localZookeeperFactory,
                builder.getEnsembleProvider(),
                builder.getSessionTimeoutMs(),
                builder.getConnectionTimeoutMs(),
                new Watcher()
                {
                    @Override
                    public void process(WatchedEvent watchedEvent)
                    {
                        CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.WATCHED, watchedEvent.getState().getIntValue(), unfixForNamespace(watchedEvent.getPath()), null, null, null, null, null, watchedEvent, null, null);
                        processEvent(event);
                    }
                },
                builder.getRetryPolicy(),
                builder.canBeReadOnly(),
                builder.getConnectionHandlingPolicy()
            );
//用于判断连接断开和连接超时的状态,设置curator的连接状态,并通过connectionStateManager触发连接事件状态通知
        internalConnectionHandler = isClassic ? new ClassicInternalConnectionHandler() : new StandardInternalConnectionHandler();
//接收事件的通知。后台线程操作事件和连接状态事件会触发
        listeners = new ListenerContainer<CuratorListener>();
//当后台线程发生异常或者handler发生异常的时候会触发
        unhandledErrorListeners = new ListenerContainer<UnhandledErrorListener>();
//后台线程执行的操作队列
        backgroundOperations = new DelayQueue<OperationAndData<?>>();
//命名空间
        namespace = new NamespaceImpl(this, builder.getNamespace());
    //线程工厂方法,初始化后台线程池时会使用
        threadFactory = getThreadFactory(builder);
//负责连接状态变化时的通知
        connectionStateManager = new ConnectionStateManager(this, builder.getThreadFactory(), builder.getSessionTimeoutMs(), builder.getConnectionHandlingPolicy().getSimulatedSessionExpirationPercent());
//CuratorFrameworkImpl的状态,调用start方法之前为 LATENT,调用start方法之后为 STARTED ,调用close()方法之后为STOPPED
        state = new AtomicReference<CuratorFrameworkState>(CuratorFrameworkState.LATENT);
//错误连接策略
        connectionStateErrorPolicy = Preconditions.checkNotNull(builder.getConnectionStateErrorPolicy(), "errorPolicy cannot be null");
//有保障的执行删除操作,其实是不断尝试直到删除成功,通过递归调用实现
        failedDeleteManager = new FailedDeleteManager(this);
//有保障的执行删除watch操作
        failedRemoveWatcherManager = new FailedRemoveWatchManager(this);
//服务端可用节点的检测器,第一次连接和重连成功之后都会触发重新获取服务端列表
        ensembleTracker = new EnsembleTracker(this, builder.getEnsembleProvider());
    }

可以看出,主要初始化了zookeeper客户端包装实例CuratorZookeeperClient,与后台操作,连接事件,异常相关的listener容器,命名空间和负载均衡等。这些都是与curator 功能密切相关的实现。这里具体看下CuratorZookeeperClientConnectionStateManager的初始化过程。

CuratorZookeeperClient初始化

public CuratorZookeeperClient(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider, int sessionTimeoutMs, int connectionTimeoutMs, Watcher watcher, RetryPolicy retryPolicy, boolean canBeReadOnly, ConnectionHandlingPolicy connectionHandlingPolicy)
    {
//3.0.0版本后默认为StandardInternalConnectionHandler,之前session Expired是由服务端告知才会触发Expired事件,
//StandardConnectionHandler当收到Disconnect事件后,如果在规定时间内没有重连到服务器,则会主动触发Expired事件
        this.connectionHandlingPolicy = connectionHandlingPolicy;
        if ( sessionTimeoutMs < connectionTimeoutMs )
        {
            log.warn(String.format("session timeout [%d] is less than connection timeout [%d]", sessionTimeoutMs, connectionTimeoutMs));
        }
//重新尝试连接的策略
        retryPolicy = Preconditions.checkNotNull(retryPolicy, "retryPolicy cannot be null");
        ensembleProvider = Preconditions.checkNotNull(ensembleProvider, "ensembleProvider cannot be null");
        this.connectionTimeoutMs = connectionTimeoutMs;
//curator注册到原生客户端上的defaultWatcher,会收到和连接状态有关的事件通知等,负责超时重连
        state = new ConnectionState(zookeeperFactory, ensembleProvider, sessionTimeoutMs, connectionTimeoutMs, watcher, tracer, canBeReadOnly, connectionHandlingPolicy);
        setRetryPolicy(retryPolicy);
    }

可以看出他主要负责了连接的创建和保证连接正常,此外如果直接同步调用客户端与服务端操作,他也根据retryPolicy负责同步操作时候的连接保证。ConnectionState是注册到原生客户端上的defaultWatcher.

ConnectionState(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider, int sessionTimeoutMs, int connectionTimeoutMs, Watcher parentWatcher, AtomicReference<TracerDriver> tracer, boolean canBeReadOnly, ConnectionHandlingPolicy connectionHandlingPolicy)
    {
        this.ensembleProvider = ensembleProvider;
        this.sessionTimeoutMs = sessionTimeoutMs;
        this.connectionTimeoutMs = connectionTimeoutMs;
        this.tracer = tracer;
        this.connectionHandlingPolicy = connectionHandlingPolicy;
        if ( parentWatcher != null )
        {
//因为defaultWatcher只能有一个,通过parentWatchers可实现defaultWatcher接到事件通知时parentWatchers的回调
            parentWatchers.offer(parentWatcher);
        }

        zooKeeper = new HandleHolder(zookeeperFactory, this, ensembleProvider, sessionTimeoutMs, canBeReadOnly);
    }

ConnectionStateManager初始化

它主要负责curator相关连接状态的处理和通知,如果我们想要监听连接状态的改变,就需要向它的listeners上注册一个ConnectionStateListener

//连接状态事件通知队列
private final BlockingQueue<ConnectionState> eventQueue = new ArrayBlockingQueue<ConnectionState>(QUEUE_SIZE);
//需要通知的listeners
    private final ListenerContainer<ConnectionStateListener> listeners = new ListenerContainer<ConnectionStateListener>();
//ConnectionStateManager的运行状态
 private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);

public ConnectionStateManager(CuratorFramework client, ThreadFactory threadFactory, int sessionTimeoutMs, int sessionExpirationPercent)
    {
        this.client = client;
        this.sessionTimeoutMs = sessionTimeoutMs;
        this.sessionExpirationPercent = sessionExpirationPercent;
        if ( threadFactory == null )
        {
            threadFactory = ThreadUtils.newThreadFactory("ConnectionStateManager");
        }
//事件队列处理线程池
        service = Executors.newSingleThreadExecutor(threadFactory);
    }

启动过程

建立与服务端的会话连接和相关功能的启动

CuratorFrameworkImpl.start

public void start()
    {
        log.info("Starting");
        if ( !state.compareAndSet(CuratorFrameworkState.LATENT, CuratorFrameworkState.STARTED) )
        {
            throw new IllegalStateException("Cannot be started more than once");
        }

        try
        {
//启动connectionStateManager
            connectionStateManager.start(); // ordering dependency - must be called before client.start()

            final ConnectionStateListener listener = new ConnectionStateListener()
            {
                @Override
                public void stateChanged(CuratorFramework client, ConnectionState newState)
                {
                    if ( ConnectionState.CONNECTED == newState || ConnectionState.RECONNECTED == newState )
                    {
                        logAsErrorConnectionErrors.set(true);
                    }
                }
            };

            this.getConnectionStateListenable().addListener(listener);
//建立与服务端的连接
            client.start();

            executorService = Executors.newSingleThreadScheduledExecutor(threadFactory);
            executorService.submit(new Callable<Object>()
            {
                @Override
                public Object call() throws Exception
                {
                    backgroundOperationsLoop();
                    return null;
                }
            });

            ensembleTracker.start();
        }
        catch ( Exception e )
        {
            ThreadUtils.checkInterrupted(e);
            handleBackgroundOperationException(null, e);
        }
    }

可以看出分别启动ConnectionStateManager和CuratorZookeeperClient。启动ConnectionStateManager可做好连接事件通知的准备,启动CuratorZookeeperClient建立与服务端的会话连接。

会话管理

curator的会话管理是在原生客户端的会话管理基础上包装而来,在上面的启动过程中我们介绍到ConnectionState会负责超时的重连,ConnectionStateManager会负责连接状态的改变和通知, connectionHandlingPolicy会负责连接超时的主动触发。此外,在客户端执行一些操作时如果感知到连接断开,也可以主动进行连接重连。下面会介绍下curator如何在原生客户端的会话管理基础上进行会话状态的通知会话超时的重连

我们知道会话连接状态相关的事件类型为Watcher.Event.EventType.None,会通知到客户端上所有的Watcher,ConnectionState作为defaultWatcher,它的事件回调如下:

 public void process(WatchedEvent event)
    {
        if ( LOG_EVENTS )
        {
            log.debug("ConnectState watcher: " + event);
        }

        if ( event.getType() == Watcher.Event.EventType.None )
        {
          //isConnected:客户当前的连接状态,true表示已连接(SyncConnected和ConnectedReadOnly状态)
            boolean wasConnected = isConnected.get();
            boolean newIsConnected = checkState(event.getState(), wasConnected);
            if ( newIsConnected != wasConnected )
            {
//如果连接状态发生改变,则更新
                isConnected.set(newIsConnected);
                connectionStartMs = System.currentTimeMillis();
                if ( newIsConnected )
                {
//说明是重连,更新会话超时协商时间
                    lastNegotiatedSessionTimeoutMs.set(zooKeeper.getNegotiatedSessionTimeoutMs());
                    log.debug("Negotiated session timeout: " + lastNegotiatedSessionTimeoutMs.get());
                }
            }
        }
//通知parentWatchers,注意初始化的时候其实传入了一个parentWatcher,会调用CuratorFrameworkImpl.processEvent
        for ( Watcher parentWatcher : parentWatchers )
        {
            TimeTrace timeTrace = new TimeTrace("connection-state-parent-process", tracer.get());
            parentWatcher.process(event);
            timeTrace.commit();
        }
    }

可以看到,对连接状态事件的处理主要是checkState方法

private boolean checkState(Event.KeeperState state, boolean wasConnected)
    {
        boolean isConnected = wasConnected;
        boolean checkNewConnectionString = true;
        switch ( state )
        {
        default:
        case Disconnected:
        {
            isConnected = false;
            break;
        }

        case SyncConnected:
        case ConnectedReadOnly:
        {
            isConnected = true;
            break;
        }

        case AuthFailed:
        {
            isConnected = false;
            log.error("Authentication failed");
            break;
        }

        case Expired:
        {
            isConnected = false;
            checkNewConnectionString = false;
            handleExpiredSession();
            break;
        }

        case SaslAuthenticated:
        {
            // NOP
            break;
        }
        }

        if ( checkNewConnectionString )
        {
//如果服务端列表发生变化,则更新
            String newConnectionString = zooKeeper.getNewConnectionString();
            if ( newConnectionString != null )
            {
                handleNewConnectionString(newConnectionString);
            }
        }

        return isConnected;
    }

可以看到会根据不同的会话状态判断连接是否正常,isConnected = true表示正常。当会话超时过期Expired时,会调用handleExpiredSession进行reset操作,也就是连接的关闭和重新建立新的会话连接。即会话超时的被动重连。在连接过程中,会根据客户端设置的连接重试机制retryPolicy检测重连是否超时。

  1. parentWatchers的回调 其实在CuratorFramework client初始化时,会初始化一个watcher添加到ConnectionStateparentWatcher中,负责连接状态改变时的会话状态改变。
//初始化的parentWatcher
new Watcher()
                {
                    @Override
                    public void process(WatchedEvent watchedEvent)
                    {
                        CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.WATCHED, watchedEvent.getState().getIntValue(), unfixForNamespace(watchedEvent.getPath()), null, null, null, null, null, watchedEvent, null, null);
                        processEvent(event);
                    }
                }

会话状态改变时调用上面watcher的process方法,调用至CuratorFrameworkImpl.processEvent

private void processEvent(final CuratorEvent curatorEvent)
    {
        if ( curatorEvent.getType() == CuratorEventType.WATCHED )
        {
//状态转换
            validateConnection(curatorEvent.getWatchedEvent().getState());
        }
//通知所有注册的CuratorListener 
        listeners.forEach(new Function<CuratorListener, Void>()
        {
            @Override
            public Void apply(CuratorListener listener)
            {
                try
                {
                    TimeTrace trace = client.startTracer("EventListener");
                    listener.eventReceived(CuratorFrameworkImpl.this, curatorEvent);
                    trace.commit();
                }
                catch ( Exception e )
                {
                    ThreadUtils.checkInterrupted(e);
                    logError("Event listener threw exception", e);
                }
                return null;
            }
        });
    }

其中,validateConnection负责连接状态的转换

void validateConnection(Watcher.Event.KeeperState state)
    {
        if ( state == Watcher.Event.KeeperState.Disconnected )
        {
            internalConnectionHandler.suspendConnection(this);
        }
        else if ( state == Watcher.Event.KeeperState.Expired )
        {
            connectionStateManager.addStateChange(ConnectionState.LOST);
        }
        else if ( state == Watcher.Event.KeeperState.SyncConnected )
        {
            internalConnectionHandler.checkNewConnection(this);
            connectionStateManager.addStateChange(ConnectionState.RECONNECTED);
        }
        else if ( state == Watcher.Event.KeeperState.ConnectedReadOnly )
        {
            internalConnectionHandler.checkNewConnection(this);
            connectionStateManager.addStateChange(ConnectionState.READ_ONLY);
        }
    }

可以看出ConnectionStateManager负责curator连接状态的更新。原生客户端的连接状态和curator包装的连接状态对应关系如下:

同时,ConnectionStateManager会将当前状态ConnectionState放入自身的事件队列中,通知所有注册到自身listenersConnectionStateListener

此外,当相关操作(包括同步和后台线程的操作,如getData)发现连接断开了,也会调用client.getZooKeeper()重连,(注意底层建立客户端连接是加锁的,保证一个客户端只有一个线程可以创建会话成功)。如CuratorFrameworkImpl的后台线程任务:

 void performBackgroundOperation(OperationAndData<?> operationAndData)
    {
        try
        {
            if ( !operationAndData.isConnectionRequired() || client.isConnected() )
            {
                operationAndData.callPerformBackgroundOperation();
            }
            else
            {
                client.getZooKeeper();  // important - allow connection resets, timeouts, etc. to occur
                if ( operationAndData.getElapsedTimeMs() >= client.getConnectionTimeoutMs() )
                {
                    throw new CuratorConnectionLossException();
                }
                operationAndData.sleepFor(1, TimeUnit.SECONDS);
                queueOperation(operationAndData);
            }
        }
        catch ( Throwable e )
        {
            ThreadUtils.checkInterrupted(e);

            /**
             * Fix edge case reported as CURATOR-52. ConnectionState.checkTimeouts() throws KeeperException.ConnectionLossException
             * when the initial (or previously failed) connection cannot be re-established. This needs to be run through the retry policy
             * and callbacks need to get invoked, etc.
             */
            if ( e instanceof CuratorConnectionLossException )
            {
                WatchedEvent watchedEvent = new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.Disconnected, null);
                CuratorEvent event = new CuratorEventImpl(this, CuratorEventType.WATCHED, KeeperException.Code.CONNECTIONLOSS.intValue(), null, null, operationAndData.getContext(), null, null, null, watchedEvent, null, null);
                if ( checkBackgroundRetry(operationAndData, event) )
                {
                    queueOperation(operationAndData);
                }
                else
                {
                    logError("Background retry gave up", e);
                }
            }
            else
            {
                handleBackgroundOperationException(operationAndData, e);
            }
        }
    }

通知机制

通知机制其实就是在事件发生的地方触发已经注册好的listerner相应的回调函数(观察者模式)。CuratorFrameworkImpl client可注册listener的方式有:

  • 一次性watch
client.checkExists().creatingParentContainersIfNeeded().usingWatcher(watcher).inBackground().forPath(path);

同原生客户端的watch,只能生效一次,需要反复注册。

  • 注册CuratorListener
// this is one method of getting event/async notifications
        CuratorListener listener = new CuratorListener() {
            @Override
            public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception {
                // examine event for details
            }
        };
        client.getCuratorListenable().addListener(listener);

其实是将listener添加到CuratorFrameworkImpl.listeners中。当后台线程完成操作会触发相应的事件通知该listener,如异步创建路径会触发CuratorEventType.CREATE事件。此外当连接状态事件触发时,parentWatcher也会回调这些listeners.

  • 注册ConnectionStateListener
ConnectionStateListener connectionStateListener = new ConnectionStateListener()
    {
        @Override
        public void stateChanged(CuratorFramework client, ConnectionState newState)
        {
          //Some details
        }
    };
client.getConnectionStateListenable().addListener(connectionStateListener);

其实是将connectionStateListener添加到connectionStateManager.listeners中,在连接状态发生改变时,会收到通知。

  • 注册UnhandledErrorListener
UnhandledErrorListener unhandledErrorListener = new UnhandledErrorListener() {
            @Override
            public void unhandledError(String message, Throwable e) {
                //
            }
        };
        client.getUnhandledErrorListenable().addListener(unhandledErrorListener);

其实是将unhandledErrorListener添加到CuratorFrameworkImpl.unhandledErrorListeners中,当后台线程操作发生异常或者handler发生异常的时候会触发,收到通知。

  • 后台线程操作完成时的回调
    public static void setDataAsyncWithCallback(CuratorFramework client, BackgroundCallback callback, String path, byte[] payload) throws Exception {
        // this is another method of getting notification of an async completion
        client.setData().inBackground(callback).forPath(path, payload);
    }

对于不同的操作,如setData,可通过链式调用inBackground(callback)传入回调函数callback,这样当操作完成后,会执行回调函数。

  • 缓存机制,多次注册 curator提供了三种类型的缓存方式:Path Cache,Node Cache 和Tree Cache。相当于和服务端的节点数据进行对比,没当数据不一致时,会通过watch机制触发,后台回调更新本地缓存数据,同时再次注册相应的watch。同时,每次重连成功之后也会重新注册watch,保证了watch不丢失。

结合上面会话管理和通知机制的介绍,可以知道原生客户端的watcher是同步通知的,当然可以在特定watcher中的处理做异步。connectionStateManager.listeners是由内部的线程池做异步通知的,CuratorFrameworkImpl.listeners对于连接状态的通知是与watcher通知线程同步,由后台线程通知时为异步。如果客户端watcher注册过多,那么可能就会导致重连之后watch丢失(重连会清空sendThread的发送和接收队列,可能会导致watch丢失),甚至重连不成功(本文分析的版本3.1.0中只要调用client.getZooKeeper()就会重连,和当时的连接状态无关。所以我觉得看版本,如参考资料二)。

recipes功能

curator实现的recipes功能主要有:分布式锁Leader选举Barriers计数器缓存队列事务等。对于队列和事务,我们可以使用其他中间件,如kafka,TCC-Transaction等解决,这里不做介绍。

分布式锁

类似于Java的j.u.c包中的锁,recipes提供了分布式协调下(不同JVM)的互斥锁(可重入/不可重入),可重入读写锁,信号量和多锁对象。

  • 可重入互斥锁 由InterProcessMutex类实现。基于临时顺序节点实现,获取锁规则为: 1.当前存在的创建时间最早的临时节点获得锁权限,再次进入可重新获得锁,内部维持了一个ConcurrentMap<Thread, LockData> threadData记录锁的重入次数。 2.不是第一个节点的每个临时顺序节点都在前一个临时顺序节点上注册节点watch 当前一个节点被删除时,后一个节点会重新根据锁规则竞争锁。
  • 不可重入互斥锁 由InterProcessSemaphoreMutex实现,相比InterProcessMutex只是少了重入功能,实现原理是在互斥锁InterProcessMutex的基础上构造一个租约,由InterProcessSemaphoreV2类实现(下文的信号量)。每次只允许客户端获得一个租约,重入InterProcessSemaphoreMutex这个锁就因为租约不够而无法获得。
  • 可重入读写锁 类似JDK的ReentrantReadWriteLock,读写锁拥有一个可重入读锁和可重入写锁。读读操作不互斥,涉及写操作和其他读写操作都互斥,写锁可降级为读锁。在使用写锁时,应该先获取读锁,再释放写锁。由InterProcessReadWriteLock类和InternalInterProcessMutex类实现。 InterProcessReadWriteLock类封装了两个由InternalInterProcessMutex类实现的读锁readMutex和写锁writeMutex。它们是在可重入互斥锁InterProcessMutex的基础上创建的。 客户端在获取读锁或写锁时均在相同的父路径下创建临时顺序节点。获取读锁时,如果是写锁线程或前面节点没有写锁临时节点,可直接获取读锁。否则读锁客户端watch在前面节点最小的写锁上面,直到前面没有写锁节点时获取锁。获取写锁时,如果前面没有临时顺序节点,则直接获得,否则写锁客户端watch在前面一个临时顺序节点上,直到成为第一个临时顺序节点时获取到写锁。
  • 信号量 由InterProcessSemaphoreV2实现,可指定租约数。在获取租约时,先获取由其维护的互斥锁,如果租约数量足够(也就是与租约有关的临时顺序节点数目不到租约数),可获得租约,即获取信号量,然后释放互斥锁进行资源的操作,可保证其他线程可继续获取互斥锁,然后获取剩余的租约。同理,归还租约时,只需删除与租约有关的临时顺序节点即可。
  • 多锁对象 由InterProcessMultiLock实现,内部维护了一个互斥锁(可为重入或非重入)列表locks,只有同时获得列表中所有的互斥锁时,才保证了获取到了这个多锁对象。释放多锁对象时同样需要释放列表中的每个互斥锁。

需要注意的是,这些锁内部没有使用ConnectionStateListener监听连接状态,也就是说当拥有锁的客户端会话过期服务端删掉其临时节点了,拥有锁的客户端并不能及时感知到这种变化。

Leader选举

分布式场景下,通常选出一个leader负责任务的分派,数据的写入等。此外,当leader意外宕机,新的leader要被选举出来。recipes提供了两种选举方式:Leader latchLeaderSelector

Leader latch

客户端分别在相同的path下创建临时顺序节点,选举规则为:

1.当前存在的创建时间最早的临时节点获得leader权限。

2.不是第一个节点的每个临时顺序节点都在前一个临时顺序节点上注册节点watch

当前一个节点被删除时,后一个节点会重新根据选举规则进行选举。

此外,每个客户端都会注册一个ConnectionStateListener监听连接状态。当连接异常时,会根据当前连接策略决定是否释放leader权限。重新连接上之后如果原来的leader丧失了权限,会删除原来节点并创建新的节点,重新参与选举。

LeaderSelector

基于互斥锁InterProcessMutex实现的,获取锁即拥有leader权限,用完了会释放锁。如果连接异常,会中断任务执行线程。与LeaderLatch相比, 通过LeaderSelectorListener可以对领导权进行控制, 用完了就释放领导权,这样每个节点都有可能获得领导权。 而LeaderLatch只有主动调用close方法才会释放领导权。

Barriers

分布式栅栏会使等待在相同节点路径上一批线程阻塞,直到某个条件满足时,才允许他们继续运行。类似于j.u.c包中的CountDownlatch与CyclicBarrier,分为DistributedBarrierDistributedDoubleBarrier两类Barriers。前者线程间可使用一次等待,后者可重复使用。

DistributedBarrier

1.调用setBarrier创建barrierPath持久节点

2.等待线程调用waitOnBarrier分别在相同的barrierPath下注册节点watch,然后阻塞

3.当调用释放栅栏的removeBarrier方法时,会删掉持久节点,等待线程继续运行

DistributedDoubleBarrier

允许一组固定数量的分布式进程,相互等待。直到最后一个进程到达,才允许所有进程继续运行。同时离开的时候,也需相互等待,直到最后一个进程要离开,才允许所有进程继续运行。

在分析之前先知道下文中watch的作用:

在节点上注册的watch主要做的事情是:1.当节点被删除或创建时,唤醒注册watch的线程。2。当当前线程所在客户端连接断开时,唤醒注册watch的线程。

1.调用enter方法等待线程在barrierPath下创建临时节点并在barrierPath/ready节点下注册节点watch,如果此时临时节点数未达到等待线程的数目,则阻塞。否则创建barrierPath/ready节点,并放行。

2.调用leave时,将这一批等待线程按照临时节点名字排序,最小节点线程不断在最大节点上面注册watch,而其他节点线程都在最小节点上面注册watch,其他节点会主动删除,然后阻塞直到只剩下最小节点的时候最小节点也删除。此时当所有节点都删掉了,所有线程可跳出无限循环,继续运行。

计数器

Curator有两个计数器, SharedCount是用int来计数,DistributedAtomicLong是用long来计数。

SharedCount

SharedCount管理一个相同path下的共享int值,start的时候在该path节点下注册watch,能够感知到节点数据的变化,并更新本地缓存的数据值和版本值。如果想改变共享值,则通过本地版本号+最新值 去更新节点,如果版本过期,则更新失败并更新本地数据,否则更新成功。类似乐观锁。

此外,可以为它增加一个SharedCountListener,可以接收到共享值和连接状态的改变事件。

DistributedAtomicLong

DistributedAtomicLong管理一个相同path下的共享long值,调用trySet修改时,首先会采用乐观锁的方式(版本+最新值)进行修改,按照尝试策略retryPolicy修改直到成功。如果尝试策略次数用完仍然失败,如果允许使用悲观锁方式(初始化promotedToLock!=null,会初始化一个互斥锁)修改,则会获取互斥锁,然后再修改。

缓存

缓存是客户端对服务端的数据的缓存,如果服务端数据发生变化,通过watch机制对客户端通知,更新缓存数据并重新watch。可分为Path Cache、Node Cache和Tree Cache。

我们知道zookeeper原生的watch是一次性的,每次触发之后服务端和客户端都会清理掉。watch分为三种:node watch,path watch和default watch。

node watch:当监控路径下的节点数据变化,节点被创建和删除时触发,通过调用existsgetData方法可注册该类watch。

path watch:当监控路径下的节点被删除,新增或删除子节点时触发,通过调用getChildren方法可注册该类watch。

default watch:每当连接状态发生改变时,都会触发。通过初始化zookeeper的时候注册。同时也可以作为node watch和path watch传入existsgetDatagetChildren方法中,此时对于监控路径是一次性的。

Path Cache

是对节点路径下子节点的新增,修改和删除的监控。当一个子节点增加, 更新,删除时, Path Cache会改变它的数据和状态。

调用start方法连接异常时重连成功(通过启动时注册connectionStateListener),每次收到pathwatch的回调时都重新注册path watch达到始终监控子节点新增和删除的效果。同时每次收到watch时,都会获得子节点的所有信息,更新缓存数据。如果是新增子节点,则注册node watch,达到监控子节点数据更新的效果,从而触发对应的子节点增加, 更新,删除事件,并通知注册的PathChildrenCacheListener

Node Cache

NodeCache是对一个节点的监控。当节点数据内容修改或者删除节点时,都会触发本地缓存的更新。

node cache在调用start方法连接异常时重连成功(通过启动时注册connectionStateListener),每次收到node watch的回调时都重新注册node watch达到始终监控节点的效果。此外,每次注册watch时是后台线程发起的,会在他的响应数据上注册一个回调函数,负责获得最新节点数据,当节点数据与之前的本地缓存不一样时,触发ListenerContainer<NodeCacheListener> listeners的回调,同时使注册的NodeCacheListener感知到。

Tree Cache

Tree Cache是对监控路径下所有节点(一棵树)的新增,修改和删除的监控。

首先对于树上每个节点会把它当成一个TreeNode,并在节点上注册node watch 和 path watch。当调用start方法,连接异常时重连成功(通过启动时注册connectionStateListener),每次收到节点上对应 watch的回调时都会根据条件重新构造相应TreeNode,注册对应的node watch 和 path watch。同时树上的节点变化会通知到注册的TreeCacheListener。

参考资料:

ZooKeeper的Java客户端使用

跟着实例学习ZooKeeper的用法: 文章汇总

Zookeeper Client架构分析——ZK链接重连失败排查

http://zookeeper.apache.org/doc/r3.4.9/recipes.html#sc_recipes_Locks

感谢您的阅读,我是Monica23334 || Monica2333 。立下每周写一篇原创文章flag的小姐姐,关注我并期待打脸吧~

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • curator的用法
    • 启动加载
      • 会话管理
        • 通知机制
          • recipes功能
            • 分布式锁
            • Leader选举
            • Barriers
            • 计数器
            • 缓存
        相关产品与服务
        负载均衡
        负载均衡(Cloud Load Balancer,CLB)提供安全快捷的流量分发服务,访问流量经由 CLB 可以自动分配到云中的多台后端服务器上,扩展系统的服务能力并消除单点故障。负载均衡支持亿级连接和千万级并发,可轻松应对大流量访问,满足业务需求。
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档