首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Node下RabbitMQ的使用

Node下RabbitMQ的使用

作者头像
MrTreasure
发布2018-05-10 11:18:34
1.1K0
发布2018-05-10 11:18:34
举报

相关代码

github

确保主机已经安装 RabbitMQ 并映射到 5762 端口

  • 多 worker 下默认调度是 RR

RabbitMQ 的一些名词定义

  • Producer 生产者是一个用户端程序,用来发送消息
  • Consumer 消费者是一个服务端程序,用来接收消息
  • Queue 队列是一个RabbitMQ的内部对象,用来存储消息

Message acknowledgment 消息回执

在实际应用中,可能会发生消费者收到Queue中的消息,但没有处理完成就宕机(或出现其他意外)的情况,这种情况下就可能会导致消息丢失。为了避免这种情况发生,我们可以要求消费者在消费完消息后发送一个回执给RabbitMQ,RabbitMQ收到消息回执(Message acknowledgment)后才将该消息从Queue中移除;如果RabbitMQ没有收到回执并检测到消费者的RabbitMQ连接断开,则RabbitMQ会将该消息发送给其他消费者(如果存在多个消费者)进行处理。这里不存在timeout概念,一个消费者处理消息时间再长也不会导致该消息被发送给其他消费者,除非它的RabbitMQ连接断开。

Message durability 消息持久化

将队列中的消息进行本地持久化存储,避免因为意外原因导致丢失的大部分消息,通过设置durable: true

Prefetch count 消息处理树

通过设置每一个消费者处理消息的数量,如果没有完成确认,就不再派发消息给消费者

exchange 交换器

生产者并不直接将消息发送到对应队列中,而是先发送到exchange 交换器中,交换器再通过一定的规则分发给一个或多个队列。交换器有四种类型:

  • 1
  • 2
  • 3
  • 4

routing key 路由key(生产者定义)

生产者在将消息发送给Exchange的时候,一般会指定一个routing key,来指定这个消息的路由规则,而这个routing key需要与Exchange Type及binding key联合使用才能最终生效。 在Exchange Type与binding key固定的情况下(在正常使用时一般这些内容都是固定配置好的),我们的生产者就可以在发送消息给Exchange时,通过指定routing key来决定消息流向哪里。 RabbitMQ为routing key设定的长度限制为255 bytes。

routing key 作用于发送的消息上

ch.publish('direct_logs', 'info', Buffer.from(ctx.request.body.content))
// 其中的 info 就是 binding key

Binding Bindg将exchange和Queue联系起来

Bing作用于消息队列Queue上

ch.bindQueue(queue.queue, ex, '')

Exchange Types 交换器的类型

  • fanout 将所有的消息发送到订阅的消息队列中
  • direct 将 binding key 与 routing key 完全相等的消息发送到订阅的队列中
  • topic 按照一定的规则匹配路由

  1. routing key为一个句点号“. ”分隔的字符串(我们将被句点号“. ”分隔开的每一段独立的字符串称为一个单词),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”
  2. binding key与routing key一样也是句点号“. ”分隔的字符串
  3. binding key中可以存在两种特殊字符“ * ”与“#”,用于做模糊匹配,其中“ * ”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)
  • headers 根据消息内容中的 header 值进行匹配,该 header 是一个键值对,在建立队列与exchange的链接时,会生成一个键值对,exchange将发送消息到键值对完全匹配的队列中

RPC 远程过程调用 消息队列本身并不具备回调的功能,即发出一个消息后,生产者并不知道消费者返回的消息(能够知道是否消费,通过 ch.ack(msg) ),通过RPC能够返回消费者的消息。其原理在于新建一个replyQueue,消费者在之前订阅该队列

思考:在HTTP1.1的情况下,server 接收到前端响应提交消息,与接收到replyQueue的消息是两个独立的事件,没办法在前者的响应中加上后者返回的信息。因此只能通过ws协议实现推送。

Node下的RabbitMQ应用

# 一个最简单的生产者与消费者建立过程
const URL = 'amqp://guest:Sunshine@localhost:5672'
// 创建一个连接
const connection = amqp.connect(URL)
// 创建生产者
connection.then(conn => {
  // 创建一个通道
  return conn.createChannel()
}).then(ch => {
  // 指定一个消息队列如果不存在则新建
  return ch.assertQueue('node').then(ok => {
    // 消息只能发送二进制
    ch.sendToQueue('node', Buffer.from('始终相信美好的事情即将发生'))
    ch.sendToQueue('node', Buffer.from('用心就能看见'))
    ch.sendToQueue('node', Buffer.from('从陌生的脸看到明天'))
  })
}).catch(err => {
  console.error(err)
})

