rabbitmq消息队列——"工作队列"

二、”工作队列”

在第一节中我们发送接收消息直接从队列中进行。这节中我们会创建一个工作队列来分发处理多个工作者中的耗时性任务。

工作队列主要是为了避免进行一些必须同步等待的资源密集型的任务。实际上我们将这些任务时序话稍后分发完成。我们将某个任务封装成消息然后发送至队列,后台运行的工作进程将这些消息取出然后执行这些任务。当你运行多个工作进程的时候,这些任务也会在它们之间共享。

前期准备

上一节的练习中我们发送的是简单包含“Hello World!”的消息,这节我们还发送字符串不过用此代表更复杂的任务,实际我们这里并没有真正的任务,像图片缩放或pdf文件渲染之类的,这里我们假装我们很忙(即处理的消息任务很耗时),使用time.Sleep函数实现。我们用字符串中的”.”符号的数量代表任务的复杂性,每一个”.”需要耗时1s来执行处理。比如:”Hello…”代表该消息处理耗时3s。

我们稍微修改下上节中send.go代码,为了可以在命令行直接发送任意数量的消息。该程序将任务发送到我们的队列,暂且命名为new_task.go:

body := bodyFrom(os.Args)
err = ch.Publish(
  "",           // exchange
  q.Name,       // routing key
  false,        // mandatory
  false,
  amqp.Publishing {
    DeliveryMode: amqp.Persistent,
    ContentType:  "text/plain",
    Body:         []byte(body),
  })
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)

我们旧的receiver.go为程序也要坐下修改:对每个消息体中的”.”符号它需要伪造一个每秒执行的工作队列。它将消息从队列中取出并执行,所以这里暂且命名为work.go:

msgs, err := ch.Consume(
  q.Name, // queue
  "",     // consumer
  true,   // auto-ack
  false,  // exclusive
  false,  // no-local
  false,  // no-wait
  nil,    // args
)
failOnError(err, "Failed to register a consumer")

forever := make(chan bool)

go func() {
  for d := range msgs {
    log.Printf("Received a message: %s", d.Body)
    dot_count := bytes.Count(d.Body, []byte("."))
    t := time.Duration(dot_count)
    time.Sleep(t * time.Second)
    log.Printf("Done")
  }
}()

log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever

请注意,我们这里的假任务模拟的是执行时间。如上一节中方式,运行:

shell1$ go run worker.go
shell2$ go run new_task.go

运行work.go:

运行new_task.go:

可以看到,work.go循环监听消息并打印,new_task.go中,我们接收控制台参数作为消息内容并发送,消息接收后自动应答。

轮转分发(Round-robin dispatching)

使用任务队列的一个优点就是有能力更简单的处理平行任务,如果工作任务堆积之后,我们只需要增加更多的工作进程,可以很简单的实现规模拓展。

首先,我们同时运行2个工作队列,都从消息队列中获取消息,实际会怎么样呢?来看看。

你现在需要打开2个窗口,都运行work.go,即work1和work2,这就是我们的2个消费者:C1、C2。

第3个窗口我们用来发送消息到队列,一旦消费者运行起来后便可以发送消息:

shell3$ go run new_task.go First message.
shell3$ go run new_task.go Second message..
shell3$ go run new_task.go Third message...
shell3$ go run new_task.go Fourth message....
shell3$ go run new_task.go Fifth message.....

然后看下work.go中接收的数据:

默认情况下,RabbitMQ会将队列中的每条消息有序的分发给每一个消费者,比如这里的work1和work2,平均每个消费者都会获得相同数量的消息(一个队列中的同一条消息不会同时发送给超过2个消费者),这种分发消息的方式就是“轮转分发”,可以开启3个work试试。

至此完整代码如下:

new_task.go:

package main

import (
    "fmt"
    "log"
    "os"
    "strings"

    "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
        panic(fmt.Sprintf("%s: %s", msg, err))
    }
}

func main() {
    //连接服务器
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    //声明channel
    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    //声明队列
    q, err := ch.QueueDeclare(
        "hello", // name        队列名称
        false,   // durable    是否持久化,这里false
        false,   // delete when unused
        false,   // exclusive
        false,   // no-wait
        nil,     // arguments
    )
    failOnError(err, "Failed to declare a queue")

    //创建请求体
    body := bodyFrom(os.Args)
    //发送消息
    err = ch.Publish(
        "",     // exchange     交换器名称,使用默认
        q.Name, // routing key    路由键,这里为队列名称
        false,  // mandatory
        false,
        amqp.Publishing{
            ContentType:  "text/plain",    //消息类型,文本消息
            Body:         []byte(body),
        })
    failOnError(err, "Failed to publish a message")
    log.Printf(" [x] Sent %s", body)
}

