FastAPI系列07:“请求-响应”过程高阶技巧


在过去的 FastAPI系列03:FastAPI路由(router)FastAPI系列05:FastAPI请求(Request)FastAPI系列06:FastAPI响应(Response)三节中,我们深入讲解了FastAPI在处理HTTP“请求-响应”过程中各种技术,这些技术基本足以应对平时的需要。然而FastAPI还提供了更多自定义处理HTTP“请求-响应”的功能,本节将继续讨论这些功能,以备不时之需。

1、自定义 Request

自定义 Request的用途

默认 Request 是 FastAPI / Starlette 提供的,很强大,但在某些复杂业务场景下,我们想要:

  • 给 Request 增加额外属性
    比如:在中间件里解析用户Token,直接挂在 request.user
  • 自定义解析逻辑
    比如:自动解密请求体,加密传输的API
  • 对接特殊协议
    比如:WebSocket-like协议,但想走Request风格
  • 简化业务代码
    比如:每个接口都要取某些公共参数,直接集成进Request
  • 统一日志/链路追踪
    给Request增加 trace_id、request_id 等,用于链路追踪系统(比如 Jaeger, Zipkin)

这时我们就需要为FastAPI自定义Request。

如何自定义 Request

自定义Request时一般有三个步骤:
1、自定义一个类,继承 starlette.requests.Request
2、为这个类添加属性重写方法,比如重写 .json()、.body() 方法
3、然后通过中间件或者依赖注入把你的 Request 替换掉
比如,在自定义的Request 里自动解析 token,挂上用户信息:

from fastapi import FastAPI, Request
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request as StarletteRequest

# 自定义Request类
class CustomRequest(StarletteRequest):
    @property
    def user(self):
        token = self.headers.get("Authorization")
        if token == "Bearer secrettoken":
            return {"user_id": 1, "username": "admin"}
        return None

# 中间件,把请求对象替换成我们的CustomRequest
class CustomRequestMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request, call_next):
        custom_request = CustomRequest(request.scope, request.receive)
        response = await call_next(custom_request)
        return response

app = FastAPI()
app.add_middleware(CustomRequestMiddleware)

@app.get("/me")
async def me(request: CustomRequest):  # 这里直接拿到CustomRequest
    if request.user:
        return {"user": request.user}
    return {"error": "Not authenticated"}

2、自定义APIRoute

APIRoute的用途

通过前面几节的内容,我们知道,在FastAPI中每一个接口(@app.get(“/xxx”))背后,本质上是一个 APIRoute 实例,- 它负责:

  • 接收请求
  • 调用你的 view function
  • 做请求参数校验(Pydantic)
  • 调用 response_model 转换
  • 调用 response_class
  • 把最终的 Response 返回给客户端

总结来说,APIRoute 是 每个接口的真正执行逻辑 的底层封装!

自定义 APIRoute的用途

因此,我们可以通过自定义 APIRoute来做到很多官方默认做不到的事情,包括:

  • 给某些接口自动加上日志记录,不用手动在每个接口里写log
  • 自动异常捕获并格式化输出,把所有异常自动变成统一格式,比如统一 {“code”: xxx, “msg”: “”}
  • 接口耗时统计(性能监控),在每个接口执行前后打点,收集响应时间
  • 动态修改请求/响应,比如:自动加密响应,或者解密请求
  • 多租户支持(动态切库),每个请求根据 Header / Token 动态切换数据库连接
  • 统一异常报警,某些接口异常可以自动报警(发钉钉/短信)

如何自定义 APIRoute

以下是一个自定义的APIRoute,它包含了:

  • 自动捕获异常,统一格式返回
  • 自动计算接口耗时,打日志
  • 自动校验 JWT 登录(校验失败直接拦截)
from fastapi import FastAPI, Request, Response
from fastapi.responses import JSONResponse
from fastapi.routing import APIRoute
import time
import jwt
from typing import Callable

# ---------------------------
# 简单的JWT密钥
SECRET_KEY = "your_secret_key"

# ---------------------------
# 自定义未授权异常
class UnauthorizedException(Exception):
    def __init__(self, message="Unauthorized"):
        self.message = message

