前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【Python模块】rabbitMQ

【Python模块】rabbitMQ

作者头像
py3study
发布2020-01-07 16:32:06
8010
发布2020-01-07 16:32:06
举报
文章被收录于专栏:python3python3
  • RabbitMQ介绍:

父进程与子进程间,同一父继承可以用multiprocess的Manager模块来实现数据互访。

作用:RabbitMQ是为了实现相互独立的两个进程数据互访。

应用场景:不需要立即操作的数据。比如:发消息,发通知,发红包等。其它常见场景包括最终一致性、广播、错峰流控等等。

同类产品有:ActiveMQ、RabbitMQ、Kafka 、 ZeroMQ等。

  • RabbitMQ特点:

        RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现。

  • AMQP :Advanced Message Queue,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。
  • RabbitMQ 最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。具体特点包括:
  • 可靠性(Reliability) RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认。
  • 灵活的路由(Flexible Routing) 在消息进入队列之前,通过 Exchange 来路由消息的。对于典型的路由功能,RabbitMQ 已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,可以将多个 Exchange 绑定在一起,也通过插件机制实现自己的 Exchange 。
  • 消息集群(Clustering) 多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker 。
  • 高可用(Highly Available Queues) 队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。
  • 多种协议(Multi-protocol) RabbitMQ 支持多种消息队列协议,比如 STOMP、MQTT 等等。
  • 多语言客户端(Many Clients) RabbitMQ 几乎支持所有常用语言,比如 Java、.NET、Ruby 等等。
  • 管理界面(Management UI) RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker 的许多方面。
  • 跟踪机制(Tracing) 如果消息异常,RabbitMQ 提供了消息跟踪机制,使用者可以找出发生了什么。
  • 插件机制(Plugin System) RabbitMQ 提供了许多插件,来从多方面进行扩展,也可以编写自己的插件。
  • RabbitMQ 中的概念模型
  • 消息模型: 所有 MQ 产品从模型抽象上来说都是一样的过程: 消费者(consumer)订阅某个队列。生产者(producer)创建消息,然后发布到队列(queue)中,最后将消息发送到监听的消费者。
  • RabbitMQ 基本概念  上面只是最简单抽象的描述,具体到 RabbitMQ 则有更详细的概念需要解释。上面介绍过 RabbitMQ 是 AMQP 协议的一个开源实现,所以其内部实际上也是 AMQP 中的基本概念:
  1. Message 消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。
  2. Publisher 消息的生产者,也是一个向交换器发布消息的客户端应用程序。
  3. Exchange 交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
  4. Binding 绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。
  5. Queue 消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。
  6. Connection 网络连接,比如一个TCP连接。
  7. Channel 信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。
  8. Consumer 消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。
  9. Virtual Host 虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。
  10. Broker 表示消息队列服务器实体。
  • AMQP 中的消息路由
  1. AMQP 中消息的路由过程和 Java 开发者熟悉的 JMS 存在一些差别,AMQP 中增加了 Exchange 和 Binding 的角色。生产者把消息发布到 Exchange 上,消息最终到达队列并被消费者接收,而 Binding 决定交换器的消息应该发送到那个队列。
5015984-7fd73af768f28704.png
5015984-7fd73af768f28704.png
  • Exchange 类型 Exchange分发消息时根据类型的不同分发策略有区别,目前共四种类型:direct、fanout、topic、headers 。headers 匹配 AMQP 消息的 header 而不是路由键,此外 headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了,所以直接看另外三种类型:
  1. direct 消息中的路由键(routing key)如果和 Binding 中的 binding key 一致, 交换器就将消息发到对应的队列中。路由键与队列名完全匹配,如果一个队列绑定到交换机要求路由键为“dog”,则只转发 routing key 标记为“dog”的消息,不会转发“dog.puppy”,也不会转发“dog.guard”等等。它是完全匹配、单播的模式。
direct.png
direct.png
  1. fanout 每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去。fanout 交换器不处理路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。fanout 类型转发消息是最快的。
fanout.png
fanout.png
  1. topic topic 交换器通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。它同样也会识别两个通配符:符号“#”和符号“*”。#匹配0个或多个单词,*只匹配一个单词。 #.a 可以匹配b.c.d.e.a      , *.a只以可匹配b.a或c.a或e.a,而不能匹配a.b.a
