专栏首页西安-晁州rabbitmq消息队列——"Hello World!"

rabbitmq消息队列——"Hello World!"

RabbitMQ

一、”Hello World!”

1、简介:

         RabbitMQ是一种消息中间件,主要思想很简单:接收消息并转发。你可以将它设想为一个邮局:你往里面发送邮件并确保邮件能实际运达,RabbitMQ好比这里的邮箱、邮局和邮递员的角色。

         RabbitMQ和邮局的一个主要区别是,RabbitMQ仅仅接收、存储、转发这些数据包裹——message。

先来看下RabbitMQ中的一些关键术语:

a)、生产(者):除了发送什么意义都没有。一个发送消息的应用就是一个生产者,使用如下描述:

b)、队列:储存消息的“容器”,可以储存任意多的message——本质上是一个无限长度的缓冲区,多个生产者可以将消息发送至同一队列,多个消费者也可以从同一队列中接收消息。队列使用如下描述,”queue_name”是该队列的名称:

c)、消费(者):一个消费者就好比一个用来等待接收消息的程序。使用如下来描述:

2、”Hello World!”(使用Go RabbitMQ客户端)

         这节我们将使用Go写两个小程序:一个生产者用来发送单一消息,一个消费者用来接收这些消息并打印。图示如下:

备注:这里使用“amqp”包,自行安装:
go get github.com/streadway/amqp

发送:

         首先创建一个send.go和receiver.go分别用来发送和接收,发送方会链接RabbitMQ服务器、发送消息然后退出。

在send.go,首先导入相关包:

package main

import (
  "fmt"
  "log"

  "github.com/streadway/amqp"
)

再加一个出错处理类:

func failOnError(err error, msg string) {
  if err != nil {
    log.Fatalf("%s: %s", msg, err)
    panic(fmt.Sprintf("%s: %s", msg, err))
  }
}

连接到服务器:

conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")  //一般默认端口5672
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()

这里的conn对象是抽象了底层的socket链接,在conn的基础上我们可以创建多个channel(通道,一个conn可以创建多个channel,使用channel节省了tcp资源,后续的很多操作如:队列声明、消息声明发送、交换器声明等都是在channel基础上操作的),接下来是产生一个channel:

ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()

为了发送消息到队列,我们还应该声明一个队列,并将消息发送给它:

q, err := ch.QueueDeclare(
  "hello", // name 队列名称
  false,   // durable    是否持久化,默认为false
  false,   // delete when unused 队列无订阅时是否自动删除队列
  false,   // exclusive 是否队列私有,私有后仅有一个应用可以连接
  false,   // no-wait
  nil,     // arguments
)
failOnError(err, "Failed to declare a queue")

body := "hello"    //消息主体
err = ch.Publish(         //发送消息
  "",     // exchange    //指定消息发送的交换器名称
  q.Name, // routing key   //路由键
  false,  // mandatory
  false,  // immediate
  amqp.Publishing {
    ContentType: "text/plain",    //消息类型:文本消息
    Body:        []byte(body),  //消息体,理论上可以发送任意类型数据,因为是byte类型
  })
failOnError(err, "Failed to publish a message")

至此发送方代码完毕!

接收:

接收跟发送不同的是,接收端一直运行监听发送端消息发送并打印输出,接收端的模型如下:

在这里,我们仍然使用send.go中的逻辑执行,首先是链接服务器,其次是声明channel和队列(可以防止接收端启动时发送端还没有启动的情况),主要代码如下:

conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()

ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()

q, err := ch.QueueDeclare(
  "hello", // name
  false,   // durable
  false,   // delete when usused
  false,   // exclusive
  false,   // no-wait
  nil,     // arguments
)
failOnError(err, "Failed to declare a queue")

这里声明的队列名称就是send.go中声明的队列,然后从该队列中读取消息并打印:

msgs, err := ch.Consume(
  q.Name, // queue
  "",     // consumer
  true,   // auto-ack
  false,  // exclusive
  false,  // no-local
  false,  // no-wait
  nil,    // args
)
failOnError(err, "Failed to register a consumer")

forever := make(chan bool)

go func() {
  for d := range msgs {
    log.Printf("Received a message: %s", d.Body)
  }
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever

至此,所有代码书写完毕,完整版代码如下:

发送:

package main

import (
    "fmt"
    "log"

    "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s:%s", msg, err)
        panic(fmt.Sprintf("%s:%s", msg, err))
    }
}

