由于每次发布的连接打开成本很高,我正在尝试实现某种方式来保持连接的活力,并在我的应用程序中共享它来发布消息。
var (
Connection *amqp.Connection
Channel *amqp.Channel
err error
)
func Connect() {
Connection, err = amqp.Dial("amqp://guest:guest@localhost:5672")
FailOnError(err, "Failed to connect to RabbitMQ")
Channel, err = Connection.Channel()
FailOnError(err, "Failed to open a channel")
}
func CloseConnection() {
err = Channel.Close()
FailOnError(err, "Failed to close channel ")
err = Connection.Close()
FailOnError(err, "Failed to close connection ")
}
func KeepAlive() {
queue, err := Channel.QueueDeclare(
"hello", // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
FailOnError(err, "couldn't publish tics")
tic := "tic"
for {
err := Channel.Publish(
"", // exchange
queue.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing {
ContentType: "text/plain",
Body: []byte(tic),
Expiration: "5000",
})
FailOnError(err, "couldn't publish tics")
time.Sleep(5 *time.Second)
}
}
func FailOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
函数KeepAlive
是一个无限循环,它每5秒发送一条虚拟消息,该消息的TTL也是5秒,因此它会被销毁。
func main() {
rabbitmq.Connect()
defer rabbitmq.CloseConnection()
go func() {
//publisher connection to stay alive as long as application is running
rabbitmq.KeepAlive()
}()
data_layer.OpenDBConnection()
router := gin.Default()
router.POST("/whatever", whatever)
err := router.Run()
if err != nil {
log.Fatal(err.Error())
}
}
这里,我正在创建连接,并在一个goroutine中调用KeepAlive
,这样它就可以在后台工作,从而使我的连接始终处于活动状态。
我的问题:
附带注意:这些被发送的抽象组将被发送到一个虚拟队列,因为如果我将它发送到我的队列中,我使用另一个服务来消费消息,它将被卡在没有TTL的实际消息后面,并且这些抽动会变得非常大。
发布于 2021-08-23 17:16:49
使用streadway/amqp
,您不需要自己实现“保持活动”。库已经提供了此机制。
方法amqp.Dial
构造一个默认心跳为10秒的Connection
。您可以看到代码这里
// connection.go
func Dial(url string) (*Connection, error) {
return DialConfig(url, Config{
Heartbeat: defaultHeartbeat,
Locale: defaultLocale,
})
}
这是通过在开放连接上发送心跳帧来实现的,它将比向仅为这个原因创建的队列发送假消息更有效率和可维护性。
如下所示,您可以使用amqp.DialConfig
更改连接心跳
conn, err := amqp.DialConfig(url, amqp.Config{
Heartbeat: 5 * time.Second,
})
您可能希望自己实现的是错误上的重新连接逻辑。为此,您可以在这里找到一些有用的信息:如何检查通道是否仍在streadway/amqp RabbitMQ客户端中工作?
https://stackoverflow.com/questions/68894836
复制相似问题