1、将准备好的zookeeper-release-3.5.4导入IDEA中
2、启动服务端
运行主类org.apache.zookeeper.server.ZooKeeperServerMain,将zoo.cfg的完整路径配置在Program arguments。

在VM options配置,即指定到conf目录下的log4j.properties:
-Dlog4j.configuration=file:/Users/Riemann/Code/framework-source-code-analysis/zookeeper-release-3.5.4/conf/log4j.properties
3、启动客户端
通过运行ZooKeeperServerMain得到的日志,可以得知ZooKeeper服务端已经启动,服务的地址为127.0.0.1:2181。启动客户端来连接测试。
客户端的启动类为org.apache.zookeeper.ZooKeeperMain,进行如下配置:

即客户端连接127.0.0.1:2181,获取节点/riemann的信息。
1、执行过程概述
单机模式的ZK服务端逻辑写在ZooKeeperServerMain类中,由里面的main函数启动,整个过程可以分为如下几步:
第一步,配置解析:解析配置(可以是指定配置文件路径也可以由启动参数设置),比如快照文件,日志文件保存路径,监听端口等等。
第二步,启动IO监听线程:以NIO为例,ZK构建了一套IO模型,一个acceptThread,通过CPU个数计算出来的selectorThread以及一个worker线程池。 其中acceptThread收到连接以后按照轮询策略交给selectorThread处理,selectorThread读取完数据以后交给worker线程池进行处理。 注:在ZK状态没有修改为RUNNING之前,IO线程虽然启动监听但不会真正接收请求。
第三步,加载数据:我们知道ZK会定期把数据dump到磁盘,因此每次启动时第一步中配置的文件路径去读取数据文件,如果存在的话就加载配置,这样就可以用于数据恢复。
第四步,构建处理链:单机模式下,ZK的请求处理链为PrepRequestProcessor->SyncRequestProcessor->FinalRequestProcessor。它们的职责如下:PrepRequestProcessor处理器用于构造请求对象,校验session合法性等。SyncRequestProcessor处理器用于向磁盘中写入事务日志和快照信息。FinalRequestProcessor处理器用于修改ZK内存中的数据结构并触发watcher。
第五步,启动服务:修改服务端运行状态,标识服务正式启动,IO线程开始接受请求。
2、单机版服务器的启动流程图如下