func bodyFrom(args []string) string {
    var s string
    if (len(args) < 2) || os.Args[1] == "" {
        s = "hello golang"
    } else {
        s = strings.Join(args[1:], " ")
    }
    return s
}

work.go:

package main

import (
    "bytes"
    "fmt"
    "log"
    "time"

    "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
        panic(fmt.Sprintf("%s: %s", msg, err))
    }
}

func main() {
    //链接服务器
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    //声明channel
    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    //声明队列
    q, err := ch.QueueDeclare(
        "hello", // name    队列名称
        false,   // durable    持久化标识
        false,   // delete when unused
        false,   // exclusive
        false,   // no-wait
        nil,     // arguments
    )
    failOnError(err, "Failed to declare a queue")

    //声明消费者
    msgs, err := ch.Consume(
        q.Name, // queue    消费的队列名称
        "",     // consumer
        true,   // auto-ack        自动应答
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )
    failOnError(err, "Failed to register a consumer")

    forever := make(chan bool) //主要用来防止主进程窗口退出

    go func() {
        for d := range msgs {
            log.Printf("Received a message: %s", d.Body)
            dot_count := bytes.Count(d.Body, []byte("."))
            t := time.Duration(dot_count)
            time.Sleep(t * time.Second) //延时x秒
            log.Printf("Done")
        }
    }()

    log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
    <-forever
}

消息应答

完成一个任务处理可能会花费数秒时间,你可能会纳闷如果其中一个消费者任务处理时间过长只部分完成就挂掉会怎样。如果使用以上代码,一旦RabbitMQ发送一个消息给消费者然后便迅速将该消息从队列内存中移除。这种情况下,如果你杀掉其中一个工作进程,那该进程正在处理的消息也将丢失。我们同样,也将丢失所有发送给该进程的未被处理的消息。

但我们并不想丢失这些任务或消息。如果某个进程挂掉,我们期望该消息仍会被发送至其它工作进程。

如果一个进程挂掉,我们希望该消息或任务可以被分发至其它工作进程。

为了确保消息永不丢失,RabbitMQ支持消息应答机制。当消息被接受,处理之后一条应答便会从消费者回传至发送方,然后RabbitMQ将其删除。

如果某个消费者挂掉(信道、链接关闭或者tcp链接丢失)且没有发送ack应答,RabbitMQ会认为该消息没有被处理完全然后会将其重新放置到队列中。通过这种方式你就可以确保消息永不丢失,甚至某个工作进程偶然挂掉的情况。

永远不会有消息超时这一说,RabbitMQ在工作进程处理挂掉后将会重发消息,这很不错甚至处理消息要发送很长很长的时间。

默认情况下消息应答是关闭的。是时候使用false(auto-ack配置项)参数将其开启了:

msgs, err := ch.Consume(
  q.Name, // queue
  "",     // consumer
  false,  // auto-ack
  false,  // exclusive
  false,  // no-local
  false,  // no-wait
  nil,    // args
)
failOnError(err, "Failed to register a consumer")

forever := make(chan bool)

go func() {
  for d := range msgs {
    log.Printf("Received a message: %s", d.Body)
    dot_count := bytes.Count(d.Body, []byte("."))
    t := time.Duration(dot_count)
    time.Sleep(t * time.Second)
    log.Printf("Done")
    d.Ack(false)
  }
}()

log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever

这里唯一不同的是将auto-ack设置为了false,使用手动应答,然后在代码中需要调用d.Ack(false),进行手动应答。

使用如上代码后,即时消息处理时按了Ctrl+C结束了进程,什么也不会丢失。工作进程挂掉后所有未应答的消息将会被重新分发。

消息持久化

我们已经学了如何确保消费者挂掉后任务不丢失的情况,但是一旦RabbitMQ服务器重启后我们的消息或任务依旧会丢失。

当RabbitMQ服务器停止或崩溃时,它将会丢失多有的队列和消息,除非你告诉它不要这么做。要做到服务宕机消息不丢失需要做到两点:我们需要将消息和队列同时标为持久化。

首先,我们需要确保RabbitMQ不会丢失我们的队列,为做到此,队列声明修改如下:

q, err := ch.QueueDeclare(
  "hello",      // name
  true,         // durable
  false,        // delete when unused
  false,        // exclusive
  false,        // no-wait
  nil,          // arguments
)
failOnError(err, "Failed to declare a queue")

即使这里被我们这样修改过,但是在先前的设置中此代码并不会工作。因为我们已经命名了一个叫做hello的队列,并且非持久。RabbitMQ不允许定义2个不同参数的队列,一旦做了将会报错。但是有一个快速的解决办法:我们声明队列换个名字就行了,如下task_queue,new_task.go:

