前往小程序,Get更优阅读体验!
立即前往
发布
社区首页 >专栏 >AI智能体(五)

AI智能体(五)

作者头像
算法之名
发布2025-02-06 21:25:13
发布2025-02-06 21:25:13
11000
代码可运行
举报
文章被收录于专栏:算法之名
运行总次数:0
代码可运行

AI智能体(四)

MetaGPT

环境装配

metagpt下载地址:https://github.com/geekan/MetaGPT

代码语言:javascript
代码运行次数:0
复制
conda create -n metagpt python=3.9
conda activate metagpt
pip install metagpt

解压我们下载的代码,进入Meta-GPT-main主目录

代码语言:javascript
代码运行次数:0
复制
pip install -r requirements.txt

进入Meta-GPT-main/config目录,修改config2.yaml的内容为

代码语言:javascript
代码运行次数:0
复制
llm:
  api_type: "zhipuai"  # or azure / ollama / groq etc.
  model: "glm-4"  # or gpt-3.5-turbo
  api_key: "******"

测试:

在终端命令行中进入Meta-GPT-main目录中执行

代码语言:javascript
代码运行次数:0
复制
metagpt "Write a cli snake game"

单动作智能体

  • 定义动作
代码语言:javascript
代码运行次数:0
复制
import re
from metagpt.actions import Action


class SimpleWriteCode(Action):
    prompt_template: str = '''Write a python function that can {instruction} and
    provide two runnable test cases.
    Return ```python your_code_here``` with No other texts,
    your code:
    '''

    def __init__(self, name='SimpleWriteCode', context=None, llm=None):
        super().__init__(name=name, context=context, llm=llm)

    async def run(self, instruction: str):
        prompt = self.prompt_template.format(instruction=instruction)
        rsp = await self._aask(prompt)
        code_text = SimpleWriteCode.parse_code(rsp)
        return code_text

    @staticmethod
    def parse_code(rsp):
        patten = r'```python(.*)```'
        match = re.search(patten, rsp, re.DOTALL)
        code_text = match.group(1) if match else rsp
        return code_text
  • 定义角色
代码语言:javascript
代码运行次数:0
复制
from metagpt.roles import Role
from metagpt.schema import Message
from metagpt.logs import logger
from action import SimpleWriteCode

class SimpleCoder(Role):
    def __init__(self, name: str = 'Alice', profile: str = 'SimpleCoder', **kwargs):
        super().__init__(name=name, profile=profile, **kwargs)
        self.set_actions([SimpleWriteCode])

    async def _act(self) -> Message:
        logger.info(f'{self._setting}: ready to {self.rc.todo}')
        todo = self.rc.todo
        msg = self.get_memories(k=1)[0]
        code_text = await todo.run(msg.content)
        msg = Message(content=code_text, role=self.profile, cause_by=type(todo))
        return msg
  • 执行
代码语言:javascript
代码运行次数:0
复制
from metagpt.logs import logger
from role import SimpleCoder
import asyncio

async def main():
    msg = 'write a function that calculates the sum of a list'
    role = SimpleCoder()
    logger.info(msg)
    result = await role.run(msg)
    logger.info(result)

if __name__ == '__main__':

    asyncio.run(main())

运行结果

代码语言:javascript
代码运行次数:0
复制
2024-12-03 13:06:09.350 | INFO     | metagpt.const:get_metagpt_package_- Package root set to /home/ubuntu/Downloads/MetaGPT-main
2024-12-03 13:06:11.006 | INFO     | __main__:main:9 - write a function that calculates the sum of a list
2024-12-03 13:06:11.006 | INFO     | role:_act:12 - Alice(SimpleCoder): ready to SimpleWriteCode
```python

def create_sum_function():

代码语言:txt
复制
def sum_of_list(lst):
代码语言:txt
复制
    return sum(lst)
代码语言:txt
复制
return sum_of_list

Test cases

sum_function = create_sum_function()

Test case 1

assert sum_function(1, 2, 3, 4, 5) == 15

print("Test case 1 passed.")

Test case 2

assert sum_function(-1, 0, 1, 2, 3) == 5

print("Test case 2 passed.")

代码语言:txt
复制
2024-12-03 13:06:14.974 | INFO     | metagpt.utils.cost_manager:update_cost:57 - Total running cost: $0.002 | Max budget: $10.000 | Current cost: $0.002, prompt_tokens: 63, completion_tokens: 112
2024-12-03 13:06:14.974 | INFO     | __main__:main:11 - SimpleCoder: 
def create_sum_function():
    def sum_of_list(lst):
        return sum(lst)
    return sum_of_list

# Test cases
sum_function = create_sum_function()

# Test case 1
assert sum_function([1, 2, 3, 4, 5]) == 15
print("Test case 1 passed.")

# Test case 2
assert sum_function([-1, 0, 1, 2, 3]) == 5
print("Test case 2 passed.")

多动作智能体

代码语言:javascript
代码运行次数:0
复制
import asyncio
import sys
import subprocess
from metagpt.llm import LLM
from metagpt.actions import Action
from metagpt.roles import Role
from metagpt.logs import logger
from action import SimpleWriteCode
from metagpt.schema import Message


class SimpleRunCode(Action):
    def __init__(self, name: str = 'SimpleRunCode', context=None, llm: LLM=None):
        super().__init__(name=name, context=context, llm=llm)

    async def run(self, code_text: str):
        result = subprocess.run([sys.executable, '-c', code_text], capture_output=True, text=True)
        code_result = result.stdout
        logger.info(f'{code_result}')
        return code_result

class RunnableCoder(Role):
    def __init__(self, name: str = 'Alice', profile: str = 'RunnableCoder', **kwargs):
        super().__init__(name=name, profile=profile, **kwargs)
        self.set_actions([SimpleWriteCode, SimpleRunCode])
        self._set_react_mode(react_mode='by_order')

    async def _act(self) -> Message:
        logger.info(f'{self._setting}: ready to {self.rc.todo}')
        todo = self.rc.todo
        msg = self.get_memories(k=1)[0]
        result = await todo.run(msg.content)
        msg = Message(content=result, role=self.profile, cause_by=type(todo))
        self.rc.memory.add(msg)
        return msg

async def main():
    msg = 'write a function that calculates the sum of a list'
    role = RunnableCoder()
    logger.info(msg)
    result = await role.run(msg)
    logger.info(result)

if __name__ == '__main__':

    asyncio.run(main())

运行结果

代码语言:javascript
代码运行次数:0
复制
2024-12-03 16:12:15.281 | INFO     | metagpt.const:get_metagpt_package_- Package root set to /home/ubuntu/Downloads/MetaGPT-main
2024-12-03 16:12:16.926 | INFO     | __main__:main:40 - write a function that calculates the sum of a list
2024-12-03 16:12:16.926 | INFO     | __main__:_act:29 - Alice(RunnableCoder): ready to SimpleWriteCode
```python

def generate_sum_function():

代码语言:txt
复制
def sum_of_list(lst):
代码语言:txt
复制
    return sum(lst)
代码语言:txt
复制
return sum_of_list

Test cases

sum_function = generate_sum_function()

Test case 1

assert sum_function(1, 2, 3, 4, 5) == 15

Test case 2

assert sum_function(-1, 0, 1, 2, -3) == -1

The function below can be used to test the generated sum_of_list function

def test_sum_function():

代码语言:txt
复制
test_lists = [
代码语言:txt
复制
    [1, 2, 3, 4, 5],
代码语言:txt
复制
    [-1, 0, 1, 2, -3]
代码语言:txt
复制
]
代码语言:txt
复制
for lst in test_lists:
代码语言:txt
复制
    result = sum_function(lst)
代码语言:txt
复制
    print(f"The sum of {lst} is {result}")

Running the test

test_sum_function()

代码语言:txt
复制
2024-12-03 16:12:24.351 | INFO     | metagpt.utils.cost_manager:update_cost:57 - Total running cost: $0.004 | Max budget: $10.000 | Current cost: $0.004, prompt_tokens: 63, completion_tokens: 192
2024-12-03 16:12:24.352 | INFO     | __main__:_act:29 - Alice(RunnableCoder): ready to SimpleRunCode
2024-12-03 16:12:24.361 | INFO     | __main__:run:19 - The sum of [1, 2, 3, 4, 5] is 15
The sum of [-1, 0, 1, 2, -3] is -1

2024-12-03 16:12:24.361 | INFO     | __main__:main:42 - RunnableCoder: The sum of [1, 2, 3, 4, 5] is 15
The sum of [-1, 0, 1, 2, -3] is -1

编写开发需求文档

test_pm.py

代码语言:javascript
代码运行次数:0
复制
import asyncio
from metagpt.context import Context
from metagpt.roles.product_manager import ProductManager
from metagpt.logs import logger

async def main():
    msg = '写一个格斗游戏的产品需求文档'
    # 显示创建会话Context对象,Role对象会隐式的自动将它共享给自己的Action对象
    context = Context()
    role = ProductManager(context=context)
    while msg:
        msg = await role.run(msg)
        logger.info(str(msg))

if __name__ == '__main__':
    asyncio.run(main())

在终端界面进入Meta-GPT-main目录中执行

代码语言:javascript
代码运行次数:0
复制
python test_pm.py

运行结果

