首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >Zookeeper源码分析之单机模式服务端启动

Zookeeper源码分析之单机模式服务端启动

作者头像
老周聊架构
发布2025-11-20 10:49:41
发布2025-11-20 10:49:41
10
举报

一、源码环境搭建

1、将准备好的zookeeper-release-3.5.4导入IDEA中

2、启动服务端

运行主类org.apache.zookeeper.server.ZooKeeperServerMain,将zoo.cfg的完整路径配置在Program arguments

在这里插入图片描述
在这里插入图片描述

VM options配置,即指定到conf目录下的log4j.properties

代码语言:javascript
复制
-Dlog4j.configuration=file:/Users/Riemann/Code/framework-source-code-analysis/zookeeper-release-3.5.4/conf/log4j.properties

3、启动客户端

通过运行ZooKeeperServerMain得到的日志,可以得知ZooKeeper服务端已经启动,服务的地址为127.0.0.1:2181。启动客户端来连接测试。

客户端的启动类为org.apache.zookeeper.ZooKeeperMain,进行如下配置:

在这里插入图片描述
在这里插入图片描述

即客户端连接127.0.0.1:2181,获取节点/riemann的信息。

二、单机模式服务端启动

1、执行过程概述

单机模式的ZK服务端逻辑写在ZooKeeperServerMain类中,由里面的main函数启动,整个过程可以分为如下几步:

第一步,配置解析:解析配置(可以是指定配置文件路径也可以由启动参数设置),比如快照文件,日志文件保存路径,监听端口等等。

第二步,启动IO监听线程:以NIO为例,ZK构建了一套IO模型,一个acceptThread,通过CPU个数计算出来的selectorThread以及一个worker线程池。 其中acceptThread收到连接以后按照轮询策略交给selectorThread处理,selectorThread读取完数据以后交给worker线程池进行处理。 注:在ZK状态没有修改为RUNNING之前,IO线程虽然启动监听但不会真正接收请求。

第三步,加载数据:我们知道ZK会定期把数据dump到磁盘,因此每次启动时第一步中配置的文件路径去读取数据文件,如果存在的话就加载配置,这样就可以用于数据恢复。

第四步,构建处理链:单机模式下,ZK的请求处理链为PrepRequestProcessor->SyncRequestProcessor->FinalRequestProcessor。它们的职责如下:PrepRequestProcessor处理器用于构造请求对象,校验session合法性等。SyncRequestProcessor处理器用于向磁盘中写入事务日志和快照信息。FinalRequestProcessor处理器用于修改ZK内存中的数据结构并触发watcher

第五步,启动服务:修改服务端运行状态,标识服务正式启动,IO线程开始接受请求。

2、单机版服务器的启动流程图如下

在这里插入图片描述
在这里插入图片描述

3、服务端启动过程

单机模式的委托启动类为:ZooKeeperServerMain,看里面的main函数代码。

代码语言:javascript
复制
public static void main(String[] args) {
    ZooKeeperServerMain main = new ZooKeeperServerMain();
    try {
        // 解析配置启动zk
        main.initializeAndRun(args);
    } catch (IllegalArgumentException e) {
        LOG.error("Invalid arguments, exiting abnormally", e);
        LOG.info(USAGE);
        System.err.println(USAGE);
        System.exit(2);
    } catch (ConfigException e) {
        LOG.error("Invalid config, exiting abnormally", e);
        System.err.println("Invalid config, exiting abnormally");
        System.exit(2);
    } catch (DatadirException e) {
        LOG.error("Unable to access datadir, exiting abnormally", e);
        System.err.println("Unable to access datadir, exiting abnormally");
        System.exit(3);
    } catch (AdminServerException e) {
        LOG.error("Unable to start AdminServer, exiting abnormally", e);
        System.err.println("Unable to start AdminServer, exiting abnormally");
        System.exit(4);
    } catch (Exception e) {
        LOG.error("Unexpected exception, exiting abnormally", e);
        System.exit(1);
    }
    LOG.info("Exiting normally");
    System.exit(0);
}
代码语言:javascript
复制
// 解析单机模式的配置对象,并启动单机模式
protected void initializeAndRun(String[] args)
    throws ConfigException, IOException, AdminServerException
{
    try {

        // 注册jmx
        // JMX的全称为Java Management Extensions.是管理Java的一种扩展。
        // 这种机制可以方便的管理、监控正在运行中的Java程序。常用于管理线程,内存,日志Level,服务重启,系统环境等
        ManagedUtil.registerLog4jMBeans();
    } catch (JMException e) {
        LOG.warn("Unable to register log4j JMX control", e);
    }

    // 创建服务配置对象
    ServerConfig config = new ServerConfig();

    // 如果入参只有一个,则认为是配置文件的路径
    if (args.length == 1) {
        // 解析配置文件
        config.parse(args[0]);
    } else {
        // 参数有多个,解析参数
        config.parse(args);
    }

    // 根据配置运行服务
    runFromConfig(config);
}

