开源代码学习-nsq(v0.1.1版本)源码分析

版权声明:本文为作者原创,如需转载请通知本人,并标明出处和作者。擅自转载的,保留追究其侵权的权利。golang群:570992072。qq 29185807 个人公众号:月牙寂道长 公众号微信号yueyajidaozhang https://blog.csdn.net/screscent/article/details/89927081

本文微信公众号文章链接:https://mp.weixin.qq.com/s/QCXkJS7OEQ67xwWPoiW8wg

NSQ 是实时的分布式消息处理平台,其设计的目的是用来大规模地处理每天数以十亿计级别的消息。NSQ 具有分布式和去中心化拓扑结构,该结构具有无单点故障、故障容错、高可用性以及能够保证消息的可靠传递的特征,是一个成熟的、已在大规模生成环境下应用的产品。

源码地址:https://github.com/nsqio/nsq

对于一个大型的项目来讲,我个人的学习习惯于从最小版本开始学起。这是因为,在一个项目最初的时候,大体功能和架构都已经成形,最初的版本,一般来说,代码量都较少,功能集最小。学习曲线低,并且又最初版本,慢慢往高版本过渡,也能更了解项目进化的过程,也是一个学习的过程。

并且在实际使用过程中,大多数情况下,我们可能不需要那么多的功能集,并且需要根据实际情况做一些二次开发,此时的话,也许低版本的会更贴近实际使用场景和二次开发场景。

对于nsq的学习分析,那么就从最低版本开始。

找到所有代码上传日志,找到了一个最低版本的0.1.1的最后版本。

代码量:

代码量非常少,通过看文件名,也大致能够知道有哪些模块。代码一眼看过去还是非常清晰的。

那么现在就开始从main函数开始分析。

github.com/nsqio/nsq/nsq.go

代码的话,不全部分析,看的是主要部分,其他部分读者可自行分析。

从图中,红框中,看到启动了4个模块。

topicFactory

uuidFactory
tcpServer
httpServer

那么先一个一个分析看看

topic

github.com/nsqio/nsq/topic.go

name:每个topic都有一个名称

newChannelChan:每个topic对应有很多的chan,用于对应订阅者,此变量用于记录新增加的订阅者chan

channelMap:订阅者对应的channel的map记录

backend:msg的缓存区

incomingMessageChan:新到msg的chan通知,用于发布者写入通知

msgChan:用于传递msg的chan,用于中间发布给订阅者

routerSyncChan:标记chan,用于标记有msg同步的开始

readSyncChan:标记chan,用于标记msg同步的结束

channelWriterStarted:标记chan,用于初始化时,开启MessagePump协程开启的标记

两个全局变量

newTopicChan:用于生产和获取topic

topicMap:用于存储所有topic

此函数就是main函数中的模块启动之一,开启topic工厂

for循环中,

55:接收的是newTopicChan的信息

57:从topicMap中查找topic信息

58:未查找到,则新建一个topic,NewTopic

62:将查找到的或者新建的topic信息通过retChan返回给调用者

入口则为GetTopic

创建了一个topicChan,并将其通过newTopicChan发送给topicFactory

那看看NewTopic:初始化后,在35行,开启了一个协程topic.Router

每个topic都有一个处理协程(Router),所有的操作通过chan将信息发送到Router中,Router中根据不同的chan,进行不同的操作。

Router中,接收的chan信息:

1、newChannelChan

入口为:

在一个topic中会对应多个channel,用于订阅者,并且每个订阅者的channel都有一个name用于标识。

在Router中:

123:从channelMap中查找对应的channel

125:若没有找到,则NewChannel(这个待其他模块再分析)

129:通过retchan,将channel信息返回给调用者

131:针对每个channel,都会开启一个协程MessagePump(待会分析)

2、incomingMessageChan

入口为:

用于发布者调用,发布一个信息。

在Router中:

136:将msg发送到msgchan中

139:若msgchan处于阻塞状态,则将msg写入到backend中

3、readSyncChan

这个是用于标记read sync start

148:并等待read sync end

4、exitchan

MessagePump

每个订阅者的channel,都会开启一个协程MessagePump。

for循环中

91:从msgchan中获取msg

92:同时也从backend中获取缓存区中未发布的msg

101:标记read sync read start

103-107:遍历所有的channel,将读取的msg,发布给所有的channel

109:标记read sync read end

模块小结:

