前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >利用RabbitMQ实现RPC(pyth

利用RabbitMQ实现RPC(pyth

作者头像
py3study
发布2020-01-06 10:38:57
7910
发布2020-01-06 10:38:57
举报
文章被收录于专栏:python3

    RPC——远程过程调用,通过网络调用运行在另一台计算机上的程序的函数\方法,是构建分布式程序的一种方式。RabbitMQ是一个消息队列系统,可以在程序之间收发消息。利用RabbitMQ可以实现RPC。本文所有操作都是在CentOS7.3上进行的,示例代码语言为Python。

RabbiMQ以及pika模块安装

yum install rabbitmq-server python-pika -ysystemctl    start rabbitmq-server

RPC的基本实现

RPC的服务端代码如下:

#!/usr/bin/env   pythonimport pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))channel = connection.channel()channel.queue_declare(queue='rpc_queue') def fun(n):    return 2*n def on_request(ch, method, props, body):    n = int(body)    response = fun(n)    ch.basic_publish(exchange='',        routing_key=props.reply_to,        properties=pika.BasicProperties(correlation_id = props.correlation_id),        body=str(response))    ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1)channel.basic_consume(on_request, queue='rpc_queue')print(" [x] Awaiting RPC requests")channel.start_consuming()

以上代码中,首先与RabbitMQ服务建立连接,然后定义了一个函数fun(),fun()功能很简单,输入一个数然后返回该数的两倍,这个函数就是我们要远程调用的函数。on_request()是一个回调函数,它作为参数传递给了basic_consume(),当basic_consume()在队列中消费1条消息时,on_request()就会被调用,on_request()从消息内容body中获取数字,并传给fun()进行计算,并将返回值作为消息内容发给调用方指定的接收队列,队列名称保存在变量props.reply_to中。

RPC的客户端代码如下:

#!/usr/bin/env   pythonimport pikaimport uuid class RpcClient(object):    def __init__(self):        self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))         self.channel = self.connection.channel()         result = self.channel.queue_declare(exclusive=True)        self.callback_queue = result.method.queue         self.channel.basic_consume(self.on_response, no_ack=True,                                   queue=self.callback_queue)     def on_response(self, ch, method, props, body):        if self.corr_id == props.correlation_id:            self.response = body     def call(self,n):        self.response = None        self.corr_id = str(uuid.uuid4())        self.channel.basic_publish(exchange='',                                     routing_key='rpc_queue',                                   properties=pika.BasicProperties(                                           reply_to = self.callback_queue,                                           correlation_id = self.corr_id,                                         ),                                   body=str(n))        while self.response is None:            self.connection.process_data_events()        return str(self.response) rpc = RpcClient() print(" [x] Requesting")response = rpc.call(2)print(" [.] Got %r" % response)

代码开始也是连接RabbitMQ,然后开始消费消息队列callback_queue中的消息,该队列的名字通过Request的属性reply_to传递给服务端,就是在上面介绍服务端代码时提到过的props.reply_to,作用是告诉服务端把结果发到这个队列。 basic_consume()的回调函数变成了on_response(),这个函数从callback_queue的消息内容中获取返回结果。

函数call实际发起请求,把数字n发给服务端程序,当response不为空时,返回response值。

下面看运行效果,先启动服务端:

image.png
image.png

在另一个窗口中运行客户端:

image.png
image.png

成功调用了服务端的fun()并得到了正确结果(fun(2)结果为4)。

总结:RPC的实现过程可以用下图来表示(图片来自RabbitMQ官网):

image.png
image.png

当客户端启动时,它将创建一个callback queue用于接收服务端的返回消息Reply,名称由RabbitMQ自动生成,如上图中的amq.gen-Xa2..。同一个客户端可能会发出多个Request,这些Request的Reply都由callback queue接收,为了互相区分,就引入了correlation_id属性,每个请求的correlation_id值唯一。这样,客户端发起的Request就带由2个关键属性:reply_to告诉服务端向哪个队列返回结果;correlation_id用来区分是哪个Request的返回。

稍微复杂点的RPC

如果服务端定义了多个函数供远程调用怎么办?有两种思路,一种是利用Request的属性app_id传递函数名,另一种是把函数名通过消息内容发送给服务端。

1.我们先实现第一种,服务端代码如下:

#!/usr/bin/env   pythonimport pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))channel = connection.channel()channel.queue_declare(queue='rpc_queue') def a():    return "a" def b():    return "b" def on_request(ch, method, props, body):    funname = props.app_id    if funname == "a":        response = a()    elif funname == "b":        response = b()     ch.basic_publish(exchange='',                     routing_key=props.reply_to,                     properties=pika.BasicProperties(correlation_id = \                                                           props.correlation_id),                     body=str(response))    ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1)channel.basic_consume(on_request, queue='rpc_queue') print(" [x] Awaiting RPC requests")channel.start_consuming()

这次我们定义了2个不同函数a()和b(),分别打印不同字符串,根据接收到的app_id来决定调用哪一个。

客户端代码:

#!/usr/bin/env   pythonimport pikaimport uuid class RpcClient(object):    def __init__(self):        self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))         self.channel = self.connection.channel()         result = self.channel.queue_declare(exclusive=True)        self.callback_queue = result.method.queue         self.channel.basic_consume(self.on_response, no_ack=True,                                   queue=self.callback_queue)     def on_response(self, ch, method, props, body):        if self.corr_id == props.correlation_id:            self.response = body     def call(self,name):        self.response = None        self.corr_id = str(uuid.uuid4())        self.channel.basic_publish(exchange='',                                     routing_key='rpc_queue',                                   properties=pika.BasicProperties(                                           reply_to = self.callback_queue,                                           correlation_id = self.corr_id,                                           app_id = str(name),                                         ),                                   body="request")        while self.response is None:            self.connection.process_data_events()        return str(self.response) rpc = RpcClient() print(" [x] Requesting")response = rpc.call("b")print(" [.] Got %r" % response)

函数call()接收参数name作为被调用的远程函数的名字,通过app_id传给服务端程序,这段代码里我们选择调用服务端的函数b(),rpc.call(“b”)。

执行结果:

image.png
image.png
image.png
image.png

结果显示成功调用了函数b,如果改成rpc.call(“a”),执行结果就会变成:

image.png
image.png

2.第二种实现方法,服务端代码:

#!/usr/bin/env   pythonimport pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))channel = connection.channel()channel.queue_declare(queue='rpc_queue') def a():    return "a" def b():    return "b" def on_request(ch, method, props, body):    funname = str(body)    if funname == "a":        response = a()    elif funname == "b":        response = b()     ch.basic_publish(exchange='',                     routing_key=props.reply_to,                     properties=pika.BasicProperties(correlation_id = \                                                           props.correlation_id),                     body=str(response))    ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1)channel.basic_consume(on_request, queue='rpc_queue') print(" [x] Awaiting RPC requests")channel.start_consuming()

客户端代码:

#!/usr/bin/env   pythonimport pikaimport uuid class RpcClient(object):    def __init__(self):        self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))         self.channel = self.connection.channel()         result = self.channel.queue_declare(exclusive=True)        self.callback_queue = result.method.queue         self.channel.basic_consume(self.on_response, no_ack=True,                                   queue=self.callback_queue)     def on_response(self, ch, method, props, body):        if self.corr_id == props.correlation_id:            self.response = body     def call(self,name):        self.response = None        self.corr_id = str(uuid.uuid4())        self.channel.basic_publish(exchange='',                                     routing_key='rpc_queue',                                   properties=pika.BasicProperties(                                           reply_to = self.callback_queue,                                           correlation_id = self.corr_id,                                         ),                                   body=str(name))        while self.response is None:            self.connection.process_data_events()        return str(self.response) rpc = RpcClient() print(" [x] Requesting")response = rpc.call("b")print(" [.] Got %r" % response)

与第一种实现方法的区别就是没有使用属性app_id,而是把要调用的函数名放在消息内容body中,执行结果跟第一种方法一样。

一个简单的实际应用案例

下面我们将编写一个小程序,用于收集多台KVM宿主机上的虚拟机数量和剩余可使用的资源。程序由两部分组成,运行在每台宿主机上的脚本agent.py和管理机上收集信息的脚本collect.py。从RPC的角度,agent.py是服务端,collect.py是客户端。

agent.py代码如下:

#!/usr/bin/pythonimport pikaimport libvirtimport psutilimport jsonimport socketimport osimport sysfrom xml.dom import minidom #配置RabbitMQ地址RabbitMQServer=x.x.x.x #连接libvirt,libvirt是一个虚拟机、容器管理程序。def get_conn():    conn = libvirt.open("qemu:///system")    if conn == None:        print '--Failed to open connection to   QEMU/KVM--'        sys.exit(2)    else:        return conn #获取虚拟机数量def getVMcount():    conn = get_conn()    domainIDs = conn.listDomainsID()    return len(domainIDs) #获取分配给所有虚拟机的内存之和def getMemoryused():    conn = get_conn()    domainIDs = conn.listDomainsID()    used_mem = 0     for id in domainIDs:        dom = conn.lookupByID(id)        used_mem += dom.maxMemory()/(1024*1024)    return used_mem #获取分配给所有虚拟机的vcpu之和def getCPUused():    conn = get_conn()    domainIDs = conn.listDomainsID()    used_cpu = 0     for id in domainIDs:        dom = conn.lookupByID(id)        used_cpu += dom.maxVcpus()    return used_cpu #获取所有虚拟机磁盘文件大小之和def getDiskused():    conn = get_conn()    domainIDs = conn.listDomainsID()    diskused = 0     for id in domainIDs:        dom = conn.lookupByID(id)        xml = dom.XMLDesc(0)        doc = minidom.parseString(xml)        disks = doc.getElementsByTagName('disk')        for disk in disks:            if disk.getAttribute('device') == 'disk':                diskfile = disk.getElementsByTagName('source')[0].getAttribute('file')                diskused += dom.blockInfo(diskfile,0)[0]/(1024**3)    return diskused #使agent.py进入守护进程模式def daemonize(stdin='/dev/null',stdout='/dev/null',stderr='/dev/null'):    try:        pid = os.fork()        if pid > 0:            sys.exit(0)    except OSError,e:        sys.stderr.write("fork #1 failed: (%d) %s\n" % (e.errno,e.strerror))        sys.exit(1)    os.chdir("/")    os.umask(0)    os.setsid()    try:        pid = os.fork()        if pid > 0:            sys.exit(0)    except OSError,e:        sys.stderr.write("fork #2 failed: (%d) %s\n" % (e.errno,e.strerror))        sys.exit(1)    for f in sys.stdout,sys.stderr,: f.flush()    si = file(stdin,'r')    so = file(stdout,'a+',0)    se = file(stderr,'a+',0)    os.dup2(si.fileno(),sys.stdin.fileno())    os.dup2(so.fileno(),sys.stdout.fileno())    os.dup2(se.fileno(),sys.stderr.fileno()) daemonize('/dev/null','/root/kvm/agent.log','/root/kvm/agent.log') #连接RabbitMQconnection = pika.BlockingConnection(pika.ConnectionParameters(host= RabbitMQServer))channel = connection.channel()channel.exchange_declare(exchange='kvm',type='fanout')result = channel.queue_declare(exclusive=True)queue_name = result.method.queuechannel.queue_bind(exchange='kvm',queue=queue_name) def on_request(ch,method,props,body):    sys.stdout.write(body+'\n')    sys.stdout.flush()    mem_total = psutil.virtual_memory()[0]/(1024*1024*1024)    cpu_total = psutil.cpu_count()    statvfs = os.statvfs('/datapool')    disk_total = (statvfs.f_frsize * statvfs.f_blocks)/(1024**3)    mem_unused = mem_total - getMemoryused()    cpu_unused = cpu_total - getCPUused()    disk_unused = disk_total - getDiskused()data = {            'hostname':socket.gethostname(),#宿主机名            'vm':getVMcount(),#虚拟机数量            'available memory':mem_unused,#可用内存            'available cpu':cpu_unused,#可用cpu核数            'available disk':disk_unused#可用磁盘空间            }    json_str = json.dumps(data)    ch.basic_publish(exchange='',                     routing_key=props.reply_to,                     properties=pika.BasicProperties(correlation_id=props.correlation_id),                     body=json_str                     )    ch.basic_ack(delivery_tag=method.delivery_tag)channel.basic_qos(prefetch_count=1)channel.basic_consume(on_request,queue=queue_name)sys.stdout.write(" [x] Awaiting RPC requests\n")sys.stdout.flush()channel.start_consuming()

collect.py代码如下:

#!/usr/bin/pythonimport pikaimport uuidimport jsonimport datetime #配置RabbitMQ地址RabbitMQServer=x.x.x.xclass RpcClient(object):    def __init__(self):        self.connection = pika.BlockingConnection(pika.ConnectionParameters(host=RabbitMQServer))        self.channel = self.connection.channel()        self.channel.exchange_declare(exchange='kvm',type='fanout')        result = self.channel.queue_declare(exclusive=True)        self.callback_queue = result.method.queue        self.channel.basic_consume(self.on_responses,no_ack=True,queue=self.callback_queue)        self.responses = []     def on_responses(self,ch,method,props,body):        if self.corr_id == props.correlation_id:            self.responses.append(body)     def call(self):        timestamp = datetime.datetime.strftime(datetime.datetime.now(),'%Y-%m-%dT%H:%M:%SZ')        self.corr_id = str(uuid.uuid4())        self.channel.basic_publish(exchange='kvm',                                     routing_key='',                                   properties=pika.BasicProperties(                                         reply_to = self.callback_queue,                                       correlation_id = self.corr_id,                                       ),                                   body='%s: receive a request' % timestamp                                   )#定义超时回调函数       def outoftime():            self.channel.stop_consuming()        self.connection.add_timeout(30,outoftime)        self.channel.start_consuming()        return self.responses rpc = RpcClient()responses = rpc.call()for i in responses:    response = json.loads(i)    print(" [.] Got %r" % response)

  本文在前面演示的RPC都是只有一个服务端的情况,客户端发起请求后是用一个while循环来阻塞程序以等待返回结果的,当self.response不为None,就退出循环。

  如果在多服务端的情况下照搬过来就会出问题,实际情况中我们可能有几十台宿主机,每台上面都运行了一个agent.py,当collect.py向几十个agent.py发起请求时,收到第一个宿主机的返回结果后就会退出上述while循环,导致后续其他宿主机的返回结果被丢弃。这里我选择定义了一个超时回调函数outoftime()来替代之前的while循环,超时时间设为30秒。collect.py发起请求后阻塞30秒来等待所有宿主机的回应。如果宿主机数量特别多,可以再调大超时时间。

  脚本运行需要使用的模块pika和psutil安装过程:

yum install -y python-pip python-develpip install pikawget --no-check-certificate https://pypi.python.org/packages/source/p/psutil/psutil-2.1.3.tar.gztar zxvf psutil-2.1.3.tar.gzcd psutil-2.1.3/ && python setup.py install

  脚本运行效果演示:

image.png
image.png
image.png
image.png
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2019/09/21 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • RabbiMQ以及pika模块安装
  • RPC的基本实现
  • 稍微复杂点的RPC
  • 一个简单的实际应用案例
相关产品与服务
专用宿主机
专用宿主机(CVM Dedicated Host,CDH)提供用户独享的物理服务器资源,满足您资源独享、资源物理隔离、安全、合规需求。专用宿主机搭载了腾讯云虚拟化系统,购买之后,您可在其上灵活创建、管理多个自定义规格的云服务器实例,自主规划物理资源的使用。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档