q, err := ch.QueueDeclare(
  "task_queue", // name
  true,         // durable
  false,        // delete when unused
  false,        // exclusive
  false,        // no-wait
  nil,          // arguments
)
failOnError(err, "Failed to declare a queue")

durable配置项的更改需要同时反映到生产者和消费者的代码上。

基于这点我们可以确定RabbitMQ重启后task_queue队列不会丢失了。现在我们还需要将消息标记为持久:使用amqp.Publishing配置项中的amqp.Persistent值实现:

err = ch.Publish(
  "",           // exchange
  q.Name,       // routing key
  false,        // mandatory
  false,
  amqp.Publishing {
    DeliveryMode: amqp.Persistent,
    ContentType:  "text/plain",
    Body:         []byte(body),
  })

完整的new_task.go的代码如下:

package main

import (
    "fmt"
    "log"
    "os"
    "strings"

    "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
        panic(fmt.Sprintf("%s: %s", msg, err))
    }
}

func main() {
    //连接服务器
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    //声明channel
    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    //声明队列
    q, err := ch.QueueDeclare(
        "task_queue", // name        队列名称
        true,        // durable    是否持久化,这里true
        false,       // delete when unused
        false,       // exclusive
        false,       // no-wait
        nil,         // arguments
    )
    failOnError(err, "Failed to declare a queue")

    //创建请求体
    body := bodyFrom(os.Args)
    //发送消息
    err = ch.Publish(
        "",     // exchange     交换器名称,使用默认
        q.Name, // routing key    路由键,这里为队列名称
        false,  // mandatory
        false,
        amqp.Publishing{
            DeliveryMode: amqp.Persistent,
            ContentType:  "text/plain", //消息类型,文本消息
            Body:         []byte(body),
        })
    failOnError(err, "Failed to publish a message")
    log.Printf(" [x] Sent %s", body)
}

func bodyFrom(args []string) string {
    var s string
    if (len(args) < 2) || os.Args[1] == "" {
        s = "hello golang"
    } else {
        s = strings.Join(args[1:], " ")
    }
    return s
}

work.go如下:

package main

import (
    "bytes"
    "fmt"
    "log"
    "time"

    "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
        panic(fmt.Sprintf("%s: %s", msg, err))
    }
}

func main() {
    //链接服务器
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    //声明channel
    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    //声明队列
    q, err := ch.QueueDeclare(
        "task_queue", // name    队列名称
        true,         // durable    持久化标识
        false,        // delete when unused
        false,        // exclusive
        false,        // no-wait
        nil,          // arguments
    )
    failOnError(err, "Failed to declare a queue")

    //声明消费者
    msgs, err := ch.Consume(
        q.Name, // queue    消费的队列名称
        "",     // consumer
        false,  // auto-ack        自动应答
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )
    failOnError(err, "Failed to register a consumer")

    forever := make(chan bool) //主要用来防止主进程窗口退出

    go func() {
        for d := range msgs {
            log.Printf("Received a message: %s", d.Body)
            dot_count := bytes.Count(d.Body, []byte("."))
            t := time.Duration(dot_count)
            time.Sleep(t * time.Second) //延时x秒
            log.Printf("Done")
            d.Ack(false)
        }
    }()

    log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
    <-forever
}

这里测试的话,可以使用RabbitMQ自带的ctl命令进行RabbitMQ应用的重启,然后看下消息会不会丢失。

公平调度

你可能已经注意到了这种消息分发机制并非我们实际想要的那种,举例来说有两个消费者或工作进程,所有奇数的消息都很难处理而所有偶数的消息都便于处理,那么一个工作进程就比较忙碌而另一个就比较轻松,好吧,RabbitMQ实际也不清楚实际的消息分发是怎样的。

这种情况的发生是因为RabbitMQ仅仅负责分发队列中的消息。并不查看消费者中的未应答的消息数量。它只是盲目的将消息均发给每个消费者。

为了避免这种情况我们可以将prefetch count项的值配置为1,这将会指示RabbitMQ在同一时间不要发送超过一条消息给每个消费者。换句话说,直到消息被处理和应答之前都不会发送给该消费者任何消息。取而代之的是,它将会发送消息至下一个比较闲的消费者或工作进程。

err = ch.Qos(
  1,     // prefetch count
  0,     // prefetch size
  false, // global
)
failOnError(err, "Failed to set QoS")

所有完整的实例代码如下:

首先是new_task.go:

package main

