首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >智能体 | Nanobot 核心模式的业务逻辑

智能体 | Nanobot 核心模式的业务逻辑

作者头像
AI老马
发布2026-05-20 14:00:28
发布2026-05-20 14:00:28
1050
举报
文章被收录于专栏:AI前沿技术AI前沿技术

做龙虾的二次开发,是不是常被这些问题卡壳?生命周期不好扩展、用户指令和自然语言混在一起难维护、多用户并发乱序、中断后状态丢了、启动速度慢等等。

结合上一节分层架构和钩子模式介绍,本节主要围绕:

多级命令、并发控制、断点恢复、懒加载等模式,详细拆解其核心方法和设计逻辑,简单好懂、直接能用。少走弯路,轻松搞定 Agent 开发。

1,多级命令模式

1.1,业务逻辑

业务需求:用户有时不仅输入普通的任务,还会有命令的需求,比如查看配置,改变模型等

方案:用户输入以 / 开头时,被解析为命令对象,路由到对应的命令处理器。

业务逻辑为:

  • • 斜杠命令处理优于普通消息,并且不走大模型处理
  • • 高优先级命令(如 /stop)在主循环层拦截,不创建任务

实现方式:

代码定位到 nanobot/command 文件夹下。首先通过 builtin.py 文件定义预设的内建命令和对应操作,比如内置 help 命令:

代码语言:javascript
复制
async def cmd_help(ctx: CommandContext) -> OutboundMessage:
    """Return available slash commands."""
    return OutboundMessage(
        channel=ctx.msg.channel,
        chat_id=ctx.msg.chat_id,
        content=build_help_text(),
        metadata={**dict(ctx.msg.metadata or {}), "render_as": "text"},
    )

并且添加注册函数,注册到CommandRouter中对应的属性字典中。共有四种的类型 priority,exact,prefix,intercept。

代码语言:javascript
复制
def register_builtin_commands(router: CommandRouter) -> None:
    """Register the default set of slash commands."""
    router.priority("/stop", cmd_stop)
    router.priority("/restart", cmd_restart)
    router.priority("/status", cmd_status)
    router.exact("/new", cmd_new)
    router.exact("/status", cmd_status)
    router.exact("/dream", cmd_dream)
    router.exact("/dream-log", cmd_dream_log)
    router.prefix("/dream-log ", cmd_dream_log)
    router.exact("/dream-restore", cmd_dream_restore)
    router.prefix("/dream-restore ", cmd_dream_restore)
    router.exact("/help", cmd_help)

1.2,命令分级介绍

核心设计思想:将"紧急程度"和"匹配粒度"两个维度正交分解,通过四层链式责任链实现灵活的路由策略。

Priority(优先级路由)— 无锁紧急命令

定义:精确匹配,在获取 dispatch lock 之前立即执行。

