前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
社区首页 >专栏 >golang-nsq系列(三)--nsqlookupd源码解析

golang-nsq系列(三)--nsqlookupd源码解析

作者头像
astraw99
发布于 2021-09-22 06:48:13
发布于 2021-09-22 06:48:13
37400
代码可运行
举报
文章被收录于专栏:K8s 系列K8s 系列
运行总次数:0
代码可运行

上一篇 介绍了 nsqd 的代码逻辑与流程图,本篇来解析 nsq 中另一大模块 nsqlookupd,其负责维护 nsqd 节点的拓扑结构信息,实现了去中心化的服务注册与发现。

1. nsqlookupd 执行入口

在 nsq/apps/nsqlookupd/main.go 可以找到执行入口文件,如下:

2. nsqlookupd 执行主逻辑

主要流程与上一篇讲的 nsqd 执行逻辑相似,区别是运行的具体任务不同。

2.1 通过第三方 svc 包进行优雅的后台进程管理,svc.Run() -> svc.Init() -> svc.Start(),启动 nsqlookupd 实例;

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
func main() {
  prg := &program{}
  if err := svc.Run(prg, syscall.SIGINT, syscall.SIGTERM); err != nil {
    logFatal("%s", err)
  }
}

func (p *program) Init(env svc.Environment) error {
  if env.IsWindowsService() {
    dir := filepath.Dir(os.Args[0])
    return os.Chdir(dir)
  }
  return nil
}

func (p *program) Start() error {
  opts := nsqlookupd.NewOptions()

  flagSet := nsqlookupdFlagSet(opts)
  ...
}

2.2 初始化配置参数(优先级:flagSet-命令行参数 > cfg-配置文件 > opts-默认值),开启协程,进入 nsqlookupd.Main() 主函数;

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
options.Resolve(opts, flagSet, cfg)
  nsqlookupd, err := nsqlookupd.New(opts)
  if err != nil {
    logFatal("failed to instantiate nsqlookupd", err)
  }
  p.nsqlookupd = nsqlookupd

  go func() {
    err := p.nsqlookupd.Main()
    if err != nil {
      p.Stop()
      os.Exit(1)
    }
  }()

2.3 开启 goroutine 执行 tcpServer, httpServer,分别监听 nsqd, nsqadmin 的客户端请求;

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
func (l *NSQLookupd) Main() error {
  ctx := &Context{l}

  exitCh := make(chan error)
  var once sync.Once
  exitFunc := func(err error) {
    once.Do(func() {
      if err != nil {
        l.logf(LOG_FATAL, "%s", err)
      }
      exitCh <- err
    })
  }

  tcpServer := &tcpServer{ctx: ctx}
  l.waitGroup.Wrap(func() {
    exitFunc(protocol.TCPServer(l.tcpListener, tcpServer, l.logf))
  })
  httpServer := newHTTPServer(ctx)
  l.waitGroup.Wrap(func() {
    exitFunc(http_api.Serve(l.httpListener, httpServer, "HTTP", l.logf))
  })

  err := <-exitCh
  return err
}

2.4 TCPServer 循环监听客户端请求,建立长连接进行通信,并开启 handler 处理每一个客户端 conn;

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
func TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) error {
  logf(lg.INFO, "TCP: listening on %s", listener.Addr())

  for {
    clientConn, err := listener.Accept()
    if err != nil {
      if nerr, ok := err.(net.Error); ok && nerr.Temporary() {
        logf(lg.WARN, "temporary Accept() failure - %s", err)
        runtime.Gosched()
        continue
      }
      // theres no direct way to detect this error because it is not exposed
      if !strings.Contains(err.Error(), "use of closed network connection") {
        return fmt.Errorf("listener.Accept() error - %s", err)
      }
      break
    }
    go handler.Handle(clientConn)
  }

  logf(lg.INFO, "TCP: closing %s", listener.Addr())

  return nil
}

2.5 httpServer 通过 http_api.Decorate 装饰器实现对各 http 路由进行 handler 装饰,如加 log 日志、V1 协议版本号的统一格式输出等;

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
func newHTTPServer(ctx *Context) *httpServer {
  log := http_api.Log(ctx.nsqlookupd.logf)

  router := httprouter.New()
  router.HandleMethodNotAllowed = true
  router.PanicHandler = http_api.LogPanicHandler(ctx.nsqlookupd.logf)
  router.NotFound = http_api.LogNotFoundHandler(ctx.nsqlookupd.logf)
  router.MethodNotAllowed = http_api.LogMethodNotAllowedHandler(ctx.nsqlookupd.logf)
  s := &httpServer{
    ctx:    ctx,
    router: router,
  }

  router.Handle("GET", "/ping", http_api.Decorate(s.pingHandler, log, http_api.PlainText))
  router.Handle("GET", "/info", http_api.Decorate(s.doInfo, log, http_api.V1))

  // v1 negotiate
  router.Handle("GET", "/debug", http_api.Decorate(s.doDebug, log, http_api.V1))
  router.Handle("GET", "/lookup", http_api.Decorate(s.doLookup, log, http_api.V1))
  router.Handle("GET", "/topics", http_api.Decorate(s.doTopics, log, http_api.V1))
  router.Handle("GET", "/channels", http_api.Decorate(s.doChannels, log, http_api.V1))
}

