作者介绍:简历上没有一个精通的运维工程师,下面的思维导图也是预计更新的内容和当前进度(不定时更新)
中间件,我给它的定义就是为了实现某系业务功能依赖的软件,包括如下部分:
Web服务器
代理服务器
ZooKeeper
Kafka
RabbitMQ(本章节)
在前几年很火的云平台:OpenStack。在里面创建虚拟主机,大概会经过下面几个步骤,申请网络资源,申请磁盘资源等。而提供这2个资源的组件是不相同的,他们就会通过RabbitMQ来来实现解耦和异步通信。
下面是一个使用Python和RabbitMQ模拟虚拟机创建流程的示例。这个实现包含两个程序。
请求发起方:发起创建虚拟主机的请求,然后他需要去申请网络资源,他会把这个请求发送到MQ的交换机里面,并且监听另外一个队列,从这个队列里面获取IP信息。
服务提供方:当监听到RabbitMQ里面有消息的时候,就进行消费,生成IP信息,返回到另外一个交换机。
当然这里只模拟了最简单的一个过程,实际的真实环境比这个复杂很多倍,只是通过这个方式更好的理解RabbitMQ业务流程。
vm_creation_simulation/
├── vm_creator.py # 发送创建请求并等待响应
├── network_manager.py # 监听请求并返回网络配置
流程
1. vm_creator.py 虚拟机创建客户端
import pika
import uuid
import json
import time
class VMCreator:
def __init__(self):
credentials = pika.PlainCredentials('admin', 'Admin@123')
parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials)
self.connection = pika.BlockingConnection(parameters)
self.channel = self.connection.channel()
# 声明请求交换机(确保与提供方相同)
self.channel.exchange_declare(
exchange='vm_requests',
exchange_type='direct',
durable=True # 持久化交换机
)
# 声明响应交换机
self.channel.exchange_declare(
exchange='ip_responses',
exchange_type='direct',
durable=True
)
# 创建临时回调队列
result = self.channel.queue_declare(queue='', exclusive=True)
self.callback_queue = result.method.queue
# 绑定回调队列到响应交换机(使用队列名作为路由键)
self.channel.queue_bind(
exchange='ip_responses',
queue=self.callback_queue,
routing_key=self.callback_queue
)
# 设置消费回调队列
self.channel.basic_consume(
queue=self.callback_queue,
on_message_callback=self.on_response,
auto_ack=True
)
self.response = None
self.corr_id = None
def on_response(self, ch, method, props, body):
if self.corr_id == props.correlation_id:
self.response = json.loads(body)
def create_vm(self, vm_name):
self.response = None
self.corr_id = str(uuid.uuid4())
# 发送虚拟机创建请求(添加消息持久化)
self.channel.basic_publish(
exchange='vm_requests',
routing_key='vm_creation',
properties=pika.BasicProperties(
reply_to=self.callback_queue,
correlation_id=self.corr_id,
delivery_mode=2, # 持久化消息
),
body=json.dumps({'vm_name': vm_name})
)
print(f" [x] 请求创建虚拟机: {vm_name} (CorrID: {self.corr_id})")
# 等待响应(添加超时机制)
start_time = time.time()
while self.response is None:
self.connection.process_data_events()
if time.time() - start_time > 10: # 10秒超时
print(f" [!] 超时: 未收到虚拟机 '{vm_name}' 的响应")
return {'error': 'timeout'}
return self.response
if __name__ == "__main__":
print("=== 虚拟机创建客户端 ===")
try:
creator = VMCreator()
# 创建3台虚拟机
for i in range(1, 4):
vm_name = f"server-{i}"
response = creator.create_vm(vm_name)
if 'ip_address' in response:
print(f" [✓] 虚拟机 '{vm_name}' 创建完成! 分配IP: {response['ip_address']}")
else:
print(f" [x] 创建失败: {response.get('error', '未知错误')}")
except pika.exceptions.AMQPConnectionError:
print(" [!] 无法连接到RabbitMQ服务器")
except KeyboardInterrupt:
print(" [x] 用户中断")
finally:
if 'creator' in locals():
creator.connection.close()
print("程序退出")
[root@rabbitmq01 ~]# python3 vm_creator.py
=== 虚拟机创建客户端 ===
[x] 请求创建虚拟机: server-1 (CorrID: 1ff8242b-ca8b-4281-b56d-73143e438914)
[✓] 虚拟机 'server-1' 创建完成! 分配IP: 10.0.211.233
[x] 请求创建虚拟机: server-2 (CorrID: 4e6aa965-d626-420b-a350-6b5b3d6a10c1)
[✓] 虚拟机 'server-2' 创建完成! 分配IP: 10.0.215.23
[x] 请求创建虚拟机: server-3 (CorrID: 0f0668f2-2649-4772-9efe-f2ca0b3620ae)
[✓] 虚拟机 'server-3' 创建完成! 分配IP: 10.0.205.240
程序退出
2. network_manager.py 网络配置服务
import pika
import json
import random
import time
class IPProvider:
def __init__(self):
credentials = pika.PlainCredentials('guest', 'guest')
parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials)
self.connection = pika.BlockingConnection(parameters)
self.channel = self.connection.channel()
# 声明请求交换机(确保与请求方相同)
self.channel.exchange_declare(
exchange='vm_requests',
exchange_type='direct',
durable=True
)
# 声明请求队列(持久化)
self.channel.queue_declare(
queue='vm_creation_queue',
durable=True
)
# 绑定队列到交换机(路由键必须匹配)
self.channel.queue_bind(
exchange='vm_requests',
queue='vm_creation_queue',
routing_key='vm_creation' # 关键:必须与请求方发送的路由键相同
)
# 声明响应交换机
self.channel.exchange_declare(
exchange='ip_responses',
exchange_type='direct',
durable=True
)
print(' [*] 等待虚拟机创建请求...')
print(f' [*] 队列绑定: vm_creation_queue -> vm_requests[vm_creation]')
def generate_ip(self):
"""生成随机IP地址"""
return f"10.0.{random.randint(1, 254)}.{random.randint(1, 254)}"
def on_request(self, ch, method, props, body):
try:
# 解析请求
request = json.loads(body)
vm_name = request['vm_name']
print(f" [.] 收到请求: 为虚拟机 '{vm_name}' 分配网络资源 (CorrID: {props.correlation_id})")
# 模拟处理时间
time.sleep(random.uniform(0.5, 2.0))
# 生成IP地址
ip_address = self.generate_ip()
response = {'ip_address': ip_address, 'vm_name': vm_name}
# 发送响应到响应交换机
ch.basic_publish(
exchange='ip_responses',
routing_key=props.reply_to, # 使用回调队列作为路由键
properties=pika.BasicProperties(
correlation_id=props.correlation_id
),
body=json.dumps(response)
)
print(f" [✓] 已为 '{vm_name}' 分配IP: {ip_address}")
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
print(f" [!] 处理错误: {str(e)}")
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
def start_consuming(self):
self.channel.basic_qos(prefetch_count=1)
self.channel.basic_consume(
queue='vm_creation_queue',
on_message_callback=self.on_request
)
try:
self.channel.start_consuming()
except KeyboardInterrupt:
print(" [x] 用户中断")
self.connection.close()
if __name__ == "__main__":
print("=== IP资源提供服务 ===")
try:
provider = IPProvider()
provider.start_consuming()
except pika.exceptions.AMQPConnectionError:
print(" [!] 无法连接到RabbitMQ服务器")
finally:
print("服务停止")
[root@rabbitmq01 ~]# python3 network_manager.py
=== IP资源提供服务 ===
[*] 等待虚拟机创建请求...
[*] 队列绑定: vm_creation_queue -> vm_requests[vm_creation]
[.] 收到请求: 为虚拟机 'server-1' 分配网络资源 (CorrID: 1ff8242b-ca8b-4281-b56d-73143e438914)
[✓] 已为 'server-1' 分配IP: 10.0.211.233
[.] 收到请求: 为虚拟机 'server-2' 分配网络资源 (CorrID: 4e6aa965-d626-420b-a350-6b5b3d6a10c1)
[✓] 已为 'server-2' 分配IP: 10.0.215.23
[.] 收到请求: 为虚拟机 'server-3' 分配网络资源 (CorrID: 0f0668f2-2649-4772-9efe-f2ca0b3620ae)
[✓] 已为 'server-3' 分配IP: 10.0.205.240
^C [x] 用户中断
服务停止