初始化配置信息:

代码语言:javascript
复制
@Override
public void configure(InetSocketAddress addr, int maxcc, boolean secure) throws IOException {
    if (secure) {
        thrownew UnsupportedOperationException("SSL isn't supported in NIOServerCnxn");
    }

    configureSaslLogin();

    maxClientCnxns = maxcc;

    // 会话超时时间
    sessionlessCnxnTimeout = Integer.getInteger(
        ZOOKEEPER_NIO_SESSIONLESS_CNXN_TIMEOUT, 10000);
    // We also use the sessionlessCnxnTimeout as expiring interval for
    // cnxnExpiryQueue. These don't need to be the same, but the expiring
    // interval passed into the ExpiryQueue() constructor below should be
    // less than or equal to the timeout.

    // 过期队列
    cnxnExpiryQueue =
        new ExpiryQueue<NIOServerCnxn>(sessionlessCnxnTimeout);
    expirerThread = new ConnectionExpirerThread();

    // 根据CPU个数计算selector线程的数量
    int numCores = Runtime.getRuntime().availableProcessors();
    // 32 cores sweet spot seems to be 4 selector threads
    numSelectorThreads = Integer.getInteger(
        ZOOKEEPER_NIO_NUM_SELECTOR_THREADS,
        Math.max((int) Math.sqrt((float) numCores/2), 1));
    if (numSelectorThreads < 1) {
        thrownew IOException("numSelectorThreads must be at least 1");
    }

    // 计算woker线程的数量
    numWorkerThreads = Integer.getInteger(
        ZOOKEEPER_NIO_NUM_WORKER_THREADS, 2 * numCores);

    // worker线程关闭时间
    workerShutdownTimeoutMS = Long.getLong(
        ZOOKEEPER_NIO_SHUTDOWN_TIMEOUT, 5000);

    LOG.info("Configuring NIO connection handler with "
             + (sessionlessCnxnTimeout/1000) + "s sessionless connection"
             + " timeout, " + numSelectorThreads + " selector thread(s), "
             + (numWorkerThreads > 0 ? numWorkerThreads : "no")
             + " worker threads, and "
             + (directBufferBytes == 0 ? "gathered writes." :
                ("" + (directBufferBytes/1024) + " kB direct buffers.")));

    // 初始化selector线程
    for(int i=0; i<numSelectorThreads; ++i) {
        selectorThreads.add(new SelectorThread(i));
    }

    this.ss = ServerSocketChannel.open();
    ss.socket().setReuseAddress(true);
    LOG.info("binding to port " + addr);
    ss.socket().bind(addr);
    ss.configureBlocking(false);
    // 初始化accept线程,这里看出accept线程只有一个,里面会注册监听ACCEPT事件
    acceptThread = new AcceptThread(ss, addr, selectorThreads);
}

启动服务逻辑:

