做龙虾的二次开发,是不是常被这些问题卡壳?生命周期不好扩展、用户指令和自然语言混在一起难维护、多用户并发乱序、中断后状态丢了、启动速度慢等等。
结合上一节分层架构和钩子模式介绍,本节主要围绕:
多级命令、并发控制、断点恢复、懒加载等模式,详细拆解其核心方法和设计逻辑,简单好懂、直接能用。少走弯路,轻松搞定 Agent 开发。
业务需求:用户有时不仅输入普通的任务,还会有命令的需求,比如查看配置,改变模型等
方案:用户输入以 / 开头时,被解析为命令对象,路由到对应的命令处理器。
业务逻辑为:
实现方式:
代码定位到 nanobot/command 文件夹下。首先通过 builtin.py 文件定义预设的内建命令和对应操作,比如内置 help 命令:
async def cmd_help(ctx: CommandContext) -> OutboundMessage:
"""Return available slash commands."""
return OutboundMessage(
channel=ctx.msg.channel,
chat_id=ctx.msg.chat_id,
content=build_help_text(),
metadata={**dict(ctx.msg.metadata or {}), "render_as": "text"},
)并且添加注册函数,注册到CommandRouter中对应的属性字典中。共有四种的类型 priority,exact,prefix,intercept。
def register_builtin_commands(router: CommandRouter) -> None:
"""Register the default set of slash commands."""
router.priority("/stop", cmd_stop)
router.priority("/restart", cmd_restart)
router.priority("/status", cmd_status)
router.exact("/new", cmd_new)
router.exact("/status", cmd_status)
router.exact("/dream", cmd_dream)
router.exact("/dream-log", cmd_dream_log)
router.prefix("/dream-log ", cmd_dream_log)
router.exact("/dream-restore", cmd_dream_restore)
router.prefix("/dream-restore ", cmd_dream_restore)
router.exact("/help", cmd_help)核心设计思想:将"紧急程度"和"匹配粒度"两个维度正交分解,通过四层链式责任链实现灵活的路由策略。
Priority(优先级路由)— 无锁紧急命令
定义:精确匹配,在获取 dispatch lock 之前立即执行。
核心特征:
dispatch lock)router.priority("/stop", cmd_stop) 注册为什么需要无锁? 假设 Agent 正在执行一个耗时的 LLM 调用,可能持续 30 秒,此时用户发送 /stop。如果 /stop 需要等锁,它就必须排在当前任务后面,违背了"紧急停止"的意图。Priority 让 /stop 插队到最前面。
Exact(精确匹配)— 标准命令
定义:字符串完全相等匹配,在分发锁内部执行。
核心特征:
/new 才能命中,/new xxx 不会命中dispatch lock 保护,保证同一时间只有一个命令在执行router.exact("/new", cmd_new) 注册核心的实现逻辑是在命令路由类 CommandRouter
Prefix(前缀匹配)— 带参数的命令
定义:检查输入是否以某个前缀开头,最长前缀优先匹配。
核心特征:
ctx.argsInterceptors(拦截器)— 条件性后备处理
定义:一组谓词函数,当前面三层都没命中时依次调用,任何一个返回非 None 就视为已处理。
核心特征:
设计意图:team-mode active check — 当 team mode 激活时,普通消息应该被 team router 接管而非发给 LLM。
层级 | 匹配方式 | 锁状态 | 典型用途 | 示例 |
|---|---|---|---|---|
priority | 精确匹配 | 无锁 | 紧急操作 | /stop, /restart |
exact | 精确匹配 | 有锁 | 标准命令(无参) | /new, /help, /dream |
prefix | 最长前缀 | 有锁 | 带参数命令 | /dream-log <sha> |
interceptors | 条件谓词 | 有锁 | 模式切换/默认行为 | team mode 拦截 |
业务需求:同一用户在处理中的消息不会被同优先级的消息中断,需要会话串行。有多个任务时,需要进行限流。
实现策略:
控制层级 | 实现 | 目的 |
|---|---|---|
会话级串行 | asyncio.Lock per session | 同一用户的消息顺序处理 |
全局并发限制 | asyncio.Semaphore(默认3) | 防止同时过多请求压垮 LLM |
任务追踪 | dict[session_key, list[Task]] | 支持 /stop 取消 |
任务流程举例:
时间轴 →
UserA 发送 msg1 ──────────────────────→ [Lock A 获取] ──→ [Gate] ──→ 处理中...
UserA 发送 msg2 ──→ [Lock A 等待...] ──────────────────────────────────→ 处理完msg1后获取
UserB 发送 msg1 ──────────────────────→ [Lock B 获取] ──→ [Gate] ──→ 处理中...
UserC 发送 msg1 ──────────────────────→ [Lock C 获取] ──→ [Gate] ──→ 处理中...
UserD 发送 msg1 ──────────────────────→ [Lock D 获取] ──→ [Gate 等待...] ──→ 有空位后处理
↑ ↑ ↑
每个会话独立锁 默认最多3个并发
每个用户独立的锁实现
self._session_locks: dict[str, asyncio.Lock] = {}
# NANOBOT_MAX_CONCURRENT_REQUESTS: <=0 means unlimited; default 3.
_max = int(os.environ.get("NANOBOT_MAX_CONCURRENT_REQUESTS", "3"))
self._concurrency_gate: asyncio.Semaphore | None = (
asyncio.Semaphore(_max) if _max > 0 else None)使用方式
# _dispatch 中的并发控制
async def _dispatch(self, msg: InboundMessage) -> None:
lock = self._session_locks.setdefault(msg.session_key, asyncio.Lock())
gate = self._concurrency_gate or nullcontext()
async with lock, gate: # 同时获取两个!
# lock: 同一会话的消息排队等待
# gate: 最多 N 个请求并行(跨所有会话)
response = await self._process_message(...)业务需求:当 Agent 正在执行工具调用时如果被中断(如进程重启、超时),下次收到新消息时会恢复中断状态。
Agent 在工具调用中途崩溃 / 中断 → 保存现场 → 下次消息来时自动恢复状态 → 重新继续执行
不丢上下文、不重跑、不混乱。
具体场景:
正常流程:
User:"帮我分析这个文件"
↓
LLM → read_file("data.csv") ← 中断!进程崩溃
↓
[保存断点到 Session.metadata]
{"assistant_message":{...},"completed_tool_results":[...],"pending_tool_calls":[...]}
下次请求:
User:"继续" (或任何新消息)
↓
_restore_runtime_checkpoint(session) ← 检测到未完成的断点
↓
取出 pending_tool_calls 中的工具,并在 session 中标记为错误
恢复 assistant_message + completed_tool_results 到历史
↓
LLM 收到上下文后可以重新决策
具体实现
def _set_runtime_checkpoint(self, session: Session, payload: dict) -> None:
"""保存当前进行中的状态(由 runner.checkpoint_callback 调用)"""
session.metadata[self._RUNTIME_CHECKPOINT_KEY] = payload
self.sessions.save(session) # 持久化!
def_restore_runtime_checkpoint(self, session: Session) -> bool:
"""恢复中断的回合"""
checkpoint = session.metadata.get("runtime_checkpoint")
ifnotisinstance(checkpoint, dict):
returnFalse# 无断点,正常处理
# 1. 恢复助手消息
restored_messages.append(checkpoint["assistant_message"])
# 2. 恢复已完成的工具结果
for msg in checkpoint["completed_tool_results"]:
restored_messages.append(msg)
# 3. 在对话中,将未完成的工具标记为错误
for tc in checkpoint["pending_tool_calls"]:
restored_messages.append({
"role": "tool",
"content": "Error: Task interrupted before this tool finished.",
...
})
# 4. 去重后写入历史
session.messages.extend(restored_messages[overlap:])
self._clear_runtime_checkpoint(session) # 清除断点
returnTrue
Agent 在工具调用过程中异常中断时,会将未执行完成的工具调用(pending_tool_calls)保存到会话断点。
下次收到用户消息时,框架会自动检测断点:
pending_tool_calls,则认为上次执行失败首先介绍两种连接模式:
生产推荐混合模式:基础懒加载 + 核心服务预连接 + 空闲自动释放,兼顾成本与性能
维度 | 延迟连接(Lazy) | 预连接(Pre) |
|---|---|---|
连接时机 | 第一条消息到达时 | 服务启动 / 预热阶段 |
首次响应 | 高延迟(连接耗时) | 零延迟(直接可用) |
资源占用 | 极低,空闲为 0 | 高,常驻连接 |
服务启动 | 极快 | 慢(需要预热) |
成本 | 最低 | 较高 |
适合业务 | 低频、成本敏感 | 高频、实时敏感 |
业务需求:MCP 的连接是高成本的,如何在用到的时候再进行连接。
MCP (Model Context Protocol) 服务器连接成本高(启动子进程、握手协议),所以在第一条消息到达时才建立连接,而不是启动时就连接:
启动时刻:
AgentLoop.__init__()
→ self._mcp_connected = False
→ self._mcp_servers = config # 仅存储配置,不连接
第一条消息到达:
_process_message()
→ _connect_mcp() ← 首次调用
→ 检查: notself._mcp_connected? → 连接
→ AsyncExitStack 管理 MCP 服务器生命周期
后续消息:
→ _connect_mcp()
→ 检查: self._mcp_connected == True? → 跳过(直接返回)
关闭时:
close_mcp() → aclose() 清理所有 MCP 连接
关键连接代码
async def_connect_mcp(self) -> None:
"""延迟连接 MCP 服务器(首次消息触发)"""
# 三重保护防止重复连接
ifself._mcp_connected orself._mcp_connecting ornotself._mcp_servers:
return
self._mcp_connecting = True# 正在连接标志(防并发)
try:
self._mcp_stack = AsyncExitStack() # 自动资源管理
awaitself._mcp_stack.__aenter__()
await connect_mcp_servers(self._mcp_servers, self.tools, self._mcp_stack)
self._mcp_connected = True# 连接成功标志
except BaseException as e:
logger.error("MCP 连接失败(下次消息会重试): {}", e)
# 清理失败的连接,不设置 _connected,下次重试
finally:
self._mcp_connecting = False# 无论成败都释放连接锁
asyncdefclose_mcp(self) -> None:
"""优雅关闭:先等待后台任务完成,再关闭 MCP"""
ifself._background_tasks:
await asyncio.gather(*self._background_tasks, return_exceptions=True)
ifself._mcp_stack:
awaitself._mcp_stack.aclose() # AsyncExitStack 自动关闭所有 MCP
调用时机
# 主循环 run() 中:启动时立即尝试连接(预热)
asyncdefrun(self):
self._running = True
awaitself._connect_mcp() # 启动时连接(可选,非阻塞后续运行)
...
# process_direct() 中:直接调用也触发连接
asyncdefprocess_direct(self, ...):
awaitself._connect_mcp() # CLI 直接模式也需要
...
# _process_message() 中:通过 _run_agent_loop 间接依赖
# (runner 可能调用 MCP 工具,但连接已在入口处保证)
五种模式对比总结
模式 | 解决的问题 | 核心类/方法 | 设计原则 |
|---|---|---|---|
Hook | Agent 生命周期可扩展 | AgentHook → _LoopHook → CompositeHook | 观察者 + 组合模式 |
命令 | 用户指令与自然语言分离 | CommandRouter + CommandContext | 命令模式 + 策略模式 |
并发控制 | 多用户多消息有序可控 | asyncio.Lock + Semaphore | 信号量模式 |
断点恢复 | 中断后状态一致性 | _runtime_checkpoint + _restore_* | 快照/备忘录模式 |
懒加载 | 启动速度 + 按需连接 | _connect_mcp() + _mcp_connected | 延迟初始化模式 |