
作者:HOS(安全风信子) 日期:2026-01-01 来源平台:GitHub 摘要: Planner / Executor 模型是一种高级 Agent 架构设计模式,通过将规划和执行分离,显著提高 Agent 处理复杂任务的能力。本文深入剖析 MCP v2.0 框架下 MCP + Planner / Executor 模型的设计与实现,从架构设计、状态转移到具体实现,全面覆盖 Planner / Executor 模型的核心机制。通过真实代码示例、Mermaid 流程图和多维度对比表,展示 MCP v2.0 如何实现高效的任务分解、规划生成和执行管理,为构建智能 Agent 系统提供实战指南。
在 AI Agent 领域,随着任务复杂度的增加,传统的单一推理模式已无法满足需求。Planner / Executor 模型通过将规划和执行分离,具有以下关键优势:
随着 MCP v2.0 的发布,Planner / Executor 模型与标准化工具调用的结合成为 AI Agent 领域的重要发展方向。
根据 GitHub 最新趋势和 AI 工具生态的发展,Planner / Executor 模型正朝着以下方向演进:
这些趋势都凸显了 Planner / Executor 模型与标准化工具调用结合的重要性和必要性。
MCP v2.0 与 Planner / Executor 模型的结合,为构建智能 Agent 系统带来了以下核心价值:
理解 MCP + Planner / Executor 模型的设计和实现,对于构建高性能、高可用、可扩展的智能 Agent 系统至关重要。
Planner / Executor 模型的核心是将 Agent 的功能分为两个主要组件:
新要素 1:分离的规划与执行组件
这种分离设计使 Agent 能够更高效地处理复杂任务,提高了系统的可靠性和可维护性。
在 Planner / Executor 模型中,MCP 扮演着重要角色,主要体现在:
新要素 2:MCP 作为执行层核心
通过 MCP 作为执行层核心,Planner / Executor 模型可以轻松集成多种工具,提高系统的扩展性和兼容性。
Planner / Executor 模型涉及复杂的状态转移,包括计划状态、执行状态和任务状态等。
新要素 3:完整的状态转移机制
完整的状态转移机制确保了 Planner / Executor 模型的可靠运行,便于监控和调试。
MCP + Planner / Executor 模型包含以下核心组件:
Mermaid 架构图:MCP + Planner / Executor 模型的核心组件