代码语言:javascript
复制
public void startup(ZooKeeperServer zkServer) throws IOException, InterruptedException {
    startup(zkServer, true);
}
代码语言:javascript
复制
public abstract void startup(ZooKeeperServer zkServer, boolean startServer) throws IOException, InterruptedException;
代码语言:javascript
复制
// 启动分了好几块,一个一个看
@Override
public void startup(ZooKeeperServer zks, boolean startServer)
        throws IOException, InterruptedException {
    // 启动相关线程
    start();
    setZooKeeperServer(zks);

    // 启动服务
    if (startServer) {
        // 加载数据到zkDataBase
        zks.startdata();
        // 启动定时清除session的管理器,注册jmx,添加请求处理器
        zks.startup();
    }
}
代码语言:javascript
复制
@Override
public void start() {
    stopped = false;
    // 初始化worker线程池
    if (workerPool == null) {
        workerPool = new WorkerService(
            "NIOWorker", numWorkerThreads, false);
    }

    // 挨个启动Selector线程(处理客户端请求线程),
    for(SelectorThread thread : selectorThreads) {
        if (thread.getState() == Thread.State.NEW) {
            thread.start();
        }
    }
    // ensure thread is started once and only once
    // 启动acceptThread线程(处理接收连接进行事件)
    if (acceptThread.getState() == Thread.State.NEW) {
        acceptThread.start();
    }

    // ExpirerThread(处理过期连接)
    if (expirerThread.getState() == Thread.State.NEW) {
        expirerThread.start();
    }
}
代码语言:javascript
复制
// 初始化数据结构
public void startdata()
throws IOException, InterruptedException {
    // 初始化ZKDatabase,该数据结构用来保存ZK上面存储的所有数据
    // check to see if zkDb is not null
    if (zkDb == null) {
        // 初始化数据数据,这里会加入一些原始节点,例如/zookeeper
        zkDb = new ZKDatabase(this.txnLogFactory);
    }
    // 加载磁盘上已经存储的数据,如果有的话
    if (!zkDb.isInitialized()) {
        loadData();
    }
}
代码语言:javascript
复制
public synchronized void startup() {
     // 初始化session追踪器
     if (sessionTracker == null) {
         createSessionTracker();
     }
     // 启动session追踪器
     startSessionTracker();

     // 建立请求处理链路
     setupRequestProcessors();

     // 注册jmx
     registerJMX();

     setState(State.RUNNING);
     notifyAll();
 }
代码语言:javascript
复制
/**
 * 这里可以看出,单机模式下请求的处理链路为:
 * PrepRequestProcessor -> SyncRequestProcessor -> FinalRequestProcessor
 * 
 * PrepRequestProcessor主要内容:对请求进行区分是否是事务请求,如果是事务请求则创建出事务请求头,
 * 同时执行一些检查操作,对于增删改等影响数据状态的操作都被认为是事务,需要创建出事务请求头。
 *
 * SyncRequestProcessor处理器:主要对事务请求进行日志记录,同时事务请求达到一定次数后,就会执行一次快照。
 *
 * FinalRequestProcessor处理器:作为处理器链上的最后一个处理器,负责执行请求的具体任务,前面几个处理器都是辅助操作,
 *     PrepRequestProcessor为请求添加事务请求头和执行一些检查工作,
 *     SyncRequestProcessor也仅仅是把该请求记录下来保存到事务日志中。
 *     该请求的具体内容,如获取所有的子节点,创建node的这些具体的操作就是由FinalRequestProcessor来完成的。
 */
protected void setupRequestProcessors() {
    RequestProcessor finalProcessor = new FinalRequestProcessor(this);
    RequestProcessor syncProcessor = new SyncRequestProcessor(this,
            finalProcessor);
    ((SyncRequestProcessor)syncProcessor).start();
    firstProcessor = new PrepRequestProcessor(this, syncProcessor);
    ((PrepRequestProcessor)firstProcessor).start();
}

欢迎大家关注我的公众号【老周聊架构】,AI、大数据、云原生、物联网等相关领域的技术知识分享。、

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2025-04-28,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 老周聊架构 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、源码环境搭建
  • 二、单机模式服务端启动
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档