前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >高性能消息中间件 NSQ 解析-应用实践

高性能消息中间件 NSQ 解析-应用实践

作者头像
aoho求索
发布2021-03-16 11:11:06
5210
发布2021-03-16 11:11:06
举报
文章被收录于专栏:aoho求索

Nsq 是用 Go 语言开发的轻量级的分布式消息队列,适合小型项目使用、用来学习消息队列实现原理,对于学习 Go channel的原理和用法,以及如何用 Go 语言来写分布式是一个很不错的入门项目。

我们在上一篇文章整体介绍了 nsq 的组成以及各个模块的功能,本文将会带领大家一起实践 nsq 的安装,并基于 nsq 提供的 API 进行实践。

安装使用

在官网(https://nsq.io/overview/quick_start.html) 下载对应的二进制可执行文件。

代码语言:javascript
复制
# 启动nsqlookupd
$ nsqlookupd
# 启动 nsqd
$ nsqd --lookupd-tcp-address=127.0.0.1:4160
# 启动 nsqadmin
$ nsqadmin --lookupd-http-address=127.0.0.1:4161

# 创建topic,发送消息
$ curl -d 'hello world 1' 'http://127.0.0.1:4151/pub?topic=test'
# 启动nsq_to_file
$ nsq_to_file --topic=test --output-dir=/tmp --lookupd-http-address=127.0.0.1:4161
# 发布消息到 nsqd
$ curl -d 'hello world 2' 'http://127.0.0.1:4151/pub?topic=test'
$ curl -d 'hello world 3' 'http://127.0.0.1:4151/pub?topic=test'

在本地按照上述步骤就可以跑起来了。

创建生产者

安装好 nsq 的几个服务之后,我们来实现基于 nsq 的生产和消费示例。首先是创建生产者:

代码语言:javascript
复制
package main

import (
  "fmt"
  "log"
  "time"

  "github.com/nsqio/go-nsq"
)

func main() {
  config := nsq.NewConfig()
  p, err := nsq.NewProducer("127.0.0.1:4150", config)

  if err != nil {
    log.Panic(err)
  }

  for i := 0; i < 1000; i++ {
    msg := fmt.Sprintf("num-%d", i)
    log.Println("Pub:" + msg)
    err = p.Publish("testTopic", []byte(msg))
    if err != nil {
      log.Panic(err)
    }
    time.Sleep(time.Second * 1)
  }

  p.Stop()
}

生产者的逻辑比较简单,基于 nsq 官方提供的 github.com/nsqio/go-nsq包,通过调用,循环写 1000 个字符+数字,即 num-n 的形式,通过 p.Publish 发送到消息队列中,等待消费。

消费者

接着,我们创建消费者:consumer.go 来消费刚刚生产的消息。

代码语言:javascript
复制
package main

import (
  "log"
  "sync"

  "github.com/nsqio/go-nsq"
)

func main() {
  wg := &sync.WaitGroup{}
  wg.Add(1000)

  config := nsq.NewConfig()
  c, _ := nsq.NewConsumer("testTopic", "ch", config)
  c.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error {
    log.Printf("Got a message: %s", message.Body)
    wg.Done()
    return nil
  }))

  // 1.直连nsqd
  // err := c.ConnectToNSQD("127.0.0.1:4150")

  // 2.通过 nsqlookupd 服务发现
  err := c.ConnectToNSQLookupd("127.0.0.1:4161")
  if err != nil {
    log.Panic(err)
  }
  wg.Wait()
}

可通过两种方式与 nsqd 连接:

  • 直连 nsqd,适用于单机(standalone)版;
  • 通过 nsqlookupd 服务发现,适用于集群(cluster)版;

消费消息的动作,主要逻辑就是打印出来,实际业务中需要进行其他处理。

运行结果

依次启动生产者和消费者的服务,可以分别看到如下的输出结果:

代码语言:javascript
复制
$go run producer.go

2020/12/28 20:29:51 Pub:num-0
2020/12/28 20:29:51 INF    1 (127.0.0.1:4150) connecting to nsqd
2020/12/28 20:29:52 Pub:num-1
2020/12/28 20:29:53 Pub:num-2
2020/12/28 20:29:54 Pub:num-3
2020/12/28 20:29:55 Pub:num-4
2020/12/28 20:29:56 Pub:num-5
2020/12/28 20:29:57 Pub:num-6
2020/12/28 20:29:58 Pub:num-7
2020/12/28 20:29:59 Pub:num-8
2020/12/28 20:30:00 Pub:num-9
2020/12/28 20:30:01 Pub:num-10

$ go run consumer.go

2020/12/28 20:30:08 INF    1 [testTopic/ch] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=testTopic
2020/12/28 20:30:08 INF    1 [testTopic/ch] (10.236.92.208:4150) connecting to nsqd
2020/12/28 20:30:08 Got a message: num-0
2020/12/28 20:30:08 Got a message: num-1
2020/12/28 20:30:08 Got a message: num-2
2020/12/28 20:30:08 Got a message: num-3
2020/12/28 20:30:08 Got a message: num-4
2020/12/28 20:30:08 Got a message: num-5
2020/12/28 20:30:08 Got a message: num-6
2020/12/28 20:30:08 Got a message: num-7
2020/12/28 20:30:08 Got a message: num-8
2020/12/28 20:30:08 Got a message: num-9
2020/12/28 20:30:08 Got a message: num-10

通过如上的示例,我们已经成功地实现 NSQ 的应用。下面我们将解析 NSQ 的几个核心部分。

小结

本文主要介绍 nsq 的安装使用,下载好可执行文件之后,依次启动 nsqlookupd、nsqd、nsqadmin 几个服务。接着我们基于官方提供的客户端 API 包实现了生产消费模型的案例。通过简单的案例,我们能够对 nsq 的安装和基本使用有一个了解。

下一篇文章,将会具体分析 nsq 实现的细节。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-03-14,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 aoho求索 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 安装使用
    • 创建生产者
      • 消费者
        • 运行结果
        • 小结
        相关产品与服务
        消息队列 CMQ 版
        消息队列 CMQ 版(TDMQ for CMQ,简称 TDMQ CMQ 版)是一款分布式高可用的消息队列服务,它能够提供可靠的,基于消息的异步通信机制,能够将分布式部署的不同应用(或同一应用的不同组件)中的信息传递,存储在可靠有效的 CMQ 队列中,防止消息丢失。TDMQ CMQ 版支持多进程同时读写,收发互不干扰,无需各应用或组件始终处于运行状态。
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档