1
什么是fanout模式?
这种模式下,传递到 Exchange 的消息将会转发到所有与其绑定的 Queue 上。
2
代码逻辑
producter_fanout.py文件内容如下:
import json
import pika
import datetime
#生成消息入口处
def get_message():
for i in range(10):
message=json.dumps({'id': "10000%s" % i, "amount": 100 * i,"name":"tony","createtime":str(datetime.datetime.now())})
producter_fanout(message)
def producter_fanout(messages):
# 获取与rabbitmq 服务的连接,虚拟队列需要指定参数 virtual_host,如果是默认的可以不填(默认为/),也可以自己创建一个
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=5672, credentials=pika.PlainCredentials('guest', 'guest')))
# 创建一个 AMQP 信道(Channel)
channel = connection.channel()
# 声明exchange名为tony_test的交换机,如不存在,则创建。type=fanout表示所有消息都可以送达到所有的queue中.durable = True 代表exchange持久化存储
channel.exchange_declare(exchange='tony_test',exchange_type='fanout',durable=True)
# 向exchange名为tony_test的交换机, routing_key 不需要配置,body是要处理的消息,delivery_mode = 2 声明消息在队列中持久化,delivery_mod = 1 消息非持久化。
print(messages)
channel.basic_publish(exchange ='tony_test', routing_key='', body=messages,properties=pika.BasicProperties(delivery_mode=2))
# 关闭与rabbitmq的连接
connection.close()
if __name__=="__main__":
get_message()
consumer_fanout.py文件内容如下:
import pika
import random
#接收消息,并写入文件
def write_file(message):
with open("msg00.txt","a+") as f:
print(message)
f.write(message)
def consumer_fanout():
# 获取与rabbitmq 服务的连接
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=5672, credentials=pika.PlainCredentials('guest', 'guest')))
# 创建一个 AMQP 信道(Channel)
channel = connection.channel()
# 声明exchange名为tony_test的交换机,如不存在,则创建。type=fanout表示所有消息都可以送达到所有的queue中.durable = True 代表exchange持久化存储
channel.exchange_declare(exchange='tony_test', exchange_type='fanout', durable=True)
# 随机创建一个队列名称
queuename="tester"+str(random.randrange(10,1000))
result=channel.queue_declare(queue=queuename)
# 将exchange 与queue 进行绑定
channel.queue_bind(exchange='tony_test', queue=queuename)
# 定义回调处理消息的函数
def callback(ch, method, properties, body):
ch.basic_ack(delivery_tag=method.delivery_tag)
print(body.decode)
write_file(body.decode())
#告诉rabbitmq,用callback来接收并处理消息
channel.basic_consume(result.method.queue,callback,False)
# 开始接收信息,并进入阻塞状态,队列里有信息才会调用callback进行处理
channel.start_consuming()
if __name__=="__main__":
consumer_fanout()
可以将consumer_fanout.py文件复制别名启动多个,然后再将producter_fanout.py里面的for 循环加大,可以看到Consumer在做消息的处理。
3
Rabbitmq界面的Exchange & Queue的展示
总结:fanout这种模式下,发送给Exchange的消息将会转发到所有与其绑定的Queue 上,确实是这样。有些Queue为0表示消息都被处理完成,有些没有处理是因为Queue在创建时成功但处理失败,程序重启后Queue就变了,所以消息会一直在那里存在,除非手动处理一下未处理消息的queue。
友情提示:“无量测试之道”原创著作,欢迎关注交流,禁止第三方不显示文章来源时转载。