“请求-响应”过程高阶技巧
在过去的 FastAPI系列03:FastAPI路由(router)、 FastAPI系列05:FastAPI请求(Request)、 FastAPI系列06:FastAPI响应(Response)三节中,我们深入讲解了FastAPI在处理HTTP“请求-响应”过程中各种技术,这些技术基本足以应对平时的需要。然而FastAPI还提供了更多自定义处理HTTP“请求-响应”的功能,本节将继续讨论这些功能,以备不时之需。
1、自定义 Request
自定义 Request的用途
默认 Request 是 FastAPI / Starlette 提供的,很强大,但在某些复杂业务场景下,我们想要:
- 给 Request 增加额外属性
比如:在中间件里解析用户Token,直接挂在 request.user - 自定义解析逻辑
比如:自动解密请求体,加密传输的API - 对接特殊协议
比如:WebSocket-like协议,但想走Request风格 - 简化业务代码
比如:每个接口都要取某些公共参数,直接集成进Request - 统一日志/链路追踪
给Request增加 trace_id、request_id 等,用于链路追踪系统(比如 Jaeger, Zipkin)
这时我们就需要为FastAPI自定义Request。
如何自定义 Request
自定义Request时一般有三个步骤:
1、自定义一个类,继承 starlette.requests.Request
2、为这个类添加属性、重写方法,比如重写 .json()、.body() 方法
3、然后通过中间件或者依赖注入把你的 Request 替换掉
比如,在自定义的Request 里自动解析 token,挂上用户信息:
from fastapi import FastAPI, Request
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request as StarletteRequest
# 自定义Request类
class CustomRequest(StarletteRequest):
@property
def user(self):
token = self.headers.get("Authorization")
if token == "Bearer secrettoken":
return {"user_id": 1, "username": "admin"}
return None
# 中间件,把请求对象替换成我们的CustomRequest
class CustomRequestMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request, call_next):
custom_request = CustomRequest(request.scope, request.receive)
response = await call_next(custom_request)
return response
app = FastAPI()
app.add_middleware(CustomRequestMiddleware)
@app.get("/me")
async def me(request: CustomRequest): # 这里直接拿到CustomRequest
if request.user:
return {"user": request.user}
return {"error": "Not authenticated"}
2、自定义APIRoute
APIRoute的用途
通过前面几节的内容,我们知道,在FastAPI中每一个接口(@app.get(“/xxx”))背后,本质上是一个 APIRoute 实例,- 它负责:
- 接收请求
- 调用你的 view function
- 做请求参数校验(Pydantic)
- 调用 response_model 转换
- 调用 response_class
- 把最终的 Response 返回给客户端
总结来说,APIRoute 是 每个接口的真正执行逻辑 的底层封装!
自定义 APIRoute的用途
因此,我们可以通过自定义 APIRoute来做到很多官方默认做不到的事情,包括:
- 给某些接口自动加上日志记录,不用手动在每个接口里写log
- 自动异常捕获并格式化输出,把所有异常自动变成统一格式,比如统一 {“code”: xxx, “msg”: “”}
- 接口耗时统计(性能监控),在每个接口执行前后打点,收集响应时间
- 动态修改请求/响应,比如:自动加密响应,或者解密请求
- 多租户支持(动态切库),每个请求根据 Header / Token 动态切换数据库连接
- 统一异常报警,某些接口异常可以自动报警(发钉钉/短信)
如何自定义 APIRoute
以下是一个自定义的APIRoute,它包含了:
- 自动捕获异常,统一格式返回
- 自动计算接口耗时,打日志
- 自动校验 JWT 登录(校验失败直接拦截)
from fastapi import FastAPI, Request, Response
from fastapi.responses import JSONResponse
from fastapi.routing import APIRoute
import time
import jwt
from typing import Callable
# ---------------------------
# 简单的JWT密钥
SECRET_KEY = "your_secret_key"
# ---------------------------
# 自定义未授权异常
class UnauthorizedException(Exception):
def __init__(self, message="Unauthorized"):
self.message = message
# ---------------------------
# 简单的JWT解码函数
def decode_jwt_token(token: str):
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
return payload
except jwt.ExpiredSignatureError:
raise UnauthorizedException("Token expired")
except jwt.InvalidTokenError:
raise UnauthorizedException("Invalid token")
# ---------------------------
# 自定义的 APIRoute
class SecureLoggingRoute(APIRoute):
def get_route_handler(self) -> Callable:
original_route_handler = super().get_route_handler()
async def custom_route_handler(request: Request) -> Response:
start_time = time.time()
try:
# ===== 校验JWT Token =====
auth_header = request.headers.get("Authorization")
if not auth_header or not auth_header.startswith("Bearer "):
raise UnauthorizedException("Missing Bearer Token")
token = auth_header.split(" ")[1]
user_payload = decode_jwt_token(token)
# 挂到 request.state 方便业务逻辑访问
request.state.user = user_payload
# ===== 正常执行视图逻辑 =====
response: Response = await original_route_handler(request)
duration = time.time() - start_time
print(f"[INFO] {request.method} {request.url.path} completed in {duration:.2f}s")
return response
except UnauthorizedException as e:
return JSONResponse(
status_code=401,
content={"code": 401, "message": e.message, "data": None}
)
except Exception as e:
# 捕获所有异常,统一返回
return JSONResponse(
status_code=500,
content={"code": 500, "message": str(e), "data": None}
)
return custom_route_handler
# ---------------------------
# 应用到FastAPI
app = FastAPI()
app.router.route_class = SecureLoggingRoute
@app.get("/protected")
async def protected_endpoint(request: Request):
# 直接拿 user 信息
return {"code": 0, "message": "ok", "data": {"user": request.state.user}}
3、使用BackgroundTasks(后台任务)
BackgroundTasks的用途
在FastAPI开发中,有时会遇到这样的需求:当服务端接收到前端的请求之后,需要处理比较耗时的任务,为了更快地使前端响应数据,不需要等待该任务处理完成再进行响应,如请求第三方接口发送短信、发送邮件、异步日志处理、大数据量汇总统计等操作。FastAPI提供了一种称为BackgroundTasks 的后台任务机制来解决此类需求。简单地说就是 “接口处理完后,后台悄悄继续干点活”。
如何使用BackgroundTasks
FastAPI 提供了 BackgroundTasks 对象,你只要往里面 .add_task(func, *args)
,它就会帮你在响应之后自动执行!
from fastapi import FastAPI, BackgroundTasks
app = FastAPI()
def write_log(message: str):
with open("log.txt", "a") as f:
f.write(message + "\n")
@app.post("/send")
async def send_notification(background_tasks: BackgroundTasks):
background_tasks.add_task(write_log, "User sent a notification.")
return {"message": "Notification will be processed"}
或者也可以在 Depends 中注册后台任务:
from fastapi import Depends, BackgroundTasks
def notify_admin(background_tasks: BackgroundTasks):
background_tasks.add_task(write_log, "Admin notified.")
@app.post("/register")
async def register_user(background_tasks: BackgroundTasks = Depends(notify_admin)):
return {"message": "User registered"}
BackgroundTasks的局限性
BackgroundTasks 与 Celery 那种真正分布式任务队列相比,它只在当前进程生效,如果服务器挂了,任务可能丢失。因此,它适合单纯的快速小任务,比如10ms-1s以内的小型后台操作,对于需要复杂重试/排队/监控的,用 Celery/RQ。
常见应用场景举例
场景 | 说明 |
---|---|
发邮件/短信通知 | 用户注册后,后台发送欢迎邮件 |
写日志到数据库/文件 | 不影响接口响应 |
图片/视频处理 | 上传成功后异步压缩/转码 |
第三方 API 通知 | 下单后异步通知支付平台 |
数据备份 | 用户操作后,异步备份一份到冷存储 |
执行定时操作 | 简单场景可以通过延迟启动BackgroundTasks模拟定时 |