golang 开发环境(version >= 1.2) 下源码,配置环境变量,执行安装脚本
gpm 依赖包管理器 ubantu: sudo apt-get intall gpm
ps:nsqlookupd与nsqadmin为辅助进程,可不使用直接用nsqd也可正常工作.
这里开启的进程均用默认的端口
package main
import (
"fmt"
"time"
"github.com/nsqio/go-nsq"
)
// nsq发布消息
func Producer() {
p, err := nsq.NewProducer("127.0.0.1:4150", nsq.NewConfig()) // 新建生产者
if err != nil {
panic(err)
}
if err := p.Publish("test", []byte("hello NSQ!!!")); err != nil { // 发布消息
panic(err)
}
}
// nsq订阅消息
type ConsumerT struct{}
func (*ConsumerT) HandleMessage(msg *nsq.Message) error {
fmt.Println(string(msg.Body))
return nil
}
func Consumer() {
c, err := nsq.NewConsumer("test", "test-channel", nsq.NewConfig()) // 新建一个消费者
if err != nil {
panic(err)
}
c.AddHandler(&ConsumerT{}) // 添加消息处理
if err := c.ConnectToNSQD("127.0.0.1:4150"); err != nil { // 建立连接
panic(err)
}
}
// 主函数
func main() {
Producer()
Consumer()
time.Sleep(time.Second * 3)
}
// 运行将会打印: hello NSQ!!!
单机使用条件,同步发布消息速度也非常快(10w/s),发布消息端基本无需再做缓存封装。接收端的消息处理应耗时尽量的短,避免消息积累,当消息积累到NSQ的缓存的数量会将多余的消息写到文件,此时也会减缓发送端的发送速度,
因此,对接收端可使用channel和go routine做简单封装处理。若某topic消息产生太快太多也可将其单独使用一个nsqd处理,避免消息积累影响其它消息投递与接收。