2.6 tcp 解析 V1 协议,走内部协议封装的 prot.IOLoop(conn) 进行循环处理客户端命令,直到客户端命令全部解析处理完毕才关闭连接;

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
var prot protocol.Protocol
  switch protocolMagic {
  case "  V1":
    prot = &LookupProtocolV1{ctx: p.ctx}
  default:
    protocol.SendResponse(clientConn, []byte("E_BAD_PROTOCOL"))
    clientConn.Close()
    p.ctx.nsqlookupd.logf(LOG_ERROR, "client(%s) bad protocol magic '%s'",
      clientConn.RemoteAddr(), protocolMagic)
    return
  }

  err = prot.IOLoop(clientConn)

2.7 通过内部协议进行 p.Exec(执行命令)、p.SendResponse(返回结果),保证每个 nsqd 节点都能正确的进行服务注册(register)与注销(unregister),并进行心跳检测(ping)节点的可用性,确保客户端取到的 nsqd 节点列表都是最新可用的。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
for {
    line, err = reader.ReadString('\n')
    if err != nil {
      break
    }

    line = strings.TrimSpace(line)
    params := strings.Split(line, " ")

    var response []byte
    response, err = p.Exec(client, reader, params)
    if err != nil {
      ctx := ""
      if parentErr := err.(protocol.ChildErr).Parent(); parentErr != nil {
        ctx = " - " + parentErr.Error()
      }
      _, sendErr := protocol.SendResponse(client, []byte(err.Error()))
      if sendErr != nil {
        p.ctx.nsqlookupd.logf(LOG_ERROR, "[%s] - %s%s", client, sendErr, ctx)
        break
      }
      continue
    }

    if response != nil {
      _, err = protocol.SendResponse(client, response)
      if err != nil {
        break
      }
    }
  }

  conn.Close()

3. nsqlookupd 流程图小结

上述流程小结示意图如下:

【小结】通过源码阅读与解析,可以看出 nsqlookupd 的作用就是管理 nsqd 节点的认证、注册、注销、心跳检测,动态维护分布式集群中最新可用的 nsqd 节点列表供客户端取用;