代码语言:javascript
代码运行次数:0
复制
2024-12-12 21:33:12.149 | INFO     | metagpt.const:get_metagpt_package_- Package root set to /home/ubuntu/Downloads/MetaGPT-main
2024-12-12 21:33:13.490 | INFO     | metagpt.roles.role:_act:403 - Alice(Product Manager): to do PrepareDocuments(PrepareDocuments)
2024-12-12 21:33:13.515 | INFO     | metagpt.utils.file_repository:save:57 - save to: /home/ubuntu/Downloads/MetaGPT-main/workspace/20241212213313/docs/requirement.txt
2024-12-12 21:33:13.515 | INFO     | metagpt.roles.role:_act:403 - Alice(Product Manager): to do WritePRD(WritePRD)
2024-12-12 21:33:13.515 | INFO     | metagpt.actions.write_prd:run:86 - New requirement detected: 写一个格斗游戏的产品需求文档
[CONTENT]
{
    "Language": "zh_cn",
    "Programming Language": "C#",
    "Original Requirements": "编写一个格斗游戏的产品需求文档",
    "Project Name": "fighting_game",
    "Product Goals": [
        "提供沉浸式的用户体验",
        "确保游戏可访问性,具有良好的响应性",
        "设计精美的用户界面"
    ],
    "User Stories": [
        "作为玩家,我希望能选择不同的难度级别",
        "作为玩家,我希望能在一局游戏结束后查看我的得分",
        "作为玩家,当我失败时,我希望能有重新开始按钮",
        "作为玩家,我希望能看到让我感觉良好的精美界面",
        "作为玩家,我希望能通过手机玩游戏"
    ],
    "Competitive Analysis": [
        "格斗游戏A:界面简单,缺乏响应性功能",
        "格斗游戏B:界面美观且响应迅速,但得分系统不透明",
        "格斗游戏C:界面响应迅速,得分系统清晰,但广告过多"
    ],
    "Competitive Quadrant Chart": "quadrantChart\n    title \"游戏的覆盖范围和参与度\"\n    x-axis \"低覆盖\" --> \"高覆盖\"\n    y-axis \"低参与\" --> \"高参与\"\n    quadrant-1 \"需要拓展\"\n    quadrant-2 \"需要推广\"\n    quadrant-3 \"需要评估\"\n    quadrant-4 \"可以改进\"\n    \"游戏A\": [0.2, 0.4]\n    \"游戏B\": [0.6, 0.8]\n    \"游戏C\": [0.3, 0.6]\n    \"我们的目标产品\": [0.5, 0.7]",
    "Requirement Analysis": "对用户需求进行分析,确保游戏设计满足市场趋势和用户期望",
    "Requirement Pool": [
        [
            "P0",
            "游戏核心玩法实现"
        ],
        [
            "P1",
            "用户界面设计与实现"
        ],
        [
            "P0",
            "游戏难度级别的设定"
        ],
        [
            "P2",
            "得分和排行榜系统"
        ],
        [
            "P1",
            "移动端适配"
        ]
    ],
    "UI Design draft": "包含游戏菜单、角色选择、战斗界面等基础功能,风格简洁,布局合理。",
    "Anything UNCLEAR": "暂无,所有需求均已明确。"
}
[/CONTENT]
2024-12-12 21:33:31.337 | INFO     | metagpt.utils.cost_manager:update_cost:57 - Total running cost: $0.021 | Max budget: $10.000 | Current cost: $0.021, prompt_tokens: 944, completion_tokens: 521
2024-12-12 21:33:31.342 | INFO     | metagpt.utils.git_repository:rename_root:219 - Rename directory /home/ubuntu/Downloads/MetaGPT-main/workspace/20241212213313 to /home/ubuntu/Downloads/MetaGPT-main/workspace/fighting_game
2024-12-12 21:33:31.343 | INFO     | metagpt.utils.file_repository:save:57 - save to: /home/ubuntu/Downloads/MetaGPT-main/workspace/fighting_game/docs/prd/20241212213331.json
no mermaid
2024-12-12 21:33:31.344 | WARNING  | metagpt.utils.mermaid:mermaid_to_file:35 - RUN `npm install -g @mermaid-js/mermaid-cli` to install mmdc,or consider changing engine to `playwright`, `pyppeteer`, or `ink`.
2024-12-12 21:33:31.344 | INFO     | metagpt.utils.file_repository:save:57 - save to: /home/ubuntu/Downloads/MetaGPT-main/workspace/fighting_game/resources/prd/20241212213331.md
2024-12-12 21:33:31.345 | INFO     | __main__:main:12 - Alice(Product Manager): {'docs': {'20241212213331.json': {'root_path': 'docs/prd', 'filename': '20241212213331.json', 'content': '{"Language":"zh_cn","Programming Language":"C#","Original Requirements":"编写一个格斗游戏的产品需求文档","Project Name":"fighting_game","Product Goals":["提供沉浸式的用户体验","确保游戏可访问性,具有良好的响应性","设计精美的用户界面"],"User Stories":["作为玩家,我希望能选择不同的难度级别","作为玩家,我希望能在一局游戏结束后查看我的得分","作为玩家,当我失败时,我希望能有重新开始按钮","作为玩家,我希望能看到让我感觉良好的精美界面","作为玩家,我希望能通过手机玩游戏"],"Competitive Analysis":["格斗游戏A:界面简单,缺乏响应性功能","格斗游戏B:界面美观且响应迅速,但得分系统不透明","格斗游戏C:界面响应迅速,得分系统清晰,但广告过多"],"Competitive Quadrant Chart":"quadrantChart\\n    title \\"游戏的覆盖范围和参与度\\"\\n    x-axis \\"低覆盖\\" --> \\"高覆盖\\"\\n    y-axis \\"低参与\\" --> \\"高参与\\"\\n    quadrant-1 \\"需要拓展\\"\\n    quadrant-2 \\"需要推广\\"\\n    quadrant-3 \\"需要评估\\"\\n    quadrant-4 \\"可以改进\\"\\n    \\"游戏A\\": [0.2, 0.4]\\n    \\"游戏B\\": [0.6, 0.8]\\n    \\"游戏C\\": [0.3, 0.6]\\n    \\"我们的目标产品\\": [0.5, 0.7]","Requirement Analysis":"对用户需求进行分析,确保游戏设计满足市场趋势和用户期望","Requirement Pool":[["P0","游戏核心玩法实现"],["P1","用户界面设计与实现"],["P0","游戏难度级别的设定"],["P2","得分和排行榜系统"],["P1","移动端适配"]],"UI Design draft":"包含游戏菜单、角色选择、战斗界面等基础功能,风格简洁,布局合理。","Anything UNCLEAR":"暂无,所有需求均已明确。"}'}}}
2024-12-12 21:33:31.345 | INFO     | __main__:main:12 - None

现在我们来看一下ProductManager这个角色类

代码语言:javascript
代码运行次数:0
复制
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@Time    : 2023/5/11 14:43
@Author  : alexanderwu
@File    : product_manager.py
@Modified By: mashenquan, 2023/11/27. Add `PrepareDocuments` action according to Section 2.2.3.5.1 of RFC 135.
"""

# 导入必要的模块和类
from metagpt.actions import UserRequirement, WritePRD
from metagpt.actions.prepare_documents import PrepareDocuments
from metagpt.roles.role import Role, RoleReactMode
from metagpt.utils.common import any_to_name


class ProductManager(Role):
    """
    Represents a Product Manager role responsible for product development and management.

    Attributes:
        name (str): Name of the product manager.
        profile (str): Role profile, default is 'Product Manager'.
        goal (str): Goal of the product manager.
        constraints (str): Constraints or limitations for the product manager.
        todo_action (str): The next action planned by the product manager.
    """

    # 类属性定义了产品管理者的默认名称、角色描述、目标和约束条件
    name: str = "Alice"
    profile: str = "Product Manager"
    goal: str = "efficiently create a successful product that meets market demands and user expectations"
    constraints: str = "utilize the same language as the user requirements for seamless communication"
    todo_action: str = ""  # 初始化为空字符串,表示当前没有待办动作

    def __init__(self, **kwargs) -> None:
        """
        Initializes a ProductManager instance with specific actions and monitoring.

        Args:
            kwargs: Additional keyword arguments passed to the parent class constructor.
        """
        super().__init__(**kwargs)  # 调用父类构造函数初始化基础属性

        # 设置产品管理者可以执行的动作列表——准备文档,书写项目需求文档
        self.set_actions([PrepareDocuments, WritePRD])

        # 监控特定的动作类型,以便在这些动作发生时做出反应
        self._watch([UserRequirement, PrepareDocuments])

        # 设置角色的反应模式为按顺序反应(BY_ORDER)
        self.rc.react_mode = RoleReactMode.BY_ORDER

        # 将下一步计划的动作设置为写产品需求文档(WritePRD),通过 any_to_name 函数将类名转换为字符串
        self.todo_action = any_to_name(WritePRD)

    async def _observe(self, ignore_memory=False) -> int:
        """
        Overrides the observe method from the parent class to customize observation behavior.

        Args:
            ignore_memory (bool): A flag indicating whether to ignore memory during observation.

        Returns:
            int: An integer result from the observation process.
        """
        # 调用父类的方法并忽略内存,确保观察过程中不考虑之前的记忆
        return await super()._observe(ignore_memory=True)

现在我们来看一下PrepareDocuments这个动作类

代码语言:javascript
代码运行次数:0
复制
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@Time    : 2023/11/20
@Author  : mashenquan
@File    : prepare_documents.py
@Desc: PrepareDocuments Action: initialize project folder and add new requirements to docs/requirements.txt.
        RFC 135 2.2.3.5.1.
"""

import shutil
from pathlib import Path
from typing import Optional

from metagpt.actions import Action, ActionOutput
from metagpt.const import REQUIREMENT_FILENAME
from metagpt.utils.file_repository import FileRepository
from metagpt.utils.git_repository import GitRepository
from metagpt.utils.project_repo import ProjectRepo


class PrepareDocuments(Action):
    """PrepareDocuments Action: 初始化项目文件夹并将新需求添加到 docs/requirements.txt 中。

    Attributes:
        name (str): 动作名称。
        i_context (Optional[str]): 内部上下文信息(如果需要)。
    """

    name: str = "PrepareDocuments"
    i_context: Optional[str] = None

    @property
    def config(self):
        """获取当前动作的配置对象。

        Returns:
            Config: 配置对象。
        """
        return self.context.config

    def _init_repo(self):
        """初始化 Git 环境和项目文件夹。

        这个方法会创建一个新的项目文件夹,并根据配置初始化一个 Git 仓库。
        如果项目路径已存在且不是增量更新,则删除现有文件夹并重新创建。
        """
        if not self.config.project_path:
            # 如果没有指定项目路径,则使用默认名称创建一个新的文件夹
            name = self.config.project_name or FileRepository.new_filename()
            path = Path(self.config.workspace.path) / name
        else:
            # 使用指定的项目路径
            path = Path(self.config.project_path)

        if path.exists() and not self.config.inc:
            # 如果文件夹已经存在且不是增量更新,则删除旧文件夹
            shutil.rmtree(path)

        # 设置新的项目路径
        self.config.project_path = path

        # 初始化 Git 仓库
        self.context.git_repo = GitRepository(local_path=path, auto_init=True)
        self.context.repo = ProjectRepo(self.context.git_repo)

    async def run(self, with_messages, **kwargs):
        """执行 PrepareDocuments 动作,创建和初始化工作空间文件夹以及 Git 环境。

        Args:
            with_messages: 包含要处理的消息列表。
            **kwargs: 其他关键字参数。

        Returns:
            ActionOutput: 包含文档内容和指令内容的动作输出对象。
        """
        self._init_repo()

        # 将主参数 idea 中的新需求写入 `docs/requirements.txt`
        doc = await self.repo.docs.save(filename=REQUIREMENT_FILENAME, content=with_messages[0].content)

        # 向 WritePRD 动作发送消息通知,指示它使用 `docs/requirements.txt` 和 `docs/prd/` 来处理需求
        return ActionOutput(content=doc.content, instruct_content=doc)