func main() {
    //链接rabbitMQ服务器
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    failOnError(err, "fail to connect to rabbitmq server")
    defer conn.Close()

    //声明一个channel
    ch, err := conn.Channel()
    failOnError(err, "fail to open a channel")
    defer ch.Close()

    //根据channel声明一个队列
    q, err := ch.QueueDeclare("queue_name", false, false, false, false, nil)
    failOnError(err, "fail to define a queue")

    //使用channel直接发送消息至队列
    body := "hello queue!"
    err = ch.Publish("", q.Name, false, false, amqp.Publishing{
        ContentType: "text/plain",
        Body:        []byte(body),
    })
    failOnError(err, "fail to publish the message")
}

接收:

// rabbitmq_1.receiver project main.go
package main

import (
    "fmt"
    "log"

    "github.com/streadway/amqp"
)

func main() {
    //链接RabbitMQ服务器
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    //声明一个channel
    ch, err := conn.Channel()
    failOnError(err, "failed to open a channel")
    defer ch.Close()

    //使用channel声明一个队列
    q, err := ch.QueueDeclare("queue_name", false, false, false, false, nil)
    failOnError(err, "failed to declare a queue")

    //注册一个消费者
    msgs, err := ch.Consume(q.Name, "", true, false, false, false, nil)
    failOnError(err, "failed to register a consumer")

    //定义一个never型chan,用于防止进程退出
    var forever chan bool = make(chan bool, 0)
    //开启一个channel,实时打印channel中的消息
    go func() {
        for d := range msgs {
            log.Printf("received a message:%s", d.Body)
        }
    }()

    log.Printf("press Ctrl+c to exit!")
    <-forever
}

func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
        panic(fmt.Sprintf("%s: %s", msg, err))
    }
}

然后我们使用RabbitMQ自带的管理工具查看mq运行情况:

首先要开启管理工具:
rabbitmq-plugins enable rabbitmq_management

然后浏览器访问地址:http://localhost:15672/,这里输入默认用户名密码:guest/guest,进去后界面如下:

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • rabbitmq消息队列——"路由"

    在之前的教程中,我们创建了一个简单的日志系统。我们能够向许多交换器转发日志消息。 在本教程中,我们将添加一个功能——我们让它仅仅接收我们感兴趣的日志类别。举例:...

    用户1141560
  • golang代码片段(摘抄)

    以下是从golang并发编程实战2中摘抄过来的代码片段,主要是实现一个简单的tcp socket通讯(客户端发送一个数字,服务端计算该数字的立方根然后返回),写...

    用户1141560
  • rabbitmq消息队列——"工作队列"

    二、”工作队列” ? 在第一节中我们发送接收消息直接从队列中进行。这节中我们会创建一个工作队列来分发处理多个工作者中的耗时性任务。 工作队列主要是为了避免进行一...

    用户1141560
  • Go操作Elasticsearch

    常见_youmen
  • 视频流媒体平台采用Go语言编程ioutil.ReadAll的用法注意点

    由于Go语言非常易学,且代码精简,我们很多同事在研发视频平台的时候,都喜欢拿Go语言来做编程,我们这个问题就是在使用Go语言做编程的时候出现的,下面就跟大家详细...

    EasyNVR
  • Golang之Socket

    超蛋lhy
  • Golang中的RPC(转载)

    GitHub: https://github.com/zhuchenglin/gorpc

    lin_zone
  • 浏览器中玩人脸识别

    其实浏览器中的人脸识别 API 已经发布有一段时间了,从Chrome 70 版本以上就有了。其中包括了人脸,文本或 QR 码的识别,基本上覆盖了当前互联网应用的...

    IMWeb前端团队
  • Go生成读取二维码(skip2/go-qrcode和boombuler/barcode)

    QR Code码,是由Denso公司于1994年9月研制的一种矩阵二维码符号,它具有一维条码及其它二维条码所具有的信息容量大、可靠性高、可表示汉字及图象多种文字...

    程序员的酒和故事
  • 关于一些动态创建的节点无法绑定事件的问题

    使用JQ提供的.on()和.delegate()方法可以解决解决此问题,给动态加载的元素成功绑定上事件,但是在这两种方法的参数中一定得写上我们需要绑定事件的...

    colezhou

扫码关注云+社区

领取腾讯云代金券