(9-4-2)智能客服Agent开发:Agent(02)DeepSeek对话Agent

9.5.3  DeepSeek对话Agent

文件deepseek_assistant_agent.py是一个适用于 DeepSeek AI 的智能助手代理(Agent),其主要功能是与 DeepSeek API 交互,实现自然语言处理(NLP)对话、音视频转录以及多媒体信息解析等能力。

class DeepSeekAssistantAgent(BaseAgent):
    def __init__(self, agent_instance: AgentInstance, cfg: Config, vs: VectorStore, llm: BaseLLM):
        super().__init__(agent_instance, cfg, vs, llm)
        self.llm: BaseLLM
        self.api_url = self.cfg.deepseek.api_url  # DeepSeek API 的 URL
        self.api_key = self.cfg.deepseek.api_key  # DeepSeek API 密钥

    async def _send_request(self, endpoint: str, method: str = 'GET', data: dict = None):
        """发送 HTTP 请求到 DeepSeek API"""
        url = f"{self.api_url}/{endpoint}"
        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json"
        }
        async with aiohttp.ClientSession() as session:
            if method == 'GET':
                async with session.get(url, headers=headers) as response:
                    return await response.json()
            elif method == 'POST':
                async with session.post(url, headers=headers, json=data) as response:
                    return await response.json()

    async def create_thread(self, dialog_id: int, init_messages: list = None) -> str | None:
        """在 DeepSeek 中创建新对话线程"""
        try:
            data = {"messages": init_messages}
            response = await self._send_request("threads", method="POST", data=data)
            return response.get("id")
        except Exception as e:
            self.logger.error(f"创建新对话 {dialog_id} 失败: {e}")
            return None

    async def delete_thread(self, dialog_id: int, thread_id: str | None) -> bool:
        """在 DeepSeek 中删除对话线程"""
        if not thread_id:
            return True

        try:
            await self._send_request(f"threads/{thread_id}", method="DELETE")
            return True
        except Exception as e:
            self.logger.error(f"删除对话 {dialog_id} 失败: {e}")
            return False

    async def build_answer(self, dialog: Dialog, question: str, media: list[dict] | None = None) -> tuple[str, list[str], dict]:
        """使用 DeepSeek AI 助手生成回答"""
        messages = dialog.runtime_messages
        actions = []
        answer = ""
        custom_data = {}

        # 处理多媒体数据并转换为结构化消息
        media_messages = await self._media_to_struct_message(media)
        if media_messages:
            messages.extend(media_messages)

        # 获取对话的额外指令
        instructions = dialog.meta.get("instruction", "")
        rag_instructions, augmentations = await self.vs.retrieval(self.agent_instance, messages, group=dialog.group)

        if augmentations:
            instructions += "\n\n" + rag_instructions
            custom_data["augmentations"] = augmentations

        try:
            # 生成 AI 助手的回复
            data = {
                "thread_id": dialog.thread_id,
                "assistant_id": self.agent_instance.cloud_id,
                "instructions": instructions,
                "messages": messages,
                "additional_messages": media_messages
            }
            response = await self._send_request("generate_response", method="POST", data=data)
            answer = response.get("answer", "")
            actions = response.get("actions", [])
            custom_data["response_data"] = {k: v for k, v in response.items() if k not in ["answer", "actions"]}

        except Exception as e:
            self.logger.error(f"生成对话 {dialog.id} 的回复失败: {e}")
            custom_data["error"] = str(e)

        return answer, actions, custom_data

    async def _media_to_struct_message(self, media: list[dict] | None):
        """处理不同类型的媒体(图片、音频、视频、文件)以供 DeepSeek 助手使用"""
        if not media:
            return []

        messages = []

        for item in media:
            media_type = item.get('type')
            url = item.get('url')

            if not url:
                continue

            if media_type == 'img':
                try:
                    async with aiohttp.ClientSession() as session:
                        async with session.get(url) as response:
                            if response.status != 200:
                                raise Exception(f"无法从 {url} 下载图片")

                            with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as temp_file:
                                temp_img_filename = temp_file.name
                                temp_file.write(await response.read())

                    with open(temp_img_filename, "rb") as img_file:
                        file_id = await self.llm.save_file(img_file, purpose="vision")

                    if file_id:
                        messages.append({"role": "user", "content": [{"type": "image_file", "image_file": {"file_id": file_id}}]})
                    else:
                        messages.append({"role": "user", "content": [{"type": "image_url", "image_url": {"url": url}}]})

                    os.remove(temp_img_filename)

                except Exception as e:
                    messages.append({"role": "user", "content": [{"type": "image_url", "image_url": {"url": url}}]})

            elif media_type == 'audio':
                try:
                    transcribed_text = await self.llm.speech_to_text(url)
                    messages.append({"role": "user", "content": f"音频转录: {transcribed_text}"})
                except Exception as e:
                    messages.append({"role": "user", "content": f"无法转录音频 {url},错误: {str(e)}"})

            elif media_type == 'video':
                try:
                    async with aiohttp.ClientSession() as session:
                        async with session.get(url) as response:
                            if response.status != 200:
                                raise Exception(f"无法从 {url} 下载视频")

                            with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as temp_file:
                                temp_video_filename = temp_file.name
                                temp_file.write(await response.read())

                    # 提取音频并进行转录
                    video_clip = await asyncio.to_thread(VideoFileClip, temp_video_filename)

                    with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as temp_audio_file:
                        temp_audio_filename = temp_audio_file.name

                    await asyncio.to_thread(video_clip.audio.write_audiofile, temp_audio_filename)

                    transcribed_text = await self.llm.speech_to_text(temp_audio_filename, is_local_file=True)
                    messages.append({"role": "user", "content": f"视频音频转录: {transcribed_text}"})

                    os.remove(temp_audio_filename)
                    os.remove(temp_video_filename)

                except Exception as e:
                    messages.append({"role": "user", "content": f"无法处理视频 {url},错误: {str(e)}"})

            elif media_type == 'file':
                messages.append({"role": "user", "content": f"文件链接: {url}"})

        return messages