核心特征:

  • • 不需要等待分发锁(dispatch lock
  • • 适用于必须即时响应的操作,不能被其他命令排队阻塞
  • • 通过 router.priority("/stop", cmd_stop) 注册

为什么需要无锁? 假设 Agent 正在执行一个耗时的 LLM 调用,可能持续 30 秒,此时用户发送 /stop。如果 /stop 需要等锁,它就必须排在当前任务后面,违背了"紧急停止"的意图。Priority 让 /stop 插队到最前面。

Exact(精确匹配)— 标准命令

定义:字符串完全相等匹配,在分发锁内部执行。

核心特征:

  • • 必须完整匹配,如输入 /new 才能命中,/new xxx 不会命中
  • • 受 dispatch lock 保护,保证同一时间只有一个命令在执行
  • • 通过 router.exact("/new", cmd_new) 注册

核心的实现逻辑是在命令路由类 CommandRouter

Prefix(前缀匹配)— 带参数的命令

定义:检查输入是否以某个前缀开头,最长前缀优先匹配。

核心特征:

  • • 自动提取前缀后面的内容作为 ctx.args
  • • 按前缀长度降序排列(最长优先),避免短前缀"抢走"长前缀的匹配
  • • 同样在分发锁内执行

Interceptors(拦截器)— 条件性后备处理

定义:一组谓词函数,当前面三层都没命中时依次调用,任何一个返回非 None 就视为已处理。

核心特征:

  • • 不是基于文本匹配,而是运行时条件判断
  • • 可以修改/拦截正常的命令流转
  • • 典型用途:模式切换、权限检查、默认行为注入

设计意图team-mode active check — 当 team mode 激活时,普通消息应该被 team router 接管而非发给 LLM。

层级

匹配方式

锁状态

典型用途

示例

priority

精确匹配

无锁

紧急操作

/stop, /restart

exact

精确匹配

有锁

标准命令(无参)

/new, /help, /dream

prefix

最长前缀

有锁

带参数命令

/dream-log <sha>

interceptors

条件谓词

有锁

模式切换/默认行为

team mode 拦截

2,并发控制

业务需求:同一用户在处理中的消息不会被同优先级的消息中断,需要会话串行。有多个任务时,需要进行限流。

实现策略:

控制层级

实现

目的

会话级串行

asyncio.Lock per session

同一用户的消息顺序处理

全局并发限制

asyncio.Semaphore(默认3)

防止同时过多请求压垮 LLM

任务追踪

dict[session_key, list[Task]]

支持 /stop 取消

任务流程举例:

代码语言:javascript
复制
时间轴 →
UserA 发送 msg1 ──────────────────────→ [Lock A 获取] ──→ [Gate] ──→ 处理中...
UserA 发送 msg2 ──→ [Lock A 等待...] ──────────────────────────────────→ 处理完msg1后获取
UserB 发送 msg1 ──────────────────────→ [Lock B 获取] ──→ [Gate] ──→ 处理中...
UserC 发送 msg1 ──────────────────────→ [Lock C 获取] ──→ [Gate] ──→ 处理中...
UserD 发送 msg1 ──────────────────────→ [Lock D 获取] ──→ [Gate 等待...] ──→ 有空位后处理
                  ↑                      ↑               ↑
             每个会话独立锁          默认最多3个并发

每个用户独立的锁实现

代码语言:javascript
复制
self._session_locks: dict[str, asyncio.Lock] = {}
# NANOBOT_MAX_CONCURRENT_REQUESTS: <=0 means unlimited; default 3.
_max = int(os.environ.get("NANOBOT_MAX_CONCURRENT_REQUESTS", "3"))
self._concurrency_gate: asyncio.Semaphore | None = (
    asyncio.Semaphore(_max) if _max > 0 else None)

使用方式

代码语言:javascript
复制
# _dispatch 中的并发控制
async def _dispatch(self, msg: InboundMessage) -> None:
    lock = self._session_locks.setdefault(msg.session_key, asyncio.Lock())
    gate = self._concurrency_gate or nullcontext()
    
    async with lock, gate:  # 同时获取两个!
        # lock: 同一会话的消息排队等待
        # gate: 最多 N 个请求并行(跨所有会话)
        response = await self._process_message(...)

3,断点恢复

业务需求:当 Agent 正在执行工具调用时如果被中断(如进程重启、超时),下次收到新消息时会恢复中断状态。

Agent 在工具调用中途崩溃 / 中断 → 保存现场 → 下次消息来时自动恢复状态 → 重新继续执行

不丢上下文、不重跑、不混乱。

具体场景:

代码语言:javascript
复制
正常流程:
  User:"帮我分析这个文件"
    ↓
  LLM → read_file("data.csv")  ← 中断!进程崩溃
    ↓
[保存断点到 Session.metadata]
    {"assistant_message":{...},"completed_tool_results":[...],"pending_tool_calls":[...]}

下次请求:
  User:"继续" (或任何新消息)
    ↓
  _restore_runtime_checkpoint(session)  ← 检测到未完成的断点
    ↓
  取出 pending_tool_calls 中的工具,并在 session 中标记为错误
  恢复 assistant_message + completed_tool_results 到历史
    ↓
  LLM 收到上下文后可以重新决策

具体实现

代码语言:javascript
复制
def _set_runtime_checkpoint(self, session: Session, payload: dict) -> None:
    """保存当前进行中的状态(由 runner.checkpoint_callback 调用)"""
    session.metadata[self._RUNTIME_CHECKPOINT_KEY] = payload
    self.sessions.save(session)  # 持久化!

def_restore_runtime_checkpoint(self, session: Session) -> bool:
    """恢复中断的回合"""
    checkpoint = session.metadata.get("runtime_checkpoint")
    ifnotisinstance(checkpoint, dict):
        returnFalse# 无断点,正常处理
    
    # 1. 恢复助手消息
    restored_messages.append(checkpoint["assistant_message"])
    
    # 2. 恢复已完成的工具结果
    for msg in checkpoint["completed_tool_results"]:
        restored_messages.append(msg)
    
    # 3. 在对话中,将未完成的工具标记为错误
    for tc in checkpoint["pending_tool_calls"]:
        restored_messages.append({
            "role": "tool",
            "content": "Error: Task interrupted before this tool finished.",
            ...
        })
    
    # 4. 去重后写入历史
    session.messages.extend(restored_messages[overlap:])
    
    self._clear_runtime_checkpoint(session)  # 清除断点
    returnTrue

Agent 在工具调用过程中异常中断时,会将未执行完成的工具调用(pending_tool_calls)保存到会话断点。

下次收到用户消息时,框架会自动检测断点:

  • • 若存在 pending_tool_calls,则认为上次执行失败
  • • 自动生成工具执行失败的消息回填到对话历史
  • • LLM 根据失败上下文重新决策(重试 / 跳过 / 报错)
  • • 实现无感知续跑

4,懒加载-MCP 连接延迟初始化

首先介绍两种连接模式:

  • • 延迟连接:首消息才建连,省资源、启动快,适合低频 / 成本敏感业务
  • • 预连接:启动就建连,低延迟、体验好,适合实时 / 高频业务

生产推荐混合模式:基础懒加载 + 核心服务预连接 + 空闲自动释放,兼顾成本与性能

维度

延迟连接(Lazy)

预连接(Pre)

连接时机

第一条消息到达时

服务启动 / 预热阶段

首次响应

高延迟(连接耗时)

零延迟(直接可用)

资源占用

极低,空闲为 0

高,常驻连接

服务启动

极快

慢(需要预热)

成本

最低

较高

适合业务

低频、成本敏感

高频、实时敏感

业务需求:MCP 的连接是高成本的,如何在用到的时候再进行连接。

MCP (Model Context Protocol) 服务器连接成本高(启动子进程、握手协议),所以在第一条消息到达时才建立连接,而不是启动时就连接:

代码语言:javascript
复制
启动时刻:
  AgentLoop.__init__()
  → self._mcp_connected = False
  → self._mcp_servers = config  # 仅存储配置,不连接

第一条消息到达:
  _process_message() 
  → _connect_mcp()  ← 首次调用
  → 检查: notself._mcp_connected? → 连接
  → AsyncExitStack 管理 MCP 服务器生命周期

后续消息:
  → _connect_mcp() 
  → 检查: self._mcp_connected == True? → 跳过(直接返回)

关闭时:
  close_mcp() → aclose() 清理所有 MCP 连接

关键连接代码

代码语言:javascript
复制
async def_connect_mcp(self) -> None:
    """延迟连接 MCP 服务器(首次消息触发)"""
    # 三重保护防止重复连接
    ifself._mcp_connected orself._mcp_connecting ornotself._mcp_servers:
        return
    
    self._mcp_connecting = True# 正在连接标志(防并发)
    try:
        self._mcp_stack = AsyncExitStack()  # 自动资源管理
        awaitself._mcp_stack.__aenter__()
        await connect_mcp_servers(self._mcp_servers, self.tools, self._mcp_stack)
        self._mcp_connected = True# 连接成功标志
    except BaseException as e:
        logger.error("MCP 连接失败(下次消息会重试): {}", e)
        # 清理失败的连接,不设置 _connected,下次重试
    finally:
        self._mcp_connecting = False# 无论成败都释放连接锁

asyncdefclose_mcp(self) -> None:
    """优雅关闭:先等待后台任务完成,再关闭 MCP"""
    ifself._background_tasks:
        await asyncio.gather(*self._background_tasks, return_exceptions=True)
    ifself._mcp_stack:
        awaitself._mcp_stack.aclose()  # AsyncExitStack 自动关闭所有 MCP

调用时机

代码语言:javascript
复制
# 主循环 run() 中:启动时立即尝试连接(预热)
asyncdefrun(self):
    self._running = True
    awaitself._connect_mcp()  # 启动时连接(可选,非阻塞后续运行)
    ...

# process_direct() 中:直接调用也触发连接
asyncdefprocess_direct(self, ...):
    awaitself._connect_mcp()  # CLI 直接模式也需要
    ...

# _process_message() 中:通过 _run_agent_loop 间接依赖
# (runner 可能调用 MCP 工具,但连接已在入口处保证)

总结:

五种模式对比总结

模式

解决的问题

核心类/方法

设计原则

Hook

Agent 生命周期可扩展

AgentHook → _LoopHook → CompositeHook

观察者 + 组合模式

命令

用户指令与自然语言分离

CommandRouter + CommandContext

命令模式 + 策略模式

并发控制

多用户多消息有序可控

asyncio.Lock + Semaphore

信号量模式

断点恢复

中断后状态一致性

_runtime_checkpoint + _restore_*

快照/备忘录模式

懒加载

启动速度 + 按需连接

_connect_mcp() + _mcp_connected

延迟初始化模式

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2026-05-17,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 AI老马啊 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1,多级命令模式
    • 1.1,业务逻辑
    • 1.2,命令分级介绍
  • 2,并发控制
  • 3,断点恢复
  • 4,懒加载-MCP 连接延迟初始化
    • 总结:
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档