topic.png
topic.png
  • RabbitMQ安装:
  1.  Windows7:
    1. 安装erlang(32bit or 64bit)。
    2. 设置erlang系统环境变量:添加系统变量ERLANG_HOME,路径:erlang安装路径。追加path -->路径%ERLANG_HOME%\bin(注:eshell版本:V10.0.1)
    3. 安装rabbitMQ。
    4. 设置rabbitMQ系统环境变量:添加系统变量RABBITMQ_HOME,路径:erlang安装路径。追加path -->路径%RABBITMQ_HOME%\sbin
    5. 开启rabbitmq服务:在cmd下运行,net start rabbitMQ
    6. 检查rabbitMQ是否安装成功:cmd下运行 rabbitctl list_users,没有报错,并显示用户名,成功
    7. 添加rabbitMQ的WEB管理插件:cmd下运行 rabbitmq-plusins enable rabbitmq_management
    8. 浏览器里输入:http://localhost:15672,访问WEB管理。 注:设置系统环境变量的目的在于,在任何目录下,都可以运行bat或exe文件。否则需要进入程序相应目录。
  2. LINUX(在线安装): 当前linux版本--Centos 7,erlang版本19.0,rabbitmq版本3.6.6,(erlang与rabbitmq版本对应表:http://www.rabbitmq.com/which-erlang.html):
    1. 安装前需要安装gcc
    2. 安装前需要安装perl
    3. 安装wget:rpm -y install wget (wget是执行网络下载)
    4. 下载erlang: wget 地址:http://www.rabbitmq.com/releases/erlang/erlang-19.0.4-1.el7.centos.x86_64.rpm
    5. 安装erlang。rpm -ivh 包名
    6. 安装socat包,yum -y install socat
    7. 下载并安装rabbitmq。地址:http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.6/rabbitmq-server-3.6.6-1.el6.noarch.rpm
    8. 安装rabbitmq。rpm -ivh 包名
    9. 设置环境变量,并指向rabbitmq的bin目录,可以不需要进入程序目录,并直接运行程序。 vi ~/.user_profile  ,添加 export PATH=$PATH:/usr/lib/rabbitmq/bin
    10. 启动rabbitmq服务。service rabbitmq-server start
    11. 使用教程:https://blog.csdn.net/qq_22075041/article/details/78855708
  • RabbitMQ简单配置:
    1. 查看用户: 默认只有guest用户,一般供本地测试用。把guest从配置文件中的红色列表中删除[{rabbit, [{loopback_users, []}]}]可改变guest为外部用户。
    2. 查看当前rabbitmq用户: rabbitmqctl list_users
    3. 添加用户: rabbitmqctl add_user david david1
    4. 添加用户标识: rabbitmqctl set_user_tags david administrator
    5. 添加用户权限: rabbitmqctl set_permissions -p / david ".*" ".*" ".*" / 是默认的vhost名,权限<conf> <write> <read> 用正则表达式来匹配特定的资源,如'^(amq\.gen.*|amq\.default)$'可以匹配server生成的和默认的exchange,'^$'不匹配任何资源
    6. 添加一个新的vhost rabbitmqctl add_vhost host1 外部连接都是需要指定一个vhost,所以要访问需要按上面步骤,给一个用户配置权限
    7. 重启rabbitmq服务 service rabbitmq-server restart
    8. 查看服务状态 service rabbitmq-server status  python中RabbitMQ相关的方法。  对于每个Channel来说,每个消息都会有一个DeliveryTag,一般用接收消息的顺序来表示:1,2,3,4 等等。 方法或属性参数作用示例pika.PlianCredentials(name,pw) name:rabbitmq的用户名 pw:rabbitmq的密码 创建rabbitmq登陆凭证 credential=pika.PlianCredentials('david','123456') pika.ConnectionParameters(host,port,virtual_host,credential) host:rabbitmq服务器地址 prot:端口,5672 virtual_host:指定虚拟host credential:登陆rabbitmq凭证 连接到rabbitmq时的参数设置 如果是本地的rabbitmq用下面语句: pika.ConnectionParameters('localhost') parameter=pika.ConnectionParameters('192.168.0.4',5672,'/',credential) rabbitmq是本地只写地址: parameter=pika.ConnnectionParameters('localhost') pika.BlockingConnection(parameters) parameter: 阻塞式连接到rabbitmq connect=pika.BlockingConnection(parameter) pika.channel() 建立通道 channel = connect.channel() delivery_tag:该消息的index requeue:默认True 更改为False,不把拒收的消息重新放入queue单条消息拒收basic_nack()delivery_tag:该消息的index mutiple:是否批量,默认False 更改为True,一次性ack比delivery_tag小的queue requeue:默认True 更改为False,不把拒收消息重新放入队列消息拒收,可以多条basic_qos()prefetch_size: prefetch_count:默认0 值越高,处理数据条数越大,适合高性能电脑 all_channels:改变均衡处理信息的条数。queue_declare(queue,passive,durable,exclusive,auto_delete,arguments)queue:队列名 passive: durable:True、开启持久化 exclusive:True、开启生成随机队列名 auto_delete: argument:声明一个队列。如果已存在不同属性的队列报错。普通声明: queue_declare(queue='test') 带持久化: queue_declare(queue='test',durable=True) 自动生成queue名: queue_declare(exclusive=True)exchange_declare()exchange:名字 exchange_type:默认direct、 可选类型:topic\fanout\direct\ passive: druable: True、持久化 auto_delete: internal: arguments:声明一个exchange,如果存在不同属性的exchange会报错默认direct类型: exchange_declare(exchange='test1') 持久化: exchange_declare(exchange='test2',durable=True)queue_delete()queue:队列名 if_unused: if_empty:删除已声明的queue queue_delete(queue='test')delete_exchange()exchange: if_unused:删除已声明的exchange queue_delete(exchange='test1')method.queue获取随机生成的queue名字声明随机queue: result = channel.queue_declare(exclusive = True) 获取queue name: q_name = result.method.queuequeue_bind()queue:队列名 exchange:要绑定的exchange routing_key:要绑定的routing_key arguments:把声明的随机queue绑定到指定的exchange或者routing_key上把上面的随机Q绑定到exchange和使用routing_key过滤: channel.queue_bind(queue=q_name,exchange='test1',routing_key='a')basic_publish() exchange:目标exchange routing_key:目标routing_key body:消息正文 properties:发送的消息属性 mandatory:默认False 更改为True,服务器没有对应的queue,那么会调用basic.return方法将消息返还给生产者。 immediate:默认False 更改为True,如果exchange在将消息route到queue(s)时发现对应的queue上没有消费者,那么这条消息不会放入队列中。当与消息routeKey关联的所有queue(一个或多个)都没有消费者时,该消息会通过basic.return方法返还给生产者。 把信息发送到rabbitmqchannel=basic_publish(exchange='test1',routing_key = 'a',body='hello word!') basic_publish的properties: pika.BasicProperties() content_type: content_encoding: headers: delivery_mode:声明信息持久化 priority: correlation_id:指定为回调函数参数props中correlation_id reply_to:回调队列名 expiration: meddage_id: timestamp: type: user_id: app_id: cluster_id: 指定发送的信息属性 1、使信息持久化,需要声明queue持久化和delivery_mode=2信息持久化 2、要实现回调,客户端至少发送带有reply_to以及correlation_id两个属性的信息 basic_consume()consumer_callback:回调函数名 queue:接收的队列名 no_ack:自动消毁队列 exclusive: consumer_tag: argument:basic_ack()delivery_tag:默认0 该消息的index mutiple:是否批量,默认False 更改为True,一次性ack比delivery_tag小的queue当consume的no_ack属性是False时,通知rabbitmq删除queue回调函数参数属性和方法channel包含channel的一切属性和方法methodconsumer_tag: delivery_tag: exchange: redelivered: routing_key:propertiesbasic_publish通过properties传入的参数bodybasic_publish发送的消息

每种方式Queue是必须有的,因为只有Queue才能读取数据。

轮询方式接收:publisher端直接send到Routing Key。RECV端直接从queue读取。

fanout:publisher端Send到Exchange。Consumer端声明Queue,绑定到Exchange,从Exchange读取。

direct:publisher端send到Exchange,并进行通过Routing_key匹配关键字。Consumer端声明Queue,绑定到Exchange,读取时写上完整的关键字,只读取关键字匹配内容。

topic:和direct模式一样,不过Routing_key支持模糊匹配。

  • Python中的RabbitMQ实例:
    • 默认轮询方式:
    • 生产者把生产的消息放入queue,多个消费者依次取出不同的消息。Q1,Q2两个消费者,生产者放入信息D1,D2时,Q1只能接收到D1,Q2只能接收到D2.
代码语言:javascript
复制
# 发送端
import pika

credential = pika.PlainCredentials('host_admin','111111')        # 设置连接到Vhost的用户名和密码
connect = pika.BlockingConnection(pika.ConnectionParameters(
                                                            '192.168.1.104',
                                                            5672,                    # web管理端口是15672,程序存取消息端口是5672
                                                            'host1',                 # vhost名字。
                                                            credential))
                                                            
channel = connect.channel()
channel.queue_declare(queue = "say", durable = True)                                 # durable消息持久化。

channel.basic_publish(exchange = '',
                      routing_key = 'say',
                      body = 'D1',
                      properties=pika.BasicProperties(delivery_mode=2)               # delivery_mode=2 消息持久化,但是阅后即消。
                      )
# 发送第第二条数据,只是为了更明显的看出来试验结果。
channel.basic_publish(exchange = '',
                      routing_key = 'say',
                      body = 'D2',
                      properties=pika.BasicProperties(delivery_mode=2)              
                      )
channel.close()


# 接收端
credential = pika.PlainCredentials('host_admin','111111')
connect = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.104',
                                                            5672,
                                                            'host1',
                                                            credential))
