专栏首页写代码和思考RabbitMQ 学习笔记3 - 使用amqp库连接RabbitMQ

RabbitMQ 学习笔记3 - 使用amqp库连接RabbitMQ

0. 背景

使用Go 操作RabbitMQ 收发消息,可以 使用Go RabbitMQ客户端库 连接 RabbitMQ 来实现。

1. amqp 类库介绍

amqp 类库 是使用Go 操作 RabbitMQ 的一个 Go RabbitMQ客户端

在安装好 RabbitMQ 服务端后,就可以使用 Go 开发客户端程序来连接RabbitMQ,发送消息,或者取消息。

开始之前 要连接使用 RabbitMQ 首先要了解 AMQP 协议的基本概念,我的另一篇文章 做了介绍,本文末也有一些 AMQP的一些资源。

一些基本概念:

  • 生产者:一个发送消息的程序,它产生消息并发送到队列。这里是用Go写的发送端示程序例。
  • 消息队列:即 RabbitMQ 内部的队列,它安装在一个服务器中。做为消息中间件,它与具体开发语言无关,支持 Go,Java等接入连接。
  • 消费者:消费者是一个等待消息,接收消息的接收端程序示例

消息队列

本文编写两个示例:

  • 发送端:一个生产者发送一条消息
  • 接收端:以及一个接收消息并打印出来的消费者。

2. 开始使用

2.1 发送端

新建一个文件 send.go,编写 go 代码。

步骤分解如下: (1)建立连接 conn, err := amqp.Dial("amqp://admin:admin@dev.com:5672/")

(2)打开channel 这里的channel 是AMQP 里的概念,可以理解为 多路复用的一个tcp长连接。

(3)声明一个队列 q, err := ch.QueueDeclare( ... )

(4)创建消息 msg := amqp.Publishing{ ... }

(5)发布消息 err = ch.Publish( ... )

代码如下:

package main

import (
    "github.com/streadway/amqp"
    "log"
)

func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
    }
}

func main() {
    // 连接 RabbitMQ
    conn, err := amqp.Dial("amqp://admin:admin@dev.com:5672/")
    failOnError(err, "连接失败")
    defer conn.Close()

    // 建立一个 channel ( 其实就是TCP连接 )
    ch, err := conn.Channel()
    failOnError(err, "打开通道失败")
    defer ch.Close()

    // 创建一个名字叫 "hello" 的队列
    q, err := ch.QueueDeclare(
        "hello", // name
        false,   // durable
        false,   // delete when unused
        false,   // exclusive
        false,   // no-wait
        nil,     // arguments
    )
    failOnError(err, "创建队列失败")

    // 构建一个消息
    body := "Hello World!"
    msg := amqp.Publishing{
        ContentType: "text/plain",
        Body:        []byte(body),
    }

    // 构建一个生产者,将消息 放入队列
    err = ch.Publish(
        "",     // exchange
        q.Name, // routing key
        false,  // mandatory
        false,  // immediate
        msg)
    log.Printf(" [x] Sent %s", body)
    failOnError(err, "Failed to publish a message")
}

2.2 接收端

步骤分解如下: (1)建立连接 conn, err := amqp.Dial("amqp://admin:admin@dev.com:5672/")

(2)打开channel 这里的channel 是AMQP 里的概念,可以理解为 多路复用的一个tcp长连接。

(3)声明一个队列 q, err := ch.QueueDeclare( ... )

(4)构建一个消费者 msgChan, err := ch.Consume( ... )

(5)不断的读取消息

for d := range msgChan { log.Printf("收到消息: %s", d.Body) }

新建一个文件 receive.go,编写 go 代码。

package main

import (
    "github.com/streadway/amqp"
    "log"
)

func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
    }
}

