RabbitMQ是实现AMQP(高级消息队列协议)的消息中间件的一种,可用于在分布式系统中存储转发消息,主要有以下的技术亮点:
RabbitMQ主要用于系统间的双向解耦,当生产者(productor)产生大量的数据时,消费者(consumer)无法快速的消费信息,那么就需要一个类似于中间件的代理服务器,用来处理和保存这些数据,RabbitMQ就扮演了这个角色。
用来处理数据的消息队列服务器实体
由RabbitMQ服务器创建的虚拟消息主机,拥有自己的权限机制,一个broker里可以开设多个vhost,用于不同用户的权限隔离,vhost之间是也完全隔离的。
产生用于消息通信的数据
消息通道,在AMQP中可以建立多个channel,每个channel代表一个会话任务。
(1)接受消息,转发消息到绑定的队列,总共有四种类型的交换器:direct,fanout,topic,headers。
(2).交换器在RabbitMQ中是一个存在的实体,不能改变,如有需要只能删除重建。
(3).topic类型的交换器利用匹配规则分析消息的routing-key属性。
(4).属性
(1).队列是RabbitMQ的内部对象,存储消息
(2).可以动态的增加消费者,队列将接受到的消息以轮询(round-robin)的方式均匀的分配给多个消费者
(3).队列的属性
channel.queue_publish(exchange=exchange_name,
routing-key="rabbitmq",
body="openstack")
channel.queue_bind(exchange=exchange_name,
queue=queue_name,
routing-key="rabbitmq")
表示交换机和队列之间的关系,在进行绑定时,带有一个额外的参数binding-key,来和routing-key相匹配。
监听消息队列来进行消息数据的读取
(1).在consumer处理完消息后,会发送消息ACK,通知通知RabbitMQ消息已被处理,可以从内存删除。如果消费者因宕机或链接失败等原因没有发送ACK,则RabbitMQ会将消息重新发送给其他监听在队列的下一个消费者。
channel.basicConsume(queuename, noAck=false, consumer);
(2).消息和队列的持久化
(3).镜像队列,实现不同节点之间的元数据和消息同步
基于RabbitMQ的RPC消息通信是neutron中跨模块进行方法调用的很重要的一种方式,根据上面的描述,要组成一个完整的RPC通信结构,需要信息的生产者和消费者。
在dhcp_agent、l3_agent、metadata_agent,metering_agent的main函数中都存在一段创建一个rpc服务端的代码,下面以dhcp_agent为例。
def main():
register_options(cfg.CONF)
common_config.init(sys.argv[1:])
config.setup_logging()
server = neutron_service.Service.create(
binary='neutron-dhcp-agent',
topic=topics.DHCP_AGENT,
report_interval=cfg.CONF.AGENT.report_interval,
manager='neutron.agent.dhcp.agent.DhcpAgentWithStateReport')
service.launch(cfg.CONF, server).wait()
最核心的,也是跟rpc相关的部分包括两部分,首先是创建rpc服务端。
server = neutron_service.Service.create(
binary='neutron-dhcp-agent',
topic=topics.DHCP_AGENT,
report_interval=cfg.CONF.AGENT.report_interval,
manager='neutron.agent.dhcp.agent.DhcpAgentWithStateReport')
该代码实际上创建了一个rpc服务端,监听指定的topic并运行manager上的tasks。
create()方法返回一个neutron.service.Service对象,neutron.service.Service继承自neutron.common.rpc.Service类。
首先看neutron.common.rpc.Service类,该类定义了start方法,该方法主要完成两件事情:一件事情是将manager添加到endpoints中;一件是创建rpc的consumer,分别监听topic的队列消息。
而在neutron.service.Service类中,初始化中生成了一个manager实例(即neutron.agent.dhcp_agent.DhcpAgentWithStateReport);并为start方法添加了周期性执行report_state方法和periodic_tasks方法。report_state方法没有具体实现,periodic_tasks方法则调用manager的periodic_tasks方法。
manager实例(即neutron.agent.dhcp_agent.DhcpAgentWithStateReport)在初始化的时候首先创建一个rpc的client端,通过代码
self.state_rpc = agent_rpc.PluginReportStateAPI(topics.PLUGIN)
该client端实际上定义了report_state方法,可以状态以rpc消息的方式发送给plugin。
manager在初始化后,还会指定周期性运行_report_state方法,实际上就是调用client端的report_state方法。
至此,对rpc服务端的创建算是完成了,之后执行代码。
service.launch(server).wait()
service.launch(server)方法首先会将server放到协程组中,并调用server的start方法来启动server。
主要对ML2Plugin进行分析,包括两个类:RpcCallbacks和AgentNotifierApi。
def start_rpc_listeners(self):
"""RpcCallbacks中实现的方法:Start the RPC loop to let the plugin communicate with agents."""
self._setup_rpc()
self.topic = topics.PLUGIN
self.conn = n_rpc.create_connection(new=True)
self.conn.create_consumer(self.topic, self.endpoints, fanout=False)
return self.conn.consume_in_threads()
创建一个通知rpc的客户端,用于向OVS的agent发出通知。所有plugin都需要有这样一个发出通知消息的客户端,创建了一个OVS agent的通知rpc客户端。之后,创建两个跟service agent相关的consumer,分别监听topics.PLUGIN
ovs_neutron_agent也会创建RPC的consumer,用来监听topics.UPDATE、topics.DELETE等操作。
def setup_rpc(self):
self.agent_id = 'ovs-agent-%s' % self.conf.host
self.topic = topics.AGENT
self.plugin_rpc = OVSPluginApi(topics.PLUGIN)
self.sg_plugin_rpc = sg_rpc.SecurityGroupServerRpcApi(topics.PLUGIN)
self.dvr_plugin_rpc = dvr_rpc.DVRServerRpcApi(topics.PLUGIN)
self.state_rpc = agent_rpc.PluginReportStateAPI(topics.PLUGIN)
# RPC network init
self.context = context.get_admin_context_without_session()
# Handle updates from service
self.endpoints = [self]
# Define the listening consumers for the agent
consumers = [[topics.PORT, topics.UPDATE],
[topics.PORT, topics.DELETE],
[constants.TUNNEL, topics.UPDATE],
[constants.TUNNEL, topics.DELETE],
[topics.SECURITY_GROUP, topics.UPDATE],
[topics.DVR, topics.UPDATE],
[topics.NETWORK, topics.UPDATE]]
这个rpc服务端主要通过neutron.server中主函数中代码执行
neutron_rpc = service.serve_rpc()
方法的实现代码(目录:neutron/neutron/service.py)如下
def serve_rpc():
plugin = manager.NeutronManager.get_plugin()
service_plugins = (
manager.NeutronManager.get_service_plugins().values())
if cfg.CONF.rpc_workers < 1:
cfg.CONF.set_override('rpc_workers', 1)
if not plugin.rpc_workers_supported():
LOG.debug("Active plugin doesn't implement start_rpc_listeners")
if 0 < cfg.CONF.rpc_workers:
LOG.error(_LE("'rpc_workers = %d' ignored because "
"start_rpc_listeners is not implemented."),
cfg.CONF.rpc_workers)
raise NotImplementedError()
try:
rpc = RpcWorker(service_plugins)
LOG.debug('using launcher for rpc, workers=%s', cfg.CONF.rpc_workers)
session.dispose()
launcher = common_service.ProcessLauncher(cfg.CONF, wait_interval=1.0)
launcher.launch_service(rpc, workers=cfg.CONF.rpc_workers)
if (cfg.CONF.rpc_state_report_workers > 0 and
plugin.rpc_state_report_workers_supported()):
rpc_state_rep = RpcReportsWorker([plugin])
LOG.debug('using launcher for state reports rpc, workers=%s',
cfg.CONF.rpc_state_report_workers)
launcher.launch_service(
rpc_state_rep, workers=cfg.CONF.rpc_state_report_workers)
return launcher
except Exception:
with excutils.save_and_reraise_exception():
LOG.exception(_LE('Unrecoverable error: please check log for '
'details.'))
其中,RpcWorker(plugin)主要通过调用plugin的方法来创建rpc服务端,最重要的工作是调用plugin的start_rpc_listeners来监听消息队列:
self._servers = self._plugin.start_rpc_listeners()
该方法在大多数plugin中并未被实现,目前ml2支持该方法。
在neutron.plugin.ml2.plugin.ML2Plugin类中,该方法创建了一个topic为topics.PLUGIN的消费rpc。
def start_rpc_listeners(self):
self.endpoints = [rpc.RpcCallbacks(self.notifier, self.type_manager),
agents_db.AgentExtRpcCallback()]
self.topic = topics.PLUGIN
self.conn = n_rpc.create_connection(new=True)
self.conn.create_consumer(self.topic, self.endpoints,
fanout=False)
return self.conn.consume_in_threads()
在Openstack中,每一个Nova服务初始化时会创建两个队列,一个名为“NODE-TYPE.NODE-ID”,另一个名为“NODE-TYPE”,NODE-TYPE是指服务的类型,NODE-ID指节点名称。
以nova启动虚拟机的过程为例,详细介绍RPC通信过程。
RPC.CAST缺少了系统消息响应流程。一个Topic消息生产者发送系统请求消息到Topic交换器,Topic交换器根据消息的Routing Key将消息转发至共享消息队列,与共享消息队列相连的所有Topic消费者接收该系统请求消息,并把它传递给响应的Worker进行处理,其调用流程如图所示:
source: //chyufly.github.io/blog/2016/04/13/rabbitmq-introduction