一.通过日志页面查看所有会话记录
点击"日志与标注",在日志页面,日志记录了应用的运行情况,包括用户的输入和 AI 的回复。如下所示:
1.chat-conversations 接口
实际调用接口,如下所示:
http://localhost:5001/console/api/apps/81dfcbc5-20be-4b8b-b5e9-c1289de39bdf/chat-conversations
2.接口源码位置
源码位置:dify\api\controllers\console\app\conversation.py
二.chat-conversations 接口源码分析
1.get(self, app_model)源码注释
该接口实现用于从数据库中查询 Conversation
表的数据。它支持多种过滤条件,如关键词搜索、时间范围、标注状态、消息数量、排序等。查询结果经过分页处理,并根据请求参数返回符合条件的对话记录。
def get(self, app_model):
# 检查当前用户是否为editor,若不是则抛出 Forbidden 异常
if not current_user.is_editor:
raise Forbidden()
# 创建请求参数解析器
parser = reqparse.RequestParser()
# 添加请求参数(包括类型、位置等)
parser.add_argument("keyword", type=str, location="args") # 关键词参数,用于搜索
parser.add_argument("start", type=DatetimeString("%Y-%m-%d %H:%M"), location="args") # 起始时间
parser.add_argument("end", type=DatetimeString("%Y-%m-%d %H:%M"), location="args") # 结束时间
parser.add_argument(
"annotation_status",
type=str,
choices=["annotated", "not_annotated", "all"],
default="all",
location="args"
) # 标注状态参数,默认为all
parser.add_argument("message_count_gte", type=int_range(1, 99999), required=False, location="args") # 消息数量大于等于此值
parser.add_argument("page", type=int_range(1, 99999), required=False, default=1, location="args") # 页码,默认为1
parser.add_argument("limit", type=int_range(1, 100), required=False, default=20, location="args") # 每页条数,默认为20
parser.add_argument(
"sort_by",
type=str,
choices=["created_at", "-created_at", "updated_at", "-updated_at"],
required=False,
default="-updated_at",
location="args",
) # 排序方式,默认按更新日期降序
# 解析请求参数
args = parser.parse_args()
# 创建一个子查询,连接 Conversation 表和 EndUser 表,获取 Conversation 的 id 和 EndUser 的 session_id
subquery = (
db.session.query(
Conversation.id.label("conversation_id"), EndUser.session_id.label("from_end_user_session_id")
)
.outerjoin(EndUser, Conversation.from_end_user_id == EndUser.id) # 外连接 EndUser 表
.subquery() # 将查询结果作为子查询
)
# 创建查询对象,查询 Conversation 表,条件是 app_id 匹配
query = db.select(Conversation).where(Conversation.app_id == app_model.id)
# 如果有关键词,进行关键词搜索
if args["keyword"]:
keyword_filter = "%{}%".format(args["keyword"]) # 使用 LIKE 进行模糊搜索
query = (
query.join(
Message,
Message.conversation_id == Conversation.id, # 连接 Message 表
)
.join(subquery, subquery.c.conversation_id == Conversation.id) # 连接子查询
.filter(
or_(
Message.query.ilike(keyword_filter),
Message.answer.ilike(keyword_filter),
Conversation.name.ilike(keyword_filter),
Conversation.introduction.ilike(keyword_filter),
subquery.c.from_end_user_session_id.ilike(keyword_filter),
),
)
.group_by(Conversation.id) # 按 Conversation.id 分组
)
# 获取当前用户信息
account = current_user
timezone = pytz.timezone(account.timezone) # 获取当前用户的时区
utc_timezone = pytz.utc # UTC 时区
# 如果有开始时间参数,转换并过滤相应的日期范围
if args["start"]:
start_datetime = datetime.strptime(args["start"], "%Y-%m-%d %H:%M") # 将字符串转换为日期时间对象
start_datetime = start_datetime.replace(second=0) # 去除秒数部分
start_datetime_timezone = timezone.localize(start_datetime) # 转换为用户本地时区时间
start_datetime_utc = start_datetime_timezone.astimezone(utc_timezone) # 转换为 UTC 时区
match args["sort_by"]:
case "updated_at" | "-updated_at":
query = query.where(Conversation.updated_at >= start_datetime_utc) # 过滤更新时间大于等于开始时间的记录
case "created_at" | "-created_at" | _:
query = query.where(Conversation.created_at >= start_datetime_utc) # 过滤创建时间大于等于开始时间的记录
# 如果有结束时间参数,转换并过滤相应的日期范围
if args["end"]:
end_datetime = datetime.strptime(args["end"], "%Y-%m-%d %H:%M") # 将字符串转换为日期时间对象
end_datetime = end_datetime.replace(second=59) # 设置为最后一秒
end_datetime_timezone = timezone.localize(end_datetime) # 转换为用户本地时区时间
end_datetime_utc = end_datetime_timezone.astimezone(utc_timezone) # 转换为 UTC 时区
match args["sort_by"]:
case "updated_at" | "-updated_at":
query = query.where(Conversation.updated_at <= end_datetime_utc) # 过滤更新时间小于等于结束时间的记录
case "created_at" | "-created_at" | _:
query = query.where(Conversation.created_at <= end_datetime_utc) # 过滤创建时间小于等于结束时间的记录
# 根据标注状态进行筛选
if args["annotation_status"] == "annotated":
query = query.options(joinedload(Conversation.message_annotations)).join(
MessageAnnotation, MessageAnnotation.conversation_id == Conversation.id # 加载 MessageAnnotation 表
)
elif args["annotation_status"] == "not_annotated":
query = (
query.outerjoin(MessageAnnotation, MessageAnnotation.conversation_id == Conversation.id) # 外连接 MessageAnnotation 表
.group_by(Conversation.id)
.having(func.count(MessageAnnotation.id) == 0) # 仅包含没有标注的对话
)
# 根据消息数量筛选
if args["message_count_gte"] and args["message_count_gte"] >= 1:
query = (
query.options(joinedload(Conversation.messages)) # 加载 Conversation 中的消息
.join(Message, Message.conversation_id == Conversation.id) # 连接 Message 表
.group_by(Conversation.id)
.having(func.count(Message.id) >= args["message_count_gte"]) # 仅包含消息数量大于等于 message_count_gte 的对话
)
# 如果应用模式是高级聊天,则过滤掉调试模式的对话
if app_model.mode == AppMode.ADVANCED_CHAT.value:
query = query.where(Conversation.invoke_from != InvokeFrom.DEBUGGER.value)
# 根据排序字段对查询结果进行排序
match args["sort_by"]:
case "created_at":
query = query.order_by(Conversation.created_at.asc()) # 按创建时间升序排序
case "-created_at":
query = query.order_by(Conversation.created_at.desc()) # 按创建时间降序排序
case "updated_at":
query = query.order_by(Conversation.updated_at.asc()) # 按更新时间升序排序
case "-updated_at":
query = query.order_by(Conversation.updated_at.desc()) # 按更新时间降序排序
case _:
query = query.order_by(Conversation.created_at.desc()) # 默认按创建时间降序排序
# 使用分页查询返回结果
conversations = db.paginate(query, page=args["page"], per_page=args["limit"], error_out=False)
# 返回分页查询的结果
return conversations
2.子查询
这个查询从 Conversation
和 EndUser
表中选取相关数据,并通过左外连接来确保即使某些 Conversation
记录没有对应的 EndUser
记录,仍然能返回数据。最终,查询返回的字段包括 Conversation
的 id
(重命名为 conversation_id
)和 EndUser
的 session_id
(重命名为 from_end_user_session_id
)。
subquery = (
db.session.query(
Conversation.id.label("conversation_id"), EndUser.session_id.label("from_end_user_session_id")
)
.outerjoin(EndUser, Conversation.from_end_user_id == EndUser.id)
.subquery()
)
(1)创建子查询 (subquery
):首先定义了一个子查询。这个子查询从数据库中选择了 Conversation
表的 id
字段,并将其重命名为 conversation_id
,以及从 EndUser
表选择了 session_id
字段,并将其重命名为 from_end_user_session_id
。
(2)连接操作 (outerjoin
):查询通过 outerjoin
将 Conversation
表与 EndUser
表连接在一起,连接条件是 Conversation.from_end_user_id
等于 EndUser.id
。这意味着,如果 Conversation
中的 from_end_user_id
和 EndUser.id
匹配,则将这两表的数据组合起来。如果没有匹配的记录,EndUser
表中的字段会以 NULL
填充。
(3)生成子查询:通过 .subquery()
方法,将查询转换为一个子查询,这样查询结果不会直接返回,而是可以在其它查询中使用。
3.关键词筛选
这个查询通过 keyword
进行模糊搜索,查找与关键字匹配的消息、会话以及与会话相关的用户信息。它将 Message
表与 Conversation
表连接,并使用子查询中的 EndUser
会话 ID 字段进行联合查询。通过 or_()
函数实现多个字段的模糊匹配,返回所有符合条件的记录。最后,查询通过 group_by
确保按会话(Conversation.id
)分组结果。
if args["keyword"]:
keyword_filter = "%{}%".format(args["keyword"])
query = (
query.join(
Message,
Message.conversation_id == Conversation.id,
)
.join(subquery, subquery.c.conversation_id == Conversation.id)
.filter(
or_(
Message.query.ilike(keyword_filter),
Message.answer.ilike(keyword_filter),
Conversation.name.ilike(keyword_filter),
Conversation.introduction.ilike(keyword_filter),
subquery.c.from_end_user_session_id.ilike(keyword_filter),
),
)
.group_by(Conversation.id)
)
(1)检查 args["keyword"]
- 首先代码检查
args
字典中的keyword
是否存在。如果存在,则创建一个名为keyword_filter
的变量,它将keyword
进行格式化处理,形成一个带有通配符的字符串(例如%keyword%
)。这样可以在 SQL 查询中进行模糊匹配。
(2)查询构造 (query.join
)
然后查询通过 .join()
方法与 Message
表和之前定义的子查询 subquery
进行连接。
- 连接
Message
表:查询将Message
表和Conversation
表连接,连接条件是Message.conversation_id == Conversation.id
,也就是通过Conversation
表的id
与Message
表中的conversation_id
字段建立关系。 - 连接子查询 (
subquery
):查询还将子查询subquery
与Conversation
表连接,连接条件是subquery.c.conversation_id == Conversation.id
,也就是说通过Conversation.id
与子查询中的conversation_id
进行关联。
(3)过滤条件 (filter
)
-
连接操作完成后,查询的下一步是通过
.filter()
来添加过滤条件。这里使用了 SQLAlchemy 的or_()
函数来设置多个条件,只要其中一个条件满足,记录就会被选中。 -
具体的过滤条件规则
Message.query.ilike(keyword_filter)
:如果Message
表中的query
字段(假设是消息内容)与keyword_filter
匹配,则返回该记录。ilike
用于不区分大小写的模糊匹配。Message.answer.ilike(keyword_filter)
:如果Message
表中的answer
字段(假设是回答内容)与keyword_filter
匹配,则返回该记录。Conversation.name.ilike(keyword_filter)
:如果Conversation
表中的name
字段(假设是会话的名字)与keyword_filter
匹配,则返回该记录。Conversation.introduction.ilike(keyword_filter)
:如果Conversation
表中的introduction
字段(开场白)与keyword_filter
匹配,则返回该记录。subquery.c.from_end_user_session_id.ilike(keyword_filter)
:如果子查询中的from_end_user_session_id
字段(即EndUser.session_id
)与keyword_filter
匹配,则返回该记录。
这样,查询会返回那些
Message
或Conversation
表中的相关字段,或者子查询中与用户会话 ID 相关的字段与keyword
模糊匹配的记录。
(4)分组 (group_by
)
- 在所有的连接和过滤条件都应用后,查询通过
.group_by(Conversation.id)
进行分组,按Conversation.id
分组,以便对每个会话进行聚合操作。这样可以确保每个Conversation
只返回一次。
说明:因为 Conversation.id
在 conversations 数据表中作为主键,通常情况是唯一的,除非异常情况下会出现重复。
4.时间范围筛选
主要包括今天,过去 7 天,过去 4 周,过去 3 月,过去 12 月,本月至今,本季度至今,本年至今和所有时间这 9 种情况。如下所示:
if args["start"]:
start_datetime = datetime.strptime(args["start"], "%Y-%m-%d %H:%M")
start_datetime = start_datetime.replace(second=0)
start_datetime_timezone = timezone.localize(start_datetime)
start_datetime_utc = start_datetime_timezone.astimezone(utc_timezone)
match args["sort_by"]:
case "updated_at" | "-updated_at":
query = query.where(Conversation.updated_at >= start_datetime_utc)
case "created_at" | "-created_at" | _:
query = query.where(Conversation.created_at >= start_datetime_utc)
if args["end"]:
end_datetime = datetime.strptime(args["end"], "%Y-%m-%d %H:%M")
end_datetime = end_datetime.replace(second=59)
end_datetime_timezone = timezone.localize(end_datetime)
end_datetime_utc = end_datetime_timezone.astimezone(utc_timezone)
match args["sort_by"]:
case "updated_at" | "-updated_at":
query = query.where(Conversation.updated_at <= end_datetime_utc)
case "created_at" | "-created_at" | _:
query = query.where(Conversation.created_at <= end_datetime_utc)
5.标注状态筛选
主要包括全部,已标注改进和未标注这 3 种情况。如下所示:
if args["annotation_status"] == "annotated":
query = query.options(joinedload(Conversation.message_annotations)).join(
MessageAnnotation, MessageAnnotation.conversation_id == Conversation.id
)
elif args["annotation_status"] == "not_annotated":
query = (
query.outerjoin(MessageAnnotation, MessageAnnotation.conversation_id == Conversation.id)
.group_by(Conversation.id)
.having(func.count(MessageAnnotation.id) == 0)
)
如果 args["annotation_status"] == "annotated"
,查询将执行以下操作:
(1)加载关联数据 (joinedload
)
使用 query.options(joinedload(Conversation.message_annotations))
来指定 SQLAlchemy 在查询 Conversation
表时,自动加载与 Conversation
表相关联的 message_annotations
表的数据。joinedload
是一种优化方法,它会在执行查询时一次性加载所有相关的数据,避免后续的额外查询(即避免 N+1 查询问题)。
(2)连接 (join
)MessageAnnotation
表
使用 .join(MessageAnnotation, MessageAnnotation.conversation_id == Conversation.id)
连接 MessageAnnotation
表和 Conversation
表,连接条件是 MessageAnnotation.conversation_id == Conversation.id
。这样每个 Conversation
记录将与其对应的 MessageAnnotation
记录关联。
(3)这个部分的查询返回所有已标注 (annotated
) 的会话记录(即有对应 MessageAnnotation
记录的会话)。
说明:join
默认执行 内连接,它只返回两个表中都有匹配记录的行。
如果 args["annotation_status"] == "not_annotated"
,查询将执行以下操作:
(1)外连接 (outerjoin
) MessageAnnotation
表
- 使用
.outerjoin(MessageAnnotation, MessageAnnotation.conversation_id == Conversation.id)
来外连接MessageAnnotation
表与Conversation
表。外连接会返回所有Conversation
记录,即使没有与之关联的MessageAnnotation
记录。在没有标注的情况下,MessageAnnotation
中的字段会是NULL
。
(2)分组 (group_by
) Conversation.id
- 使用
.group_by(Conversation.id)
对查询结果按Conversation.id
进行分组。这样,每个Conversation
记录只会出现在查询结果中一次。
(3)条件过滤 (having
)
- 使用
.having(func.count(MessageAnnotation.id) == 0)
来过滤那些没有任何MessageAnnotation
记录的Conversation
。func.count(MessageAnnotation.id)
计算每个Conversation
与之相关联的MessageAnnotation
记录数。having
子句确保只有MessageAnnotation
记录数为零的Conversation
被返回,即那些没有标注的会话。
说明:outerjoin
在 SQLAlchemy 中执行的是 外连接,通常是 左外连接(LEFT OUTER JOIN)。
6.消息数量筛选
message_count_gte
参数用于过滤对话,确保返回的对话中消息的数量大于或等于指定的值。它通过在查询中添加一个条件来实现这一点,该条件检查每个对话中的消息数量是否满足要求。
if args["message_count_gte"] and args["message_count_gte"] >= 1:
query = (
query.options(joinedload(Conversation.messages))
.join(Message, Message.conversation_id == Conversation.id)
.group_by(Conversation.id)
.having(func.count(Message.id) >= args["message_count_gte"])
)
7.排序字段筛选
match args["sort_by"]:
case "created_at":
query = query.order_by(Conversation.created_at.asc())
case "-created_at":
query = query.order_by(Conversation.created_at.desc())
case "updated_at":
query = query.order_by(Conversation.updated_at.asc())
case "-updated_at":
query = query.order_by(Conversation.updated_at.desc())
case _:
query = query.order_by(Conversation.created_at.desc())
8.设置分页查询
db.paginate(query, page=args["page"], per_page=args["limit"], error_out=False)
是 SQLAlchemy 中用于进行 分页查询 的一个方法。该方法会对数据库查询结果进行分页,返回的是当前页的数据,而不会返回所有数据。
conversations = db.paginate(query, page=args["page"], per_page=args["limit"], error_out=False)
(1)query
query
变量是一个已经构建好的 SQLAlchemy 查询对象,包含了所有的查询条件、连接、过滤器等。它代表了最终要执行的 SQL 查询。
(2)page=args["page"]
args["page"]
指定了当前请求的页码(page number),即用户希望返回哪一页的数据。在分页查询中,页码是从1
开始的。- 如果
args["page"]
为1
,表示返回第一页的数据;如果为2
,则表示返回第二页的数据,依此类推。
(3)per_page=args["limit"]
args["limit"]
指定了每页返回的记录数,即每页显示多少条数据。例如,如果args["limit"]
为10
,每页将显示 10 条记录。- 通过设置
per_page
,可以控制每页返回的最大记录数,这对于控制分页查询的结果集大小非常重要。
(4)error_out=False
error_out=False
是一个可选参数,用于控制当请求的page
超过总页数时的行为。如果error_out=True
(默认值),当请求的页码超出了总页数时,SQLAlchemy 会抛出一个404
错误。- 如果设置为
False
,当请求的页码超出范围时,查询不会抛出错误,而是返回空的结果集。这对于处理分页请求超出范围的情况非常有用,通常会返回一个空的页面而不是抛出错误。
(5)分页过程
- 当调用
db.paginate()
方法时,SQLAlchemy 会执行以下几个步骤:- 计算分页的偏移量(offset):偏移量是从结果集中跳过多少条记录。例如,假设每页 10 条记录,
page=3
,那么偏移量为(3-1) * 10 = 20
。即跳过前 20 条记录,获取从第 21 条到第 30 条的数据。 - 限制查询的记录数(limit):查询结果将只返回当前页的数据。通过
per_page
参数限制返回的最大记录数。例如,如果per_page=10
,那么查询只会返回最多 10 条记录。 - 执行查询:基于计算出的偏移量和限制记录数,SQLAlchemy 会生成相应的 SQL 查询并执行。这通常会转换为类似的 SQL 语句
SELECT * FROM conversation LIMIT 10 OFFSET 20;
这条查询将返回Conversation
表中的第 21 到第 30 条记录。 - 返回分页结果:
db.paginate()
会返回一个分页对象,这个对象包含了当前页的结果集、总记录数、总页数、当前页码等信息。
- 计算分页的偏移量(offset):偏移量是从结果集中跳过多少条记录。例如,假设每页 10 条记录,
9.SQL 查询实现(有待验证)
将上述 Python 代码逻辑转换为 SQL 查询语句,可依据代码中的查询条件、连接、过滤、排序以及分页逻辑来进行转化。如下所示:
SELECT * FROM conversation
JOIN message ON message.conversation_id = conversation.id
JOIN (SELECT conversation_id, from_end_user_session_id FROM end_user) AS subquery
ON subquery.conversation_id = conversation.id
LEFT JOIN message_annotation ON message_annotation.conversation_id = conversation.id
WHERE conversation.app_id = <app_model.id>
AND (message.query LIKE '%<keyword>%' OR
message.answer LIKE '%<keyword>%' OR
conversation.name LIKE '%<keyword>%' OR
conversation.introduction LIKE '%<keyword>%' OR
subquery.from_end_user_session_id LIKE '%<keyword>%')
AND conversation.updated_at >= '<start_datetime_utc>'
AND conversation.created_at <= '<end_datetime_utc>'
AND conversation.invoke_from != <InvokeFrom.DEBUGGER.value>
GROUP BY conversation.id
HAVING COUNT(message_annotation.id) = 0
AND COUNT(message.id) >= <message_count_gte>
ORDER BY conversation.created_at DESC
LIMIT 20 OFFSET 0;
参考文献
[1] SQLAlchemy 官方网站:https://www.sqlalchemy.org/
[2] SQLAlchemy GitHub:https://github.com/sqlalchemy/sqlalchemy
[3] Flask-SQLAlchemy 官方网站:https://flask-sqlalchemy.readthedocs.io/en/stable/
[4] Flask-SQLAlchemy GitHub:https://github.com/pallets-eco/flask-sqlalchemy