func main() {
    // 连接 RabbitMQ
    conn, err := amqp.Dial("amqp://admin:admin@dev.com:5672/")
    failOnError(err, "连接失败")
    defer conn.Close()

    // 建立一个 channel ( 其实就是TCP连接 )
    ch, err := conn.Channel()
    failOnError(err, "打开通道失败")
    defer ch.Close()

    // 创建一个名字叫 "hello" 的队列
    q, err := ch.QueueDeclare(
        "hello", // name
        false,   // durable
        false,   // delete when unused
        false,   // exclusive
        false,   // no-wait
        nil,     // arguments
    )
    failOnError(err, "创建队列失败")

    // 开启一个 消费者
    // 返回值是 ch 类型
    msgChan, err := ch.Consume(
        q.Name, // queue
        "",     // consumer
        true,   // auto-ack
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )
    failOnError(err, "注册消费者 ,失败")

    //帮助阻塞
    forever := make(chan bool)

    // 开启一个 go程
    go func() {
        for d := range msgChan {
            log.Printf("收到消息: %s", d.Body)
        }
    }()

    log.Printf(" 等待消息...")
    <-forever
}

2.3 执行

方法:

  • 先开启一个命令行窗口,执行 go run receive.go
  • 再开启一个命令行窗口,执行 go run send.go ,可多次执行看效果

执行后的效果截图:

image.png

3. 可能遇到的问题

遇到 “Reason: "username or password not allowed"” 缺少权限,可能账户密码错误,也可能使用了 guest 账户未处理远程连接。考虑新建一个高权限的用户。 新建账户的方法请参考我的另一篇文章

遇到 “no access to this vhost” 为 admin 赋予权限,使之可以访问 vhost 下面的指令 为 admin 赋予权限,使得可以访问 vhost 名字为 / 的资源。

rabbitmqctl set_permissions -p / admin "." "." "." 说明: / 是个 vhost 资源名称 "." "." "." 标识权限的类型,和读写权限。

4. 参考

https://www.rabbitmq.com/tutorials/tutorial-one-go.html

https://godoc.org/github.com/streadway/amqp

AMQP的一些资源 http://www.rabbitmq.com/tutorials/amqp-concepts.html http://www.rabbitmq.com/getstarted.html http://www.rabbitmq.com/amqp-0-9-1-reference.html

END

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Go 学习笔记1 - 通过http包发送网络请求

    http 包 提供了 HTTP 客户端实现,和服务端的实现。 通过 http 包,我们可以发送网络请求,get, post 等。

    zhangyunfeiVir
  • 在Android中使用logback-android日志框架配置 slf4j + logback

    logbak定位于log4j的替代者,logback同样支持slf4j,方便被替换。在Android平台上,我在使用log4中遇到tag混乱的问题。相比log4...

    zhangyunfeiVir
  • gRPC学习笔记2 - 示例

    两个微服务之间通过基于 HTTP 2.0 二进制数据帧通信,使用 gRPC 内置的 protobuf 协议,其 DSL 语法 可清晰定义服务间通信的数据结构

    zhangyunfeiVir
  • 如何优雅地处理Async/Await的异常?

    async/await 中的异常处理很让人混乱。尽管有很多种方式来应对async 函数的异常,但是连经验丰富的开发者有时候也会搞错。

    Fundebug
  • async简单使用

           node的异步io虽然好用,但是控制异步流程确实一个比较麻烦的事情,比如在爬虫中控制并发数量,避免并发过大导致网站宕机或被加入黑名单。因此需要一个...

    用户2038589
  • 大数据技术之_19_Spark学习_07_Spark 性能调优小结

    ========== Spark 的监控方式 ========== 1、Spark Web UI Spark 内置应用运行监控工具(提供了应用...

    黑泽君
  • 认识Flume(一)

    Apache Flume是一个分布式的、可靠的和可用的系统,用于有效地收集、聚合和将大量日志数据从许多不同的源移动到集中的数据存储。

    用户3467126
  • nodejs之async模块

    async模块是为了解决嵌套金字塔,和异步流程控制而生,常用方法有series、parallel、waterfall、parallelLimit、auto、wh...

    无邪Z
  • IO设计模式之Reactor和Proactor

    上面文章中,我们提到不同的操作系统实现的io策略可能不一样,即使是同一个操作系统也可能存在多重io策略,常见如linux上的select,poll,epoll,...

    我是攻城师
  • 不仅性能秒杀Hadoop,现在连分布式集群功能也开源了

    2020年8月3日,涛思数据团队正式宣布,物联网大数据平台TDengine集群版开源。此次开源,我们在GitHub上传了23.9万行源代码,1198个源文件,包...

    数据猿

扫码关注云+社区

领取腾讯云代金券