# ---------------------------
# 简单的JWT解码函数
def decode_jwt_token(token: str):
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
        return payload
    except jwt.ExpiredSignatureError:
        raise UnauthorizedException("Token expired")
    except jwt.InvalidTokenError:
        raise UnauthorizedException("Invalid token")

# ---------------------------
# 自定义的 APIRoute
class SecureLoggingRoute(APIRoute):
    def get_route_handler(self) -> Callable:
        original_route_handler = super().get_route_handler()

        async def custom_route_handler(request: Request) -> Response:
            start_time = time.time()
            try:
                # ===== 校验JWT Token =====
                auth_header = request.headers.get("Authorization")
                if not auth_header or not auth_header.startswith("Bearer "):
                    raise UnauthorizedException("Missing Bearer Token")
                
                token = auth_header.split(" ")[1]
                user_payload = decode_jwt_token(token)
                
                # 挂到 request.state 方便业务逻辑访问
                request.state.user = user_payload

                # ===== 正常执行视图逻辑 =====
                response: Response = await original_route_handler(request)

                duration = time.time() - start_time
                print(f"[INFO] {request.method} {request.url.path} completed in {duration:.2f}s")

                return response

            except UnauthorizedException as e:
                return JSONResponse(
                    status_code=401,
                    content={"code": 401, "message": e.message, "data": None}
                )
            except Exception as e:
                # 捕获所有异常,统一返回
                return JSONResponse(
                    status_code=500,
                    content={"code": 500, "message": str(e), "data": None}
                )

        return custom_route_handler

# ---------------------------
# 应用到FastAPI
app = FastAPI()
app.router.route_class = SecureLoggingRoute

@app.get("/protected")
async def protected_endpoint(request: Request):
    # 直接拿 user 信息
    return {"code": 0, "message": "ok", "data": {"user": request.state.user}}
    

3、使用BackgroundTasks(后台任务)

BackgroundTasks的用途

在FastAPI开发中,有时会遇到这样的需求:当服务端接收到前端的请求之后,需要处理比较耗时的任务,为了更快地使前端响应数据,不需要等待该任务处理完成再进行响应,如请求第三方接口发送短信、发送邮件、异步日志处理、大数据量汇总统计等操作。FastAPI提供了一种称为BackgroundTasks 的后台任务机制来解决此类需求。简单地说就是 “接口处理完后,后台悄悄继续干点活”。

如何使用BackgroundTasks

FastAPI 提供了 BackgroundTasks 对象,你只要往里面 .add_task(func, *args),它就会帮你在响应之后自动执行

from fastapi import FastAPI, BackgroundTasks

app = FastAPI()

def write_log(message: str):
    with open("log.txt", "a") as f:
        f.write(message + "\n")

@app.post("/send")
async def send_notification(background_tasks: BackgroundTasks):
    background_tasks.add_task(write_log, "User sent a notification.")
    return {"message": "Notification will be processed"}

或者也可以在 Depends 中注册后台任务:

from fastapi import Depends, BackgroundTasks

def notify_admin(background_tasks: BackgroundTasks):
    background_tasks.add_task(write_log, "Admin notified.")

@app.post("/register")
async def register_user(background_tasks: BackgroundTasks = Depends(notify_admin)):
    return {"message": "User registered"}

BackgroundTasks的局限性

BackgroundTasks 与 Celery 那种真正分布式任务队列相比,它只在当前进程生效,如果服务器挂了,任务可能丢失。因此,它适合单纯的快速小任务,比如10ms-1s以内的小型后台操作,对于需要复杂重试/排队/监控的,用 Celery/RQ。
常见应用场景举例

场景说明
发邮件/短信通知用户注册后,后台发送欢迎邮件
写日志到数据库/文件不影响接口响应
图片/视频处理上传成功后异步压缩/转码
第三方 API 通知下单后异步通知支付平台
数据备份用户操作后,异步备份一份到冷存储
执行定时操作简单场景可以通过延迟启动BackgroundTasks模拟定时
评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值