def callback(ch,method,properties,body):
    print(method.queue)
                                                          
channel = connect.channel()
channel.queue_declare(queue = "say",durable = True)
channel.basic_comsume(callback,
                      queue='say',
                      no_ack=False)
channel.start_consume()
  •  fanout 广播方式

        忽略routing_key,客户端接收时,只需绑定exchange就可以,所有客户端同时接到信息。

代码语言:javascript
复制
import pika
# 发送端
connect = pika.BlockingConnections(pika.ConnectionParameters('localhost'))         # 本地方式连接,不用验证
channel = connect.channel()

channel.exchange_declare(exchange='fan_test',exchange_type='fanout')                
channel.basick_publish(exchange='fantest',
                       routing_key='',
                       body='this is test!'
                       )
channel.close()       
                       

# 接收端
connect = pika.BlockingConnections(pika.ConnectionParameters('localhost'))
channel = connect.channel()
result=channel.queue_declare(exclusive=True)    # 随机生成一个queue名字
q_name=result.method.queue                      # 获取随机生成的queue名字
channel.queue_bind(queue=q_name,                # 把queue名字绑定到exchange
                   exchange='fantest')

def callback(ch,method,properties,body):
    print(body)

channel.basic_consume(callback,
                      queue=q_name,
                      no_ack=True)
                 
