前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >开源代码学习-nsq(v0.1.1版本)源码分析

开源代码学习-nsq(v0.1.1版本)源码分析

作者头像
月牙寂道长
发布2019-07-02 13:31:13
4390
发布2019-07-02 13:31:13
举报
文章被收录于专栏:月牙寂月牙寂

版权声明:本文为作者原创,如需转载请通知本人,并标明出处和作者。擅自转载的,保留追究其侵权的权利。golang群:570992072。qq 29185807 个人公众号:月牙寂道长 公众号微信号yueyajidaozhang https://cloud.tencent.com/developer/article/1454518

本文微信公众号文章链接:https://mp.weixin.qq.com/s/QCXkJS7OEQ67xwWPoiW8wg

NSQ 是实时的分布式消息处理平台,其设计的目的是用来大规模地处理每天数以十亿计级别的消息。NSQ 具有分布式和去中心化拓扑结构,该结构具有无单点故障、故障容错、高可用性以及能够保证消息的可靠传递的特征,是一个成熟的、已在大规模生成环境下应用的产品。

源码地址:https://github.com/nsqio/nsq

代码语言:txt
复制
对于一个大型的项目来讲,我个人的学习习惯于从最小版本开始学起。这是因为,在一个项目最初的时候,大体功能和架构都已经成形,最初的版本,一般来说,代码量都较少,功能集最小。学习曲线低,并且又最初版本,慢慢往高版本过渡,也能更了解项目进化的过程,也是一个学习的过程。
代码语言:txt
复制
并且在实际使用过程中,大多数情况下,我们可能不需要那么多的功能集,并且需要根据实际情况做一些二次开发,此时的话,也许低版本的会更贴近实际使用场景和二次开发场景。
代码语言:txt
复制
对于nsq的学习分析,那么就从最低版本开始。

找到所有代码上传日志,找到了一个最低版本的0.1.1的最后版本。

代码量:

代码量非常少,通过看文件名,也大致能够知道有哪些模块。代码一眼看过去还是非常清晰的。

那么现在就开始从main函数开始分析。

github.com/nsqio/nsq/nsq.go

代码的话,不全部分析,看的是主要部分,其他部分读者可自行分析。

从图中,红框中,看到启动了4个模块。

topicFactory

代码语言:javascript
复制
uuidFactory
代码语言:javascript
复制
tcpServer
代码语言:javascript
复制
httpServer

那么先一个一个分析看看

topic

github.com/nsqio/nsq/topic.go

name:每个topic都有一个名称

newChannelChan:每个topic对应有很多的chan,用于对应订阅者,此变量用于记录新增加的订阅者chan

channelMap:订阅者对应的channel的map记录

backend:msg的缓存区

incomingMessageChan:新到msg的chan通知,用于发布者写入通知

msgChan:用于传递msg的chan,用于中间发布给订阅者

routerSyncChan:标记chan,用于标记有msg同步的开始

readSyncChan:标记chan,用于标记msg同步的结束

channelWriterStarted:标记chan,用于初始化时,开启MessagePump协程开启的标记

两个全局变量

newTopicChan:用于生产和获取topic

topicMap:用于存储所有topic

此函数就是main函数中的模块启动之一,开启topic工厂

for循环中,

55:接收的是newTopicChan的信息

57:从topicMap中查找topic信息

58:未查找到,则新建一个topic,NewTopic

62:将查找到的或者新建的topic信息通过retChan返回给调用者

入口则为GetTopic

创建了一个topicChan,并将其通过newTopicChan发送给topicFactory

那看看NewTopic:初始化后,在35行,开启了一个协程topic.Router

每个topic都有一个处理协程(Router),所有的操作通过chan将信息发送到Router中,Router中根据不同的chan,进行不同的操作。

Router中,接收的chan信息:

1、newChannelChan

代码语言:txt
复制
入口为:

在一个topic中会对应多个channel,用于订阅者,并且每个订阅者的channel都有一个name用于标识。

在Router中:

123:从channelMap中查找对应的channel

125:若没有找到,则NewChannel(这个待其他模块再分析)

129:通过retchan,将channel信息返回给调用者

131:针对每个channel,都会开启一个协程MessagePump(待会分析)

2、incomingMessageChan

入口为:

用于发布者调用,发布一个信息。

在Router中:

136:将msg发送到msgchan中

139:若msgchan处于阻塞状态,则将msg写入到backend中

3、readSyncChan

这个是用于标记read sync start

148:并等待read sync end

4、exitchan

MessagePump

每个订阅者的channel,都会开启一个协程MessagePump。

for循环中

91:从msgchan中获取msg

