(9-4-1)智能客服Agent开发:Agent(01)Agent工厂+OpenAI对话Agent

9.5  Agent

本项目的“Agents”目录包含了与智能代理(Agent)相关的功能和实现,主要负责管理和实现智能代理相关的功能,包括定义代理的基础接口(BaseAgent)、提供代理工厂(agent_factory.py)以根据配置创建不同类型的代理,以及实现具体的代理逻辑,如 OpenAIAgent 和 OpenAIAssistantAgent。这些代理封装了与各种语言模型(LLM)和外部服务(如 OpenAI 或 DeepSeek API)的交互逻辑,从而实现对话生成、任务处理和多媒体数据解析等功能。

9.5.1  Agent工厂

文件agent_factory.py的功能是根据传入的 AgentInstance 类型(如 openai 或 openai_assistant)创建不同类型的智能代理实例,通过工厂函数create_agent()来实例化合适的代理类(OpenAIAgent 或 OpenAIAssistantAgent),并传入所需的配置、向量存储和语言模型(LLM)。如果代理类型无法识别,会抛出一个 ValueError 异常。

from app.pkg.agents.base_agent import BaseAgent
from app.pkg.agents.openai_agent import OpenAIAgent
from app.pkg.agents.openai_assistant_agent import OpenAIAssistantAgent
from app.pkg.config import Config
from app.pkg.entities.models import AgentInstance
from app.pkg.llm.base_llm import BaseLLM
from app.pkg.vector_store.vector_store import VectorStore


def create_agent(agent_instance: AgentInstance, cfg: Config, vs: VectorStore, llm: BaseLLM) -> BaseAgent:
    if agent_instance.type == "openai":
        return OpenAIAgent(agent_instance, cfg, vs, llm)

    if agent_instance.type == "openai_assistant":
        return OpenAIAssistantAgent(agent_instance, cfg, vs, llm)

    raise ValueError(f"Неизвестный тип агента: {agent_instance.type}")

9.5.2  OpenAI对话Agent

1. 类OpenAIAgent

文件base_agent.py实现了类OpenAIAgent,此类是 BaseAgent 类的子类,旨在通过 OpenAI 提供的技术为对话系统构建代理。它实现了创建、删除对话线程以及生成对话答案等功能。具体包括创建和删除线程、与 OpenAI API 交互生成答案,并支持处理问题和媒体内容。另外,类OpenAIAgent还包括了与云服务交互的功能,支持为代理创建云实例。

class OpenAIAgent(BaseAgent):
    def __init__(self, agent_instance, cfg, vs, llm: BaseLLM):
        super().__init__(agent_instance, cfg, vs, llm)

    async def delete_thread(self, dialog_id: int, thread_id: str | None) -> bool:
        """
        从 OpenAI 系统(或 OpenAIAgent 交互的任何系统)删除一个线程。
        如果 `thread_id` 为 None,则删除与 `dialog_id` 对应的线程。
        """
        try:
            # 在这个示例中,我们模拟一个 API 调用来删除一个线程。
            # OpenAI 没有一个专门的“删除线程”API,所以我们假设自定义行为
            # 用于从数据库或向量存储等系统中删除线程。
            if thread_id:
                # 删除特定线程的逻辑
                self.logger.info(f"Deleting thread {thread_id} for dialog {dialog_id}")
            else:
                # 删除基于对话的线程逻辑
                self.logger.info(f"Deleting thread for dialog {dialog_id}")
            
            # 模拟删除,返回 True 表示成功
            return True
        except Exception as e:
            self.logger.error(f"Failed to delete thread: {str(e)}")
            return False

    async def create_thread(self, dialog_id: int, init_messages: list = None) -> str | None:
        """
        在 OpenAI 系统(或 OpenAIAgent 交互的任何系统)中创建一个新线程。
        """
        try:
            # 我们通过 OpenAI(或其他平台)模拟线程创建。
            # 通常,这涉及到发送初始消息来创建线程。

            self.logger.info(f"Creating thread for dialog {dialog_id} with initial messages: {init_messages}")
            
            # 为了演示,生成一个虚假的线程 ID
            thread_id = f"thread-{dialog_id}-{self.cfg.openai.api_key}"

            # 创建后返回线程 ID
            return thread_id
        except Exception as e:
            self.logger.error(f"Failed to create thread: {str(e)}")
            return None

    async def create_cloud(self) -> str:
        """
        为 OpenAIAgent 创建云实例。可能涉及与 OpenAI 的
        API 进行交互以部署代理实例,或为代理初始化基于云的资源。
        """
        try:
            # 为代理创建云实例的占位符
            self.logger.info(f"Creating cloud instance for agent {self.agent_instance.id}")
            cloud_instance_id = f"cloud-{self.agent_instance.id}"

            # 返回云实例 ID
            return cloud_instance_id
        except Exception as e:
            self.logger.error(f"Failed to create cloud instance: {str(e)}")
            return ""

    async def build_asnwer(self, dialog: Dialog, question: str, media: list | None = None) -> tuple[str, list[str], dict]:
        """
        使用 OpenAI 模型或 API 构建答案。它接受一个问题,通过模型处理它,
        并返回答案以及附加的元数据。
        """
        try:
            # 模拟与 OpenAI 的 API(例如 GPT-3 或 GPT-4)交互以生成答案。
            response = await self.llm.invoke(question)

            # 假设模型返回一个字典,其中包含答案和任何元数据
            answer = response.get("content", "")
            metadata = response.get("usage", {})

            # 如果请求中有媒体,我们在这里处理它
            media_urls = [item["url"] for item in (media or [])]

            return answer, media_urls, metadata
        except Exception as e:
            self.logger.error(f"Failed to build answer for question: {question}. Error: {str(e)}")
            return "", [], {}

