父进程与子进程间,同一父继承可以用multiprocess的Manager模块来实现数据互访。
作用:RabbitMQ是为了实现相互独立的两个进程数据互访。
应用场景:不需要立即操作的数据。比如:发消息,发通知,发红包等。其它常见场景包括最终一致性、广播、错峰流控等等。
同类产品有:ActiveMQ、RabbitMQ、Kafka 、 ZeroMQ等。
RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现。
reply_to
以及correlation_id
两个属性的信息
basic_consume()consumer_callback:回调函数名
queue:接收的队列名
no_ack:自动消毁队列
exclusive:
consumer_tag:
argument:basic_ack()delivery_tag:默认0
该消息的index
mutiple:是否批量,默认False
更改为True,一次性ack比delivery_tag小的queue当consume的no_ack属性是False时,通知rabbitmq删除queue回调函数参数属性和方法channel包含channel的一切属性和方法methodconsumer_tag:
delivery_tag:
exchange:
redelivered:
routing_key:propertiesbasic_publish通过properties传入的参数bodybasic_publish发送的消息每种方式Queue是必须有的,因为只有Queue才能读取数据。
轮询方式接收:publisher端直接send到Routing Key。RECV端直接从queue读取。
fanout:publisher端Send到Exchange。Consumer端声明Queue,绑定到Exchange,从Exchange读取。
direct:publisher端send到Exchange,并进行通过Routing_key匹配关键字。Consumer端声明Queue,绑定到Exchange,读取时写上完整的关键字,只读取关键字匹配内容。
topic:和direct模式一样,不过Routing_key支持模糊匹配。
# 发送端
import pika
credential = pika.PlainCredentials('host_admin','111111') # 设置连接到Vhost的用户名和密码
connect = pika.BlockingConnection(pika.ConnectionParameters(
'192.168.1.104',
5672, # web管理端口是15672,程序存取消息端口是5672
'host1', # vhost名字。
credential))
channel = connect.channel()
channel.queue_declare(queue = "say", durable = True) # durable消息持久化。
channel.basic_publish(exchange = '',
routing_key = 'say',
body = 'D1',
properties=pika.BasicProperties(delivery_mode=2) # delivery_mode=2 消息持久化,但是阅后即消。
)
# 发送第第二条数据,只是为了更明显的看出来试验结果。
channel.basic_publish(exchange = '',
routing_key = 'say',
body = 'D2',
properties=pika.BasicProperties(delivery_mode=2)
)
channel.close()
# 接收端
credential = pika.PlainCredentials('host_admin','111111')
connect = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.104',
5672,
'host1',
credential))
def callback(ch,method,properties,body):
print(method.queue)
channel = connect.channel()
channel.queue_declare(queue = "say",durable = True)
channel.basic_comsume(callback,
queue='say',
no_ack=False)
channel.start_consume()
忽略routing_key,客户端接收时,只需绑定exchange就可以,所有客户端同时接到信息。
import pika
# 发送端
connect = pika.BlockingConnections(pika.ConnectionParameters('localhost')) # 本地方式连接,不用验证
channel = connect.channel()
channel.exchange_declare(exchange='fan_test',exchange_type='fanout')
channel.basick_publish(exchange='fantest',
routing_key='',
body='this is test!'
)
channel.close()
# 接收端
connect = pika.BlockingConnections(pika.ConnectionParameters('localhost'))
channel = connect.channel()
result=channel.queue_declare(exclusive=True) # 随机生成一个queue名字
q_name=result.method.queue # 获取随机生成的queue名字
channel.queue_bind(queue=q_name, # 把queue名字绑定到exchange
exchange='fantest')
def callback(ch,method,properties,body):
print(body)
channel.basic_consume(callback,
queue=q_name,
no_ack=True)
channel.start_consume()
经过routing,只发送给完全匹配routing_key和queue
# 发送端
import pika
connect = pika.BlockingConnections(pika.ConnectionParameters('localhost'))
channel = connect.channel()
channel.exchange_declare(exchange='ex_test',exchange_type='direct')
channel.basic_publish(exchange='ex_test',
routing_key='R1',
queue='',
body='this is test of the direct to R1!')
channel.basic_publish(exchange='ex_test',
routing_key='R2', # 更改了routing_key,客户端routing_key,R1接收不到
queue='',
body='this is a test of direct to R2!')
channel.close()
# 接收端
connect = pika.BlockingConnection(pika.Connectionparameters('localhost'))
channel = connect.channel()
result = channel.queue_declare(exclusive = True)
q_name = result.method.queue
channel.queue_bind(queue=q_name,
exchange='ex_test')
def callback(ch,method,properties,body):
print(method.routing_key,body)
channel.basic_consume(callback,
queue=q_name,
routing_key='R1')
channel.start_consume()
# 发送端
import pika
routing_key = ["a.a","b.a","c.b.a","d.c","d.c.b"]
connect = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connect.channel()
channel.exchange_declare(exchange='ex_topic',exchange_type='topic')
for i in routing_key:
message = "routingkey:{}".format(i)
channel.basic_publish(exchange="ex_topic",
routing_key = i,
body = message)
channel.close()
# 接收端:
import pika
routing = ["*.a","#.a"]
connect = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connect.channel()
def callback(ch,method,propreties,body):
print(method.queue,body)
channel.exchange_declare(exchange="ex_topic",exchange_type="topic")
result = channel.queue_declare(exclusive= True)
q_name = result.method.queue
for i in routing:
channel.queue_bind(exchange="ex_topic",routing_key=i,queue=q_name)
channel.basic_consume(callcack,
queue=q_name,
no_ack=True)
channel.start_consuming()