import (
    "fmt"
    "log"
    "os"
    "strings"

    "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
        panic(fmt.Sprintf("%s: %s", msg, err))
    }
}

func main() {
    //连接服务器
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    //声明channel
    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    //声明队列
    q, err := ch.QueueDeclare(
        "task_queue", // name        队列名称
        true,         // durable    是否持久化,这里true
        false,        // delete when unused
        false,        // exclusive
        false,        // no-wait
        nil,          // arguments
    )
    failOnError(err, "Failed to declare a queue")

    //创建请求体
    body := bodyFrom(os.Args)
    //发送消息
    err = ch.Publish(
        "",     // exchange     交换器名称,使用默认
        q.Name, // routing key    路由键,这里为队列名称
        false,  // mandatory
        false,
        amqp.Publishing{
            DeliveryMode: amqp.Persistent,
            ContentType:  "text/plain", //消息类型,文本消息
            Body:         []byte(body),
        })
    failOnError(err, "Failed to publish a message")
    log.Printf(" [x] Sent %s", body)
}

func bodyFrom(args []string) string {
    var s string
    if (len(args) < 2) || os.Args[1] == "" {
        s = "hello golang"
    } else {
        s = strings.Join(args[1:], " ")
    }
    return s
}

然后是work.go:

package main

import (
    "bytes"
    "fmt"
    "log"
    "time"

    "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
        panic(fmt.Sprintf("%s: %s", msg, err))
    }
}

func main() {
    //链接服务器
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    //声明channel
    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    //声明队列
    q, err := ch.QueueDeclare(
        "task_queue", // name    队列名称
        true,         // durable    持久化标识
        false,        // delete when unused
        false,        // exclusive
        false,        // no-wait
        nil,          // arguments
    )
    failOnError(err, "Failed to declare a queue")

    err = ch.Qos(
        1,     // prefetch count
        0,     // prefetch size
        false, // global
    )
    failOnError(err, "Failed to set QoS")

    //声明消费者
    msgs, err := ch.Consume(
        q.Name, // queue    消费的队列名称
        "",     // consumer
        false,  // auto-ack        自动应答
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )
    failOnError(err, "Failed to register a consumer")

    forever := make(chan bool) //主要用来防止主进程窗口退出

    go func() {
        for d := range msgs {
            d.Ack(false)
            log.Printf("Received a message: %s", d.Body)
            dot_count := bytes.Count(d.Body, []byte("."))
            t := time.Duration(dot_count)
            time.Sleep(t * time.Second) //延时x秒
            log.Printf("Done")
        }
    }()

    log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
    <-forever
}

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏java架构师

Web开发中的文件上传组件uploadify的使用

在Web开发中,有很多可以上传的组件模块,利用HTML的File控件的上传也是一种办法,不过这种方式,需要处理的细节比较多,而且只能支持单文件的操作。在目前We...

2855
来自专栏iOS开发攻城狮的集散地

iOS app国际化 、跳转到系统设置、iOS10通知、正则表达式

1544
来自专栏Golang语言社区

使goroutine同步的方法总结

在前面并发性能对比的文章中,我们可以看到Golang处理大并发的能力十分强劲,而且开发也特别方便,只需要用go关键字即可开启一个新的协程。

1063
来自专栏Aloys的开发之路

LaTeX内容总结

心得 Sublime + LaTeXTools 简直噩梦,决不再碰 TeX最好用的发行版就是Tex Live TeX Live可以在Linux、Windows平...

24210
来自专栏Pythonista

Golang之Mysql事务

2402
来自专栏hbbliyong

WPF命令(Command)介绍、命令和数据绑定集成应用

要开始使用命令,必须做三件事:                                               一:定义一个命令       ...

3423
来自专栏静默虚空的博客

Eclipse 使用小结

代码智能提示 Java智能提示 Window -> Preferences -> Java -> Editor -> Content Assist -> Aut...

2156
来自专栏Golang语言社区

剖析Go编写的Socket服务器模块解耦及基础模块的设计

Server的解耦—通过Router+Controller实现逻辑分发 在实际的系统项目工程中中,我们在写代码的时候要尽量避免不必要的耦合,否则你以后在更新和维...

3568
来自专栏c#开发者

打包并自动安装sql数据库

打包,并自动安装SQL数据库 应一位网友的需求,并修正了MVP李洪根".NET平台下WEB应用程序的部署(安装数据库和自动配置)"中的osql用法错误,已测试通...

2763
来自专栏余生开发

win10 系统下,修改此处打开命令行为cmd或powershell

1、用“Windows+R”打开运行窗口输入“regedit”并按回车。或直接在Cortana栏中输入“regedit”,单击打开注册表管理器;

1372

扫码关注云+社区