现在我们来看一下WritePRD这个动作类

代码语言:javascript
代码运行次数:0
复制
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@Time    : 2023/5/11 17:45
@Author  : alexanderwu
@File    : write_prd.py
@Modified By: mashenquan, 2023/11/27.
            1. According to Section 2.2.3.1 of RFC 135, replace file data in the message with the file name.
            2. According to the design in Section 2.2.3.5.2 of RFC 135, add incremental iteration functionality.
            3. Move the document storage operations related to WritePRD from the save operation of WriteDesign.
@Modified By: mashenquan, 2023/12/5. Move the generation logic of the project name to WritePRD.
"""

# 导入 Python 未来的特性,以确保代码向前兼容。
from __future__ import annotations

# 导入必要的标准库模块。
import json
from pathlib import Path

# 导入 metagpt 库中的相关类和常量。
from metagpt.actions import Action, ActionOutput
from metagpt.actions.action_node import ActionNode
from metagpt.actions.fix_bug import FixBug
from metagpt.actions.write_prd_an import (
    COMPETITIVE_QUADRANT_CHART,
    PROJECT_NAME,
    REFINED_PRD_NODE,
    WP_IS_RELATIVE_NODE,
    WP_ISSUE_TYPE_NODE,
    WRITE_PRD_NODE,
)
from metagpt.const import (
    BUGFIX_FILENAME,
    COMPETITIVE_ANALYSIS_FILE_REPO,
    REQUIREMENT_FILENAME,
)
from metagpt.logs import logger
from metagpt.schema import BugFixContext, Document, Documents, Message
from metagpt.utils.common import CodeParser
from metagpt.utils.file_repository import FileRepository
from metagpt.utils.mermaid import mermaid_to_file


# 定义上下文模板,用于生成 PRD 文档的开头部分。
CONTEXT_TEMPLATE = """
### Project Name
{project_name}

### Original Requirements
{requirements}

### Search Information
-
"""

# 定义新需求模板,用于更新已有 PRD 文档时合并旧内容和新需求。
NEW_REQ_TEMPLATE = """
### Legacy Content
{old_prd}