2. 类OpenAIAssistantAgent

文件openai_assistant_agent.py定义了一个 OpenAIAssistantAgent 类,用于与 OpenAI 的 API 交互,提供基于 LLM(大语言模型)的对话管理、媒体处理和增强信息检索功能。文件openai_assistant_agent.py的具体实现流程如下所示:

(1)方法 build_asnwer()的功能是处理用户对话,通过 OpenAI 的 API 发送问题,并管理 API 运行状态,包括处理请求超时、解析 AI 生成的响应、执行所需的外部函数调用,并最终返回 AI 生成的答案、相关的动作和附加数据。

class OpenAIAssistantAgent(BaseAgent):
    def __init__(self, agent_instance: AgentInstance, cfg: Config, vs: VectorStore, llm: BaseLLM|OpenAILLM):
        super().__init__(agent_instance, cfg, vs, llm)

        self.llm: OpenAILLM
        self.client = AsyncOpenAI(api_key=self.cfg.openai.api_key)

    async def build_asnwer(self, dialog: Dialog, question: str, media: list[dict] | None = None) -> tuple[str, list[str], dict]:
        messages = dialog.runtime_messages

        actions = []
        answer = ""
        is_success = False
        custom_data = {}
        run = None

        # 处理媒体信息,并转换为结构化的消息
        media_messages = await self._media_to_struct_message(media)
        if media_messages:
            for media_message in media_messages:
                if isinstance(media_message["content"], str):
                    messages.append(media_message["content"])

        # 获取对话的指令信息
        instructions = dialog.meta.get("intruction", "")

        # 通过 RAG(检索增强生成)从向量存储中获取补充信息
        rag_instructions, augmentations = await self.vs.retrieval(self.agent_instance, messages, group=dialog.group)

        if augmentations:
            instructions = instructions + "\n\n" + rag_instructions
            custom_data["augmentations"] = augmentations

        try:
            # 发送用户问题到 OpenAI API
            await self.client.beta.threads.messages.create(thread_id=dialog.thread_id, role="user", content=question, timeout=3)
            self.logger.debug(f"已发送问题 {dialog.id}: {question}")

            # 创建 OpenAI 运行线程,并提供附加指令
            run = await self.client.beta.threads.runs.create_and_poll(
                thread_id=dialog.thread_id,
                assistant_id=self.agent_instance.cloud_id,
                additional_instructions=instructions,
                timeout=6,
                response_format=self.agent_instance.response_format,
                additional_messages=media_messages,
            )

            self.logger.debug(f"已创建 run {dialog.id}: {run.id} - {run.status}: {question}")

            while not is_success:
                if run.status == "completed":
                    # 运行完成,获取最新的响应消息
                    messages = await self.client.beta.threads.messages.list(thread_id=dialog.thread_id, limit=1, timeout=3)
                    result = json.loads(messages.data[0].content[0].text.value)

                    answer = result["answer"] if "answer" in result else ""
                    actions = [result["action"]] if "action" in result else []
                    result.pop("action", None)
                    result.pop("answer", None)
                    custom_data["response_data"] = result

                    self.logger.debug(f"收到回答 run {dialog.id}: {run.id}: {answer}, {json.dumps(actions, ensure_ascii=False)}, {json.dumps(custom_data, ensure_ascii=False)}")
                    break
                elif run.status == "requires_action":
                    # 运行状态要求执行外部操作
                    outputs = []

                    for call in run.required_action.submit_tool_outputs.tool_calls:
                        call_id = call.id
                        function = call.function
                        try:
                            data = json.loads(function.arguments)
                        except:
                            data = {}

                        if "events" not in custom_data:
                            custom_data["events"] = {}

                        self.logger.debug(f"run.status {dialog.id}: {run.status}. {question}. {function.name}")

                        if function.name == TOOLS_GET_AUGMENTED_INFO:
                            func_result, _ = await self.vs.retrieval(self.agent_instance, data.get("data", []), group=dialog.group, use_consolidation=False)
                        else:
                            func_result = await self.er.route_event(function.name, dialog.external_id, dialog.user_id, dialog.user_type, **data)

                        if not custom_data["events"].get(function.name):
                            custom_data["events"][function.name] = {}

                        custom_data["events"][function.name][call_id] = {
                            "data": data,
                            "func_result": func_result,
                        }

                        outputs.append({"tool_call_id": call_id, "output": json.dumps(func_result)})

                    try:
                        # 发送外部工具执行结果
                        run = await self.client.beta.threads.runs.submit_tool_outputs_and_poll(
                            thread_id=dialog.thread_id,
                            run_id=run.id,
                            tool_outputs=outputs,
                            timeout=6,
                        )
                    except Exception as e:
                        self.logger.error(f"发送工具执行结果时出错 {dialog.id}: {e}")
                        run = await self.client.beta.threads.runs.poll(
                            thread_id=dialog.thread_id,
                            run_id=run.id,
                            timeout=6,
                        )
                elif run.status in ["expired", "cancelling", "cancelled", "failed"]:
                    # 运行状态异常,记录错误信息
                    try:
                        custom_data["code"] = run.last_error.code
                    except:
                        custom_data["code"] = run.status
                    custom_data["error"] = run.last_error.message
                    break
                elif run.status in ["incomplete", "queued", "in_progress"]:
                    # 运行仍在进行中,轮询获取状态
                    run = await self.client.beta.threads.runs.poll(
                        thread_id=dialog.thread_id,
                        run_id=run.id,
                        timeout=6,
                    )
                else:
                    raise Exception("OpenAI API 返回了未知状态")

        except Exception as e:
            self.logger.error(f"调用 GPT 生成回答时出错 {dialog.id}: {e}")
            if run:
                try:
                    custom_data["code"] = run.last_error.code
                    custom_data["error"] = run.last_error.message
                except:
                    custom_data["code"] = 0
                    custom_data["error"] = str(e)
            else:
                custom_data["code"] = 0
                custom_data["error"] = str(e)

        return answer, actions, custom_data

