(12-5-03)仿Manus通用AI Agent系统:Agent模块(3)链式推理Agent

11.5.3  链式推理Agent

文件cot.py实现了链式推理(Chain of Thought, CoT)代理类CoTAgent,专注于展示大型语言模型的推理过程,而不执行任何工具。类CoTAgent通过处理一条推理链,模拟思维的步骤来得出结论。该代理仅需要一个步骤来完成推理,并利用提供的系统提示和用户消息生成回答。

class CoTAgent(BaseAgent):
    """链式推理代理 - 专注于展示大型语言模型的思维过程,而不执行工具"""

    name: str = "cot"
    description: str = "一个使用链式推理的代理"

    system_prompt: str = SYSTEM_PROMPT
    next_step_prompt: Optional[str] = NEXT_STEP_PROMPT

    llm: LLM = Field(default_factory=LLM)

    max_steps: int = 1  # CoT 通常只需要一步就能完成推理

    async def step(self) -> str:
        """执行一步链式推理"""
        logger.info(f"🧠 {self.name} 正在思考...")

        # 如果 next_step_prompt 存在且这不是第一次消息,将其添加到用户消息中
        if self.next_step_prompt and len(self.messages) > 1:
            self.memory.add_message(Message.user_message(self.next_step_prompt))

        # 使用系统提示和用户消息
        response = await self.llm.ask(
            messages=self.messages,
            system_msgs=[Message.system_message(self.system_prompt)]
            if self.system_prompt
            else None,
        )

        # 记录助手的回应
        self.memory.add_message(Message.assistant_message(response))

        # 完成后将状态设置为已完成
        self.state = AgentState.FINISHED

        return response

11.5.4  任务Agent

文件planning.py实现了类PlanningAgent,这是一个用于创建和管理解决任务的计划的代理。类PlanningAgent使用一个规划工具来构建和管理结构化的计划,并通过单个步骤跟踪任务的进展,直到任务完成。代理能够处理多个工具调用,并跟踪每个步骤的执行状态,确保任务按计划进行。