92:同时也从backend中获取缓存区中未发布的msg

101:标记read sync read start

103-107:遍历所有的channel,将读取的msg,发布给所有的channel

109:标记read sync read end

模块小结:

1、GetTopic接口提供创建和获取topic信息。

代码语言:txt
复制
   通过topicFactory来进行处理。

2、GetChannel接口提供创建和获取Channel

代码语言:txt
复制
  通过topic中的Router,接受newChannelChan进行处理

3、PutMessage接口提供发布msg接口

代码语言:txt
复制
 通过topic中的Router,接受incomingMessageChan进行处理

4、通过msgChan和backend缓冲区,用于msg的传送。并在MessagePump中进行消息的分发。

uuid

github.com/nsqio/nsq/uuid.go

代码很简单,是一个生成uuid的模块。就不讲解了

Tcp

github.com/nsqio/nsq/tcp.go

tcp模块,用于tcp的监听

16:tcp的accept

20:每个tcp链接都会构建一个client

22:每个链接的处理部分为client Handle

client

github.com/nsqio/nsq/client.go

client中有conn链接信息,state状态信息,channel对应的订阅channel信息。

client的状态表

提供的接口write,这个就不解释了

Handle是每个链接的入口

82:读取协议版本号

90:查找对应版本号的协议

97:真正的处理部分,protoclo.IOLoop

protocol

github.com/nsqio/nsq/protocol.go

是一个接口

protocol_v1

github.com/nsqio/nsq/protocol_v1.go

v1版本的协议

初始化协议

协议处理部分

29:初始化了一个bufio的reader

30:for循环开始

31:开始从链接中读取数据

重点在

54:查找到MethodByName

56:调用查找到的Method

75:调用client的write,将结果返回

那么协议中提供了多少操作呢?

sub

关键地方:

112:获取topic

113:获取channel

114:将client加入到channel中进行管理

get

关键地方:

127:从channel中获取Message

137-138:将msg格式化到buf中

140:将buf return

channel

github.com/nsqio/nsq/channel.go

channel模块和topic模块很类似

name:channel的名称

addClientChan:用于tcp链接对应的client加入channel中的传送chan

removeClientChan:用于删除client的传送chan

clients:用于保存channel中的所有链接

backend:msg缓冲区

incomingMessageChan:用于msg到来的chan

msgchan:用于msg的传送

其余的不解释

NewChannel是在topic中的GetChannel接口,并在Router中调用,可以返回topic源码分析地方查看。

如topic一样,每个channel都开启了一个channel.Router协程,同样所有的操作都是通过chan来发送信号到Router中,进行操作。

RemoveClient通过把信号发送给removeClientChan,在Router中操作

外部发送信息到channel中,通过incomingMessageChan发送,在Router中操作。调用的地方有topic中的MessagePump中,可以返回topic源码分析地方查看。

对外还有一个GetMessage接口,调用地方是在Protocol_v1中的GET中,可以返回Protocol_v1源码分析地方查看。

189:从msgchan中获取msg

190:从backend缓冲区中获取msg

202:将msg返回给调用者

1、addClientChan

将新的client添加到client数组中

2、removeClientChan

遍历client数组,将查找到的client删除掉

3、incomingMessageChan

此处的操作与topic中的类似。将msg发送到msgchan中,若msgchan阻塞,则放到backend缓冲区中。

http

github.com/nsqio/nsq/http.go

httpserver中注册了两个处理handler:pingHandler,putHandler

pingHandler,很简单,不解释

主要操作:

86:获取到topic

87:将msg发送到topic中

总结:

所有的模块都分解完了。那么现在把这些模块链接起来。

tcp模块监听链接,每个链接生成一个client,client通过Protocol与channel联系起来。

在Protocol中提供,SUB、GET操作,client提供write供Protocol调用

在SUB中,提供将client与channel联系起来。

在GET中,提供将client从channel中获取订阅的msg,并调用client的write通过tcp发送给订阅者

每个topic包含多个channel,每个channel对应有一个MessagePump,用于从topic中将msg分发给每个channel。topic中提供GetChannel用于创建和获取channel。

topic提供一个topicFactory用于创建和获取topic。

topic提供PutMessage用于发布者发布msg。而在http模块中,提供putHandler,用于发布者发布msg,通过在putHandler中调用PutMessage接口,将msg发布到topic中。

整个过程就是如此。在这个v0.1.1版本中,最主要的流程都有了。但此版本只是一个单机系统。还不是分布式系统。

后续版本,待有时间的时候,再做分析。

龚浩华

月牙寂道长

qq:29185807

2019年05月07日

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019年05月07日,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档