(2)方法create_cloud()用于在 OpenAI 平台上创建或更新 AI 助手(Assistant),具体功能如下:

  1. 工具处理:确保 main_tool 在工具列表中,如果尚未添加,则将其加入。
  2. 更新助手:如果 AI 助手已存在(即 cloud_id 存在),则尝试更新其配置,包括名称、指令、模型、工具和超参数(如 temperature 和 top_p)。
  3. 创建助手:如果 cloud_id 为空,则创建一个新的 AI 助手并返回其 ID。
  4. 异常处理:如果 API 请求失败,记录错误信息。
  5. 返回 ID:返回创建或更新后的 AI 助手 ID,如果失败,则返回 ""。
    async def create_cloud(self) -> str:
        # 获取当前助手的工具列表
        tools = self.agent_instance.tools
        main_tool = self.vs.main_tool()
        
        # 确保主要工具(main_tool)在工具列表中
        if main_tool:
            name = main_tool.get("function").get("name")
            tools_names = [tool.get("function").get("name") for tool in tools]

            if name not in tools_names:
                tools.append(main_tool)

        # 如果助手已经存在(即 cloud_id 存在),尝试更新助手
        if self.agent_instance.cloud_id:
            try:
                response = await self.client.beta.assistants.update(
                    name=self.agent_instance.name,  # 助手名称
                    assistant_id=self.agent_instance.cloud_id,  # 助手 ID
                    instructions=self.agent_instance.instruction,  # 助手的指令
                    model=self.cfg.openai.model,  # 使用的 OpenAI 语言模型
                    tools=tools,  # 该助手可用的工具
                    response_format=self.agent_instance.response_format,  # 响应格式
                    temperature=self.agent_instance.config.get("temperature", 0.5),  # 生成文本的随机性
                    top_p=self.agent_instance.config.get("top_p", 1.0),  # 影响生成多样性的参数
                )
                self.logger.debug(f"已更新助手 {self.agent_instance.id}: {response.id}")
                return response.id  # 返回更新后的助手 ID
            except Exception as e:
                self.logger.error(f"更新助手失败 {self.agent_instance.id}: {e}")

        # 如果助手不存在(即 cloud_id 为空),尝试创建新助手
        else:
            try:
                response = await self.client.beta.assistants.create(
                    name=self.agent_instance.name,  # 助手名称
                    instructions=self.agent_instance.instruction,  # 助手的指令
                    model=self.cfg.openai.model,  # 使用的 OpenAI 语言模型
                    tools=tools,  # 该助手可用的工具
                    response_format=self.agent_instance.response_format,  # 响应格式
                    temperature=self.agent_instance.config.get("temperature", 0.5),  # 生成文本的随机性
                    top_p=self.agent_instance.config.get("top_p", 1.0),  # 影响生成多样性的参数
                )
                self.logger.debug(f"已创建助手 {self.agent_instance.id}: {response.id}")
                return response.id  # 返回新创建的助手 ID
            except Exception as e:
                self.logger.error(f"创建助手失败 {self.agent_instance.id}: {e}")

        # 如果创建或更新失败,则返回原始 cloud_id(如果存在),否则返回空字符串
        return self.agent_instance.cloud_id if self.agent_instance.cloud_id else ""