3、服务端启动过程
单机模式的委托启动类为:ZooKeeperServerMain,看里面的main函数代码。
public static void main(String[] args) {
ZooKeeperServerMain main = new ZooKeeperServerMain();
try {
// 解析配置启动zk
main.initializeAndRun(args);
} catch (IllegalArgumentException e) {
LOG.error("Invalid arguments, exiting abnormally", e);
LOG.info(USAGE);
System.err.println(USAGE);
System.exit(2);
} catch (ConfigException e) {
LOG.error("Invalid config, exiting abnormally", e);
System.err.println("Invalid config, exiting abnormally");
System.exit(2);
} catch (DatadirException e) {
LOG.error("Unable to access datadir, exiting abnormally", e);
System.err.println("Unable to access datadir, exiting abnormally");
System.exit(3);
} catch (AdminServerException e) {
LOG.error("Unable to start AdminServer, exiting abnormally", e);
System.err.println("Unable to start AdminServer, exiting abnormally");
System.exit(4);
} catch (Exception e) {
LOG.error("Unexpected exception, exiting abnormally", e);
System.exit(1);
}
LOG.info("Exiting normally");
System.exit(0);
}
// 解析单机模式的配置对象,并启动单机模式
protected void initializeAndRun(String[] args)
throws ConfigException, IOException, AdminServerException
{
try {
// 注册jmx
// JMX的全称为Java Management Extensions.是管理Java的一种扩展。
// 这种机制可以方便的管理、监控正在运行中的Java程序。常用于管理线程,内存,日志Level,服务重启,系统环境等
ManagedUtil.registerLog4jMBeans();
} catch (JMException e) {
LOG.warn("Unable to register log4j JMX control", e);
}
// 创建服务配置对象
ServerConfig config = new ServerConfig();
// 如果入参只有一个,则认为是配置文件的路径
if (args.length == 1) {
// 解析配置文件
config.parse(args[0]);
} else {
// 参数有多个,解析参数
config.parse(args);
}
// 根据配置运行服务
runFromConfig(config);
}
初始化配置信息:
@Override
public void configure(InetSocketAddress addr, int maxcc, boolean secure) throws IOException {
if (secure) {
thrownew UnsupportedOperationException("SSL isn't supported in NIOServerCnxn");
}
configureSaslLogin();
maxClientCnxns = maxcc;
// 会话超时时间
sessionlessCnxnTimeout = Integer.getInteger(
ZOOKEEPER_NIO_SESSIONLESS_CNXN_TIMEOUT, 10000);
// We also use the sessionlessCnxnTimeout as expiring interval for
// cnxnExpiryQueue. These don't need to be the same, but the expiring
// interval passed into the ExpiryQueue() constructor below should be
// less than or equal to the timeout.
// 过期队列
cnxnExpiryQueue =
new ExpiryQueue<NIOServerCnxn>(sessionlessCnxnTimeout);
expirerThread = new ConnectionExpirerThread();
// 根据CPU个数计算selector线程的数量
int numCores = Runtime.getRuntime().availableProcessors();
// 32 cores sweet spot seems to be 4 selector threads
numSelectorThreads = Integer.getInteger(
ZOOKEEPER_NIO_NUM_SELECTOR_THREADS,
Math.max((int) Math.sqrt((float) numCores/2), 1));
if (numSelectorThreads < 1) {
thrownew IOException("numSelectorThreads must be at least 1");
}
// 计算woker线程的数量
numWorkerThreads = Integer.getInteger(
ZOOKEEPER_NIO_NUM_WORKER_THREADS, 2 * numCores);
// worker线程关闭时间
workerShutdownTimeoutMS = Long.getLong(
ZOOKEEPER_NIO_SHUTDOWN_TIMEOUT, 5000);
LOG.info("Configuring NIO connection handler with "
+ (sessionlessCnxnTimeout/1000) + "s sessionless connection"
+ " timeout, " + numSelectorThreads + " selector thread(s), "
+ (numWorkerThreads > 0 ? numWorkerThreads : "no")
+ " worker threads, and "
+ (directBufferBytes == 0 ? "gathered writes." :
("" + (directBufferBytes/1024) + " kB direct buffers.")));
// 初始化selector线程
for(int i=0; i<numSelectorThreads; ++i) {
selectorThreads.add(new SelectorThread(i));
}
this.ss = ServerSocketChannel.open();
ss.socket().setReuseAddress(true);
LOG.info("binding to port " + addr);
ss.socket().bind(addr);
ss.configureBlocking(false);
// 初始化accept线程,这里看出accept线程只有一个,里面会注册监听ACCEPT事件
acceptThread = new AcceptThread(ss, addr, selectorThreads);
}
启动服务逻辑:
public void startup(ZooKeeperServer zkServer) throws IOException, InterruptedException {
startup(zkServer, true);
}
public abstract void startup(ZooKeeperServer zkServer, boolean startServer) throws IOException, InterruptedException;
// 启动分了好几块,一个一个看
@Override
public void startup(ZooKeeperServer zks, boolean startServer)
throws IOException, InterruptedException {
// 启动相关线程
start();
setZooKeeperServer(zks);
// 启动服务
if (startServer) {
// 加载数据到zkDataBase
zks.startdata();
// 启动定时清除session的管理器,注册jmx,添加请求处理器
zks.startup();
}
}
@Override
public void start() {
stopped = false;
// 初始化worker线程池
if (workerPool == null) {
workerPool = new WorkerService(
"NIOWorker", numWorkerThreads, false);
}
// 挨个启动Selector线程(处理客户端请求线程),
for(SelectorThread thread : selectorThreads) {
if (thread.getState() == Thread.State.NEW) {
thread.start();
}
}
// ensure thread is started once and only once
// 启动acceptThread线程(处理接收连接进行事件)
if (acceptThread.getState() == Thread.State.NEW) {
acceptThread.start();
}
// ExpirerThread(处理过期连接)
if (expirerThread.getState() == Thread.State.NEW) {
expirerThread.start();
}
}
// 初始化数据结构
public void startdata()
throws IOException, InterruptedException {
// 初始化ZKDatabase,该数据结构用来保存ZK上面存储的所有数据
// check to see if zkDb is not null
if (zkDb == null) {
// 初始化数据数据,这里会加入一些原始节点,例如/zookeeper
zkDb = new ZKDatabase(this.txnLogFactory);
}
// 加载磁盘上已经存储的数据,如果有的话
if (!zkDb.isInitialized()) {
loadData();
}
}
public synchronized void startup() {
// 初始化session追踪器
if (sessionTracker == null) {
createSessionTracker();
}
// 启动session追踪器
startSessionTracker();
// 建立请求处理链路
setupRequestProcessors();
// 注册jmx
registerJMX();
setState(State.RUNNING);
notifyAll();
}
/**
* 这里可以看出,单机模式下请求的处理链路为:
* PrepRequestProcessor -> SyncRequestProcessor -> FinalRequestProcessor
*
* PrepRequestProcessor主要内容:对请求进行区分是否是事务请求,如果是事务请求则创建出事务请求头,
* 同时执行一些检查操作,对于增删改等影响数据状态的操作都被认为是事务,需要创建出事务请求头。
*
* SyncRequestProcessor处理器:主要对事务请求进行日志记录,同时事务请求达到一定次数后,就会执行一次快照。
*
* FinalRequestProcessor处理器:作为处理器链上的最后一个处理器,负责执行请求的具体任务,前面几个处理器都是辅助操作,
* PrepRequestProcessor为请求添加事务请求头和执行一些检查工作,
* SyncRequestProcessor也仅仅是把该请求记录下来保存到事务日志中。
* 该请求的具体内容,如获取所有的子节点,创建node的这些具体的操作就是由FinalRequestProcessor来完成的。
*/
protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
RequestProcessor syncProcessor = new SyncRequestProcessor(this,
finalProcessor);
((SyncRequestProcessor)syncProcessor).start();
firstProcessor = new PrepRequestProcessor(this, syncProcessor);
((PrepRequestProcessor)firstProcessor).start();
}
欢迎大家关注我的公众号【老周聊架构】,AI、大数据、云原生、物联网等相关领域的技术知识分享。、