### New Requirements
{requirements}
"""


class WritePRD(Action):
    """WritePRD 处理以下几种情况:
    1. Bugfix: 如果需求是修复错误,则生成 bugfix 文档。
    2. 新需求: 如果需求是新的,则生成 PRD 文档。
    3. 需求更新: 如果需求是对现有需求的更新,则更新 PRD 文档。
    """

    async def run(self, with_messages, *args, **kwargs) -> ActionOutput | Message:
        """运行 WritePRD 动作的主要逻辑。

        Args:
            with_messages: 包含要处理的消息列表。
            *args: 可变参数列表。
            **kwargs: 关键字参数字典。

        Returns:
            ActionOutput 或 Message 对象,具体取决于处理的结果。
        """
        req: Document = await self.repo.requirement  # 获取当前的需求文档。
        docs: list[Document] = await self.repo.docs.prd.get_all()  # 获取所有已有的 PRD 文档。
        if not req:
            raise FileNotFoundError("未找到需求文档。")  # 如果没有找到需求文档,则抛出文件未找到异常。

        # 检查是否为 bugfix,并处理。
        if await self._is_bugfix(req.content):  # 判断需求内容是否为 bugfix。
            logger.info(f"检测到 bugfix: {req.content}")  # 记录日志信息。
            return await self._handle_bugfix(req)  # 调用处理 bugfix 的方法并返回结果。

        # 移除上一轮的 bugfix 文件以防冲突。
        await self.repo.docs.delete(filename=BUGFIX_FILENAME)

        # 根据需求是否与现有文档相关来决定是更新还是创建新文档。
        if related_docs := await self.get_related_docs(req, docs):  # 获取与当前需求相关的 PRD 文档。
            logger.info(f"检测到需求更新: {req.content}")  # 记录日志信息。
            return await self._handle_requirement_update(req, related_docs)  # 调用处理需求更新的方法并返回结果。
        else:
            logger.info(f"检测到新需求: {req.content}")  # 记录日志信息。
            return await self._handle_new_requirement(req)  # 调用处理新需求的方法并返回结果。

    async def _handle_bugfix(self, req: Document) -> Message:
        """处理 bugfix 需求,生成并保存 bugfix 文档。

        Args:
            req (Document): 需求文档对象。

        Returns:
            Message: 包含 bugfix 上下文信息的消息对象。
        """
        await self.repo.docs.save(filename=BUGFIX_FILENAME, content=req.content)  # 保存 bugfix 内容到指定文件。
        await self.repo.docs.save(filename=REQUIREMENT_FILENAME, content="")  # 清空需求文件的内容。
        bug_fix = BugFixContext(filename=BUGFIX_FILENAME)  # 创建 BugFixContext 实例。
        return Message(
            content=bug_fix.model_dump_json(),  # 将 bugfix 上下文序列化为 JSON 字符串。
            instruct_content=bug_fix,  # 设置指令内容。
            role="",  # 角色为空字符串。
            cause_by=FixBug,  # 指定引起此动作的原因是 FixBug。
            sent_from=self,  # 发送方是当前实例。
            send_to="Alex",  # 接收方是工程师的名字 "Alex"。
        )

    async def _handle_new_requirement(self, req: Document) -> ActionOutput:
        """处理新需求,生成并保存新的 PRD 文档。

        Args:
            req (Document): 需求文档对象。

        Returns:
            ActionOutput: 包含新生成的 PRD 文档的动作输出对象。
        """
        project_name = self.project_name  # 获取项目名称。
        context = CONTEXT_TEMPLATE.format(requirements=req.content, project_name=project_name)  # 使用模板格式化需求内容和项目名称。
        exclude = [PROJECT_NAME.key] if project_name else []  # 如果有项目名称,则排除该项目名称字段。
        node = await WRITE_PRD_NODE.fill(context=context, llm=self.llm, exclude=exclude)  # 填充 PRD 节点。
        await self._rename_workspace(node)  # 重命名工作区。
        new_prd_doc = await self.repo.docs.prd.save(  # 保存新的 PRD 文档。
            filename=FileRepository.new_filename() + ".json",  # 生成一个新的文件名。
            content=node.instruct_content.model_dump_json()  # 将节点内容序列化为 JSON 字符串。
        )
        await self._save_competitive_analysis(new_prd_doc)  # 保存竞争分析图表。
        await self.repo.resources.prd.save_pdf(doc=new_prd_doc)  # 保存 PRD 文档为 PDF。
        return Documents.from_iterable(documents=[new_prd_doc]).to_action_output()  # 返回包含新 PRD 文档的动作输出。

    async def _handle_requirement_update(self, req: Document, related_docs: list[Document]) -> ActionOutput:
        """处理需求更新,更新相关的 PRD 文档。

        Args:
            req (Document): 当前需求文档对象。
            related_docs (list[Document]): 相关的 PRD 文档列表。

        Returns:
            ActionOutput: 包含更新后的 PRD 文档的动作输出对象。
        """
        for doc in related_docs:
            await self._update_prd(req, doc)  # 更新每个相关的 PRD 文档。
        return Documents.from_iterable(documents=related_docs).to_action_output()  # 返回包含更新后 PRD 文档的动作输出。

    async def _is_bugfix(self, context: str) -> bool:
        """判断给定的需求内容是否为 bugfix。

        Args:
            context (str): 需求内容。

        Returns:
            bool: 如果是 bugfix 返回 True,否则返回 False。
        """
        if not self.repo.code_files_exists():  # 如果没有代码文件存在,则不可能是 bugfix。
            return False
        node = await WP_ISSUE_TYPE_NODE.fill(context, self.llm)  # 使用 LLM 判断需求类型。
        return node.get("issue_type") == "BUG"  # 如果问题是 bug,则返回 True。

    async def get_related_docs(self, req: Document, docs: list[Document]) -> list[Document]:
        """获取与当前需求相关的 PRD 文档。

        Args:
            req (Document): 当前需求文档对象。
            docs (list[Document]): 所有 PRD 文档列表。

        Returns:
            list[Document]: 与当前需求相关的 PRD 文档列表。
        """
        return [doc for doc in docs if await self._is_related(req, doc)]  # 过滤出与当前需求相关的 PRD 文档。

    async def _is_related(self, req: Document, old_prd: Document) -> bool:
        """判断两个文档是否相关。

        Args:
            req (Document): 当前需求文档对象。
            old_prd (Document): 现有的 PRD 文档对象。

        Returns:
            bool: 如果相关返回 True,否则返回 False。
        """
        context = NEW_REQ_TEMPLATE.format(old_prd=old_prd.content, requirements=req.content)  # 使用模板格式化旧内容和新需求。
        node = await WP_IS_RELATIVE_NODE.fill(context, self.llm)  # 使用 LLM 判断两个文档是否相关。
        return node.get("is_relative") == "YES"  # 如果文档相关则返回 True。

    async def _merge(self, req: Document, related_doc: Document) -> Document:
        """合并新需求和已有 PRD 文档的内容。

        Args:
            req (Document): 当前需求文档对象。
            related_doc (Document): 现有的 PRD 文档对象。

        Returns:
            Document: 合并后的新 PRD 文档对象。
        """
        if not self.project_name:
            self.project_name = Path(self.project_path).name  # 如果没有项目名称,则使用项目路径的名称作为项目名称。
        prompt = NEW_REQ_TEMPLATE.format(requirements=req.content, old_prd=related_doc.content)  # 使用模板格式化旧内容和新需求。
        node = await REFINED_PRD_NODE.fill(context=prompt, llm=self.llm, schema=self.prompt_schema)  # 使用 LLM 细化 PRD 节点。
        related_doc.content = node.instruct_content.model_dump_json()  # 更新 PRD 文档的内容。
        await self._rename_workspace(node)  # 重命名工作区。
        return related_doc  # 返回合并后的新 PRD 文档对象。

    async def _update_prd(self, req: Document, prd_doc: Document) -> Document:
        """更新 PRD 文档。

        Args:
            req (Document): 当前需求文档对象。
            prd_doc (Document): 现有的 PRD 文档对象。

        Returns:
            Document: 更新后的 PRD 文档对象。
        """
        new_prd_doc: Document = await self._merge(req, prd_doc)  # 合并新需求和已有 PRD 文档。
        await self.repo.docs.prd.save_doc(doc=new_prd_doc)  # 保存更新后的 PRD 文档。
        await self._save_competitive_analysis(new_prd_doc)  # 保存竞争分析图表。
        await self.repo.resources.prd.save_pdf(doc=new_prd_doc)  # 保存 PRD 文档为 PDF。
        return new_prd_doc  # 返回更新后的 PRD 文档对象。

    async def _save_competitive_analysis(self, prd_doc: Document):
        """保存竞争分析图表。

        Args:
            prd_doc (Document): PRD 文档对象。
        """
        m = json.loads(prd_doc.content)  # 解析 PRD 文档内容为 JSON。
        quadrant_chart = m.get(COMPETITIVE_QUADRANT_CHART.key)  # 获取竞争分析图表数据。
        if not quadrant_chart:
            return  # 如果没有图表数据,则直接返回。
        pathname = self.repo.workdir / COMPETITIVE_ANALYSIS_FILE_REPO / Path(prd_doc.filename).stem  # 构建图表文件路径。
        pathname.parent.mkdir(parents=True, exist_ok=True)  # 创建图表文件夹(如果不存在)。
        await mermaid_to_file(self.config.mermaid.engine, quadrant_chart, pathname)  # 将图表数据保存为文件。

    async def _rename_workspace(self, prd):
        """重命名工作区。

        Args:
            prd (ActionOutput | ActionNode): PRD 文档或节点对象。
        """
        if not self.project_name:  # 如果还没有设置项目名称。
            if isinstance(prd, (ActionOutput, ActionNode)):  # 如果传入的是 ActionOutput 或 ActionNode。
                ws_name = prd.instruct_content.model_dump().get("Project Name")  # 从模型中获取项目名称。
            else:
                ws_name = CodeParser.parse_str(block="Project Name", text=prd)  # 从文本中解析项目名称。
            if ws_name:
                self.project_name = ws_name  # 设置项目名称。
        self.repo.git_repo.rename_root(self.project_name)  # 重命名 Git 工作区的根目录为项目名称。

我们来看一下角色基类Role的_watch方法

代码语言:javascript
代码运行次数:0
复制
def _watch(self, actions: Iterable[Type[Action]] | Iterable[Action]):
    """
    监视感兴趣的动作。角色将在 _observe 期间从其个人消息缓冲区中选择由这些动作引发的消息。

    Args:
        actions (Iterable[Type[Action]] | Iterable[Action]): 
            可以是动作类型的可迭代对象(例如类),也可以是动作实例的可迭代对象。
            这些动作代表了角色希望监听的行为。
    """
    # 将传入的动作转换为字符串表示,并存储在角色控制器(rc)的 watch 属性中。
    # 使用集合来确保每个动作的唯一性,并且方便后续快速查找。
    self.rc.watch = {any_to_str(t) for t in actions}

_observe方法

代码语言:javascript
代码运行次数:0
复制
async def _observe(self, ignore_memory=False) -> int:
    """
    准备新消息以供处理,从消息缓冲区和其他来源读取消息。
    
    Args:
        ignore_memory (bool): 是否忽略内存中的旧消息,默认为False
    
    Returns:
        int: 返回新观察到的消息数量
    """
    # 准备要处理的新消息
    news = []
    
    # 如果系统已经恢复并且有最新的已观察消息,则从内存中查找新的消息
    if self.recovered and self.latest_observed_msg:
        news = self.rc.memory.find_news(observed=[self.latest_observed_msg], k=10)
    
    # 如果没有找到新的消息,则从消息缓冲区中弹出所有消息
    if not news:
        news = self.rc.msg_buffer.pop_all()
    
    # 如果不忽略内存,则将读取的消息存储在自己的内存中,以防止重复处理
    old_messages = [] if ignore_memory else self.rc.memory.get()
    self.rc.memory.add_batch(news)  # 将新消息批量添加到内存
    
    # 过滤出感兴趣的特定类型的消息
    self.rc.news = [
        n for n in news 
        if (n.cause_by in self.rc.watch or self.name in n.send_to) and n not in old_messages
    ]
    
    # 记录最新的已观察消息,如果有新消息的话
    self.latest_observed_msg = self.rc.news[-1] if self.rc.news else None  

    # 设计规则:
    # 如果需要进一步对 Message 对象进行分类,可以使用 Message.set_meta 函数。
    # msg_buffer 是接收缓冲区,避免在此处添加消息数据和操作。
    
    # 日志记录:如果存在新的消息,则打印简短的预览信息
    news_text = [f"{i.role}: {i.content[:20]}..." for i in self.rc.news]
    if news_text:
        logger.debug(f"{self._setting} observed: {news_text}")
    
    # 返回新观察到的消息数量
    return len(self.rc.news)

run方法

代码语言:javascript
代码运行次数:0
复制
@role_raise_decorator
async def run(self, with_message=None) -> Message | None:
    """Observe, and think and act based on the results of the observation"""
    
    # 检查是否提供了消息参数。
    if with_message:
        msg = None
        
        # 如果传入的消息是字符串,则创建一个新的 Message 对象,并设置其内容为该字符串。
        if isinstance(with_message, str):
            msg = Message(content=with_message)
        
        # 如果传入的消息已经是 Message 类型,则直接使用它。
        elif isinstance(with_message, Message):
            msg = with_message
        
        # 如果传入的消息是一个列表,则将列表中的所有元素连接成一个以换行符分隔的字符串,并创建一个新的 Message 对象。
        elif isinstance(with_message, list):
            msg = Message(content="\n".join(with_message))
        
        # 如果消息对象的 cause_by 属性未设置,则默认设置为 UserRequirement,表示此消息是由用户需求引发的。
        if not msg.cause_by:
            msg.cause_by = UserRequirement
        
        # 将构建好的消息对象放入内部的消息队列或容器中,以便后续处理。
        self.put_message(msg)

    # 调用 _observe 方法检查是否有新的信息需要处理。如果没有任何新信息,则执行以下代码块。
    if not await self._observe():
        # 记录一条调试级别的日志信息,表明当前没有新的信息,并且系统将进入等待状态。
        logger.debug(f"{self._setting}: no news. waiting.")
        return  # 立即返回,不执行后续代码。

    # 根据之前收集的信息做出反应,并返回一个响应消息 rsp。
    rsp = await self.react()

    # 重置下一个待执行的动作,通过将 todo 设置为 None 来取消任何预定的操作,为下一轮处理做好准备。
    self.set_todo(None)

    # 将响应消息 rsp 发布给环境对象,由环境对象负责将消息传递给所有订阅者。
    self.publish_message(rsp)

    # 返回响应消息 rsp 给调用者。
    return rsp

react方法

代码语言:javascript
代码运行次数:0
复制
async def react(self) -> Message:
    """
    根据观察到的消息选择三种策略之一进行反应。
    
    该方法根据 Role 对象配置中的 `react_mode` 属性来决定采取哪种策略:
        - 如果模式是 `REACT`思考执行模式 或 `BY_ORDER`顺序执行模式,则直接对消息做出反应。
        - 如果模式是 `PLAN_AND_ACT`,则先计划再行动。
        - 如果是其他未支持的模式,则抛出异常。
    最后,重置状态和待办事项,返回响应消息。
    """

    # 根据 RoleConfig (rc) 中定义的 react_mode 来确定使用哪种反应模式。
    if self.rc.react_mode == RoleReactMode.REACT or self.rc.react_mode == RoleReactMode.BY_ORDER:
        # 如果反应模式是 REACT 或 BY_ORDER,则调用 _react 方法直接处理消息。
        rsp = await self._react()
    elif self.rc.react_mode == RoleReactMode.PLAN_AND_ACT:
        # 如果反应模式是 PLAN_AND_ACT,则先规划然后再执行,调用 _plan_and_act 方法。
        rsp = await self._plan_and_act()
    else:
        # 如果反应模式既不是上述两种之一,则抛出一个 ValueError 异常,提示不支持的反应模式。
        raise ValueError(f"Unsupported react mode: {self.rc.react_mode}")

    # 当前的反应已经完成,将状态设置为 -1,并且重置 todo(下一个要做的动作)回 None。
    # 这是为了确保每次处理完一个消息后,对象的状态都能回到初始状态,准备下一轮的处理。
    self._set_state(state=-1)

    # 返回生成的响应消息给调用者。
    return rsp

_react方法

代码语言:javascript
代码运行次数:0
复制
async def _react(self) -> Message:
    """
    思考然后行动,直到角色认为是时候停止并且不再需要进一步的任务。
    这是 ReAct 论文中描述的标准思考-行动循环,它在任务解决过程中交替进行思考和行动,即 _think -> _act -> _think -> _act -> ...
    使用 llm(大型语言模型)在 _think 阶段动态选择动作。
    
    Returns:
        Message: 最后一次行动产生的响应消息。
    """

    actions_taken = 0  # 初始化已执行的动作计数器为 0。
    
    # 创建一个初始的 Message 对象,内容表示尚未采取任何行动。这个对象会在第一次行动后被覆盖。
    rsp = Message(content="No actions taken yet", cause_by=Action)

    # 开始一个循环,该循环会交替执行思考和行动,直到达到最大循环次数或没有更多待办事项为止。
    while actions_taken < self.rc.max_react_loop:
        # 思考阶段:决定下一步要做什么。(选择动作)
        todo = await self._think()  # 调用 _think 方法来确定下一个待办事项。

        # 如果没有更多的待办事项,则退出循环。
        if not todo:
            break

        # 行动阶段:执行之前决定的行动。
        logger.debug(f"{self._setting}: {self.rc.state=}, will do {self.rc.todo}")  # 记录当前状态和即将执行的行动。
        rsp = await self._act()  # 调用 _act 方法执行动作,并更新响应消息。

        actions_taken += 1  # 增加已执行动作的计数。

    # 返回最后一次行动产生的响应消息。
    return rsp

_think方法

代码语言:javascript
代码运行次数:0
复制
async def _think(self) -> bool:
    """
    考虑接下来要做什么,并决定下一步的行动。如果没有任何可以执行的动作,则返回 False。
    
    Returns:
        bool: 如果有确定的下一个动作则返回 True,否则返回 False。
    """

    # 如果只有一个可用的动作,则只能执行这个动作。
    if len(self.actions) == 1:
        self._set_state(0)  # 设置状态为 0,表示将要执行第一个动作。
        return True

    # 如果系统处于恢复状态并且当前状态是非负数,则设置状态以从恢复状态开始执行动作。
    if self.recovered and self.rc.state >= 0:
        self._set_state(self.rc.state)  # 设置状态为恢复时的状态。
        self.recovered = False  # 确保不会因为 max_react_loop 的限制而无法工作。
        return True

    # 如果反应模式是 BY_ORDER(按顺序),则根据动作列表调整最大循环次数,并递增状态值。
    if self.rc.react_mode == RoleReactMode.BY_ORDER:
        if self.rc.max_react_loop != len(self.actions):
            self.rc.max_react_loop = len(self.actions)  # 更新最大循环次数为动作数量。
        self._set_state(self.rc.state + 1)  # 将状态递增到下一个动作。
        return self.rc.state >= 0 and self.rc.state < len(self.actions)  # 检查新状态是否在有效范围内。

    # 如果不止一个动作可选,构建提示信息,用于指导 LLM(大型语言模型)选择下一个动作。
    prompt = self._get_prefix()  # 获取前缀信息,可能包含上下文或历史记录。
    prompt += STATE_TEMPLATE.format(
        history=self.rc.history,  # 提供历史交互记录。
        states="\n".join(self.states),  # 列出所有可能的状态。
        n_states=len(self.states) - 1,  # 状态总数减一。
        previous_state=self.rc.state,  # 当前状态。
    )

    # 使用 LLM 根据构建的提示来异步询问下一个状态。
    next_state = await self.llm.aask(prompt)
    next_state = extract_state_value_from_output(next_state)  # 从 LLM 的输出中提取状态值。
    logger.debug(f"{prompt=}")  # 记录调试信息,显示使用的提示内容。

    # 验证 LLM 返回的状态值是否有效。
    if (not next_state.isdigit() and next_state != "-1") or int(next_state) not in range(-1, len(self.states)):
        # 如果状态值无效(不是数字且不等于 -1 或超出范围),则记录警告并设置状态为 -1。
        logger.warning(f"Invalid answer of state, {next_state=}, will be set to -1")
        next_state = -1
    else:
        next_state = int(next_state)  # 将状态值转换为整数。
        if next_state == -1:
            logger.info(f"End actions with {next_state=}")  # 如果状态值为 -1,表示结束动作。

    # 设置角色的状态为计算得出的下一个状态。
    self._set_state(next_state)

    return True  # 表示已经决定了下一个动作。

_act方法

代码语言:javascript
代码运行次数:0
复制
async def _act(self) -> Message:
    """
    执行当前待办事项(todo),并根据执行结果生成一个消息对象。
    
    Returns:
        Message: 包含执行结果的消息对象。
    """

    # 记录日志信息,显示当前设置和即将执行的待办事项及其名称。
    logger.info(f"{self._setting}: to do {self.rc.todo}({self.rc.todo.name})")

    # 调用当前待办事项的 run 方法,并传入历史记录作为参数。该方法是异步的,因此使用 await 等待其完成。
    response = await self.rc.todo.run(self.rc.history)

    # 根据待办事项的返回值创建一个 Message 对象。
    if isinstance(response, (ActionOutput, ActionNode)):
        # 如果返回的是 ActionOutput 或 ActionNode 类型,则提取其中的内容和其他属性来构建 Message。
        msg = Message(
            content=response.content,
            instruct_content=response.instruct_content,
            role=self._setting,
            cause_by=self.rc.todo,
            sent_from=self,
        )
    elif isinstance(response, Message):
        # 如果返回的是 Message 类型,则直接使用它。
        msg = response
    else:
        # 如果返回的是其他类型,则尝试将其转换为字符串,并创建一个新的 Message 对象。
        msg = Message(content=response or "", role=self.profile, cause_by=self.rc.todo, sent_from=self)

    # 将生成的消息添加到内存中,以便后续处理或记录。
    self.rc.memory.add(msg)

    # 返回生成的消息对象。
    return msg

通过以上这个示例,我们可以得出一个概念

上图是一个角色,对应我们代码中就是ProductManager(产品经理)这个类。它要做事需要分成几步。

第一步:观察,看事情是否跟自己有关系。对应的代码为

代码语言:javascript
代码运行次数:0
复制
# 监控特定的动作类型,以便在这些动作发生时做出反应
self._watch([UserRequirement, PrepareDocuments])

该角色只关心这两个动作,如果既不是用户输入,也不是准备文档,则该角色不会做任何事情。

第二步:思考,决定调用哪个动作。

第三步:开始动作,并动作执行完返回思考,依次不断反复。

第四步:所有动作完成,发布生成的信息。

除了这种思考执行的方式还有另外两种模式

  • 顺序执行模式
  1. 使用self.set_actions初始化所有Action。
  2. 指定每次Role会选择哪个Action,我们将react_mode设置为"by order",这意味着Role将按照self.set_actions中指定顺序执行其能够执行的Action。在这种情况下当Role执行_act时,self.rc.todo将首先是SimpleWriteCode,然后是SimpleRunCode。
  3. 覆盖_act函数。Role从上一轮的人类输入或动作输出中检索消息,用适当的Message内容提供当前的Action(self.rc.todo),最后返回由当前Action输出组成的Message。

总之就是无需思考,按顺序编排执行。

  • 规划与行动

提前制定计划,然后使用该计划指导一系列行动。

规划相对于思考更加复杂,思考其实只是在选择动作,输入和输出不会变化。规划不但编排下一步的动作,而且对输入输出也进行提取。

代码运行流程

role.run()

封装传来的信息Message

执行动作self.react(),选择模式,即上面的三种模式

代码语言:javascript
代码运行次数:0
复制
    # 根据 RoleConfig (rc) 中定义的 react_mode 来确定使用哪种反应模式。
    if self.rc.react_mode == RoleReactMode.REACT or self.rc.react_mode == RoleReactMode.BY_ORDER:
        # 如果反应模式是 REACT 或 BY_ORDER,则调用 _react 方法直接处理消息。
        rsp = await self._react()
    elif self.rc.react_mode == RoleReactMode.PLAN_AND_ACT:
        # 如果反应模式是 PLAN_AND_ACT,则先规划然后再执行,调用 _plan_and_act 方法。
        rsp = await self._plan_and_act()
    else:
        # 如果反应模式既不是上述两种之一,则抛出一个 ValueError 异常,提示不支持的反应模式。
        raise ValueError(f"Unsupported react mode: {self.rc.react_mode}")

设置下一步动作

将Message发送给环境供其他使用。

多角色协作开发项目

上图是一个多角色交互的过程,它这里有一个环境(Env)的概念,环境会将所有的角色、上下文、需求纳入其中。每一个角色都会观察哪些动作结束后的消息是它们进一步该处理的。

代码语言:javascript
代码运行次数:0
复制
import asyncio
from metagpt.roles import Architect, Engineer, ProductManager, ProjectManager
from metagpt.team import Team
from metagpt.logs import logger

async def main():
    msg = '开发一个跨境电商SaaS门店管理系统'
    company = Team()
    company.hire([ProductManager(), Architect(), ProjectManager(), Engineer()])
    company.invest(investment=3.0)
    company.run_project(idea=msg)
    msg = await company.run(n_round=5)
    logger.info(str(msg))

if __name__ == '__main__':

    asyncio.run(main())

我们来看一下Team这个类

代码语言:javascript
代码运行次数:0
复制
import warnings
from pathlib import Path
from typing import Any, Optional

from pydantic import BaseModel, ConfigDict, Field

from metagpt.actions import UserRequirement
from metagpt.const import MESSAGE_ROUTE_TO_ALL, SERDESER_PATH
from metagpt.context import Context
from metagpt.environment import Environment
from metagpt.logs import logger
from metagpt.roles import Role
from metagpt.schema import Message
from metagpt.utils.common import (
    NoMoneyException,
    read_json_file,
    serialize_decorator,
    write_json_file,
)


class Team(BaseModel):
    """
    Team: 拥有一个或多个角色(代理人),标准操作程序(SOP),以及一个用于即时通讯的环境,
    专门从事任何多代理活动,例如协作编写可执行代码。
    """

    model_config = ConfigDict(arbitrary_types_allowed=True)  # 允许任意类型的属性

    env: Optional[Environment] = None  # 团队的环境对象,默认为None
    investment: float = Field(default=10.0)  # 投资金额,默认值为10.0
    idea: str = Field(default="")  # 项目的想法,默认为空字符串

    def __init__(self, context: Context = None, **data: Any):
        """
        初始化团队实例。

        Args:
            context (Context): 上下文对象,默认为None
            **data (Any): 其他初始化数据
        """
        super().__init__(**data)
        ctx = context or Context()  # 如果没有提供context,则创建一个新的Context对象
        if not self.env:
            self.env = Environment(context=ctx)  # 如果没有env,则创建一个新的Environment对象
        else:
            self.env.context = ctx  # 否则设置env的上下文
        if "roles" in data:
            self.hire(data["roles"])  # 如果提供了roles,则雇佣这些角色
        if "env_desc" in data:
            self.env.desc = data["env_desc"]  # 如果提供了env_desc,则设置环境描述

    def serialize(self, stg_path: Path = None):
        """
        序列化团队信息到文件。

        Args:
            stg_path (Path): 存储路径,默认为SERDESER_PATH.joinpath("team")
        """
        stg_path = SERDESER_PATH.joinpath("team") if stg_path is None else stg_path
        team_info_path = stg_path.joinpath("team.json")
        serialized_data = self.model_dump()
        serialized_data["context"] = self.env.context.serialize()

        write_json_file(team_info_path, serialized_data)

    @classmethod
    def deserialize(cls, stg_path: Path, context: Context = None) -> "Team":
        """
        从文件反序列化团队信息。

        Args:
            stg_path (Path): 存储路径
            context (Context): 上下文对象,默认为None

        Returns:
            Team: 反序列化后的团队对象
        """
        team_info_path = stg_path.joinpath("team.json")
        if not team_info_path.exists():
            raise FileNotFoundError(
                "存储元文件 `team.json` 不存在,请启动新项目."
            )

        team_info: dict = read_json_file(team_info_path)
        ctx = context or Context()
        ctx.deserialize(team_info.pop("context", None))
        team = Team(**team_info, context=ctx)
        return team

    def hire(self, roles: list[Role]):
        """雇佣角色进行合作"""
        self.env.add_roles(roles)

    @property
    def cost_manager(self):
        """获取成本管理器"""
        return self.env.context.cost_manager

    def invest(self, investment: float):
        """投资公司。当超过最大预算时抛出NoMoneyException异常。"""
        self.investment = investment
        self.cost_manager.max_budget = investment
        logger.info(f"Investment: ${investment}.")

    def _check_balance(self):
        """检查余额是否足够"""
        if self.cost_manager.total_cost >= self.cost_manager.max_budget:
            raise NoMoneyException(self.cost_manager.total_cost, f"资金不足: {self.cost_manager.max_budget}")

    def run_project(self, idea, send_to: str = ""):
        """运行项目,从发布用户需求开始。"""
        self.idea = idea

        # 用户需求。
        self.env.publish_message(
            Message(role="Human", content=idea, cause_by=UserRequirement, send_to=send_to or MESSAGE_ROUTE_TO_ALL),
            peekable=False,
        )

    def start_project(self, idea, send_to: str = ""):
        """
        已废弃:此方法将在未来被移除。
        请使用 `run_project` 方法代替。
        """
        warnings.warn(
            "The 'start_project' method is deprecated and will be removed in the future. "
            "Please use the 'run_project' method instead.",
            DeprecationWarning,
            stacklevel=2,
        )
        return self.run_project(idea=idea, send_to=send_to)

    @serialize_decorator
    async def run(self, n_round=3, idea="", send_to="", auto_archive=True):
        """运行公司直到目标轮次或没钱为止"""
        if idea:
            self.run_project(idea=idea, send_to=send_to)

        while n_round > 0:
            if self.env.is_idle:
                logger.debug("所有角色空闲.")
                break
            n_round -= 1
            self._check_balance()
            await self.env.run()

            logger.debug(f"剩余{n_round=}.")
        self.env.archive(auto_archive)
        return self.env.history

然后是Architect角色类

代码语言:javascript
代码运行次数:0
复制
from metagpt.actions import WritePRD
from metagpt.actions.design_api import WriteDesign
from metagpt.roles.role import Role


class Architect(Role):
    """
    表示软件开发过程中的架构师角色。

    Attributes:
        name (str): 架构师的名字,默认为 "Bob"。
        profile (str): 角色简介,默认为 "Architect"。
        goal (str): 架构师的主要目标或职责,默认为设计简洁、可用且完整的软件系统。
        constraints (str): 架构师需要遵守的约束或指导方针,默认为确保架构简单并使用适当的开源库,以及使用与用户需求相同的语言。
    """

    name: str = "Bob"
    profile: str = "Architect"
    goal: str = "design a concise, usable, complete software system"
    constraints: str = (
        "make sure the architecture is simple enough and use appropriate open source "
        "libraries. Use same language as user requirement"
    )

    def __init__(self, **kwargs) -> None:
        """
        初始化 Architect 实例。

        Args:
            **kwargs: 其他初始化参数,传递给父类 Role 的构造函数。
        """
        super().__init__(**kwargs)  # 调用父类 Role 的构造函数进行初始化

        # 初始化架构师角色特有的动作
        self.set_actions([WriteDesign])  # 设置架构师可以执行的动作,例如编写设计文档

        # 设置架构师应该监视或关注的事件或动作
        self._watch({WritePRD})  # 监视 WritePRD 动作,即产品需求文档的编写

WriteDesign动作类

代码语言:javascript
代码运行次数:0
复制
import json  
from pathlib import Path  
from typing import Optional  

from metagpt.actions import Action, ActionOutput
from metagpt.actions.design_api_an import (  # 导入设计API相关的常量和节点
    DATA_STRUCTURES_AND_INTERFACES,
    DESIGN_API_NODE,
    PROGRAM_CALL_FLOW,
    REFINED_DATA_STRUCTURES_AND_INTERFACES,
    REFINED_DESIGN_NODE,
    REFINED_PROGRAM_CALL_FLOW,
)
from metagpt.const import DATA_API_DESIGN_FILE_REPO, SEQ_FLOW_FILE_REPO  # 导入文件存储位置常量
from metagpt.logs import logger  
from metagpt.schema import Document, Documents, Message  # 导入文档和消息相关的类
from metagpt.utils.mermaid import mermaid_to_file  # 导入Mermaid图表保存函数

NEW_REQ_TEMPLATE = """
### Legacy Content
{old_design}