channel.start_consume()
  •  direct方式

        经过routing,只发送给完全匹配routing_key和queue

代码语言:javascript
复制
# 发送端
import pika

connect = pika.BlockingConnections(pika.ConnectionParameters('localhost'))

channel = connect.channel()

channel.exchange_declare(exchange='ex_test',exchange_type='direct')

channel.basic_publish(exchange='ex_test',
                      routing_key='R1',
                      queue='',
                      body='this is test of the direct to R1!')
                      
channel.basic_publish(exchange='ex_test',
                      routing_key='R2',                                # 更改了routing_key,客户端routing_key,R1接收不到
                      queue='',
                      body='this is a test of direct to R2!')

channel.close()


# 接收端

connect = pika.BlockingConnection(pika.Connectionparameters('localhost'))

channel = connect.channel()

result = channel.queue_declare(exclusive = True)
q_name = result.method.queue

channel.queue_bind(queue=q_name,
                   exchange='ex_test')

                   
def callback(ch,method,properties,body):
    print(method.routing_key,body)
                   
channel.basic_consume(callback,
                      queue=q_name,
                      routing_key='R1')

channel.start_consume()
  • topic方式: 可以匹配不同名字的routing_key
代码语言:javascript
复制
# 发送端
import pika

routing_key = ["a.a","b.a","c.b.a","d.c","d.c.b"]

connect = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connect.channel()

channel.exchange_declare(exchange='ex_topic',exchange_type='topic')

for i in routing_key:
    message = "routingkey:{}".format(i)
    channel.basic_publish(exchange="ex_topic",
                          routing_key = i,
                          body = message)

channel.close()



# 接收端:

import pika

routing = ["*.a","#.a"]

connect = pika.BlockingConnection(pika.ConnectionParameters("localhost"))

channel = connect.channel()

def callback(ch,method,propreties,body):
    print(method.queue,body)

channel.exchange_declare(exchange="ex_topic",exchange_type="topic")
result = channel.queue_declare(exclusive= True)
q_name = result.method.queue

for i in routing:
    channel.queue_bind(exchange="ex_topic",routing_key=i,queue=q_name)

channel.basic_consume(callcack,
                      queue=q_name,
                      no_ack=True)

channel.start_consuming()

参考文章:https://www.jianshu.com/p/79ca08116d57

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019-09-15 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
消息队列 CMQ 版
消息队列 CMQ 版(TDMQ for CMQ,简称 TDMQ CMQ 版)是一款分布式高可用的消息队列服务,它能够提供可靠的,基于消息的异步通信机制,能够将分布式部署的不同应用(或同一应用的不同组件)中的信息传递,存储在可靠有效的 CMQ 队列中,防止消息丢失。TDMQ CMQ 版支持多进程同时读写,收发互不干扰,无需各应用或组件始终处于运行状态。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档