使用Go 操作RabbitMQ 收发消息,可以 使用Go RabbitMQ客户端库 连接 RabbitMQ 来实现。
amqp 类库 是使用Go 操作 RabbitMQ 的一个 Go RabbitMQ客户端
在安装好 RabbitMQ 服务端后,就可以使用 Go 开发客户端程序来连接RabbitMQ,发送消息,或者取消息。
开始之前
要连接使用 RabbitMQ 首先要了解 AMQP 协议的基本概念,我的另一篇文章
做了介绍,本文末也有一些 AMQP的一些资源。
一些基本概念:
消息队列
本文编写两个示例:
新建一个文件 send.go,编写 go 代码。
步骤分解如下:
(1)建立连接
conn, err := amqp.Dial("amqp://admin:admin@dev.com:5672/")
(2)打开channel
这里的channel 是AMQP 里的概念,可以理解为 多路复用的一个tcp长连接。
(3)声明一个队列
q, err := ch.QueueDeclare( ... )
(4)创建消息
msg := amqp.Publishing{ ... }
(5)发布消息
err = ch.Publish( ... )
代码如下:
package main
import (
"github.com/streadway/amqp"
"log"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// 连接 RabbitMQ
conn, err := amqp.Dial("amqp://admin:admin@dev.com:5672/")
failOnError(err, "连接失败")
defer conn.Close()
// 建立一个 channel ( 其实就是TCP连接 )
ch, err := conn.Channel()
failOnError(err, "打开通道失败")
defer ch.Close()
// 创建一个名字叫 "hello" 的队列
q, err := ch.QueueDeclare(
"hello", // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "创建队列失败")
// 构建一个消息
body := "Hello World!"
msg := amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
}
// 构建一个生产者,将消息 放入队列
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
msg)
log.Printf(" [x] Sent %s", body)
failOnError(err, "Failed to publish a message")
}
步骤分解如下:
(1)建立连接
conn, err := amqp.Dial("amqp://admin:admin@dev.com:5672/")
(2)打开channel
这里的channel 是AMQP 里的概念,可以理解为 多路复用的一个tcp长连接。
(3)声明一个队列
q, err := ch.QueueDeclare( ... )
(4)构建一个消费者
msgChan, err := ch.Consume( ... )
(5)不断的读取消息
for d := range msgChan {
log.Printf("收到消息: %s", d.Body)
}
新建一个文件 receive.go,编写 go 代码。
package main
import (
"github.com/streadway/amqp"
"log"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// 连接 RabbitMQ
conn, err := amqp.Dial("amqp://admin:admin@dev.com:5672/")
failOnError(err, "连接失败")
defer conn.Close()
// 建立一个 channel ( 其实就是TCP连接 )
ch, err := conn.Channel()
failOnError(err, "打开通道失败")
defer ch.Close()
// 创建一个名字叫 "hello" 的队列
q, err := ch.QueueDeclare(
"hello", // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "创建队列失败")
// 开启一个 消费者
// 返回值是 ch 类型
msgChan, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "注册消费者 ,失败")
//帮助阻塞
forever := make(chan bool)
// 开启一个 go程
go func() {
for d := range msgChan {
log.Printf("收到消息: %s", d.Body)
}
}()
log.Printf(" 等待消息...")
<-forever
}
方法:
执行后的效果截图:
image.png
遇到 “Reason: "username or password not allowed"”
缺少权限,可能账户密码错误,也可能使用了 guest 账户未处理远程连接。考虑新建一个高权限的用户。
新建账户的方法请参考我的另一篇文章。
遇到 “no access to this vhost”
为 admin 赋予权限,使之可以访问 vhost
下面的指令 为 admin 赋予权限,使得可以访问 vhost 名字为 / 的资源。
rabbitmqctl set_permissions -p / admin "." "." "."
说明:
/ 是个 vhost 资源名称
"." "." "." 标识权限的类型,和读写权限。
https://www.rabbitmq.com/tutorials/tutorial-one-go.html
https://godoc.org/github.com/streadway/amqp
AMQP的一些资源
http://www.rabbitmq.com/tutorials/amqp-concepts.html
http://www.rabbitmq.com/getstarted.html
http://www.rabbitmq.com/amqp-0-9-1-reference.html
END