上面的代码实现了一个 DeepSeekAssistantAgent 类,该类通过与 DeepSeek API 交互来实现自动化对话生成和多媒体处理。其主要功能包括创建和删除对话线程、生成回答以及处理不同类型的多媒体数据(图片、音频、视频等)。以下是代码实现的具体流程:

  1. 初始化 (__init__ 方法):通过 super().__init__(...) 调用父类 BaseAgent 的初始化方法,设置了 agent_instance, cfg, vs, llm 等基本参数。
  2. 初始化 DeepSeek 客户端:通过从配置文件中获取 api_url 和 api_key,初始化与 DeepSeek API 的连接信息。这些信息用于后续与 DeepSeek API 的交互。
  3. 发送 HTTP 请求 (_send_request 方法):这个方法是与 DeepSeek API 交互的核心,使用 aiohttp 发送 HTTP 请求。该方法支持两种请求类型:GET 和 POST,分别用于获取数据和提交数据。POST 请求会将数据以 JSON 格式发送到 DeepSeek API 端点,GET 请求则从 DeepSeek API 获取响应数据。
  4. 创建对话线程 (create_thread 方法):在此方法中,通过调用 _send_request 方法向 DeepSeek API 的 threads 端点发送 POST 请求来创建一个新的对话线程。该请求会传递初始消息(init_messages),DeepSeek API 返回的响应包含线程 ID,将其返回给调用者。
  5. 删除对话线程 (delete_thread 方法):如果存在有效的 thread_id,该方法会调用 _send_request 向 DeepSeek API 的 threads/{thread_id} 端点发送 DELETE 请求来删除指定的对话线程。如果没有有效的 thread_id,直接返回 True。
  6. 生成回答 (build_answer 方法):从对话对象(dialog)中提取运行时消息(runtime_messages),并将这些消息与可能存在的媒体消息合并。通过 vs.retrieval 方法检索与当前对话相关的增强信息(rag_instructions 和 augmentations),并将其加入到指令中。使用 DeepSeek API 的 generate_response 端点生成回答。
  7. 多媒体消息处理 (_media_to_struct_message 方法):该方法负责处理不同类型的媒体(图片、音频、视频等),将其转化为 DeepSeek 可以理解的结构化消息格式。

评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包

打赏作者

码农三叔

感谢鼓励

¥1 ¥2 ¥4 ¥6 ¥10 ¥20
扫码支付:¥1
获取中
扫码支付

您的余额不足,请更换扫码支付或充值

打赏作者

实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值