首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >RabbitMQ-案例(虚拟机创建流程)

RabbitMQ-案例(虚拟机创建流程)

作者头像
运维小路
发布2025-07-08 18:13:18
发布2025-07-08 18:13:18
11200
代码可运行
举报
文章被收录于专栏:运维小路运维小路
运行总次数:0
代码可运行

作者介绍:简历上没有一个精通的运维工程师,下面的思维导图也是预计更新的内容和当前进度(不定时更新)

中间件,我给它的定义就是为了实现某系业务功能依赖的软件,包括如下部分:

Web服务器

代理服务器

ZooKeeper

Kafka

RabbitMQ(本章节)

在前几年很火的云平台:OpenStack。在里面创建虚拟主机,大概会经过下面几个步骤,申请网络资源,申请磁盘资源等。而提供这2个资源的组件是不相同的,他们就会通过RabbitMQ来来实现解耦和异步通信。

下面是一个使用Python和RabbitMQ模拟虚拟机创建流程的示例。这个实现包含两个程序。

请求发起方:发起创建虚拟主机的请求,然后他需要去申请网络资源,他会把这个请求发送到MQ的交换机里面,并且监听另外一个队列,从这个队列里面获取IP信息。

服务提供方:当监听到RabbitMQ里面有消息的时候,就进行消费,生成IP信息,返回到另外一个交换机。

当然这里只模拟了最简单的一个过程,实际的真实环境比这个复杂很多倍,只是通过这个方式更好的理解RabbitMQ业务流程。

文件结构

代码语言:javascript
代码运行次数:0
运行
复制
vm_creation_simulation/
├── vm_creator.py       # 发送创建请求并等待响应
├── network_manager.py  # 监听请求并返回网络配置

流程

1. vm_creator.py 虚拟机创建客户端

代码语言:javascript
代码运行次数:0
运行
复制
import pika
import uuid
import json
import time

class VMCreator:
    def __init__(self):
        credentials = pika.PlainCredentials('admin', 'Admin@123')
        parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials)
        self.connection = pika.BlockingConnection(parameters)
        self.channel = self.connection.channel()

        # 声明请求交换机(确保与提供方相同)
        self.channel.exchange_declare(
            exchange='vm_requests',
            exchange_type='direct',
            durable=True  # 持久化交换机
        )

        # 声明响应交换机
        self.channel.exchange_declare(
            exchange='ip_responses',
            exchange_type='direct',
            durable=True
        )

        # 创建临时回调队列
        result = self.channel.queue_declare(queue='', exclusive=True)
        self.callback_queue = result.method.queue

        # 绑定回调队列到响应交换机(使用队列名作为路由键)
        self.channel.queue_bind(
            exchange='ip_responses',
            queue=self.callback_queue,
            routing_key=self.callback_queue
        )

        # 设置消费回调队列
        self.channel.basic_consume(
            queue=self.callback_queue,
            on_message_callback=self.on_response,
            auto_ack=True
        )

        self.response = None
        self.corr_id = None

    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = json.loads(body)

    def create_vm(self, vm_name):
        self.response = None
        self.corr_id = str(uuid.uuid4())

        # 发送虚拟机创建请求(添加消息持久化)
        self.channel.basic_publish(
            exchange='vm_requests',
            routing_key='vm_creation',
            properties=pika.BasicProperties(
                reply_to=self.callback_queue,
                correlation_id=self.corr_id,
                delivery_mode=2,  # 持久化消息
            ),
            body=json.dumps({'vm_name': vm_name})
        )
        print(f" [x] 请求创建虚拟机: {vm_name} (CorrID: {self.corr_id})")

        # 等待响应(添加超时机制)
        start_time = time.time()
        while self.response is None:
            self.connection.process_data_events()
            if time.time() - start_time > 10:  # 10秒超时
                print(f" [!] 超时: 未收到虚拟机 '{vm_name}' 的响应")
                return {'error': 'timeout'}

        return self.response

if __name__ == "__main__":
    print("=== 虚拟机创建客户端 ===")
    try:
        creator = VMCreator()

        # 创建3台虚拟机
        for i in range(1, 4):
            vm_name = f"server-{i}"
            response = creator.create_vm(vm_name)
            if 'ip_address' in response:
                print(f" [✓] 虚拟机 '{vm_name}' 创建完成! 分配IP: {response['ip_address']}")
            else:
                print(f" [x] 创建失败: {response.get('error', '未知错误')}")

    except pika.exceptions.AMQPConnectionError:
        print(" [!] 无法连接到RabbitMQ服务器")
    except KeyboardInterrupt:
        print(" [x] 用户中断")
    finally:
        if 'creator' in locals():
            creator.connection.close()
        print("程序退出")
