有奖捉虫:办公协同&微信生态&物联网文档专题 HOT

操作场景

本文以调用 Go SDK 为例介绍通过开源 SDK 实现消息收发的操作过程,帮助您更好地理解消息收发的完整过程。

前提条件

操作步骤

1. 执行如下命令在客户端环境安装所需包。
go get "github.com/rabbitmq/amqp091-go"
2. 安装完成后,即可引入到您的 GO 工程文件中。
import (amqp "github.com/rabbitmq/amqp091-go")
3. 引入之后即可在您的项目中使用客户端。

使用示例

1. 建立连接和通信信道。
// 所需参数
const (
host = "amqp-xx.rabbitmq.x.tencenttdmq.com" // 服务接入地址
username = "roleName" // 角色控制台对应的角色名称
password = "eyJrZX..." // 角色对应的密钥
vhost = "amqp-xx|Vhost" // 要使用的Vhost全称
)
// 创建连接
conn, err := amqp.Dial("amqp://" + username + ":" + password + "@" + host + ":5672/" + vhost)
failOnError(err, "Failed to connect to RabbitMQ")
defer func(conn *amqp.Connection) {
err := conn.Close()
if err != nil {
}
}(conn)

// 建立通道
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer func(ch *amqp.Channel) {
err := ch.Close()
if err != nil {
}
}(ch)
参数
说明
host
集群接入地址,在集群基本信息页面的客户端接入模块获取。



username
用户名称,填写在控制台创建的用户名称。
password
用户密码,填写在控制台创建用户时填写的密码。
vhost
Vhost 名称,在控制台 Vhost 列表获取。
2. 声明交换机。
// 声明交换机 (名称和类型需要与存在的交换机保持一致)
err = ch.ExchangeDeclare(
"logs-exchange", // 交换机名称
"fanout", // 交换机类型
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a exchange")
3. 发布消息。 消息可发给交换机,也可以直接发到指定队列 ( hello world 和 work queues 消息模型)。
发布消息到交换机:
// 消息内容
body := "this is new message."
// 发布消息到交换机
err = ch.Publish(
"logs-exchange", // exchange
"", // routing key (根据使用的交换机类型可选择的是否需要routing key),如果不选择交换机,该参数为消息队列名称
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")

发布消息到指定队列:
// 发布消息到指定的消息队列
err = ch.Publish(
"", // exchange
queue.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
4. 订阅消息。
// 创建消费者并消费指定消息队列中的消息
msgs, err := ch.Consume(
"message-queue", // message-queue
"", // consumer
false, // 设置为非自动确认(可根据需求自己选择)
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")

// 获取消息队列中的消息
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
t := time.Duration(1)
time.Sleep(t * time.Second)
// 手动回复ack
d.Ack(false)
}
}()
log.Printf(" [Consumer] Waiting for messages.")
<-forever
5. 消费者使用 routing key。
// 需要在消息队列中指定 交换机 和 routing key
err = ch.QueueBind(
q.Name, // queue name
"routing_key", // routing key
"topic_demo", // exchange
false,
nil,
)
failOnError(err, "Failed to bind a queue")
说明
详细使用示例可参见 DemoRabbitMQ 官网