### New Requirements
{context}
"""  # 定义新需求模板,用于合并旧设计和新需求


class WriteDesign(Action):
    name: str = "WriteDesign"  # 定义动作名称
    i_context: Optional[str] = None  # 初始化上下文信息,默认为None
    desc: str = (
        "Based on the PRD, think about the system design, and design the corresponding APIs, "
        "data structures, library tables, processes, and paths. Please provide your design, feedback "
        "clearly and in detail."
    )  # 描述该动作的目的和要求

    async def run(self, with_messages: Message, schema: str = None):
        """
        执行写设计动作。

        Args:
            with_messages (Message): 包含上下文信息的消息。
            schema (str, optional): 模式,默认为None。

        Returns:
            ActionOutput: 动作执行结果。
        """
        changed_prds = self.repo.docs.prd.changed_files  # 获取被修改的PRD文件字典
        changed_system_designs = self.repo.docs.system_design.changed_files  # 获取被修改的系统设计文件字典

        changed_files = Documents()  # 创建Documents对象,用于存储更改后的文件
        for filename in changed_prds.keys():  # 遍历所有更改的PRD文件名
            doc = await self._update_system_design(filename=filename)  # 调用更新系统设计的方法,并获取更新后的文档
            changed_files.docs[filename] = doc  # 将更新后的文档添加到changed_files中

        for filename in changed_system_designs.keys():  # 遍历所有更改的系统设计文件名
            if filename not in changed_files.docs:  # 如果文件名不在已更新的文件列表中,则进行更新
                doc = await self._update_system_design(filename=filename)
                changed_files.docs[filename] = doc  # 将更新后的文档添加到changed_files中

        if not changed_files.docs:  # 如果没有文件被更改
            logger.info("Nothing has changed.")  # 记录日志信息

        return ActionOutput(content=changed_files.model_dump_json(), instruct_content=changed_files)  # 返回动作执行结果

    async def _new_system_design(self, context):
        """
        创建新的系统设计。

        Args:
            context (str): 上下文信息。

        Returns:
            Node: 设计节点。
        """
        node = await DESIGN_API_NODE.fill(context=context, llm=self.llm)  # 使用LLM填充设计节点
        return node  # 返回创建的设计节点

    async def _merge(self, prd_doc, system_design_doc):
        """
        合并旧的设计与新的需求。

        Args:
            prd_doc (Document): 产品需求文档。
            system_design_doc (Document): 系统设计文档。

        Returns:
            Document: 更新后的系统设计文档。
        """
        context = NEW_REQ_TEMPLATE.format(old_design=system_design_doc.content, context=prd_doc.content)  # 根据模板生成新的上下文
        node = await REFINED_DESIGN_NODE.fill(context=context, llm=self.llm)  # 使用LLM填充精炼设计节点
        system_design_doc.content = node.instruct_content.model_dump_json()  # 更新系统设计文档内容为新的设计节点内容
        return system_design_doc  # 返回更新后的系统设计文档

    async def _update_system_design(self, filename) -> Document:
        """
        更新特定文件名的系统设计。

        Args:
            filename (str): 文件名。

        Returns:
            Document: 更新后的系统设计文档。
        """
        prd = await self.repo.docs.prd.get(filename)  # 获取指定文件名的PRD文档
        old_system_design_doc = await self.repo.docs.system_design.get(filename)  # 获取指定文件名的旧系统设计文档

        if not old_system_design_doc:  # 如果不存在旧的系统设计文档
            system_design = await self._new_system_design(context=prd.content)  # 创建新的系统设计
            doc = await self.repo.docs.system_design.save(
                filename=filename,
                content=system_design.instruct_content.model_dump_json(),
                dependencies={prd.root_relative_path},  # 保存新设计文档,并设置依赖关系
            )
        else:  # 如果存在旧的系统设计文档
            doc = await self._merge(prd_doc=prd, system_design_doc=old_system_design_doc)  # 合并现有设计和新需求
            await self.repo.docs.system_design.save_doc(doc=doc, dependencies={prd.root_relative_path})  # 保存更新后的设计文档

        await self._save_data_api_design(doc)  # 保存数据API设计图表
        await self._save_seq_flow(doc)  # 保存序列流图
        await self.repo.resources.system_design.save_pdf(doc=doc)  # 保存PDF格式的设计文档
        return doc  # 返回更新后的系统设计文档

    async def _save_data_api_design(self, design_doc):
        """
        保存数据API设计到文件。

        Args:
            design_doc (Document): 系统设计文档。
        """
        m = json.loads(design_doc.content)  # 解析设计文档内容为Python对象
        data_api_design = m.get(DATA_STRUCTURES_AND_INTERFACES.key) or m.get(REFINED_DATA_STRUCTURES_AND_INTERFACES.key)  # 获取数据结构和接口信息
        if not data_api_design:  # 如果没有数据结构和接口信息
            return  # 直接返回,不做任何处理

        pathname = self.repo.workdir / DATA_API_DESIGN_FILE_REPO / Path(design_doc.filename).with_suffix("")  # 构建保存路径
        await self._save_mermaid_file(data_api_design, pathname)  # 保存Mermaid图表文件
        logger.info(f"Save class view to {str(pathname)}")  # 记录日志信息

    async def _save_seq_flow(self, design_doc):
        """
        保存序列流图到文件。

        Args:
            design_doc (Document): 系统设计文档。
        """
        m = json.loads(design_doc.content)  # 解析设计文档内容为Python对象
        seq_flow = m.get(PROGRAM_CALL_FLOW.key) or m.get(REFINED_PROGRAM_CALL_FLOW.key)  # 获取程序调用流程信息
        if not seq_flow:  # 如果没有程序调用流程信息
            return  # 直接返回,不做任何处理

        pathname = self.repo.workdir / SEQ_FLOW_FILE_REPO / Path(design_doc.filename).with_suffix("")  # 构建保存路径
        await self._save_mermaid_file(seq_flow, pathname)  # 保存Mermaid图表文件
        logger.info(f"Saving sequence flow to {str(pathname)}")  # 记录日志信息

    async def _save_mermaid_file(self, data: str, pathname: Path):
        """
        将Mermaid图表保存到文件。

        Args:
            data (str): Mermaid图表数据。
            pathname (Path): 文件路径。
        """
        pathname.parent.mkdir(parents=True, exist_ok=True)  # 确保保存路径存在
        await mermaid_to_file(self.config.mermaid.engine, data, pathname)  # 将Mermaid图表数据保存到文件

ProjectManager角色类

代码语言:javascript
代码运行次数:0
复制
from metagpt.actions import WriteTasks
from metagpt.actions.design_api import WriteDesign
from metagpt.roles.role import Role


class ProjectManager(Role):
    """
    表示负责监督项目执行和团队效率的项目经理角色。

    Attributes:
        name (str): 项目经理的名字,默认为 "Eve"。
        profile (str): 角色简介,默认为 "Project Manager"。
        goal (str): 项目经理的主要目标或职责,默认为根据PRD/技术设计分解任务,生成任务列表,并分析任务依赖关系以从前提模块开始。
        constraints (str): 项目经理需要遵守的约束或指导方针,默认为使用与用户需求相同的语言。
    """

    name: str = "Eve"
    profile: str = "Project Manager"
    goal: str = (
        "break down tasks according to PRD/technical design, generate a task list, and analyze task "
        "dependencies to start with the prerequisite modules"
    )
    constraints: str = "use same language as user requirement"

    def __init__(self, **kwargs) -> None:
        """
        初始化 ProjectManager 实例。

        Args:
            **kwargs: 其他初始化参数,传递给父类 Role 的构造函数。
        """
        super().__init__(**kwargs)  # 调用父类 Role 的构造函数进行初始化

        self.set_actions([WriteTasks])  # 设置项目经理可以执行的动作,例如编写任务清单
        self._watch([WriteDesign])  # 监视 WriteDesign 动作,即系统设计文档的编写

WriteTasks动作类

代码语言:javascript
代码运行次数:0
复制
import json  
from typing import Optional  

from metagpt.actions.action import Action
from metagpt.actions.action_output import ActionOutput
from metagpt.actions.project_management_an import PM_NODE, REFINED_PM_NODE
from metagpt.const import PACKAGE_REQUIREMENTS_FILENAME  # 导入包需求文件名常量
from metagpt.logs import logger  
from metagpt.schema import Document, Documents  

NEW_REQ_TEMPLATE = """
### Legacy Content
{old_task}