(3)方法create_thread()通过 OpenAI API 创建一个新的对话线程,并返回其 thread_id。支持初始化消息 init_messages,用于在创建对话时添加初始内容。当发生异常时,记录错误日志并返回 None。

    async def create_thread(self, dialog_id: int, init_messages: list = None) -> str | None:
        """
        创建新的对话线程。

        :param dialog_id: 当前对话的 ID(用于日志记录)。
        :param init_messages: 初始化消息列表,可选参数,默认 None。
        :return: 创建成功则返回 thread_id,失败则返回 None。
        """
        try:
            # 通过 OpenAI API 创建新对话线程,并设置超时时间为 5 秒
            thread = await self.client.beta.threads.create(timeout=5, messages=init_messages)
            return thread.id  # 返回创建的线程 ID
        except Exception as e:
            # 记录创建对话线程的错误日志
            self.logger.error(f"创建新对话线程失败 {dialog_id}: {e}")
            return None  # 返回 None 表示创建失败

(4)方法delete_thread()通过 OpenAI API 删除指定的对话线程 thread_id,若 thread_id 为空则直接返回 True(无需删除)。当发生异常时,记录错误日志并返回 False,表示删除失败。

    async def delete_thread(self, dialog_id: int, thread_id: str | None) -> bool:
        """
        删除指定的对话线程。

        :param dialog_id: 当前对话的 ID(用于日志记录)。
        :param thread_id: 需要删除的对话线程 ID,如果为空,则直接返回 True。
        :return: 删除成功返回 True,失败返回 False。
        """
        # 如果 thread_id 为空,说明无需删除,直接返回 True
        if not thread_id:
            return True

        try:
            # 通过 OpenAI API 删除指定的对话线程,设置超时时间为 5 秒
            await self.client.beta.threads.delete(thread_id=thread_id, timeout=5)
            return True  # 删除成功返回 True
        except Exception as e:
            # 记录删除对话线程的错误日志
            self.logger.error(f"删除对话线程失败 {dialog_id}: {e}")
            return False  # 返回 False 表示删除失败

