专栏首页share ai happiness消息中间件工作队列 — RabbitMQ

消息中间件工作队列 — RabbitMQ

工作队列

工作队列(又称:任务队列——Task Queues)是为了避免等待一些占用大量资源、时间的操作。当我们把任务(Task)当作消息发送到队列中,一个运行在后台的工作者(worker)进程就会取出任务然后处理。当你运行多个工作者(workers),任务就会在它们之间共享。这个概念在网络应用中是非常有用的,它可以在短暂的HTTP请求中处理一些复杂的任务。

RabbitMQ分发策略:轮询和公平分发。

轮询分发:

如果现在有两个消费者,生产者产生的消息会轮流分发给两个消费者。

公平分发:

比如有两个工作者(workers),处理奇数消息的比较繁忙,处理偶数消息的比较轻松。然而RabbitMQ并不知道这些,它仍然一如既往的派发消息。

这时因为RabbitMQ只管分发进入队列的消息,不会关心有多少消费者(consumer)没有作出响应。它盲目的把第n-th条消息发给第n-th个消费者。

我们可以使用basic.qos方法,并设置prefetch_count=1。这样是告诉RabbitMQ,在同一时刻,不要发送超过1条消息给一个工作者(worker),直到它已经处理了上一条消息并且作出了响应。这样,RabbitMQ就会把消息分发给下一个空闲的工作者(worker)。

channel.basic_qos(prefetch_count=1)

关于队列大小

如果所有的工作者都处理繁忙状态,你的队列就会被填满。你需要留意这个问题,要么添加更多的工作者(workers),要么使用其他策略。

点对点进行发布:

生产者代码:

import pika
import json

credentials = pika.PlainCredentials('xuan', '123456')  # mq用户名和密码
# 虚拟队列需要指定参数 virtual_host,如果是默认的可以不填。
connection = pika.BlockingConnection(pika.ConnectionParameters(host = '210.30.97.163',port = 5672,virtual_host = '/',credentials = credentials))
channel=connection.channel()
# 声明消息队列,消息将在这个队列传递,如不存在,则创建
result = channel.queue_declare(queue = 'mq-test', durable=True)

for i in range(10):
    message=json.dumps({'CID':"1000%s"%i})
# 向队列插入数值 routing_key是队列名
    channel.basic_publish(exchange = '',routing_key = 'mq-test',body = message)
    print(message)
connection.close()

消费者代码:

import pika

credentials = pika.PlainCredentials('xuan', '123456')
connection = pika.BlockingConnection(pika.ConnectionParameters(host = '210.30.97.163',port = 5672,virtual_host = '/',credentials = credentials))
channel = connection.channel()
# 申明消息队列,消息在这个队列传递,如果不存在,则创建队列
channel.queue_declare(queue = 'mq-test', durable = True)
# 定义一个回调函数来处理消息队列中的消息,这里是打印出来
def callback(ch, method, properties, body):
    ch.basic_ack(delivery_tag = method.delivery_tag)
    print(body.decode())

# 告诉rabbitmq,用callback来接收消息
channel.basic_consume('mq-test',callback)
# 开始接收信息,并进入阻塞状态,队列里有信息才会调用callback进行处理
channel.start_consuming()

一对多进行发布:

生产者代码:

#!/usr/bin/env python
import pika
import sys

credentials = pika.PlainCredentials('xuan', '123456')  # mq用户名和密码
connection = pika.BlockingConnection(pika.ConnectionParameters(host = '210.30.97.163',port = 5672,virtual_host = '/',credentials = credentials))
channel=connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(
    exchange='',
    routing_key='task_queue',
    body=message,
    properties=pika.BasicProperties(
        delivery_mode=2,  # make message persistent
    ))
print(" [x] Sent %r" % message)
connection.close()

消费者代码:

#!/usr/bin/env python
import pika
import time

credentials = pika.PlainCredentials('xuan', '123456')  # mq用户名和密码
connection = pika.BlockingConnection(pika.ConnectionParameters(host = '210.30.97.163',port = 5672,virtual_host = '/',credentials = credentials))
channel=connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')

# 当工作者(worker)完成了任务,就发送一个响应。
#下面的代码,我们发现即使使用CTRL+C杀掉了一个工作者(worker)进程,消息也不会丢失。当工作者(worker)挂掉这后,所有没有响应的消息都会重新发送。
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body.decode())
    time.sleep(body.count(b'.'))
    print(" [x] Done")
    ch.basic_ack(delivery_tag=method.delivery_tag)

#可以使用basic.qos方法,并设置prefetch_count=1。这样是告诉RabbitMQ,再同一时刻,不要发送超过1条消息给一个工作者(worker)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback)

channel.start_consuming()

本文分享自微信公众号 - 1001次重燃(smile765999),作者:木野归郎

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2020-11-21

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • MNIST数据集 — 前期准备

    前面都是基础零碎的知识,需要通过一个栗子来为大家把整个流程走一遍,从整体上对TensorFlow进行一个把握,大概分为四篇文章来说明吧(前期准备、前馈计算、模型...

    木野归郎
  • 消息中间件初识和安装 — RabbitMQ

    这两天有个功能实现需要用到rabbitmq,之前做大数据的时候用过kafka,对rabbitmq了解的比较少,这里进行学习总结。

    木野归郎
  • Spark程序开发调优(前奏)

    Spark 性能优化的第一步,就是要在开发 Spark 作业的过程中注意和应用一些性能优化的基本原则。开发调优,就是要让大家了解以下一些 Spark 基本开发原...

    木野归郎
  • RabbitMQ 消息队列

    RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。

    py3study
  • 使用dbms_metadata生成建表语句(r2笔记97天)

    有时候在工作中,可以使用exp/imp得到表的创建语句。 如果想得到关于table,index,constraint的语句,可以考虑使用dbms_metadat...

    jeanron100
  • 云监控最佳实践:在SCF中上报自定义监控数据

    目前内测阶段免费使用,无需审核,开通服务即用。诚邀您点击 申请页面 参与内测体验!

    腾讯云监控团队
  • 深度学习系列(2):前向传播和后向传播算法

    深度学习系列(2):前向传播和后向传播算法 前言 讲真,之前学吴恩达的机器学习课时,还手写实现过后向传播算法,但如今忘得也一干二净。总结两个原因:1. 理解不够...

    用户1147447
  • 【LeetCode】(No.012&013) 整数与罗马数字的相互转换

    好几天没有更新LeetCode的刷题了,12和13题是整数与罗马数字的相互转换,今天用一篇文章对这个问题做一下解析。

    PM小王
  • Jenkins在kubernetes上的落地实践

    创建pv/pvc对象,这里我们要注意nfs提供给jenkins的存储目录的权限问题,否则服务因为权限无法写入数据:

    公众号: 云原生生态圈
  • 拇指记者打探事件分发机制背后的秘密(上)

    聊到事件分发,很多朋友就会想到view的dispatchTouchEvent,其实在此之前,Android还做了很多工作。

    码上积木

扫码关注云+社区

领取腾讯云代金券