MCP + Planner / Executor 模型的完整流程包括以下步骤:
Mermaid 流程图:MCP + Planner / Executor 模型的完整流程
渲染错误: Mermaid 渲染失败: Parse error on line 54: ... AC->>MM: 存储任务记忆 ----------------------^ Expecting 'SPACE', 'NEWLINE', 'create', 'box', 'end', 'autonumber', 'activate', 'deactivate', 'title', 'legacy_title', 'acc_title', 'acc_descr', 'acc_descr_multiline_value', 'loop', 'rect', 'opt', 'alt', 'par', 'par_over', 'critical', 'break', 'participant', 'participant_actor', 'destroy', 'note', 'links', 'link', 'properties', 'details', 'ACTOR', got '1'
Planner 组件负责生成详细的执行计划,其核心功能包括:
代码示例 1:Planner 组件核心实现
from typing import Dict, Any, List, Optional
import asyncio
from memory_manager import MemoryManager
from llm_client import LLMClient
class PlannerComponent:
def __init__(self, config: Dict[str, Any]):
self.config = config
self.llm_client = LLMClient(config.get("llm_config", {}))
self.memory_manager = MemoryManager(config.get("memory_config", {}))
self.max_plan_steps = config.get("max_plan_steps", 20)
async def initialize(self):
"""初始化 Planner 组件"""
await self.llm_client.initialize()
await self.memory_manager.initialize()
async def generate_plan(self, task: str, context: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""生成执行计划"""
# 1. 检索相关记忆
relevant_memories = await self.memory_manager.retrieve_memories(task)
# 2. 构建规划上下文
planning_context = {
"task": task,
"context": context or {},
"relevant_memories": relevant_memories,
"max_plan_steps": self.max_plan_steps,
"available_tools": await self._get_available_tools()
}
# 3. 调用 LLM 生成执行计划
model_response = await self.llm_client.generate(planning_context)
# 4. 解析模型响应,提取执行计划
plan = self._parse_plan_response(model_response)
# 5. 验证和优化计划
plan = self._validate_and_optimize_plan(plan)
return plan
async def adjust_plan(self, original_plan: Dict[str, Any], execution_feedback: List[Dict[str, Any]]) -> Dict[str, Any]:
"""根据执行反馈调整计划"""
# 1. 构建调整上下文
adjustment_context = {
"original_plan": original_plan,
"execution_feedback": execution_feedback,
"max_plan_steps": self.max_plan_steps,
"available_tools": await self._get_available_tools()
}
# 2. 调用 LLM 调整计划
model_response = await self.llm_client.generate(adjustment_context)
# 3. 解析模型响应,提取调整后的计划
adjusted_plan = self._parse_plan_response(model_response)
# 4. 验证和优化调整后的计划
adjusted_plan = self._validate_and_optimize_plan(adjusted_plan)
return adjusted_plan
async def _get_available_tools(self) -> List[Dict[str, Any]]:
"""获取可用工具列表(简化实现,实际应从 MCP Server 获取)"""
# 简化实现,返回示例工具列表
return [
{
"name": "weather_api",
"description": "获取指定城市和日期的天气信息",
"parameters": {
"type": "object",
"properties": {
"city": {"type": "string"},
"date": {"type": "string"}
},
"required": ["city", "date"]
}
},
{
"name": "calculator",
"description": "执行简单的数学计算",
"parameters": {
"type": "object",
"properties": {
"expression": {"type": "string"}
},
"required": ["expression"]
}
}
]
def _parse_plan_response(self, model_response: str) -> Dict[str, Any]:
"""解析模型响应,提取执行计划"""
import json
try:
return json.loads(model_response)
except Exception as e:
print(f"解析计划响应失败: {e}")
# 返回默认计划
return {
"plan_id": f"plan_{asyncio.get_event_loop().time()}",
"steps": [],
"status": "generated",
"created_at": asyncio.get_event_loop().time()
}
def _validate_and_optimize_plan(self, plan: Dict[str, Any]) -> Dict[str, Any]:
"""验证和优化执行计划"""
# 1. 验证计划结构
if "steps" not in plan:
plan["steps"] = []
if "status" not in plan:
plan["status"] = "generated"
if "plan_id" not in plan:
plan["plan_id"] = f"plan_{asyncio.get_event_loop().time()}"
# 2. 限制计划步骤数量
if len(plan["steps"]) > self.max_plan_steps:
plan["steps"] = plan["steps"][:self.max_plan_steps]
# 3. 优化计划,如添加依赖关系、并行执行标记等
# 简化实现,实际应包含更复杂的优化逻辑
for i, step in enumerate(plan["steps"]):
if "step_id" not in step:
step["step_id"] = f"step_{i}"
if "status" not in step:
step["status"] = "pending"
return plan
async def close(self):
"""关闭 Planner 组件,清理资源"""
await self.llm_client.close()
await self.memory_manager.close()
# 使用示例
async def main():
# 配置
config = {
"llm_config": {
"model_name": "gpt-4o",
"temperature": 0.1
},
"memory_config": {
"url": "http://localhost:3000"
},
"max_plan_steps": 10
}
# 创建 Planner 组件
planner = PlannerComponent(config)
try:
# 初始化
await planner.initialize()
# 生成执行计划
task = "计算北京今天的天气温度的平方"
plan = await planner.generate_plan(task)
print(f"生成的执行计划: {json.dumps(plan, indent=2, ensure_ascii=False)}")
finally:
# 关闭组件
await planner.close()
if __name__ == "__main__":
import json
asyncio.run(main())代码解析:
Executor 组件负责执行计划,其核心功能包括:
代码示例 2:Executor 组件核心实现
from typing import Dict, Any, List, Optional
import asyncio
from mcp_client import MCPAgentClient
from memory_manager import MemoryManager
class ExecutorComponent:
def __init__(self, config: Dict[str, Any]):
self.config = config
self.mcp_client = MCPAgentClient(config.get("mcp_server_url", "http://localhost:8000/mcp"))
self.memory_manager = MemoryManager(config.get("memory_config", {}))
self.max_retries = config.get("max_retries", 3)
async def initialize(self):
"""初始化 Executor 组件"""
await self.mcp_client.initialize()
await self.memory_manager.initialize()
async def execute_step(self, step: Dict[str, Any], context: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""执行单个计划步骤"""
execution_result = {
"step_id": step["step_id"],
"status": "executing",
"start_time": asyncio.get_event_loop().time(),
"end_time": None,
"result": None,
"error": None,
"retries": 0
}
try:
# 1. 解析步骤,提取工具调用请求
tool_call = step.get("tool_call")
if tool_call:
# 2. 执行工具调用
tool_result = await self._execute_tool_call(tool_call, context)
execution_result["result"] = tool_result
execution_result["status"] = "completed"
else:
# 3. 执行其他类型的步骤(如计算、决策等)
execution_result["result"] = await self._execute_other_step(step, context)
execution_result["status"] = "completed"
except Exception as e:
execution_result["error"] = str(e)
execution_result["status"] = "failed"
finally:
execution_result["end_time"] = asyncio.get_event_loop().time()
# 4. 存储执行记录
await self.memory_manager.store_execution_record(execution_result)
return execution_result
async def execute_plan(self, plan: Dict[str, Any], context: Optional[Dict[str, Any]] = None) -> List[Dict[str, Any]]:
"""执行完整的执行计划"""
execution_results = []
for step in plan.get("steps", []):
# 执行单个步骤
result = await self.execute_step(step, context)
execution_results.append(result)
# 如果步骤失败,尝试重试
if result["status"] == "failed" and result["retries"] < self.max_retries:
for retry in range(self.max_retries - result["retries"]):
result["retries"] += 1
result["status"] = "retrying"
# 重试执行步骤
retry_result = await self.execute_step(step, context)
retry_result["retries"] = result["retries"]
execution_results.append(retry_result)
if retry_result["status"] == "completed":
break
return execution_results
async def _execute_tool_call(self, tool_call: Dict[str, Any], context: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""执行工具调用"""
# 调用 MCP Client 执行工具
return await self.mcp_client.call_tool(
tool_name=tool_call["name"],
arguments=tool_call["arguments"],
context=context
)
async def _execute_other_step(self, step: Dict[str, Any], context: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""执行其他类型的步骤(简化实现)"""
# 简化实现,返回示例结果
return {
"message": f"执行步骤 {step['step_id']} 成功",
"step_type": step.get("type", "unknown"),
"context": context
}
async def close(self):
"""关闭 Executor 组件,清理资源"""
await self.mcp_client.close()
await self.memory_manager.close()
# 使用示例
async def main():
# 配置
config = {
"mcp_server_url": "http://localhost:8000/mcp",
"memory_config": {
"url": "http://localhost:3000"
},
"max_retries": 3
}
# 创建 Executor 组件
executor = ExecutorComponent(config)
try:
# 初始化
await executor.initialize()
# 示例执行步骤
step = {
"step_id": "step_0",
"type": "tool_call",
"tool_call": {
"name": "weather_api",
"arguments": {
"city": "北京",
"date": "2026-01-01"
}
},
"status": "pending"
}
# 执行步骤
result = await executor.execute_step(step)
print(f"执行结果: {json.dumps(result, indent=2, ensure_ascii=False)}")
finally:
# 关闭组件
await executor.close()
if __name__ == "__main__":
import json
asyncio.run(main())代码解析:
计划管理器负责管理执行计划,其核心功能包括:
代码示例 3:计划管理器核心实现
from typing import Dict, Any, List, Optional
import asyncio
class PlanManager:
def __init__(self, planner, executor):
self.planner = planner
self.executor = executor
self.plans = {} # 存储所有计划,按计划 ID 组织
self.execution_results = {} # 存储执行结果,按计划 ID 组织
async def submit_plan(self, plan: Dict[str, Any]) -> str:
"""提交执行计划"""
plan_id = plan.get("plan_id", f"plan_{asyncio.get_event_loop().time()}")
plan["plan_id"] = plan_id
plan["status"] = "submitted"
plan["submitted_at"] = asyncio.get_event_loop().time()
# 存储计划
self.plans[plan_id] = plan
self.execution_results[plan_id] = []
return plan_id
async def execute_plan(self, plan_id: str, context: Optional[Dict[str, Any]] = None) -> List[Dict[str, Any]]:
"""执行指定的执行计划"""
if plan_id not in self.plans:
raise ValueError(f"计划 {plan_id} 不存在")
plan = self.plans[plan_id]
plan["status"] = "executing"
plan["started_at"] = asyncio.get_event_loop().time()
# 执行计划
results = await self.executor.execute_plan(plan, context)
self.execution_results[plan_id].extend(results)
# 检查是否需要调整计划
if self._need_plan_adjustment(results):
# 调整计划
adjusted_plan = await self.planner.adjust_plan(plan, results)
adjusted_plan_id = await self.submit_plan(adjusted_plan)
# 执行调整后的计划
adjusted_results = await self.execute_plan(adjusted_plan_id, context)
self.execution_results[plan_id].extend(adjusted_results)
# 更新原计划状态
plan["status"] = "adjusted"
plan["adjusted_plan_id"] = adjusted_plan_id
else:
# 计划执行完成
plan["status"] = "completed"
plan["completed_at"] = asyncio.get_event_loop().time()
return self.execution_results[plan_id]
async def get_plan(self, plan_id: str) -> Optional[Dict[str, Any]]:
"""获取指定的执行计划"""
return self.plans.get(plan_id)
async def get_execution_results(self, plan_id: str) -> List[Dict[str, Any]]:
"""获取指定计划的执行结果"""
return self.execution_results.get(plan_id, [])
def _need_plan_adjustment(self, results: List[Dict[str, Any]]) -> bool:
"""判断是否需要调整计划"""
# 简化实现,实际应包含更复杂的判断逻辑
# 例如:如果有多个步骤失败,或者关键步骤失败,需要调整计划
failed_steps = [r for r in results if r["status"] == "failed"]
return len(failed_steps) > 0
async def close(self):
"""关闭计划管理器,清理资源"""
await self.planner.close()
await self.executor.close()
# 使用示例
async def main():
from planner_component import PlannerComponent
from executor_component import ExecutorComponent
# 配置
config = {
"llm_config": {
"model_name": "gpt-4o",
"temperature": 0.1
},
"memory_config": {
"url": "http://localhost:3000"
},
"mcp_server_url": "http://localhost:8000/mcp",
"max_plan_steps": 10,
"max_retries": 3
}
# 创建组件
planner = PlannerComponent(config)
executor = ExecutorComponent(config)
plan_manager = PlanManager(planner, executor)
try:
# 初始化组件
await planner.initialize()
await executor.initialize()
# 生成执行计划
task = "计算北京今天的天气温度的平方"
plan = await planner.generate_plan(task)
# 提交执行计划
plan_id = await plan_manager.submit_plan(plan)
print(f"提交的计划 ID: {plan_id}")
# 执行计划
results = await plan_manager.execute_plan(plan_id)
print(f"计划执行结果: {json.dumps(results, indent=2, ensure_ascii=False)}")
finally:
# 关闭组件
await plan_manager.close()
if __name__ == "__main__":
import json
asyncio.run(main())代码解析:
以下是一个完整的 MCP + Planner / Executor 模型集成示例,展示如何在实际应用中使用该模型:
代码示例 4:MCP + Planner / Executor 模型的集成示例
import asyncio
import json
from planner_component import PlannerComponent
from executor_component import ExecutorComponent
from plan_manager import PlanManager
from agent_controller import AgentController
async def main():
# 1. 配置初始化
config = {
"llm_config": {
"type": "openai",
"model_name": "gpt-4o",
"api_key": "${OPENAI_API_KEY}",
"temperature": 0.1,
"max_tokens": 2048
},
"memory_config": {
"type": "vector_db",
"url": "http://localhost:3000",
"similarity_threshold": 0.8
},
"mcp": {
"server_url": "http://localhost:8000/mcp"
},
"max_plan_steps": 10,
"max_retries": 3
}
# 2. 创建组件
planner = PlannerComponent(config)
executor = ExecutorComponent(config)
plan_manager = PlanManager(planner, executor)
agent_controller = AgentController(config, plan_manager)
try:
# 3. 初始化组件
await planner.initialize()
await executor.initialize()
await agent_controller.initialize()
# 4. 示例 1:简单任务
print("=== 示例 1:简单任务 ===")
user_request1 = "计算北京今天的天气温度"
response1 = await agent_controller.handle_request(user_request1)
print(f"用户请求: {user_request1}")
print(f"响应: {response1}")
print()
# 5. 示例 2:复杂任务
print("=== 示例 2:复杂任务 ===")
user_request2 = "计算北京今天的天气温度的平方,然后加上上海今天的天气温度"
response2 = await agent_controller.handle_request(user_request2)
print(f"用户请求: {user_request2}")
print(f"响应: {response2}")
print()
print("MCP + Planner / Executor 模型集成示例完成!")
finally:
# 6. 关闭组件
await plan_manager.close()
await agent_controller.close()
if __name__ == "__main__":
asyncio.run(main())代码解析:
完整的状态转移机制是 Planner / Executor 模型可靠运行的关键。以下是一个状态转移机制的实现示例:
代码示例 5:状态转移机制实现
class StateTransitionManager:
def __init__(self):
# 状态转移规则
self.transition_rules = {
"plan": {
"submitted": ["executing"],
"executing": ["completed", "failed", "adjusted"],
"completed": [],
"failed": ["executing"],
"adjusted": ["executing", "completed", "failed"]
},
"step": {
"pending": ["executing"],
"executing": ["completed", "failed"],
"completed": [],
"failed": ["pending", "retrying"],
"retrying": ["executing", "failed"],
"terminated": []
},
"task": {
"pending": ["processing"],
"processing": ["completed", "failed", "terminated"],
"completed": [],
"failed": ["processing"],
"terminated": []
}
}
def can_transition(self, entity_type: str, current_state: str, target_state: str) -> bool:
"""判断是否可以从当前状态转移到目标状态"""
if entity_type not in self.transition_rules:
return False
rules = self.transition_rules[entity_type]
if current_state not in rules:
return False
return target_state in rules[current_state]
def transition_state(self, entity: Dict[str, Any], target_state: str) -> Dict[str, Any]:
"""执行状态转移"""
entity_type = entity.get("type", "task")
current_state = entity.get("status", "pending")
if not self.can_transition(entity_type, current_state, target_state):
raise ValueError(f"无法从状态 {current_state} 转移到状态 {target_state}")
# 更新状态
entity["status"] = target_state
entity[f"{target_state}_at"] = time.time()
return entity
def get_allowed_transitions(self, entity_type: str, current_state: str) -> List[str]:
"""获取允许的状态转移列表"""
if entity_type not in self.transition_rules:
return []
rules = self.transition_rules[entity_type]
if current_state not in rules:
return []
return rules[current_state]
# 使用示例
import time
# 创建状态转移管理器
state_manager = StateTransitionManager()
# 示例 1:计划状态转移
plan = {
"type": "plan",
"status": "submitted"
}
print(f"计划当前状态: {plan['status']}")
print(f"允许的状态转移: {state_manager.get_allowed_transitions('plan', plan['status'])}")
# 执行状态转移
state_manager.transition_state(plan, "executing")
print(f"计划新状态: {plan['status']}")
# 示例 2:步骤状态转移
step = {
"type": "step",
"status": "pending"
}
print(f"\n步骤当前状态: {step['status']}")
print(f"允许的状态转移: {state_manager.get_allowed_transitions('step', step['status'])}")
# 执行状态转移
state_manager.transition_state(step, "executing")
print(f"步骤新状态: {step['status']}")代码解析:
对比维度 | MCP + Planner / Executor | ReAct 架构 | 单一推理架构 | 传统多智能体架构 |
|---|---|---|---|---|
架构复杂度 | 较高,包含多个组件和复杂的交互 | 中等,主要包含 Reason-Act 循环 | 较低,单一推理组件 | 较高,包含多个智能体和复杂的通信 |
任务处理能力 | 强大,能够处理复杂的多步任务 | 较强,能够处理需要多步推理的任务 | 有限,主要处理简单任务 | 强大,能够处理复杂的分布式任务 |
可靠性 | 高,规划和执行分离,便于错误恢复 | 中等,依赖 Reason-Act 循环的可靠性 | 较低,单一组件故障影响整个系统 | 高,多智能体协作,具有容错能力 |
可维护性 | 高,组件分离,便于单独优化和维护 | 中等,Reason-Act 循环相对简单 | 低,单一组件,修改影响整个系统 | 中等,多智能体协作复杂度高 |
扩展性 | 高,便于添加新的规划算法和执行策略 | 中等,便于添加新的工具和推理逻辑 | 低,扩展需要修改核心代码 | 高,便于添加新的智能体 |
工具集成 | 强,通过 MCP 标准化工具调用,支持多种工具 | 强,支持多种工具集成 | 弱,工具集成困难 | 强,支持多种工具和服务集成 |
执行效率 | 高,支持并行执行和计划优化 | 中等,依赖 Reason-Act 循环的效率 | 高,单一推理,响应时间短 | 中等,多智能体通信开销大 |
开发难度 | 较高,需要设计和实现多个组件 | 中等,主要实现 Reason-Act 循环 | 低,实现简单的推理逻辑 | 高,需要设计和实现多智能体通信和协作 |
对比维度 | MCP + Planner / Executor | 传统计划执行系统 |
|---|---|---|
计划生成 | 基于 LLM 的智能计划生成,支持复杂任务 | 基于预定义规则或简单算法,对复杂任务支持有限 |
工具集成 | 通过 MCP 标准化工具调用,支持多种工具 | 通常是自定义工具集成,兼容性差 |
动态调整 | 支持根据执行反馈动态调整计划 | 通常是静态计划,调整困难 |
智能程度 | 基于 LLM 的智能系统,能够处理不确定性 | 基于规则的系统,对不确定性处理能力有限 |
可扩展性 | 模块化设计,便于添加新的规划算法和执行策略 | 扩展困难,往往需要修改核心代码 |
易用性 | 提供简洁的 API,便于使用和集成 | 通常需要复杂的配置和开发 |
成本 | 依赖 LLM,运行成本较高 | 运行成本较低,但开发成本较高 |
适用场景 | 复杂、不确定的任务场景 | 简单、确定的任务场景 |
通过与主流方案和传统系统的对比,可以看出 MCP + Planner / Executor 模型的主要优势:
MCP + Planner / Executor 模型在实际工程中具有重要意义,主要体现在以下几个方面:
尽管 MCP + Planner / Executor 模型具有很多优势,但在实际应用中也面临一些潜在风险和挑战:
MCP + Planner / Executor 模型在某些场景下也存在局限性:
基于当前技术发展和社区动态,我预测 MCP + Planner / Executor 模型的发展将呈现以下趋势:
MCP + Planner / Executor 模型的广泛应用将对 AI Agent 生态产生深远影响:
对于正在或计划构建基于 MCP + Planner / Executor 模型的开发者,我提出以下建议:
以下是一个推荐的计划格式设计:
{
"plan_id": "plan_123",
"task": "计算北京今天的天气温度的平方",
"steps": [
{
"step_id": "step_0",
"type": "tool_call",
"name": "获取北京今天的天气",
"tool_call": {
"name": "weather_api",
"arguments": {
"city": "北京",
"date": "2026-01-01"
}
},
"status": "pending",
"dependencies": [],
"outputs": ["temperature"]
},
{
"step_id": "step_1",
"type": "calculation",
"name": "计算温度的平方",
"calculation": {
"expression": "temperature * temperature",
"inputs": ["temperature"]
},
"status": "pending",
"dependencies": ["step_0"],
"outputs": ["result"]
}
],
"status": "generated",
"created_at": 1735708800,
"metadata": {
"max_retries": 3,
"timeout": 300
}
}以下是一个推荐的执行结果格式设计:
{
"execution_id": "exec_123",
"plan_id": "plan_123",
"step_id": "step_0",
"status": "completed",
"start_time": 1735708800,
"end_time": 1735708810,
"result": {
"temperature": 22,
"humidity": 45,
"condition": "晴"
},
"error": null,
"retries": 0,
"outputs": {
"temperature": 22
},
"metadata": {
"tool_name": "weather_api",
"tool_version": "1.0"
}
}MCP v2.0, Planner / Executor 模型, 高级 Agent 架构, 状态转移, 任务分解, 计划生成, 执行管理, 异步通信