首先思考一个问题,为什么要使用namesrv,而不使用zookeeper?
轻量性:namesrv比zookeeper更加轻量,代码少于zookeeper,没有zookeeper复杂。
协议上:zookeeper是基于CAP理论实现的,因此当zookeeper宕机时,重启之后恢复需要进行选举,而选举的过程是一个耗时的过程,而namesrv不需要进行选举,同时namesrv之间不进行通信。
高可用上:一个宕机,其余的不受影响,broker高可用。
在 nameSrv的启动中,我们可以看到namesrv主要做了两件事:
创建nameSrv控制器 createNamesrvController(args)
启动nameSrv控制器 start(controller)
而创建namesrvController的过程:
创建nameSrv控制器:
1.创建的过程中 首先设置系统配置 远端版本 也即rocketmq的当前版本
对当前的命令选项进行构建 分为h-help,n-namesrvAddr
对命令行进行解析,如果解析处理的命令行为空,则进行退出
2.创建namesrv配置对象,创建nettyserver配置对象,设置nettyServer配置的监听端口 9876
3.如果命令行中有c或者p可选参数,则拿到命令选项,对其判断,如果不为空,则使用流进行读取,使用配置读写进行加载
4.然后将其解析成对象,同时namesrv配置中设置配置存储路径
5.如果rocketmq的home为空,则进行打印,同时使用系统的exit
6.注册配置信息
启动的过程:
首先判断控制器是否为空,如果不为空,则执行控制器初始化。如果初始化结果为false,
则执行控制器关闭操作,同时进行退出exit
否者获取钩子,执行添加关闭钩子操作
启动namesrv控制器操作
而其启动的本质是:netty服务端的启动,此时会启动netty服务,同时如果文件服务不为空,同时还会启动文件watch服务
// 启动namesrv:启动远端服务器,同时启动文件watch服务
public void start() throws Exception {
this.remotingServer.start();
if (this.fileWatchService != null) {
this.fileWatchService.start();
}
}
namesrvcontroller中又存在什么信息呢?主要是配置信息、路由信息、远端信息
nameSrv配置 namesrvConfig
netty服务端配置 nettyServerConfig
kv配置管理器 kvConfigManager
路由信息管理器 routeInfoManager
远端服务端 remotingServer
brokerhouse保持服务 brokerHousekeepingService
远端执行器 remotingExecutor
配置 configuration
文件watch服务 fileWatchService
在RocketMQ的NameServer中,我们可以看到在进行配置信息会通过读写锁的方式进行放入,保证读写分离,高效保证读,同时限制并发写操作。和我们使用的synchronized或者ReentrantLock不同,使用的是ReentrantReadWriteLock。它的特性是:读读不互斥、读写互斥、写写互斥,而如果使用synchronized或者ReentrantLock,则是含有互斥的概念在里面。但是这样的操作依然不是最优的,如果看过ConcurrentHashMap的话,你会体会到它使用分段锁的概念,也即将锁的粒度进行细化,从而不是锁全局对象,而是锁局部对象。此时可以将读写锁的性能发挥出最大的效果,也即在粒度上再进行细化。
// 放入kv配置信息 入参 命名空间,k-v
public void putKVConfig(final String namespace, final String key, final String value) {
try {
// 使用写锁进行锁定中断
this.lock.writeLock().lockInterruptibly();
//.... 此处什么逻辑代码,配置的代码
} finally {
this.lock.writeLock().unlock();
}
} catch (InterruptedException e) {
log.error("putKVConfig InterruptedException", e);
}
this.persist();
}
那么我们看到锁不是lock操作,而是锁定中断,那此时的操作和我们看到的lock有什么区别呢?
lock:
1.如果该锁没有被另一个线程保持,则获取该锁并立即返回,将锁的保持计数设置为 1。
2.如果当前线程已经保持该锁,则将保持计数加 1,并且该方法立即返回。
3.如果该锁被另一个线程保持,则出于线程调度的目的,禁用当前线程,并且在获得锁之前,该线程将一直处于休眠状态,此时锁保持计数被设置为 1。
lockInterruptibly:
1.如果当前线程未被中断,则获取锁。
2.如果该锁没有被另一个线程保持,则获取该锁并立即返回,将锁的保持计数设置为 1。
3.如果当前线程已经保持此锁,则将保持计数加 1,并且该方法立即返回。
4.如果锁被另一个线程保持,则出于线程调度目的,禁用当前线程,并且在发生以下两种情况之一以前,该线程将一直处于休眠状态:
锁由当前线程获得;或者 其他某个线程中断当前线程。
5.如果当前线程获得该锁,则将锁保持计数设置为 1。
如果当前线程:
在进入此方法时已经设置了该线程的中断状态;或者 在等待获取锁的同时被中断。
则抛出 InterruptedException,并且清除当前线程的已中断状态。
6.在此实现中,因为此方法是一个显式中断点,所以要优先考虑响应中断,而不是响应锁的普通获取或重入获取。
那么我们知道不管是读写锁还是ReentrantLock的底层都是基于CAS+AQS实现的,由于其基于AQS实现,因此可以看到其实现的方式的思路是:
对应锁的本质是实现互斥,因此思考的第一个问题是如何实现互斥 -> CAS,使用state判断非公平锁还是独占锁,状态
对应拿到锁的线程执行任务,而没有拿到锁的线程,需要执行的操作是进行排队 -> addWaiter() 队列节点中考虑线程节点的等待状态
对应排队的任务,如何进行唤醒和执行 ->执行lockPark/lockUnpark操作
其中队列是CLH队列:
+------+ prev +-----+ +-----+
head | | <---- | | <---- | | tail
+------+ +-----+ +-----+
也即我们可以基于AbstractQueuedSynchronizer的代码可以看到它是一个双向队列,其有前驱节点prev和后继节点next。同时带有状态waitStatus。