9.5.3 DeepSeek对话Agent
文件deepseek_assistant_agent.py是一个适用于 DeepSeek AI 的智能助手代理(Agent),其主要功能是与 DeepSeek API 交互,实现自然语言处理(NLP)对话、音视频转录以及多媒体信息解析等能力。
class DeepSeekAssistantAgent(BaseAgent):
def __init__(self, agent_instance: AgentInstance, cfg: Config, vs: VectorStore, llm: BaseLLM):
super().__init__(agent_instance, cfg, vs, llm)
self.llm: BaseLLM
self.api_url = self.cfg.deepseek.api_url # DeepSeek API 的 URL
self.api_key = self.cfg.deepseek.api_key # DeepSeek API 密钥
async def _send_request(self, endpoint: str, method: str = 'GET', data: dict = None):
"""发送 HTTP 请求到 DeepSeek API"""
url = f"{self.api_url}/{endpoint}"
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
async with aiohttp.ClientSession() as session:
if method == 'GET':
async with session.get(url, headers=headers) as response:
return await response.json()
elif method == 'POST':
async with session.post(url, headers=headers, json=data) as response:
return await response.json()
async def create_thread(self, dialog_id: int, init_messages: list = None) -> str | None:
"""在 DeepSeek 中创建新对话线程"""
try:
data = {"messages": init_messages}
response = await self._send_request("threads", method="POST", data=data)
return response.get("id")
except Exception as e:
self.logger.error(f"创建新对话 {dialog_id} 失败: {e}")
return None
async def delete_thread(self, dialog_id: int, thread_id: str | None) -> bool:
"""在 DeepSeek 中删除对话线程"""
if not thread_id:
return True
try:
await self._send_request(f"threads/{thread_id}", method="DELETE")
return True
except Exception as e:
self.logger.error(f"删除对话 {dialog_id} 失败: {e}")
return False
async def build_answer(self, dialog: Dialog, question: str, media: list[dict] | None = None) -> tuple[str, list[str], dict]:
"""使用 DeepSeek AI 助手生成回答"""
messages = dialog.runtime_messages
actions = []
answer = ""
custom_data = {}
# 处理多媒体数据并转换为结构化消息
media_messages = await self._media_to_struct_message(media)
if media_messages:
messages.extend(media_messages)
# 获取对话的额外指令
instructions = dialog.meta.get("instruction", "")
rag_instructions, augmentations = await self.vs.retrieval(self.agent_instance, messages, group=dialog.group)
if augmentations:
instructions += "\n\n" + rag_instructions
custom_data["augmentations"] = augmentations
try:
# 生成 AI 助手的回复
data = {
"thread_id": dialog.thread_id,
"assistant_id": self.agent_instance.cloud_id,
"instructions": instructions,
"messages": messages,
"additional_messages": media_messages
}
response = await self._send_request("generate_response", method="POST", data=data)
answer = response.get("answer", "")
actions = response.get("actions", [])
custom_data["response_data"] = {k: v for k, v in response.items() if k not in ["answer", "actions"]}
except Exception as e:
self.logger.error(f"生成对话 {dialog.id} 的回复失败: {e}")
custom_data["error"] = str(e)
return answer, actions, custom_data
async def _media_to_struct_message(self, media: list[dict] | None):
"""处理不同类型的媒体(图片、音频、视频、文件)以供 DeepSeek 助手使用"""
if not media:
return []
messages = []
for item in media:
media_type = item.get('type')
url = item.get('url')
if not url:
continue
if media_type == 'img':
try:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
if response.status != 200:
raise Exception(f"无法从 {url} 下载图片")
with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as temp_file:
temp_img_filename = temp_file.name
temp_file.write(await response.read())
with open(temp_img_filename, "rb") as img_file:
file_id = await self.llm.save_file(img_file, purpose="vision")
if file_id:
messages.append({"role": "user", "content": [{"type": "image_file", "image_file": {"file_id": file_id}}]})
else:
messages.append({"role": "user", "content": [{"type": "image_url", "image_url": {"url": url}}]})
os.remove(temp_img_filename)
except Exception as e:
messages.append({"role": "user", "content": [{"type": "image_url", "image_url": {"url": url}}]})
elif media_type == 'audio':
try:
transcribed_text = await self.llm.speech_to_text(url)
messages.append({"role": "user", "content": f"音频转录: {transcribed_text}"})
except Exception as e:
messages.append({"role": "user", "content": f"无法转录音频 {url},错误: {str(e)}"})
elif media_type == 'video':
try:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
if response.status != 200:
raise Exception(f"无法从 {url} 下载视频")
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as temp_file:
temp_video_filename = temp_file.name
temp_file.write(await response.read())
# 提取音频并进行转录
video_clip = await asyncio.to_thread(VideoFileClip, temp_video_filename)
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as temp_audio_file:
temp_audio_filename = temp_audio_file.name
await asyncio.to_thread(video_clip.audio.write_audiofile, temp_audio_filename)
transcribed_text = await self.llm.speech_to_text(temp_audio_filename, is_local_file=True)
messages.append({"role": "user", "content": f"视频音频转录: {transcribed_text}"})
os.remove(temp_audio_filename)
os.remove(temp_video_filename)
except Exception as e:
messages.append({"role": "user", "content": f"无法处理视频 {url},错误: {str(e)}"})
elif media_type == 'file':
messages.append({"role": "user", "content": f"文件链接: {url}"})
return messages
上面的代码实现了一个 DeepSeekAssistantAgent 类,该类通过与 DeepSeek API 交互来实现自动化对话生成和多媒体处理。其主要功能包括创建和删除对话线程、生成回答以及处理不同类型的多媒体数据(图片、音频、视频等)。以下是代码实现的具体流程:
- 初始化 (__init__ 方法):通过 super().__init__(...) 调用父类 BaseAgent 的初始化方法,设置了 agent_instance, cfg, vs, llm 等基本参数。
- 初始化 DeepSeek 客户端:通过从配置文件中获取 api_url 和 api_key,初始化与 DeepSeek API 的连接信息。这些信息用于后续与 DeepSeek API 的交互。
- 发送 HTTP 请求 (_send_request 方法):这个方法是与 DeepSeek API 交互的核心,使用 aiohttp 发送 HTTP 请求。该方法支持两种请求类型:GET 和 POST,分别用于获取数据和提交数据。POST 请求会将数据以 JSON 格式发送到 DeepSeek API 端点,GET 请求则从 DeepSeek API 获取响应数据。
- 创建对话线程 (create_thread 方法):在此方法中,通过调用 _send_request 方法向 DeepSeek API 的 threads 端点发送 POST 请求来创建一个新的对话线程。该请求会传递初始消息(init_messages),DeepSeek API 返回的响应包含线程 ID,将其返回给调用者。
- 删除对话线程 (delete_thread 方法):如果存在有效的 thread_id,该方法会调用 _send_request 向 DeepSeek API 的 threads/{thread_id} 端点发送 DELETE 请求来删除指定的对话线程。如果没有有效的 thread_id,直接返回 True。
- 生成回答 (build_answer 方法):从对话对象(dialog)中提取运行时消息(runtime_messages),并将这些消息与可能存在的媒体消息合并。通过 vs.retrieval 方法检索与当前对话相关的增强信息(rag_instructions 和 augmentations),并将其加入到指令中。使用 DeepSeek API 的 generate_response 端点生成回答。
- 多媒体消息处理 (_media_to_struct_message 方法):该方法负责处理不同类型的媒体(图片、音频、视频等),将其转化为 DeepSeek 可以理解的结构化消息格式。