### New Requirements
{context}
"""  # 定义新需求模板,用于合并旧任务和新需求


class WriteTasks(Action):
    name: str = "CreateTasks"  # 定义动作名称
    i_context: Optional[str] = None  # 初始化上下文信息,默认为None

    async def run(self, with_messages):
        """
        执行创建任务的动作。

        Args:
            with_messages: 包含上下文信息的消息。

        Returns:
            ActionOutput: 动作执行结果。
        """
        changed_system_designs = self.repo.docs.system_design.changed_files  # 获取被修改的系统设计文件字典
        changed_tasks = self.repo.docs.task.changed_files  # 获取被修改的任务文件字典
        change_files = Documents()  # 创建Documents对象,用于存储更改后的文件

        # 根据git头差异重写`docs/system_designs/`下发生变更的系统设计
        for filename in changed_system_designs:
            task_doc = await self._update_tasks(filename=filename)  # 更新任务,并获取更新后的任务文档
            change_files.docs[filename] = task_doc  # 将更新后的任务文档添加到change_files中

        # 根据git头差异重写`docs/tasks/`下发生变更的任务文件
        for filename in changed_tasks:
            if filename in change_files.docs:  # 如果文件名已在更新列表中,则跳过
                continue
            task_doc = await self._update_tasks(filename=filename)
            change_files.docs[filename] = task_doc  # 将更新后的任务文档添加到change_files中

        if not change_files.docs:  # 如果没有文件被更改
            logger.info("Nothing has changed.")  # 记录日志信息

        return ActionOutput(content=change_files.model_dump_json(), instruct_content=change_files)  # 返回动作执行结果

    async def _update_tasks(self, filename):
        """
        更新特定文件名的任务。

        Args:
            filename (str): 文件名。

        Returns:
            Document: 更新后的任务文档。
        """
        system_design_doc = await self.repo.docs.system_design.get(filename)  # 获取指定文件名的系统设计文档
        task_doc = await self.repo.docs.task.get(filename)  # 获取指定文件名的任务文档

        if task_doc:  # 如果存在任务文档
            task_doc = await self._merge(system_design_doc=system_design_doc, task_doc=task_doc)  # 合并现有任务和新需求
            await self.repo.docs.task.save_doc(doc=task_doc, dependencies={system_design_doc.root_relative_path})  # 保存更新后的任务文档
        else:  # 如果不存在任务文档
            rsp = await self._run_new_tasks(context=system_design_doc.content)  # 创建新的任务
            task_doc = await self.repo.docs.task.save(
                filename=filename,
                content=rsp.instruct_content.model_dump_json(),
                dependencies={system_design_doc.root_relative_path},  # 保存新任务文档,并设置依赖关系
            )

        await self._update_requirements(task_doc)  # 更新项目依赖(如所需的Python包)
        return task_doc  # 返回更新后的任务文档

    async def _run_new_tasks(self, context):
        """
        根据给定的上下文创建新的任务。

        Args:
            context (str): 上下文信息。

        Returns:
            Node: 任务节点。
        """
        node = await PM_NODE.fill(context, self.llm, schema=self.prompt_schema)  # 使用LLM填充任务节点
        return node  # 返回创建的任务节点

    async def _merge(self, system_design_doc, task_doc) -> Document:
        """
        合并现有的任务文档与新的需求。

        Args:
            system_design_doc (Document): 系统设计文档。
            task_doc (Document): 任务文档。

        Returns:
            Document: 更新后的任务文档。
        """
        context = NEW_REQ_TEMPLATE.format(context=system_design_doc.content, old_task=task_doc.content)  # 根据模板生成新的上下文
        node = await REFINED_PM_NODE.fill(context, self.llm, schema=self.prompt_schema)  # 使用LLM填充精炼的任务节点
        task_doc.content = node.instruct_content.model_dump_json()  # 更新任务文档内容为新的任务节点内容
        return task_doc  # 返回更新后的任务文档

    async def _update_requirements(self, doc):
        """
        更新项目依赖(如所需的Python包)。

        Args:
            doc (Document): 任务文档。
        """
        m = json.loads(doc.content)  # 解析任务文档内容为Python对象
        packages = set(m.get("Required packages", []))  # 获取所需包列表

        requirement_doc = await self.repo.get(filename=PACKAGE_REQUIREMENTS_FILENAME)  # 获取需求文件
        if not requirement_doc:  # 如果需求文件不存在
            requirement_doc = Document(filename=PACKAGE_REQUIREMENTS_FILENAME, root_path=".", content="")  # 创建新的需求文件

        lines = requirement_doc.content.splitlines()  # 分割需求文件内容为行列表
        for pkg in lines:
            if pkg == "":
                continue
            packages.add(pkg)  # 将已有包添加到所需包集合中

        await self.repo.save(filename=PACKAGE_REQUIREMENTS_FILENAME, content="\n".join(packages))  # 保存更新后的需求文件

Engineer角色类

代码语言:javascript
代码运行次数:0
复制
from pathlib import Path
from typing import Optional, Set

# 导入自定义模块和类
from metagpt.actions import Action, WriteCode, WriteCodeReview, WriteTasks
from metagpt.actions.fix_bug import FixBug
from metagpt.actions.project_management_an import REFINED_TASK_LIST, TASK_LIST
from metagpt.actions.summarize_code import SummarizeCode
from metagpt.actions.write_code_plan_and_change_an import WriteCodePlanAndChange
from metagpt.const import (
    BUGFIX_FILENAME,
    CODE_PLAN_AND_CHANGE_FILE_REPO,
    REQUIREMENT_FILENAME,
    SYSTEM_DESIGN_FILE_REPO,
    TASK_FILE_REPO,
)
from metagpt.logs import logger
from metagpt.roles import Role
from metagpt.schema import (
    CodePlanAndChangeContext,
    CodeSummarizeContext,
    CodingContext,
    Document,
    Documents,
    Message,
)
from metagpt.utils.common import any_to_name, any_to_str, any_to_str_set

# 定义一个提示模板,用于判断是否需要执行特定任务
IS_PASS_PROMPT = """
{context}