源码中使用了很多 RWMutex 读写锁、interface 协议公共接口、goroutine/channel 协程间并发通信,从而保证了高可用、高吞吐量的应用能力。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2019-11-23,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 稻草人生 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
golang-nsq系列(二)--nsqd源码解析
上一篇初识了 nsq 三个模块(nsqd, nsqlookupd, nsqadmin)的 demo演示,本篇则从源码开始,一步一步去解析 nsqd 的执行流程和逻辑处理,学习别人优秀的项目架构,以期学以致用。
astraw99
2021/09/22
4610
golang-nsq系列(二)--nsqd源码解析
golang-nsq系列(四)--源码解析总结篇
随着互联网技术在各行各业的应用高速普及与发展,各层应用之间调用关系越来越复杂,架构、开发、运维成本越来越高,高内聚、低耦合、可扩展、高可用已成为了行业需求。
astraw99
2021/09/22
1.4K0
golang-nsq系列(四)--源码解析总结篇
高性能消息中间件 NSQ 解析-nsqd 的实现介绍
我们在前面介绍了 nsq 的相关概念以及 nsq 的安装与应用。从本篇开始将会结合源码介绍 nsq 的实现细节。
aoho求索
2021/04/02
1.2K0
高性能消息中间件 NSQ 解析-nsqd 的实现介绍
golang 源码分析:minio(part I)路由
MinIO的命令行启动只有2个命令,一个是server、一个是gateway,分别用于启动服务和网关,而整个MinIO的启动是从main.go文件开始的
golangLeetcode
2022/08/03
1K0
Golang 语言编写的消息队列 NSQ 官方客户端 go-nsq 怎么使用?
NSQ 是 Golang 语言编写的实时分布式消息传递平台(也可以理解为消息队列),它主要由三个守护进程组成,分别是 nsqd、nsqlookupd 和 nsqadmin。其中 nsqd 是核心组成部分,它负责处理客户端的请求,比如生产、排序和消费消息等;nsqlookupd 负责管理集群拓扑信息和提供一个最终一致性的发现服务,nsqadmin 是一个 web 界面的管理平台,可以用于实时查看集群信息和执行其他管理操作。单个 nsqd可以含有很多 topic,每个 topic 可以含有很多 channel。
frank.
2021/11/02
1.8K0
go-nsq使用简述
一 环境依赖:   golang 开发环境(version >= 1.2) 下源码,配置环境变量,执行安装脚本   gpm 依赖包管理器 ubantu: sudo apt-get intall gpm 二 NSQ安装: git获取源码: mkdir -p $GOPATH/src/github.com/nsqio;cd $GOPATH/src/github.com/nsqio;git clone https://github
李海彬
2018/03/23
1.3K0
Go之NSQ简介,原理和使用
参考上图利用消息队列把业务流程中的非关键流程异步化, 从而显著降低业务请求的响应时间
iginkgo18
2020/10/30
3.9K0
Go之NSQ简介,原理和使用
nsq部署_andlua辅助源码
看完lookupd和nsqd之后我们再来看下nsq client端的代码。 我是想把nsq系统完完整整的看一遍,从而对他形成一个更整体的 认识。对message queue来说他的client端就是生产者和消费者,生产者负责想nsq中投递消息,消费者负责从lookupd中获取到 指定nsqd之后,从nsqd中获取消息。
全栈程序员站长
2022/08/04
1.5K0
golang源码分析:grpc 链接池(2)
继续上一篇golang源码分析:grpc 链接池(1),我们从源码来分析,我们将从连接池的建立,请求发起的时候获取连接,以及最终关闭连接三个流程进行源码分析。
golangLeetcode
2023/03/01
6560
golang源码分析:grpc 链接池(2)
剖析nsq消息队列(二) 去中心化源码解析
在上一篇帖子剖析nsq消息队列(一) 简介及去中心化实现原理中,我介绍了nsq的两种使用方式,一种是直接连接,还有一种是通过nslookup来实现去中心化的方式使用,并大概说了一下实现原理,没有什么难理解的东西,这篇帖子我把nsq实现去中心化的源码和其中的业物逻辑展示给大家看一下。
lpxxn
2019/09/29
9540
剖析nsq消息队列(二) 去中心化源码解析
kratos源码分析系列(6)
直接获取当前节点:selector/node/direct/direct.go
golangLeetcode
2023/09/06
5320
kratos源码分析系列(6)
k8s源码-kube-apiserver主流程解析
apiserver是kubernetes的api server,运行在集群的master上,提供集群的管理API服务。
ascehuang
2020/01/21
2.8K0
k8s源码-kube-apiserver主流程解析
golang源码分析之:go-mitmproxy
日常开发中,我们除了使用charles、finder抓包外,也可以使用mitmproxy抓包,并且它还提供二次开发能力。在学习mitmproxy之前,我们先学习下他的go版本的精简实现github.com/lqqyt2423/go-mitmproxy/cmd/go-mitmproxy
golangLeetcode
2024/07/06
3160
golang源码分析之:go-mitmproxy
nsq (三) 消息传输的可靠性和持久化[一]
上两篇帖子主要说了一下nsq的拓扑结构,如何进行故障处理和横向扩展,保证了客户端和服务端的长连接,连接保持了,就要传输数据了,nsq如何保证消息被订阅者消费,如何保证消息不丢失,就是今天要阐述的内容。
lpxxn
2019/11/03
2.2K0
golang-nsq系列(一)--初识
nsq 最初是由 bitly 公司开源出来的一款简单易用的分布式消息中间件,它可用于大规模系统中的实时消息服务,并且每天能够处理数亿级别的消息。
astraw99
2021/09/22
6660
golang-nsq系列(一)--初识
高性能消息中间件 NSQ 解析-应用实践
Nsq 是用 Go 语言开发的轻量级的分布式消息队列,适合小型项目使用、用来学习消息队列实现原理,对于学习 Go channel的原理和用法,以及如何用 Go 语言来写分布式是一个很不错的入门项目。
aoho求索
2021/03/16
5660
开源代码学习-nsq(v0.1.5版本)源码分析
版权声明:本文为作者原创,如需转载请通知本人,并标明出处和作者。擅自转载的,保留追究其侵权的权利。golang群:570992072。qq 29185807 个人公众号:月牙寂道长 公众号微信号yueyajidaozhang https://blog.csdn.net/screscent/article/details/90043013
月牙寂道长
2019/07/02
7070
开源代码学习-nsq(v0.1.5版本)源码分析
docker源码分析-Client创建与命令执行
一直在研究docker,最近被人问到docker到底是怎么工作的却不是太清楚,在网上偶然看到一本讲docker源码的电子书,花了整晚看了下,终于对docker的实现细节比较清楚了。但这本电子书讲的是1.2版本时的docker源码,跟最新的1.12版本相比差别还是挺大的,在这本书里讲到的源码与最新源码已经对应不上了。因此我计划写一份针对1.12版本的docker源码分析。 docker的总体架构 这部分基本没有太大的变化,我觉得可以直接参照1.2版本的总体架构,就不重复分析了。见这里。 Client创建与命令
jeremyxu
2018/05/10
1.3K0
golang 源码分析(17):cobra docker
然后创建DockerCli对象,DockerCli对象在cli/cli.go里声明。
golangLeetcode
2022/08/02
5100
微服务之服务注册和服务发现篇
seth-shi
2023/12/18
1810
微服务之服务注册和服务发现篇
相关推荐
golang-nsq系列(二)--nsqd源码解析
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
查看详情【社区公告】 技术创作特训营有奖征文
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验