1、GetTopic接口提供创建和获取topic信息。

通过topicFactory来进行处理。

2、GetChannel接口提供创建和获取Channel

通过topic中的Router,接受newChannelChan进行处理

3、PutMessage接口提供发布msg接口

通过topic中的Router,接受incomingMessageChan进行处理

4、通过msgChan和backend缓冲区,用于msg的传送。并在MessagePump中进行消息的分发。

uuid

github.com/nsqio/nsq/uuid.go

代码很简单,是一个生成uuid的模块。就不讲解了

Tcp

github.com/nsqio/nsq/tcp.go

tcp模块,用于tcp的监听

16:tcp的accept

20:每个tcp链接都会构建一个client

22:每个链接的处理部分为client Handle

client

github.com/nsqio/nsq/client.go

client中有conn链接信息,state状态信息,channel对应的订阅channel信息。

client的状态表

提供的接口write,这个就不解释了

Handle是每个链接的入口

82:读取协议版本号

90:查找对应版本号的协议

97:真正的处理部分,protoclo.IOLoop

protocol

github.com/nsqio/nsq/protocol.go

是一个接口

protocol_v1

github.com/nsqio/nsq/protocol_v1.go

v1版本的协议

初始化协议

协议处理部分

29:初始化了一个bufio的reader

30:for循环开始

31:开始从链接中读取数据

重点在

54:查找到MethodByName

56:调用查找到的Method

75:调用client的write,将结果返回

那么协议中提供了多少操作呢?

sub

关键地方:

112:获取topic

113:获取channel

114:将client加入到channel中进行管理

get

关键地方:

127:从channel中获取Message

137-138:将msg格式化到buf中

140:将buf return

channel

github.com/nsqio/nsq/channel.go

channel模块和topic模块很类似

name:channel的名称

addClientChan:用于tcp链接对应的client加入channel中的传送chan

removeClientChan:用于删除client的传送chan

clients:用于保存channel中的所有链接

backend:msg缓冲区

incomingMessageChan:用于msg到来的chan

msgchan:用于msg的传送

其余的不解释

NewChannel是在topic中的GetChannel接口,并在Router中调用,可以返回topic源码分析地方查看。

如topic一样,每个channel都开启了一个channel.Router协程,同样所有的操作都是通过chan来发送信号到Router中,进行操作。

RemoveClient通过把信号发送给removeClientChan,在Router中操作

外部发送信息到channel中,通过incomingMessageChan发送,在Router中操作。调用的地方有topic中的MessagePump中,可以返回topic源码分析地方查看。

对外还有一个GetMessage接口,调用地方是在Protocol_v1中的GET中,可以返回Protocol_v1源码分析地方查看。

189:从msgchan中获取msg

190:从backend缓冲区中获取msg

202:将msg返回给调用者

1、addClientChan

将新的client添加到client数组中

2、removeClientChan

遍历client数组,将查找到的client删除掉

3、incomingMessageChan

此处的操作与topic中的类似。将msg发送到msgchan中,若msgchan阻塞,则放到backend缓冲区中。

http

github.com/nsqio/nsq/http.go

httpserver中注册了两个处理handler:pingHandler,putHandler

pingHandler,很简单,不解释

主要操作:

86:获取到topic

87:将msg发送到topic中

总结:

所有的模块都分解完了。那么现在把这些模块链接起来。

tcp模块监听链接,每个链接生成一个client,client通过Protocol与channel联系起来。

在Protocol中提供,SUB、GET操作,client提供write供Protocol调用

在SUB中,提供将client与channel联系起来。

在GET中,提供将client从channel中获取订阅的msg,并调用client的write通过tcp发送给订阅者

每个topic包含多个channel,每个channel对应有一个MessagePump,用于从topic中将msg分发给每个channel。topic中提供GetChannel用于创建和获取channel。

topic提供一个topicFactory用于创建和获取topic。

topic提供PutMessage用于发布者发布msg。而在http模块中,提供putHandler,用于发布者发布msg,通过在putHandler中调用PutMessage接口,将msg发布到topic中。

整个过程就是如此。在这个v0.1.1版本中,最主要的流程都有了。但此版本只是一个单机系统。还不是分布式系统。

后续版本,待有时间的时候,再做分析。

龚浩华

月牙寂道长

qq:29185807

2019年05月07日

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

扫码关注云+社区

领取腾讯云代金券