首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >如何用RabbitMQ streadway/amqp发布消息时保持我的连接正常?

如何用RabbitMQ streadway/amqp发布消息时保持我的连接正常?
EN

Stack Overflow用户
提问于 2021-08-23 14:57:02
回答 1查看 1.9K关注 0票数 2

由于每次发布的连接打开成本很高,我正在尝试实现某种方式来保持连接的活力,并在我的应用程序中共享它来发布消息。

代码语言:javascript
运行
复制
var (
    Connection *amqp.Connection
    Channel *amqp.Channel
    err error
)

func Connect() {

    Connection, err = amqp.Dial("amqp://guest:guest@localhost:5672")
    FailOnError(err, "Failed to connect to RabbitMQ")

    Channel, err = Connection.Channel()
    FailOnError(err, "Failed to open a channel")
}

func CloseConnection() {
    err = Channel.Close()
    FailOnError(err, "Failed to close channel ")
    err = Connection.Close()
    FailOnError(err, "Failed to close connection ")
}

func KeepAlive() {

    queue, err := Channel.QueueDeclare(
        "hello", // name
        false,   // durable
        false,   // delete when unused
        false,   // exclusive
        false,   // no-wait
        nil,     // arguments
    )
    FailOnError(err, "couldn't publish tics")

    tic := "tic"
    for {
        err := Channel.Publish(
            "",         // exchange
            queue.Name, // routing key
            false,      // mandatory
            false,      // immediate
            amqp.Publishing {
                ContentType: "text/plain",
                Body:        []byte(tic),
                Expiration: "5000",
            })
        FailOnError(err, "couldn't publish tics")
        time.Sleep(5 *time.Second)
    }
}

func FailOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
    }
}

函数KeepAlive是一个无限循环,它每5秒发送一条虚拟消息,该消息的TTL也是5秒,因此它会被销毁。

代码语言:javascript
运行
复制
func main() {
    rabbitmq.Connect()
    defer rabbitmq.CloseConnection()
    go func() {
        //publisher connection to stay alive as long as application is running
        rabbitmq.KeepAlive()
    }()

    data_layer.OpenDBConnection()
    router := gin.Default()

    router.POST("/whatever", whatever)

    err := router.Run()
    if err != nil {
        log.Fatal(err.Error())
    }
}

这里,我正在创建连接,并在一个goroutine中调用KeepAlive,这样它就可以在后台工作,从而使我的连接始终处于活动状态。

我的问题:

  • 我觉得这种方式只是一项工作,虽然我试图寻找如何让它保持活力的例子,但似乎所有这些例子都对消费者方面感兴趣。有更干净的方法来维持我的联系吗?
  • 只要我的应用程序运行不好,我的连接就会继续存在吗?性能方面(网络,内存使用)?注意:我计划用普罗米修斯来监视这个节目,但是任何关于我可能面临的问题的注意都会有帮助。

附带注意:这些被发送的抽象组将被发送到一个虚拟队列,因为如果我将它发送到我的队列中,我使用另一个服务来消费消息,它将被卡在没有TTL的实际消息后面,并且这些抽动会变得非常大。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-08-23 17:16:49

使用streadway/amqp,您不需要自己实现“保持活动”。库已经提供了此机制。

方法amqp.Dial构造一个默认心跳为10秒的Connection。您可以看到代码这里

代码语言:javascript
运行
复制
// connection.go
func Dial(url string) (*Connection, error) {
    return DialConfig(url, Config{
        Heartbeat: defaultHeartbeat,
        Locale:    defaultLocale,
    })
}

这是通过在开放连接上发送心跳帧来实现的,它将比向仅为这个原因创建的队列发送假消息更有效率和可维护性。

如下所示,您可以使用amqp.DialConfig更改连接心跳

代码语言:javascript
运行
复制
    conn, err := amqp.DialConfig(url, amqp.Config{
        Heartbeat: 5 * time.Second,
    })

您可能希望自己实现的是错误上的重新连接逻辑。为此,您可以在这里找到一些有用的信息:如何检查通道是否仍在streadway/amqp RabbitMQ客户端中工作?

票数 3
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/68894836

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档