// 创建消费者
connection.then(conn => {
  // 创建一个通道
  return conn.createChannel()
}).then(ch => {
  ch.assertQueue('node').then(ok => {
    ch.consume('node', msg => {
      if (msg) {
        console.log(chalk.green(msg.content.toString()))
        ch.ack(msg)
      }
    })
  })
}).catch(err => {
  console.error(err)
})
# Koa下RabbitMQ的应用,省略部分koa的代码
// 定义一个 Promise 避免每次都重复调用(利用Promise的状态不可逆)
const CHANNEL = rabbit.getChannel()
const Queue = 'node'
const ex = 'logs'
const key = 'anonymous.info'
// 普通的消息队列  WorkQueues
router.post('/send', async ctx => {
  const ch = await CHANNEL
  // 定义durable属性使得信道具有持久性
  ch.assertQueue(Queue, { durable: true })

  if (typeof ctx.request.body.content !== 'string') {
    ctx.status = 400
    ctx.body = {
      status: 'faild'
    }
    return
  }

  ch.sendToQueue(Queue, Buffer.from(ctx.request.body.content))
  ctx.body = {
    result: 'success'
  }
  log(chalk.magenta`[x] Sent${ctx.request.body.content}`)
})

// 使用 exchange 的消息队列 
router.post('/exchange', async ctx => {
  const ch = await CHANNEL
  // 定义类型 fanout,所有订阅的队列都会收到接受消息
  ch.assertExchange(ex, 'fanout', { durable: false })
  try {
    // 默认为空的名字
    ch.publish(ex, '', Buffer.from(ctx.request.body.content))
    ctx.body = {
      result: 'success'
    }
  } catch (error) {
    console.error(chalk.red`${error.toString()}`)
    ctx.body = error
  }
})

// 带routing的exchange
router.post('/routing', async ctx => {
  const ch = await CHANNEL
  // 设置 exchange 的名字 以及分发方法
  ch.assertExchange('direct_logs', 'direct', { durable: false })
  try {
    // 向管道分发 info 级的方法
    ch.publish('direct_logs', 'info', Buffer.from(ctx.request.body.content))
    ctx.body = {
      result: 'success'
    }
  } catch (error) {
    console.error(chalk.red`${error.toString()}`)
    ctx.body = error
  }
})

// topic的exchange
router.post('/topic', async ctx => {
  const ch = await CHANNEL
  ch.assertExchange(ex, 'topic', { durable: false })
  ch.publish(ex, key, Buffer.from(ctx.request.body.content))
})
#订阅 fanout 类型的队列
rabbit.getChannel().then(ch => {
  // 将通道绑定到名为 ex 路由方法为 fanout(监听所有)的 exchange 上
  ch.assertExchange(ex, 'fanout', { durable: false })
  // 获取队列
  ch.assertQueue('', { exclusive: false }).then(queue => {
    log(chalk.blue`[*] Waiting for messages in ${queue.queue}. To exit press CTRL+C`)
    // 将通道绑定到具体的队里上
    ch.bindQueue(queue.queue, ex, '')
    // 消费
    ch.consume(queue.queue, msg => {
      log(chalk.green`[x] Received${msg.content.toString()}`)
      ch.ack(msg)
    }, { noAck: false })
  })
})

#订阅 direct 类型的队列
rabbit.getChannel().then(ch => {
  ch.assertExchange(ex, 'direct', { durable: false })
  ch.assertQueue('', { exclusive: true }).then(q => {
    log(chalk.blue`[*] Waiting for messages in ${q.queue}. To exit press CTRL+C`)
    ch.bindQueue(q.queue, ex, 'info')

    ch.consume(q.queue, msg => {
      log(chalk.green`[x] Received${msg.content.toString()}`)
      ch.ack(msg)
    }, { noAck: true })
  })
})

#设置prefetch的消费者
rabbit.getChannel().then(ch => {
  // 将该消费者设定到 Queue 队列中
  ch.assertQueue(Queue, { durable: true })
  // 该消费者最多同时消费两条信息
  ch.prefetch(2)
  // 进行消费
  ch.consume(Queue, (msg) => {
    // 模拟一个消耗时间较长的任务
    global.setTimeout(() => {
      log(chalk.green`[x] Received ${msg.content.toString()}`)
      // 消息被消费后一定要调用 channel 的 ack 方法确认消费,否则会一直停在队列中
      ch.ack(msg)
    }, 2000)
  }, { noAck: false })
})

P.S 参考资料RabbitMQ基础概念详细介绍

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018.04.07 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 相关代码
  • RabbitMQ 的一些名词定义
  • Node下的RabbitMQ应用
相关产品与服务
对象存储
对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档