前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RocketMQ中使用读写锁场景

RocketMQ中使用读写锁场景

作者头像
路行的亚洲
发布2021-04-13 14:59:52
6790
发布2021-04-13 14:59:52
举报
文章被收录于专栏:后端技术学习后端技术学习

首先思考一个问题,为什么要使用namesrv,而不使用zookeeper

代码语言:javascript
复制
轻量性:namesrv比zookeeper更加轻量,代码少于zookeeper,没有zookeeper复杂。
协议上:zookeeper是基于CAP理论实现的,因此当zookeeper宕机时,重启之后恢复需要进行选举,而选举的过程是一个耗时的过程,而namesrv不需要进行选举,同时namesrv之间不进行通信。
高可用上:一个宕机,其余的不受影响,broker高可用。

在 nameSrv的启动中,我们可以看到namesrv主要做了两件事

代码语言:javascript
复制
创建nameSrv控制器 createNamesrvController(args)
启动nameSrv控制器  start(controller)

而创建namesrvController的过程:

代码语言:javascript
复制
创建nameSrv控制器:
    1.创建的过程中 首先设置系统配置 远端版本 也即rocketmq的当前版本
                              对当前的命令选项进行构建 分为h-help,n-namesrvAddr
                              对命令行进行解析,如果解析处理的命令行为空,则进行退出
    2.创建namesrv配置对象,创建nettyserver配置对象,设置nettyServer配置的监听端口 9876
    3.如果命令行中有c或者p可选参数,则拿到命令选项,对其判断,如果不为空,则使用流进行读取,使用配置读写进行加载
    4.然后将其解析成对象,同时namesrv配置中设置配置存储路径
    5.如果rocketmq的home为空,则进行打印,同时使用系统的exit
    6.注册配置信息

启动的过程:

代码语言:javascript
复制
 首先判断控制器是否为空,如果不为空,则执行控制器初始化。如果初始化结果为false,
 则执行控制器关闭操作,同时进行退出exit
 否者获取钩子,执行添加关闭钩子操作
 启动namesrv控制器操作

而其启动的本质是:netty服务端的启动,此时会启动netty服务,同时如果文件服务不为空,同时还会启动文件watch服务

代码语言:javascript
复制
// 启动namesrv:启动远端服务器,同时启动文件watch服务
public void start() throws Exception {
    this.remotingServer.start();

    if (this.fileWatchService != null) {
        this.fileWatchService.start();
    }
}

namesrvcontroller中又存在什么信息呢?主要是配置信息、路由信息、远端信息

代码语言:javascript
复制
nameSrv配置 namesrvConfig
netty服务端配置 nettyServerConfig
kv配置管理器  kvConfigManager
路由信息管理器  routeInfoManager
远端服务端 remotingServer
brokerhouse保持服务  brokerHousekeepingService
远端执行器 remotingExecutor
配置 configuration
文件watch服务  fileWatchService

在RocketMQ的NameServer中,我们可以看到在进行配置信息会通过读写锁的方式进行放入,保证读写分离,高效保证读,同时限制并发写操作。和我们使用的synchronized或者ReentrantLock不同,使用的是ReentrantReadWriteLock。它的特性是:读读不互斥、读写互斥、写写互斥,而如果使用synchronized或者ReentrantLock,则是含有互斥的概念在里面。但是这样的操作依然不是最优的,如果看过ConcurrentHashMap的话,你会体会到它使用分段锁的概念,也即将锁的粒度进行细化,从而不是锁全局对象,而是锁局部对象。此时可以将读写锁的性能发挥出最大的效果,也即在粒度上再进行细化。

代码语言:javascript
复制
// 放入kv配置信息 入参 命名空间,k-v
public void putKVConfig(final String namespace, final String key, final String value) {
    try {
        // 使用写锁进行锁定中断
        this.lock.writeLock().lockInterruptibly();
        //.... 此处什么逻辑代码,配置的代码
        } finally {
            this.lock.writeLock().unlock();
        }
    } catch (InterruptedException e) {
        log.error("putKVConfig InterruptedException", e);
    }
    this.persist();
}

那么我们看到锁不是lock操作,而是锁定中断,那此时的操作和我们看到的lock有什么区别呢?

lock:

代码语言:javascript
复制
1.如果该锁没有被另一个线程保持,则获取该锁并立即返回,将锁的保持计数设置为 1。
2.如果当前线程已经保持该锁,则将保持计数加 1,并且该方法立即返回。
3.如果该锁被另一个线程保持,则出于线程调度的目的,禁用当前线程,并且在获得锁之前,该线程将一直处于休眠状态,此时锁保持计数被设置为 1。

lockInterruptibly:

代码语言:javascript
复制
1.如果当前线程未被中断,则获取锁。 
2.如果该锁没有被另一个线程保持,则获取该锁并立即返回,将锁的保持计数设置为 1。 
3.如果当前线程已经保持此锁,则将保持计数加 1,并且该方法立即返回。  
4.如果锁被另一个线程保持,则出于线程调度目的,禁用当前线程,并且在发生以下两种情况之一以前,该线程将一直处于休眠状态: 
     锁由当前线程获得;或者 其他某个线程中断当前线程。  
5.如果当前线程获得该锁,则将锁保持计数设置为 1。 
   如果当前线程: 
       在进入此方法时已经设置了该线程的中断状态;或者 在等待获取锁的同时被中断。 
   则抛出 InterruptedException,并且清除当前线程的已中断状态。  
6.在此实现中,因为此方法是一个显式中断点,所以要优先考虑响应中断,而不是响应锁的普通获取或重入获取。 

那么我们知道不管是读写锁还是ReentrantLock的底层都是基于CAS+AQS实现的,由于其基于AQS实现,因此可以看到其实现的方式的思路是:

代码语言:javascript
复制
对应锁的本质是实现互斥,因此思考的第一个问题是如何实现互斥 -> CAS,使用state判断非公平锁还是独占锁,状态
对应拿到锁的线程执行任务,而没有拿到锁的线程,需要执行的操作是进行排队 -> addWaiter() 队列节点中考虑线程节点的等待状态
对应排队的任务,如何进行唤醒和执行 ->执行lockPark/lockUnpark操作

其中队列是CLH队列:

代码语言:javascript
复制
      +------+  prev +-----+       +-----+
 head |      | <---- |     | <---- |     |  tail
      +------+       +-----+       +-----+

也即我们可以基于AbstractQueuedSynchronizer的代码可以看到它是一个双向队列,其有前驱节点prev和后继节点next。同时带有状态waitStatus。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-04-12,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 后端技术学习 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档