----
Does the above log indicate anything that needs to be done?
If there are any tasks to be completed, please answer 'NO' along with the to-do list in JSON format;
otherwise, answer 'YES' in JSON format.
"""

# 定义工程师角色类,继承自Role类
class Engineer(Role):
    """
    代表工程师角色,负责编写代码及可能的代码审查。
    
    属性:
        name (str): 工程师的名字。
        profile (str): 角色描述,默认为“Engineer”。
        goal (str): 工程师的目标。
        constraints (str): 对工程师的约束。
        n_borg (int): Borg数量。
        use_code_review (bool): 是否使用代码审查。
    """
    
    name: str = "Alex"  # 工程师的名字
    profile: str = "Engineer"  # 角色描述
    goal: str = "write elegant, readable, extensible, efficient code"  # 目标:编写优雅、可读性强、可扩展且高效的代码
    constraints: str = (
        "the code should conform to standards like google-style and be modular and maintainable. "
        "Use same language as user requirement"
    )  # 约束:代码应符合google风格等标准,并且是模块化和可维护的。使用与用户需求相同的语言。
    n_borg: int = 1  # Borg数量
    use_code_review: bool = False  # 是否使用代码审查
    code_todos: list = []  # 代码待办事项列表
    summarize_todos: list = []  # 摘要待办事项列表
    next_todo_action: str = ""  # 下一步动作名称
    n_summarize: int = 0  # 摘要计数器

    def __init__(self, **kwargs) -> None:
        super().__init__(**kwargs)  # 调用父类的构造函数
        
        # 设置动作并监控特定事件
        self.set_actions([WriteCode])
        self._watch([WriteTasks, SummarizeCode, WriteCode, WriteCodeReview, FixBug, WriteCodePlanAndChange])
        self.code_todos = []  # 初始化代码待办事项列表
        self.summarize_todos = []  # 初始化摘要待办事项列表
        self.next_todo_action = any_to_name(WriteCode)  # 设置下一步动作名称

    # 解析任务列表的方法
    @staticmethod
    def _parse_tasks(task_msg: Document) -> list[str]:
        m = json.loads(task_msg.content)  # 将任务消息的内容解析为JSON对象
        return m.get(TASK_LIST.key) or m.get(REFINED_TASK_LIST.key)  # 返回任务列表或经过优化的任务列表

    # 执行特定动作的方法
    async def _act_sp_with_cr(self, review=False) -> Set[str]:
        changed_files = set()  # 初始化已更改文件集合
        for todo in self.code_todos:  # 遍历待办事项列表
            coding_context = await todo.run()  # 运行待办事项并获取上下文
            # 如果启用代码审查,则运行代码审查动作
            if review:
                action = WriteCodeReview(i_context=coding_context, context=self.context, llm=self.llm)
                self._init_action(action)
                coding_context = await action.run()

            dependencies = {coding_context.design_doc.root_relative_path, coding_context.task_doc.root_relative_path}  # 设置依赖项
            if self.config.inc:
                dependencies.add(coding_context.code_plan_and_change_doc.root_relative_path)
            # 保存代码到项目仓库
            await self.project_repo.srcs.save(
                filename=coding_context.filename,
                dependencies=list(dependencies),
                content=coding_context.code_doc.content,
            )
            # 记录消息
            msg = Message(
                content=coding_context.model_dump_json(),
                instruct_content=coding_context,
                role=self.profile,
                cause_by=WriteCode,
            )
            self.rc.memory.add(msg)

            changed_files.add(coding_context.code_doc.filename)  # 添加已更改文件名到集合
        if not changed_files:
            logger.info("Nothing has changed.")  # 如果没有更改,则记录日志信息
        return changed_files  # 返回已更改文件集合

    # 根据是否使用代码审查来确定下一步动作的方法
    async def _act(self) -> Message | None:
        if self.rc.todo is None:
            return None  # 如果没有待办事项,则返回None
        # 根据待办事项类型执行不同动作
        if isinstance(self.rc.todo, WriteCodePlanAndChange):
            self.next_todo_action = any_to_name(WriteCode)
            return await self._act_code_plan_and_change()
        if isinstance(self.rc.todo, WriteCode):
            self.next_todo_action = any_to_name(SummarizeCode)
            return await self._act_write_code()
        if isinstance(self.rc.todo, SummarizeCode):
            self.next_todo_action = any_to_name(WriteCode)
            return await self._act_summarize()
        return None  # 如果未匹配任何类型,则返回None

    # 执行写代码动作的方法
    async def _act_write_code(self):
        changed_files = await self._act_sp_with_cr(review=self.use_code_review)  # 调用_act_sp_with_cr方法
        return Message(
            content="\n".join(changed_files),  # 返回已更改文件列表
            role=self.profile,
            cause_by=WriteCodeReview if self.use_code_review else WriteCode,
            send_to=self,
            sent_from=self,
        )

    # 执行摘要动作的方法
    async def _act_summarize(self):
        tasks = []
        for todo in self.summarize_todos:  # 遍历摘要待办事项列表
            summary = await todo.run()  # 运行待办事项并获取摘要
            summary_filename = Path(todo.i_context.design_filename).with_suffix(".md").name  # 设置摘要文件名
            dependencies = {todo.i_context.design_filename, todo.i_context.task_filename}  # 设置依赖项
            for filename in todo.i_context.codes_filenames:
                rpath = self.project_repo.src_relative_path / filename
                dependencies.add(str(rpath))  # 添加依赖路径
            # 保存摘要到项目仓库
            await self.project_repo.resources.code_summary.save(
                filename=summary_filename, content=summary, dependencies=dependencies
            )
            is_pass, reason = await self._is_pass(summary)  # 判断是否通过摘要检查
            if not is_pass:
                todo.i_context.reason = reason
                tasks.append(todo.i_context.model_dump())  # 添加失败摘要到任务列表

                await self.project_repo.docs.code_summary.save(
                    filename=Path(todo.i_context.design_filename).name,
                    content=todo.i_context.model_dump_json(),
                    dependencies=dependencies,
                )
            else:
                await self.project_repo.docs.code_summary.delete(filename=Path(todo.i_context.design_filename).name)

        logger.info(f"--max-auto-summarize-code={self.config.max_auto_summarize_code}")  # 记录自动摘要代码的最大数量
        if not tasks or self.config.max_auto_summarize_code == 0:
            return Message(
                content="",  # 如果没有任务或最大自动摘要代码数量为0,则返回空内容
                role=self.profile,
                cause_by=SummarizeCode,
                sent_from=self,
                send_to="Edward",  # 发送到QaEngineer
            )
        # 自动摘要代码的最大数量限制,-1表示不限制
        self.n_summarize += 1 if self.config.max_auto_summarize_code > self.n_summarize else 0
        return Message(
            content=json.dumps(tasks),  # 返回任务列表
            role=self.profile,
            cause_by=SummarizeCode,
            send_to=self,
            sent_from=self,
        )

    # 执行代码计划和变更动作的方法
    async def _act_code_plan_and_change(self):
        node = await self.rc.todo.run()  # 运行待办事项并获取节点
        code_plan_and_change = node.instruct_content.model_dump_json()  # 获取代码计划和变更内容
        dependencies = {
            REQUIREMENT_FILENAME,
            str(self.project_repo.docs.prd.root_path / self.rc.todo.i_context.prd_filename),
            str(self.project_repo.docs.system_design.root_path / self.rc.todo.i_context.design_filename),
            str(self.project_repo.docs.task.root_path / self.rc.todo.i_context.task_filename),
        }  # 设置依赖项
        code_plan_and_change_filepath = Path(self.rc.todo.i_context.design_filename)  # 设置代码计划和变更文件路径
        # 保存代码计划和变更到项目仓库
        await self.project_repo.docs.code_plan_and_change.save(
            filename=code_plan_and_change_filepath.name, content=code_plan_and_change, dependencies=dependencies
        )
        await self.project_repo.resources.code_plan_and_change.save(
            filename=code_plan_and_change_filepath.with_suffix(".md").name,
            content=node.content,
            dependencies=dependencies,
        )

        return Message(
            content=code_plan_and_change,  # 返回代码计划和变更内容
            role=self.profile,
            cause_by=WriteCodePlanAndChange,
            send_to=self,
            sent_from=self,
        )

    # 判断摘要是否通过的方法
    async def _is_pass(self, summary) -> (str, str):
        rsp = await self.llm.aask(msg=IS_PASS_PROMPT.format(context=summary), stream=False)  # 提问并获取回复
        logger.info(rsp)  # 记录回复日志
        if "YES" in rsp:
            return True, rsp  # 如果回复中包含"YES",则返回True和回复内容
        return False, rsp  # 否则返回False和回复内容

    # 思考下一步动作的方法
    async def _think(self) -> Action | None:
        if not self.src_workspace:
            self.src_workspace = self.git_repo.workdir / self.git_repo.workdir.name  # 设置源工作区
        write_plan_and_change_filters = any_to_str_set([WriteTasks, FixBug])  # 设置写代码计划和变更过滤器
        write_code_filters = any_to_str_set([WriteTasks, WriteCodePlanAndChange, SummarizeCode])  # 设置写代码过滤器
        summarize_code_filters = any_to_str_set([WriteCode, WriteCodeReview])  # 设置摘要过滤器
        if not self.rc.news:
            return None  # 如果没有新消息,则返回None
        msg = self.rc.news[0]  # 获取最新消息
        # 根据过滤器条件设置下一步动作
        if self.config.inc and msg.cause_by in write_plan_and_change_filters:
            logger.debug(f"TODO WriteCodePlanAndChange:{msg.model_dump_json()}")
            await self._new_code_plan_and_change_action(cause_by=msg.cause_by)
            return self.rc.todo
        if msg.cause_by in write_code_filters:  # 如果消息的触发原因是写代码过滤器中的任意一项
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2025-02-06,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • MetaGPT
  • Test cases
  • Test case 1
  • Test case 2
  • Test cases
  • Test case 1
  • Test case 2
  • The function below can be used to test the generated sum_of_list function
  • Running the test
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档