前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Python之Rabbitmq的fanout模式

Python之Rabbitmq的fanout模式

作者头像
Wu_Candy
发布2022-07-04 16:55:25
3660
发布2022-07-04 16:55:25
举报
文章被收录于专栏:无量测试之道

1

什么是fanout模式?


这种模式下,传递到 Exchange 的消息将会转发到所有与其绑定的 Queue 上。

  • 不需要指定 routing_key ,即使指定了也是无效。
  • 需要提前将 Exchange 和 Queue 绑定,一个 Exchange 可以绑定多个 Queue,一个Queue可以绑定多个Exchange。
  • 需要先启动订阅者,此模式下的队列是 Consumer 随机生成的,发布者仅仅发布消息到 Exchange ,由Exchange转发消息至Queue。

2

代码逻辑


producter_fanout.py文件内容如下:

代码语言:javascript
复制
import json
import pika
import datetime

#生成消息入口处
def get_message():
    for i in range(10):
        message=json.dumps({'id': "10000%s" % i, "amount": 100 * i,"name":"tony","createtime":str(datetime.datetime.now())})
        producter_fanout(message)


def producter_fanout(messages):
    # 获取与rabbitmq 服务的连接,虚拟队列需要指定参数 virtual_host,如果是默认的可以不填(默认为/),也可以自己创建一个
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=5672, credentials=pika.PlainCredentials('guest', 'guest')))
    # 创建一个 AMQP 信道(Channel)
    channel = connection.channel()
    # 声明exchange名为tony_test的交换机,如不存在,则创建。type=fanout表示所有消息都可以送达到所有的queue中.durable = True 代表exchange持久化存储
    channel.exchange_declare(exchange='tony_test',exchange_type='fanout',durable=True)
    # 向exchange名为tony_test的交换机, routing_key 不需要配置,body是要处理的消息,delivery_mode = 2 声明消息在队列中持久化,delivery_mod = 1 消息非持久化。
    print(messages)
    channel.basic_publish(exchange ='tony_test', routing_key='', body=messages,properties=pika.BasicProperties(delivery_mode=2))
    # 关闭与rabbitmq的连接
    connection.close()
if __name__=="__main__":
    get_message()

consumer_fanout.py文件内容如下:

代码语言:javascript
复制
import pika
import random
#接收消息,并写入文件
def write_file(message):
    with open("msg00.txt","a+") as f:
        print(message)
        f.write(message)

def consumer_fanout():
    # 获取与rabbitmq 服务的连接
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=5672, credentials=pika.PlainCredentials('guest', 'guest')))
    # 创建一个 AMQP 信道(Channel)
    channel = connection.channel()
    # 声明exchange名为tony_test的交换机,如不存在,则创建。type=fanout表示所有消息都可以送达到所有的queue中.durable = True 代表exchange持久化存储
    channel.exchange_declare(exchange='tony_test', exchange_type='fanout', durable=True)
    # 随机创建一个队列名称
    queuename="tester"+str(random.randrange(10,1000))
    result=channel.queue_declare(queue=queuename)
    # 将exchange 与queue 进行绑定
    channel.queue_bind(exchange='tony_test', queue=queuename)
    # 定义回调处理消息的函数
    def callback(ch, method, properties, body):
        ch.basic_ack(delivery_tag=method.delivery_tag)
        print(body.decode)
        write_file(body.decode())
    #告诉rabbitmq,用callback来接收并处理消息
    channel.basic_consume(result.method.queue,callback,False)
    # 开始接收信息,并进入阻塞状态,队列里有信息才会调用callback进行处理
    channel.start_consuming()

if __name__=="__main__":
    consumer_fanout()

可以将consumer_fanout.py文件复制别名启动多个,然后再将producter_fanout.py里面的for 循环加大,可以看到Consumer在做消息的处理。

3

Rabbitmq界面的Exchange & Queue的展示


总结:fanout这种模式下,发送给Exchange的消息将会转发到所有与其绑定的Queue 上,确实是这样。有些Queue为0表示消息都被处理完成,有些没有处理是因为Queue在创建时成功但处理失败,程序重启后Queue就变了,所以消息会一直在那里存在,除非手动处理一下未处理消息的queue。

友情提示:“无量测试之道”原创著作,欢迎关注交流,禁止第三方不显示文章来源时转载。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-06-22,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 无量测试之道 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档