前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RabbitMQ在Python中的使用详解

RabbitMQ在Python中的使用详解

作者头像
Autooooooo
发布2020-11-09 09:59:51
4.3K0
发布2020-11-09 09:59:51
举报
文章被收录于专栏:Coxhuang

RabbitMQ

关于python的队列,内置的有两种,一种是线程queue,另一种是进程queue,但是这两种queue都是只能在同一个进程下的线程间或者父进程与子进程之间进行队列通讯,并不能进行程序与程序之间的信息交换,这时候我们就需要一个中间件,来实现程序之间的通讯。 Mac安装RabbitMQ ??? https://cloud.tencent.com/developer/article/1744695 Python队列Queue使用 ??? https://cloud.tencent.com/developer/article/1744693

#0 GitHub

https://github.com/Coxhuang/python-rabbitmq

#1 环境

代码语言:javascript
复制
Python3.7.3
pika==1.0.1 # pika的版本不同,提供方法的参数名有变化

#2 开始

#2.1 轮询模式

此模式下,发送队列的一方把消息存入mq的指定队列后,若有消费者端联入相应队列,即会获取到消息,并且队列中的消息会被消费掉。 若有多个消费端同时连接着队列,则会已轮询的方式将队列中的消息消费掉。 生产者

代码语言:javascript
复制
import pika
credentials = pika.PlainCredentials(
    username = 'guest',
    password = 'guest',
)
connection = pika.BlockingConnection(
    pika.ConnectionParameters(
        host = '127.0.0.1', # MQ地址(本机)
        port = 5672, # 端口号,注意是5672,不是15672
        virtual_host = '/', # 虚拟主机
        credentials = credentials, # 用户名/密码
    )
)
channel = connection.channel()
channel.queue_declare(
    queue='queue_name_test', # 队列名
    durable=True, # 使队列持久化
)
channel.basic_publish(
    exchange='',
    routing_key='queue_name_test', # 告诉rabbitmq将消息发送到 queue_name_test 队列中
    body='Hello RabbitMQ!', # 发送消息的内容
    properties=pika.BasicProperties( delivery_mode=2,) # 消息持久化
)
print("发送...")
connection.close()

消费者

代码语言:javascript
复制
import pika
import time

auth = pika.PlainCredentials(
    username='guest',
    password='guest',
) # 用户名 / 密码
connection = pika.BlockingConnection(
    pika.ConnectionParameters(
        '127.0.0.1', # RabbitMQ 地址
        5672, # 端口号
        '/', # 虚拟主机
        auth, # 验证
    )
) # 链接RabbitMQ
channel = connection.channel() # 创建RabbitMQ通道

channel.queue_declare(
    queue='queue_name_test', # 消费对列名
    durable=True, # 持久化
)


def callback(ch, method, properties, body):
    print("消费者:  %r" % body)

channel.basic_consume(
    queue='queue_name_test', # 对列名
    auto_ack=True, # 自动回应
    on_message_callback=callback, # 回调消息
)
time.sleep(5) # 模拟消费时间
print("消费者")
channel.start_consuming()

运行生产者


# 模拟轮询模式

生产者生产5个任务到队列中

代码语言:javascript
复制
import pika
credentials = pika.PlainCredentials(
    username = 'guest',
    password = 'guest',
)
connection = pika.BlockingConnection(
    pika.ConnectionParameters(
        host = '127.0.0.1', # MQ地址(本机)
        port = 5672, # 端口号,注意是5672,不是15672
        virtual_host = '/', # 虚拟主机
        credentials = credentials, # 用户名/密码
    )
)

channel = connection.channel()

channel.queue_declare(
    queue='queue_name_test', # 队列名
    durable=True, # 使队列持久化
)

channel.basic_publish(
    exchange='',
    routing_key='queue_name_test', # 告诉rabbitmq将消息发送到 queue_name_test 队列中
    body="Hello RabbitMQ, I'm first task ", # 发送消息的内容
    properties=pika.BasicProperties( delivery_mode=2,) # 消息持久化
)

channel.basic_publish(
    exchange='',
    routing_key='queue_name_test', # 告诉rabbitmq将消息发送到 queue_name_test 队列中
    body="Hello RabbitMQ, I'm second task ", # 发送消息的内容
    properties=pika.BasicProperties( delivery_mode=2,) # 消息持久化
)