(5)下面代码的功能是处理各种类型的媒体文件(图片、音频、视频、文件),并将其转换为可发送给 OpenAI Assistant 的结构化消息。

  1. 图片处理:下载图片并尝试上传至 OpenAI,以便在对话中使用。如果上传失败,则使用直接的图片 URL。
  2. 音频处理:调用 speech_to_text 方法,将音频文件转换为文本,并作为消息发送。如果转录失败,返回错误信息。
  3. 视频处理:下载视频文件,并提取音频进行语音转录。从视频的不同时间点(0%、25%、50%、75%、100%)提取关键帧,并上传至 OpenAI,以便分析视频内容。处理完成后,删除临时文件。
  4. 文件处理(TODO 未来扩展):目前仅返回文件的 URL,后续可能添加对文件内容的读取和处理功能。

 

    async def _media_to_struct_message(self, media: list[dict] | None):
        if not media:
            return []

        messages = []

        for item in media:
            media_type = item.get('type')
            url = item.get('url')

            if not url:
                continue

            if media_type == 'img':
                try:
                    async with aiohttp.ClientSession() as session:
                        async with session.get(url) as response:
                            if response.status != 200:
                                raise Exception(f"Failed to download image from {url}")

                            with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as temp_file:
                                temp_img_filename = temp_file.name
                                temp_file.write(await response.read())

                    with open(temp_img_filename, "rb") as img_file:
                        file_id = await self.llm.save_file(img_file, purpose="vision")

                    if file_id:
                        messages.append({
                            "role": "user",
                            "content": [
                                {
                                    "type": "image_file",
                                    "image_file": {
                                        "file_id": file_id
                                    }
                                }
                            ]
                        })
                    else:
                        messages.append({
                            "role": "user",
                            "content": [
                                {
                                    "type": "image_url",
                                    "image_url": {
                                        "url": url
                                    }
                                }
                            ]
                        })

                    if os.path.exists(temp_img_filename):
                        os.remove(temp_img_filename)

                except Exception as e:
                    messages.append({
                        "role": "user",
                        "content": [
                            {
                                "type": "image_url",
                                "image_url": {
                                    "url": url
                                }
                            }
                        ]
                    })

            elif media_type == 'audio':
                try:
                    transcribed_text = await self.llm.speech_to_text(url)
                    messages.append({
                        "role": "user",
                        "content": f"Audio transcription: {transcribed_text}"
                    })
                except Exception as e:
                    messages.append({
                        "role": "user",
                        "content": f"Failed to transcribe audio from {url}. Error: {str(e)}"
                    })

            elif media_type == 'video':
                try:
                    async with aiohttp.ClientSession() as session:
                        async with session.get(url) as response:
                            if response.status != 200:
                                raise Exception(f"Failed to download video file from {url}")

                            with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as temp_file:
                                temp_video_filename = temp_file.name
                                temp_file.write(await response.read())

                    video_clip = await asyncio.to_thread(VideoFileClip, temp_video_filename)

                    with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as temp_audio_file:
                        temp_audio_filename = temp_audio_file.name

                    await asyncio.to_thread(video_clip.audio.write_audiofile, temp_audio_filename)

                    transcribed_text = await self.llm.speech_to_text(temp_audio_filename, is_local_file=True)

                    messages.append({
                        "role": "user",
                        "content": f"Transcription of audio from video: {transcribed_text}"
                    })

                    cap = await asyncio.to_thread(cv2.VideoCapture, temp_video_filename)
                    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

                    frame_positions = [0, 0.25, 0.5, 0.75, 1.0]
                    frame_descriptions = ["start", "quarter", "middle", "three-quarters", "end"]

                    for i, pos in enumerate(frame_positions):
                        frame_idx = max(0, min(int(total_frames * pos), total_frames - 1))
                        await asyncio.to_thread(cap.set, cv2.CAP_PROP_POS_FRAMES, frame_idx)
                        ret, frame = await asyncio.to_thread(cap.read)

                        if ret:
                            with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as frame_file:
                                frame_filename = frame_file.name

                            await asyncio.to_thread(cv2.imwrite, frame_filename, frame)

  
                            with open(frame_filename, "rb") as img_file:
                                file_id = await self.llm.save_file(img_file, purpose="vision")

                            if file_id:
                                position_name = frame_descriptions[i]
                                messages.append({
                                    "role": "user",
                                    "content": [
                                        {
                                            "type": "text",
                                            "text": f"Frame from {position_name} of the video:"
                                        },
                                        {
                                            "type": "image_file",
                                            "image_file": {
                                                "file_id": file_id
                                            }
                                        }
                                    ]
                                })

                            os.remove(frame_filename)

                    await asyncio.to_thread(cap.release)

                    if os.path.exists(temp_audio_filename):
                        os.remove(temp_audio_filename)

                    if os.path.exists(temp_video_filename):
                        os.remove(temp_video_filename)

                except Exception as e:
                    messages.append({
                        "role": "user",
                        "content": f"Failed to process video from {url}. Error: {str(e)}"
                    })

            elif media_type == 'file':
                messages.append({
                    "role": "user",
                    "content": f"File link: {url}"
                })

        return messages

上述代码确保了各种媒体类型能够被正确转换,并适用于 OpenAI Assistant 交互,同时提供错误处理和日志记录功能,以提高系统的健壮性。

评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包

打赏作者

码农三叔

感谢鼓励

¥1 ¥2 ¥4 ¥6 ¥10 ¥20
扫码支付:¥1
获取中
扫码支付

您的余额不足,请更换扫码支付或充值

打赏作者

实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值