代码语言:javascript
代码运行次数:0
运行
复制
[root@rabbitmq01 ~]# python3 vm_creator.py 
=== 虚拟机创建客户端 ===
 [x] 请求创建虚拟机: server-1 (CorrID: 1ff8242b-ca8b-4281-b56d-73143e438914)
 [✓] 虚拟机 'server-1' 创建完成! 分配IP: 10.0.211.233
 [x] 请求创建虚拟机: server-2 (CorrID: 4e6aa965-d626-420b-a350-6b5b3d6a10c1)
 [✓] 虚拟机 'server-2' 创建完成! 分配IP: 10.0.215.23
 [x] 请求创建虚拟机: server-3 (CorrID: 0f0668f2-2649-4772-9efe-f2ca0b3620ae)
 [✓] 虚拟机 'server-3' 创建完成! 分配IP: 10.0.205.240
程序退出

2. network_manager.py 网络配置服务

代码语言:javascript
代码运行次数:0
运行
复制
import pika
import json
import random
import time

class IPProvider:
    def __init__(self):
        credentials = pika.PlainCredentials('guest', 'guest')
        parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials)
        self.connection = pika.BlockingConnection(parameters)
        self.channel = self.connection.channel()

        # 声明请求交换机(确保与请求方相同)
        self.channel.exchange_declare(
            exchange='vm_requests',
            exchange_type='direct',
            durable=True
        )

        # 声明请求队列(持久化)
        self.channel.queue_declare(
            queue='vm_creation_queue',
            durable=True
        )

        # 绑定队列到交换机(路由键必须匹配)
        self.channel.queue_bind(
            exchange='vm_requests',
            queue='vm_creation_queue',
            routing_key='vm_creation'  # 关键:必须与请求方发送的路由键相同
        )

        # 声明响应交换机
        self.channel.exchange_declare(
            exchange='ip_responses',
            exchange_type='direct',
            durable=True
        )

        print(' [*] 等待虚拟机创建请求...')
        print(f' [*] 队列绑定: vm_creation_queue -> vm_requests[vm_creation]')

    def generate_ip(self):
        """生成随机IP地址"""
        return f"10.0.{random.randint(1, 254)}.{random.randint(1, 254)}"

    def on_request(self, ch, method, props, body):
        try:
            # 解析请求
            request = json.loads(body)
            vm_name = request['vm_name']
            print(f" [.] 收到请求: 为虚拟机 '{vm_name}' 分配网络资源 (CorrID: {props.correlation_id})")

            # 模拟处理时间
            time.sleep(random.uniform(0.5, 2.0))

            # 生成IP地址
            ip_address = self.generate_ip()
            response = {'ip_address': ip_address, 'vm_name': vm_name}

            # 发送响应到响应交换机
            ch.basic_publish(
                exchange='ip_responses',
                routing_key=props.reply_to,  # 使用回调队列作为路由键
                properties=pika.BasicProperties(
                    correlation_id=props.correlation_id
                ),
                body=json.dumps(response)
            )
            print(f" [✓] 已为 '{vm_name}' 分配IP: {ip_address}")
            ch.basic_ack(delivery_tag=method.delivery_tag)

        except Exception as e:
            print(f" [!] 处理错误: {str(e)}")
            ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)

    def start_consuming(self):
        self.channel.basic_qos(prefetch_count=1)
        self.channel.basic_consume(
            queue='vm_creation_queue',
            on_message_callback=self.on_request
        )
        try:
            self.channel.start_consuming()
        except KeyboardInterrupt:
            print(" [x] 用户中断")
            self.connection.close()

if __name__ == "__main__":
    print("=== IP资源提供服务 ===")
    try:
        provider = IPProvider()
        provider.start_consuming()
    except pika.exceptions.AMQPConnectionError:
        print(" [!] 无法连接到RabbitMQ服务器")
    finally:
        print("服务停止")
代码语言:javascript
代码运行次数:0
运行
复制
[root@rabbitmq01 ~]# python3 network_manager.py 
=== IP资源提供服务 ===
 [*] 等待虚拟机创建请求...
 [*] 队列绑定: vm_creation_queue -> vm_requests[vm_creation]
 [.] 收到请求: 为虚拟机 'server-1' 分配网络资源 (CorrID: 1ff8242b-ca8b-4281-b56d-73143e438914)
 [✓] 已为 'server-1' 分配IP: 10.0.211.233
 [.] 收到请求: 为虚拟机 'server-2' 分配网络资源 (CorrID: 4e6aa965-d626-420b-a350-6b5b3d6a10c1)
 [✓] 已为 'server-2' 分配IP: 10.0.215.23
 [.] 收到请求: 为虚拟机 'server-3' 分配网络资源 (CorrID: 0f0668f2-2649-4772-9efe-f2ca0b3620ae)
 [✓] 已为 'server-3' 分配IP: 10.0.205.240
^C [x] 用户中断
服务停止
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2025-06-29,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 运维小路 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 文件结构
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档