channel.basic_publish(
    exchange='',
    routing_key='queue_name_test', # 告诉rabbitmq将消息发送到 queue_name_test 队列中
    body="Hello RabbitMQ, I'm third task ", # 发送消息的内容
    properties=pika.BasicProperties( delivery_mode=2,) # 消息持久化
)

channel.basic_publish(
    exchange='',
    routing_key='queue_name_test', # 告诉rabbitmq将消息发送到 queue_name_test 队列中
    body="Hello RabbitMQ, I'm fourth task ", # 发送消息的内容
    properties=pika.BasicProperties( delivery_mode=2,) # 消息持久化
)

channel.basic_publish(
    exchange='',
    routing_key='queue_name_test', # 告诉rabbitmq将消息发送到 queue_name_test 队列中
    body="Hello RabbitMQ, I'm fifth task ", # 发送消息的内容
    properties=pika.BasicProperties( delivery_mode=2,) # 消息持久化
)

print("派送任务到队列中...")
connection.close()

消费者(新建3个消费者py文件,分别运行消费者程序)

代码语言:javascript
复制
# 消费者1
import pika
import time

auth = pika.PlainCredentials(
    username='guest',
    password='guest',
) # 用户名 / 密码
connection = pika.BlockingConnection(
    pika.ConnectionParameters(
        '127.0.0.1', # RabbitMQ 地址
        5672, # 端口号
        '/', # 虚拟主机
        auth, # 验证
    )
) # 链接RabbitMQ
channel = connection.channel() # 创建RabbitMQ通道

channel.queue_declare(
    queue='queue_name_test', # 消费对列名
    durable=True, # 持久化
)


def callback(ch, method, properties, body):
    print("消费者:  %r" % body)
    time.sleep(5) # 模拟消费时间

channel.basic_consume(
    queue='queue_name_test', # 对列名
    auto_ack=True, # 自动回应
    on_message_callback=callback, # 回调消息
)

print("我是消费者1")
channel.start_consuming()
代码语言:javascript
复制
# 消费者2 
... # 同消费者1 
... # 同消费者1 
def callback(ch, method, properties, body):
    print("消费者:  %r" % body)
    time.sleep(5) # 模拟消费时间
代码语言:javascript
复制
# 消费者3 
... # 同消费者1 
... # 同消费者1 
def callback(ch, method, properties, body):
    print("消费者:  %r" % body)
    time.sleep(5) # 模拟消费时间





轮询模式:公平分配任务给消费者,不考虑消费者的消费能力

#2.2 广播模式

在多consumer的情况下,默认rabbitmq是轮询发送消息的,但有的consumer消费速度快,有的消费速度慢,为了资源使用更平衡,引入ack确认机制。consumer消费完消息后会给rabbit发送ack,一旦未ack的消息数量超过指定允许的数量,则不再往该consumer发送,改为发送给其他consumer。

# 模拟广播模式
  • 生产者代码不变
  • 消费者
代码语言:javascript
复制
# 插入如下代码
...
def callback(ch, method, properties, body):
    print("消费者2:  %r" % body)
    time.sleep(10) # 模拟消费者消费能力
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
...

消费者1消费能力: 1秒一个 消费者2消费能力: 10秒一个 消费者3消费能力: 15秒一个





# 报错

#1

代码语言:javascript
复制
# error
pika.exceptions.AMQPConnectionError
代码语言:javascript
复制
# RabbitMQ地址写错

#2

代码语言:javascript
复制
# error
TypeError: basic_consume() got multiple values for argument 
代码语言:javascript
复制
pika版本不同
当前pika==1.0.1
basic_consume的参数名发生改变,应该为如下格式:
channel.basic_consume(
    queue='queue_name_test', # 对列名
    auto_ack=False, # 自动回应
    on_message_callback=callback, # 回调消息
)
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2019/05/03 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • RabbitMQ
  • #0 GitHub
  • #1 环境
  • #2 开始
    • #2.1 轮询模式
      • # 模拟轮询模式
    • #2.2 广播模式
      • # 模拟广播模式
  • # 报错
    • #1
      • #2
      相关产品与服务
      轻量应用服务器
      轻量应用服务器(TencentCloud Lighthouse)是新一代开箱即用、面向轻量应用场景的云服务器产品,助力中小企业和开发者便捷高效的在云端构建网站、Web应用、小程序/小游戏、游戏服、电商应用、云盘/图床和开发测试环境,相比普通云服务器更加简单易用且更贴近应用,以套餐形式整体售卖云资源并提供高带宽流量包,将热门软件打包实现一键构建应用,提供极简上云体验。
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档