9.5 Agent
本项目的“Agents”目录包含了与智能代理(Agent)相关的功能和实现,主要负责管理和实现智能代理相关的功能,包括定义代理的基础接口(BaseAgent)、提供代理工厂(agent_factory.py)以根据配置创建不同类型的代理,以及实现具体的代理逻辑,如 OpenAIAgent 和 OpenAIAssistantAgent。这些代理封装了与各种语言模型(LLM)和外部服务(如 OpenAI 或 DeepSeek API)的交互逻辑,从而实现对话生成、任务处理和多媒体数据解析等功能。
9.5.1 Agent工厂
文件agent_factory.py的功能是根据传入的 AgentInstance 类型(如 openai 或 openai_assistant)创建不同类型的智能代理实例,通过工厂函数create_agent()来实例化合适的代理类(OpenAIAgent 或 OpenAIAssistantAgent),并传入所需的配置、向量存储和语言模型(LLM)。如果代理类型无法识别,会抛出一个 ValueError 异常。
from app.pkg.agents.base_agent import BaseAgent
from app.pkg.agents.openai_agent import OpenAIAgent
from app.pkg.agents.openai_assistant_agent import OpenAIAssistantAgent
from app.pkg.config import Config
from app.pkg.entities.models import AgentInstance
from app.pkg.llm.base_llm import BaseLLM
from app.pkg.vector_store.vector_store import VectorStore
def create_agent(agent_instance: AgentInstance, cfg: Config, vs: VectorStore, llm: BaseLLM) -> BaseAgent:
if agent_instance.type == "openai":
return OpenAIAgent(agent_instance, cfg, vs, llm)
if agent_instance.type == "openai_assistant":
return OpenAIAssistantAgent(agent_instance, cfg, vs, llm)
raise ValueError(f"Неизвестный тип агента: {agent_instance.type}")
9.5.2 OpenAI对话Agent
1. 类OpenAIAgent
文件base_agent.py实现了类OpenAIAgent,此类是 BaseAgent 类的子类,旨在通过 OpenAI 提供的技术为对话系统构建代理。它实现了创建、删除对话线程以及生成对话答案等功能。具体包括创建和删除线程、与 OpenAI API 交互生成答案,并支持处理问题和媒体内容。另外,类OpenAIAgent还包括了与云服务交互的功能,支持为代理创建云实例。
class OpenAIAgent(BaseAgent):
def __init__(self, agent_instance, cfg, vs, llm: BaseLLM):
super().__init__(agent_instance, cfg, vs, llm)
async def delete_thread(self, dialog_id: int, thread_id: str | None) -> bool:
"""
从 OpenAI 系统(或 OpenAIAgent 交互的任何系统)删除一个线程。
如果 `thread_id` 为 None,则删除与 `dialog_id` 对应的线程。
"""
try:
# 在这个示例中,我们模拟一个 API 调用来删除一个线程。
# OpenAI 没有一个专门的“删除线程”API,所以我们假设自定义行为
# 用于从数据库或向量存储等系统中删除线程。
if thread_id:
# 删除特定线程的逻辑
self.logger.info(f"Deleting thread {thread_id} for dialog {dialog_id}")
else:
# 删除基于对话的线程逻辑
self.logger.info(f"Deleting thread for dialog {dialog_id}")
# 模拟删除,返回 True 表示成功
return True
except Exception as e:
self.logger.error(f"Failed to delete thread: {str(e)}")
return False
async def create_thread(self, dialog_id: int, init_messages: list = None) -> str | None:
"""
在 OpenAI 系统(或 OpenAIAgent 交互的任何系统)中创建一个新线程。
"""
try:
# 我们通过 OpenAI(或其他平台)模拟线程创建。
# 通常,这涉及到发送初始消息来创建线程。
self.logger.info(f"Creating thread for dialog {dialog_id} with initial messages: {init_messages}")
# 为了演示,生成一个虚假的线程 ID
thread_id = f"thread-{dialog_id}-{self.cfg.openai.api_key}"
# 创建后返回线程 ID
return thread_id
except Exception as e:
self.logger.error(f"Failed to create thread: {str(e)}")
return None
async def create_cloud(self) -> str:
"""
为 OpenAIAgent 创建云实例。可能涉及与 OpenAI 的
API 进行交互以部署代理实例,或为代理初始化基于云的资源。
"""
try:
# 为代理创建云实例的占位符
self.logger.info(f"Creating cloud instance for agent {self.agent_instance.id}")
cloud_instance_id = f"cloud-{self.agent_instance.id}"
# 返回云实例 ID
return cloud_instance_id
except Exception as e:
self.logger.error(f"Failed to create cloud instance: {str(e)}")
return ""
async def build_asnwer(self, dialog: Dialog, question: str, media: list | None = None) -> tuple[str, list[str], dict]:
"""
使用 OpenAI 模型或 API 构建答案。它接受一个问题,通过模型处理它,
并返回答案以及附加的元数据。
"""
try:
# 模拟与 OpenAI 的 API(例如 GPT-3 或 GPT-4)交互以生成答案。
response = await self.llm.invoke(question)
# 假设模型返回一个字典,其中包含答案和任何元数据
answer = response.get("content", "")
metadata = response.get("usage", {})
# 如果请求中有媒体,我们在这里处理它
media_urls = [item["url"] for item in (media or [])]
return answer, media_urls, metadata
except Exception as e:
self.logger.error(f"Failed to build answer for question: {question}. Error: {str(e)}")
return "", [], {}
2. 类OpenAIAssistantAgent
文件openai_assistant_agent.py定义了一个 OpenAIAssistantAgent 类,用于与 OpenAI 的 API 交互,提供基于 LLM(大语言模型)的对话管理、媒体处理和增强信息检索功能。文件openai_assistant_agent.py的具体实现流程如下所示:
(1)方法 build_asnwer()的功能是处理用户对话,通过 OpenAI 的 API 发送问题,并管理 API 运行状态,包括处理请求超时、解析 AI 生成的响应、执行所需的外部函数调用,并最终返回 AI 生成的答案、相关的动作和附加数据。
class OpenAIAssistantAgent(BaseAgent):
def __init__(self, agent_instance: AgentInstance, cfg: Config, vs: VectorStore, llm: BaseLLM|OpenAILLM):
super().__init__(agent_instance, cfg, vs, llm)
self.llm: OpenAILLM
self.client = AsyncOpenAI(api_key=self.cfg.openai.api_key)
async def build_asnwer(self, dialog: Dialog, question: str, media: list[dict] | None = None) -> tuple[str, list[str], dict]:
messages = dialog.runtime_messages
actions = []
answer = ""
is_success = False
custom_data = {}
run = None
# 处理媒体信息,并转换为结构化的消息
media_messages = await self._media_to_struct_message(media)
if media_messages:
for media_message in media_messages:
if isinstance(media_message["content"], str):
messages.append(media_message["content"])
# 获取对话的指令信息
instructions = dialog.meta.get("intruction", "")
# 通过 RAG(检索增强生成)从向量存储中获取补充信息
rag_instructions, augmentations = await self.vs.retrieval(self.agent_instance, messages, group=dialog.group)
if augmentations:
instructions = instructions + "\n\n" + rag_instructions
custom_data["augmentations"] = augmentations
try:
# 发送用户问题到 OpenAI API
await self.client.beta.threads.messages.create(thread_id=dialog.thread_id, role="user", content=question, timeout=3)
self.logger.debug(f"已发送问题 {dialog.id}: {question}")
# 创建 OpenAI 运行线程,并提供附加指令
run = await self.client.beta.threads.runs.create_and_poll(
thread_id=dialog.thread_id,
assistant_id=self.agent_instance.cloud_id,
additional_instructions=instructions,
timeout=6,
response_format=self.agent_instance.response_format,
additional_messages=media_messages,
)
self.logger.debug(f"已创建 run {dialog.id}: {run.id} - {run.status}: {question}")
while not is_success:
if run.status == "completed":
# 运行完成,获取最新的响应消息
messages = await self.client.beta.threads.messages.list(thread_id=dialog.thread_id, limit=1, timeout=3)
result = json.loads(messages.data[0].content[0].text.value)
answer = result["answer"] if "answer" in result else ""
actions = [result["action"]] if "action" in result else []
result.pop("action", None)
result.pop("answer", None)
custom_data["response_data"] = result
self.logger.debug(f"收到回答 run {dialog.id}: {run.id}: {answer}, {json.dumps(actions, ensure_ascii=False)}, {json.dumps(custom_data, ensure_ascii=False)}")
break
elif run.status == "requires_action":
# 运行状态要求执行外部操作
outputs = []
for call in run.required_action.submit_tool_outputs.tool_calls:
call_id = call.id
function = call.function
try:
data = json.loads(function.arguments)
except:
data = {}
if "events" not in custom_data:
custom_data["events"] = {}
self.logger.debug(f"run.status {dialog.id}: {run.status}. {question}. {function.name}")
if function.name == TOOLS_GET_AUGMENTED_INFO:
func_result, _ = await self.vs.retrieval(self.agent_instance, data.get("data", []), group=dialog.group, use_consolidation=False)
else:
func_result = await self.er.route_event(function.name, dialog.external_id, dialog.user_id, dialog.user_type, **data)
if not custom_data["events"].get(function.name):
custom_data["events"][function.name] = {}
custom_data["events"][function.name][call_id] = {
"data": data,
"func_result": func_result,
}
outputs.append({"tool_call_id": call_id, "output": json.dumps(func_result)})
try:
# 发送外部工具执行结果
run = await self.client.beta.threads.runs.submit_tool_outputs_and_poll(
thread_id=dialog.thread_id,
run_id=run.id,
tool_outputs=outputs,
timeout=6,
)
except Exception as e:
self.logger.error(f"发送工具执行结果时出错 {dialog.id}: {e}")
run = await self.client.beta.threads.runs.poll(
thread_id=dialog.thread_id,
run_id=run.id,
timeout=6,
)
elif run.status in ["expired", "cancelling", "cancelled", "failed"]:
# 运行状态异常,记录错误信息
try:
custom_data["code"] = run.last_error.code
except:
custom_data["code"] = run.status
custom_data["error"] = run.last_error.message
break
elif run.status in ["incomplete", "queued", "in_progress"]:
# 运行仍在进行中,轮询获取状态
run = await self.client.beta.threads.runs.poll(
thread_id=dialog.thread_id,
run_id=run.id,
timeout=6,
)
else:
raise Exception("OpenAI API 返回了未知状态")
except Exception as e:
self.logger.error(f"调用 GPT 生成回答时出错 {dialog.id}: {e}")
if run:
try:
custom_data["code"] = run.last_error.code
custom_data["error"] = run.last_error.message
except:
custom_data["code"] = 0
custom_data["error"] = str(e)
else:
custom_data["code"] = 0
custom_data["error"] = str(e)
return answer, actions, custom_data
(2)方法create_cloud()用于在 OpenAI 平台上创建或更新 AI 助手(Assistant),具体功能如下:
- 工具处理:确保 main_tool 在工具列表中,如果尚未添加,则将其加入。
- 更新助手:如果 AI 助手已存在(即 cloud_id 存在),则尝试更新其配置,包括名称、指令、模型、工具和超参数(如 temperature 和 top_p)。
- 创建助手:如果 cloud_id 为空,则创建一个新的 AI 助手并返回其 ID。
- 异常处理:如果 API 请求失败,记录错误信息。
- 返回 ID:返回创建或更新后的 AI 助手 ID,如果失败,则返回 ""。
async def create_cloud(self) -> str:
# 获取当前助手的工具列表
tools = self.agent_instance.tools
main_tool = self.vs.main_tool()
# 确保主要工具(main_tool)在工具列表中
if main_tool:
name = main_tool.get("function").get("name")
tools_names = [tool.get("function").get("name") for tool in tools]
if name not in tools_names:
tools.append(main_tool)
# 如果助手已经存在(即 cloud_id 存在),尝试更新助手
if self.agent_instance.cloud_id:
try:
response = await self.client.beta.assistants.update(
name=self.agent_instance.name, # 助手名称
assistant_id=self.agent_instance.cloud_id, # 助手 ID
instructions=self.agent_instance.instruction, # 助手的指令
model=self.cfg.openai.model, # 使用的 OpenAI 语言模型
tools=tools, # 该助手可用的工具
response_format=self.agent_instance.response_format, # 响应格式
temperature=self.agent_instance.config.get("temperature", 0.5), # 生成文本的随机性
top_p=self.agent_instance.config.get("top_p", 1.0), # 影响生成多样性的参数
)
self.logger.debug(f"已更新助手 {self.agent_instance.id}: {response.id}")
return response.id # 返回更新后的助手 ID
except Exception as e:
self.logger.error(f"更新助手失败 {self.agent_instance.id}: {e}")
# 如果助手不存在(即 cloud_id 为空),尝试创建新助手
else:
try:
response = await self.client.beta.assistants.create(
name=self.agent_instance.name, # 助手名称
instructions=self.agent_instance.instruction, # 助手的指令
model=self.cfg.openai.model, # 使用的 OpenAI 语言模型
tools=tools, # 该助手可用的工具
response_format=self.agent_instance.response_format, # 响应格式
temperature=self.agent_instance.config.get("temperature", 0.5), # 生成文本的随机性
top_p=self.agent_instance.config.get("top_p", 1.0), # 影响生成多样性的参数
)
self.logger.debug(f"已创建助手 {self.agent_instance.id}: {response.id}")
return response.id # 返回新创建的助手 ID
except Exception as e:
self.logger.error(f"创建助手失败 {self.agent_instance.id}: {e}")
# 如果创建或更新失败,则返回原始 cloud_id(如果存在),否则返回空字符串
return self.agent_instance.cloud_id if self.agent_instance.cloud_id else ""
(3)方法create_thread()通过 OpenAI API 创建一个新的对话线程,并返回其 thread_id。支持初始化消息 init_messages,用于在创建对话时添加初始内容。当发生异常时,记录错误日志并返回 None。
async def create_thread(self, dialog_id: int, init_messages: list = None) -> str | None:
"""
创建新的对话线程。
:param dialog_id: 当前对话的 ID(用于日志记录)。
:param init_messages: 初始化消息列表,可选参数,默认 None。
:return: 创建成功则返回 thread_id,失败则返回 None。
"""
try:
# 通过 OpenAI API 创建新对话线程,并设置超时时间为 5 秒
thread = await self.client.beta.threads.create(timeout=5, messages=init_messages)
return thread.id # 返回创建的线程 ID
except Exception as e:
# 记录创建对话线程的错误日志
self.logger.error(f"创建新对话线程失败 {dialog_id}: {e}")
return None # 返回 None 表示创建失败
(4)方法delete_thread()通过 OpenAI API 删除指定的对话线程 thread_id,若 thread_id 为空则直接返回 True(无需删除)。当发生异常时,记录错误日志并返回 False,表示删除失败。
async def delete_thread(self, dialog_id: int, thread_id: str | None) -> bool:
"""
删除指定的对话线程。
:param dialog_id: 当前对话的 ID(用于日志记录)。
:param thread_id: 需要删除的对话线程 ID,如果为空,则直接返回 True。
:return: 删除成功返回 True,失败返回 False。
"""
# 如果 thread_id 为空,说明无需删除,直接返回 True
if not thread_id:
return True
try:
# 通过 OpenAI API 删除指定的对话线程,设置超时时间为 5 秒
await self.client.beta.threads.delete(thread_id=thread_id, timeout=5)
return True # 删除成功返回 True
except Exception as e:
# 记录删除对话线程的错误日志
self.logger.error(f"删除对话线程失败 {dialog_id}: {e}")
return False # 返回 False 表示删除失败
(5)下面代码的功能是处理各种类型的媒体文件(图片、音频、视频、文件),并将其转换为可发送给 OpenAI Assistant 的结构化消息。
- 图片处理:下载图片并尝试上传至 OpenAI,以便在对话中使用。如果上传失败,则使用直接的图片 URL。
- 音频处理:调用 speech_to_text 方法,将音频文件转换为文本,并作为消息发送。如果转录失败,返回错误信息。
- 视频处理:下载视频文件,并提取音频进行语音转录。从视频的不同时间点(0%、25%、50%、75%、100%)提取关键帧,并上传至 OpenAI,以便分析视频内容。处理完成后,删除临时文件。
- 文件处理(TODO 未来扩展):目前仅返回文件的 URL,后续可能添加对文件内容的读取和处理功能。
async def _media_to_struct_message(self, media: list[dict] | None):
if not media:
return []
messages = []
for item in media:
media_type = item.get('type')
url = item.get('url')
if not url:
continue
if media_type == 'img':
try:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
if response.status != 200:
raise Exception(f"Failed to download image from {url}")
with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as temp_file:
temp_img_filename = temp_file.name
temp_file.write(await response.read())
with open(temp_img_filename, "rb") as img_file:
file_id = await self.llm.save_file(img_file, purpose="vision")
if file_id:
messages.append({
"role": "user",
"content": [
{
"type": "image_file",
"image_file": {
"file_id": file_id
}
}
]
})
else:
messages.append({
"role": "user",
"content": [
{
"type": "image_url",
"image_url": {
"url": url
}
}
]
})
if os.path.exists(temp_img_filename):
os.remove(temp_img_filename)
except Exception as e:
messages.append({
"role": "user",
"content": [
{
"type": "image_url",
"image_url": {
"url": url
}
}
]
})
elif media_type == 'audio':
try:
transcribed_text = await self.llm.speech_to_text(url)
messages.append({
"role": "user",
"content": f"Audio transcription: {transcribed_text}"
})
except Exception as e:
messages.append({
"role": "user",
"content": f"Failed to transcribe audio from {url}. Error: {str(e)}"
})
elif media_type == 'video':
try:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
if response.status != 200:
raise Exception(f"Failed to download video file from {url}")
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as temp_file:
temp_video_filename = temp_file.name
temp_file.write(await response.read())
video_clip = await asyncio.to_thread(VideoFileClip, temp_video_filename)
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as temp_audio_file:
temp_audio_filename = temp_audio_file.name
await asyncio.to_thread(video_clip.audio.write_audiofile, temp_audio_filename)
transcribed_text = await self.llm.speech_to_text(temp_audio_filename, is_local_file=True)
messages.append({
"role": "user",
"content": f"Transcription of audio from video: {transcribed_text}"
})
cap = await asyncio.to_thread(cv2.VideoCapture, temp_video_filename)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
frame_positions = [0, 0.25, 0.5, 0.75, 1.0]
frame_descriptions = ["start", "quarter", "middle", "three-quarters", "end"]
for i, pos in enumerate(frame_positions):
frame_idx = max(0, min(int(total_frames * pos), total_frames - 1))
await asyncio.to_thread(cap.set, cv2.CAP_PROP_POS_FRAMES, frame_idx)
ret, frame = await asyncio.to_thread(cap.read)
if ret:
with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as frame_file:
frame_filename = frame_file.name
await asyncio.to_thread(cv2.imwrite, frame_filename, frame)
with open(frame_filename, "rb") as img_file:
file_id = await self.llm.save_file(img_file, purpose="vision")
if file_id:
position_name = frame_descriptions[i]
messages.append({
"role": "user",
"content": [
{
"type": "text",
"text": f"Frame from {position_name} of the video:"
},
{
"type": "image_file",
"image_file": {
"file_id": file_id
}
}
]
})
os.remove(frame_filename)
await asyncio.to_thread(cap.release)
if os.path.exists(temp_audio_filename):
os.remove(temp_audio_filename)
if os.path.exists(temp_video_filename):
os.remove(temp_video_filename)
except Exception as e:
messages.append({
"role": "user",
"content": f"Failed to process video from {url}. Error: {str(e)}"
})
elif media_type == 'file':
messages.append({
"role": "user",
"content": f"File link: {url}"
})
return messages
上述代码确保了各种媒体类型能够被正确转换,并适用于 OpenAI Assistant 交互,同时提供错误处理和日志记录功能,以提高系统的健壮性。