class PlanningAgent(ToolCallAgent):

    name: str = "planning"
    description: str = "一个创建和管理计划以解决任务的代理"

    system_prompt: str = PLANNING_SYSTEM_PROMPT
    next_step_prompt: str = NEXT_STEP_PROMPT

    available_tools: ToolCollection = Field(
        default_factory=lambda: ToolCollection(PlanningTool(), Terminate())
    )
    tool_choices: TOOL_CHOICE_TYPE = ToolChoice.AUTO  # type: ignore
    special_tool_names: List[str] = Field(default_factory=lambda: [Terminate().name])

    tool_calls: List[ToolCall] = Field(default_factory=list)
    active_plan_id: Optional[str] = Field(default=None)

    # 添加字典来跟踪每个工具调用的步骤状态
    step_execution_tracker: Dict[str, Dict] = Field(default_factory=dict)
    current_step_index: Optional[int] = None

    max_steps: int = 20

    @model_validator(mode="after")
    def initialize_plan_and_verify_tools(self) -> "PlanningAgent":
        """初始化代理,设置默认计划 ID 并验证所需工具。"""
        self.active_plan_id = f"plan_{int(time.time())}"

        if "planning" not in self.available_tools.tool_map:
            self.available_tools.add_tool(PlanningTool())

        return self

    async def think(self) -> bool:
        """基于计划状态决定下一个行动。"""
        prompt = (
            f"当前计划状态:\n{await self.get_plan()}\n\n{self.next_step_prompt}"
            if self.active_plan_id
            else self.next_step_prompt
        )
        self.messages.append(Message.user_message(prompt))

        # 在思考之前获取当前步骤索引
        self.current_step_index = await self._get_current_step_index()

        result = await super().think()

        # 思考之后,如果决定执行工具且该工具不是规划工具或特殊工具,
        # 则将其与当前步骤关联以便追踪
        if result and self.tool_calls:
            latest_tool_call = self.tool_calls[0]  # 获取最新的工具调用
            if (
                latest_tool_call.function.name != "planning"
                and latest_tool_call.function.name not in self.special_tool_names
                and self.current_step_index is not None
            ):
                self.step_execution_tracker[latest_tool_call.id] = {
                    "step_index": self.current_step_index,
                    "tool_name": latest_tool_call.function.name,
                    "status": "pending",  # 执行后将更新
                }

        return result

    async def act(self) -> str:
        """执行一个步骤并跟踪其完成状态。"""
        result = await super().act()

        # 执行工具后,更新计划状态
        if self.tool_calls:
            latest_tool_call = self.tool_calls[0]

            # 更新执行状态为已完成
            if latest_tool_call.id in self.step_execution_tracker:
                self.step_execution_tracker[latest_tool_call.id]["status"] = "completed"
                self.step_execution_tracker[latest_tool_call.id]["result"] = result

                # 如果这是一个非规划工具或非特殊工具,则更新计划状态
                if (
                    latest_tool_call.function.name != "planning"
                    and latest_tool_call.function.name not in self.special_tool_names
                ):
                    await self.update_plan_status(latest_tool_call.id)

        return result

    async def get_plan(self) -> str:
        """检索当前计划的状态。"""
        if not self.active_plan_id:
            return "没有活动计划。请先创建计划。"

        result = await self.available_tools.execute(
            name="planning",
            tool_input={"command": "get", "plan_id": self.active_plan_id},
        )
        return result.output if hasattr(result, "output") else str(result)

    async def run(self, request: Optional[str] = None) -> str:
        """运行代理并可选择性提供初始请求。"""
        if request:
            await self.create_initial_plan(request)
        return await super().run()

    async def update_plan_status(self, tool_call_id: str) -> None:
        """
        基于完成的工具执行更新当前计划进度。
        仅在相关工具成功执行后,标记步骤为已完成。
        """
        if not self.active_plan_id:
            return

        if tool_call_id not in self.step_execution_tracker:
            logger.warning(f"未找到工具调用 {tool_call_id} 的步骤追踪")
            return

        tracker = self.step_execution_tracker[tool_call_id]
        if tracker["status"] != "completed":
            logger.warning(f"工具调用 {tool_call_id} 尚未成功完成")
            return

        step_index = tracker["step_index"]

        try:
            # 标记步骤为已完成
            await self.available_tools.execute(
                name="planning",
                tool_input={
                    "command": "mark_step",
                    "plan_id": self.active_plan_id,
                    "step_index": step_index,
                    "step_status": "completed",
                },
            )
            logger.info(
                f"在计划 {self.active_plan_id} 中将步骤 {step_index} 标记为已完成"
            )
        except Exception as e:
            logger.warning(f"更新计划状态失败: {e}")

    async def _get_current_step_index(self) -> Optional[int]:
        """
        解析当前计划,找出第一个未完成步骤的索引。
        如果未找到活动步骤,则返回 None。
        """
        if not self.active_plan_id:
            return None

        plan = await self.get_plan()

        try:
            plan_lines = plan.splitlines()
            steps_index = -1

            # 找到 "Steps:" 行的索引
            for i, line in enumerate(plan_lines):
                if line.strip() == "Steps:":
                    steps_index = i
                    break

            if steps_index == -1:
                return None

            # 找到第一个未完成的步骤
            for i, line in enumerate(plan_lines[steps_index + 1 :], start=0):
                if "[ ]" in line or "[→]" in line:  # 未开始或进行中
                    # 将当前步骤标记为进行中
                    await self.available_tools.execute(
                        name="planning",
                        tool_input={
                            "command": "mark_step",
                            "plan_id": self.active_plan_id,
                            "step_index": i,
                            "step_status": "in_progress",
                        },
                    )
                    return i

            return None  # 未找到活动步骤
        except Exception as e:
            logger.warning(f"查找当前步骤索引时出错: {e}")
            return None

    async def create_initial_plan(self, request: str) -> None:
        """根据请求创建初始计划。"""
        logger.info(f"使用 ID {self.active_plan_id} 创建初始计划")

        messages = [
            Message.user_message(
                f"分析请求并创建一个 ID 为 {self.active_plan_id} 的计划: {request}"
            )
        ]
        self.memory.add_messages(messages)
        response = await self.llm.ask_tool(
            messages=messages,
            system_msgs=[Message.system_message(self.system_prompt)],
            tools=self.available_tools.to_params(),
            tool_choice=ToolChoice.AUTO,
        )
        assistant_msg = Message.from_tool_calls(
            content=response.content, tool_calls=response.tool_calls
        )

        self.memory.add_message(assistant_msg)

        plan_created = False
        for tool_call in response.tool_calls:
            if tool_call.function.name == "planning":
                result = await self.execute_tool(tool_call)
                logger.info(
                    f"执行工具 {tool_call.function.name},结果: {result}"
                )

                # 将工具响应添加到记忆中
                tool_msg = Message.tool_message(
                    content=result,
                    tool_call_id=tool_call.id,
                    name=tool_call.function.name,
                )
                self.memory.add_message(tool_msg)
                plan_created = True
                break

        if not plan_created:
            logger.warning("从初始请求中未创建计划")
            tool_msg = Message.assistant_message(
                "错误: 参数 `plan_id` 是创建命令所必需的"
            )
            self.memory.add_message(tool_msg)


async def main():
    # 配置并运行代理
    agent = PlanningAgent(available_tools=ToolCollection(PlanningTool(), Terminate()))
    result = await agent.run("帮我计划一次去月球的旅行")
    print(result)


if __name__ == "__main__":
    import asyncio

    asyncio.run(main())

评论 3
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包

打赏作者

码农三叔

感谢鼓励

¥1 ¥2 ¥4 ¥6 ¥10 ¥20
扫码支付:¥1
获取中
扫码支付

您的余额不足,请更换扫码支付或充值

打赏作者

实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值