第12讲:FastAPI 与异步 Web 框架¶
上一讲:Prompt 工程与 Profile 系统 下一讲:应用入口与环境前置校验
本讲定位¶
P0 主链路(第 3-11 讲)你已经学完了 LangChain 和完整的 RAG 管线。接下来两讲(本讲 + 第 13 讲)进入 Web 服务基础设施——理解整个项目运行在什么"骨架"上。 本讲先讲 FastAPI 框架本身(async/await、路由、WebSocket),第 13 讲再讲基于 FastAPI 的应用入口和启动校验。学完这两讲,你对第 9-10 讲(QAService、Pipeline)中用到的
async def、yield、WebSocket 事件推送将有透彻理解。
本讲目标¶
- 理解同步 vs 异步编程在 Web 服务中的区别
- 掌握 FastAPI 的核心概念:路由、中间件、依赖注入
- 理解 WebSocket 协议及其在流式输出中的应用
- 读懂本项目的 API 层代码
本讲地图¶
本图对应本讲功能闭环,展示从输入到本讲交付物的主干路径。节点与主项目代码文件和函数保持一致,后续章节消费的能力只作为交付边界出现。
图 1:第 12 讲功能闭环地图¶
flowchart TD
C12_APP["应用入口<br/>create_app()"]
C12_ROUTER["API 路由<br/>router"]
C12_SCHEMA["请求模型<br/>RetrievalDebugRequest"]
C12_CONTEXT["请求解析<br/>QueryServiceContext.from_ws_payload() / from_debug_request()"]
C12_COLLECT["WebSocket 流式问答<br/>websocket_endpoint() / _send_stream_events()"]
C12_DEBUG["诊断接口<br/>debug_retrieval()"]
C12_FEEDBACK["用户反馈<br/>FeedbackStore.add_feedback()"]
C12_WS["WebSocket 循环<br/>websocket_endpoint() / _send_stream_events()"]
C12_ERROR{{"错误处理<br/>register_api_exception_handlers()"}}
C12_APP --> C12_ROUTER
C12_ROUTER --> C12_CONTEXT
C12_SCHEMA --> C12_CONTEXT
C12_CONTEXT --> C12_COLLECT
C12_COLLECT --> C12_WS
C12_CONTEXT --> C12_DEBUG
C12_ROUTER --> C12_FEEDBACK
C12_ROUTER --> C12_ERROR
style C12_APP fill:#F8FAFC,stroke:#64748B,stroke-width:2px
style C12_ROUTER fill:#DBEAFE,stroke:#2563EB,stroke-width:2px
style C12_SCHEMA fill:#F5F3FF,stroke:#7C3AED,stroke-width:2px
style C12_CONTEXT fill:#FEF3C7,stroke:#D97706,stroke-width:2px
style C12_COLLECT fill:#DBEAFE,stroke:#2563EB,stroke-width:2px
style C12_DEBUG fill:#F5F3FF,stroke:#7C3AED,stroke-width:2px
style C12_FEEDBACK fill:#FEF3C7,stroke:#D97706,stroke-width:2px
style C12_WS fill:#DBEAFE,stroke:#2563EB,stroke-width:2px
style C12_ERROR fill:#DCFCE7,stroke:#16A34A,stroke-width:2px
节点与代码对齐¶
| 节点 | 对齐文件 | 函数/对象 | 本章职责 |
|---|---|---|---|
| 应用入口 | app.py |
create_app() |
创建 FastAPI 应用并注册路由。 |
| API 路由 | qa_core/api/chat.py |
router |
定义 /api/retrieval/debug 和 /api/stream。 |
| 请求模型 | qa_core/schemas.py |
RetrievalDebugRequest |
用 Pydantic 校验检索诊断请求。 |
| 请求解析 | qa_core/api/service_context.py |
QueryServiceContext.from_ws_payload() / from_debug_request() |
把 WebSocket JSON 或 HTTP debug request 转成服务调用参数。 |
| WebSocket 流式问答 | qa_core/api/chat.py |
websocket_endpoint() / _send_stream_events() |
调用 QAService.stream_query() 并逐条发送事件。 |
| 诊断接口 | qa_core/api/chat.py |
debug_retrieval() |
调用 QAService.debug_retrieval()。 |
| 用户反馈 | qa_core/memory/feedback.py |
FeedbackStore.add_feedback() |
记录 useful/not_useful 和来源快照。 |
| WebSocket 循环 | qa_core/api/chat.py |
websocket_endpoint() / _send_stream_events() |
接收 JSON、发送 start/status/token/end/error。 |
| 错误处理 | qa_core/api/error_handlers.py |
register_api_exception_handlers() |
统一 Bad Request 和 API 异常响应。 |
第一部分:前置知识 — 同步 vs 异步¶
1.1 传统同步 Web 服务的瓶颈¶
假设一个 Web 服务收到一个请求,需要做三件事:
在传统的同步模型(如 Flask 默认模式)中:
每个请求必须等前一个请求完全处理完才开始。当有 100 个并发请求时,第 100 个用户要等 45 秒。
这是因为同步模型用线程处理并发,每个请求占用一个线程。当线程在等数据库返回时,它什么也不做,只是"阻塞"在那里。
1.2 异步(Async)模型¶
异步模型的核心思想:当一个操作在等待时(如等待数据库返回),切换到处理另一个请求。
gantt
title 同步 vs 异步请求处理对比
dateFormat X
axisFormat %s
section 同步模式 3线程
Req1-等待DB :r1a, 0, 100
Req1-等待API:r1b, 100, 200
Req1-等待Milvus:r1c, 200, 300
Req2-等待DB :r2a, 300, 400
Req2-等待API:r2b, 400, 500
Req2-等待Milvus:r2c, 500, 600
Req3-等待DB :r3a, 600, 700
Req3-等待API:r3b, 700, 800
Req3-等待Milvus:r3c, 800, 900
section 异步模式 事件循环
Req1-DB :e1a, 0, 40
Req2-DB :e2a, 40, 80
Req3-DB :e3a, 80, 120
Req1-API:e1b, 120, 160
Req2-API:e2b, 160, 200
Req3-API:e3b, 200, 240
Req1-Milvus:e1c, 240, 280
Req2-Milvus:e2c, 280, 320
Req3-Milvus:e3c, 320, 360
sequenceDiagram
participant U1 as 用户1
participant U2 as 用户2
participant S as 异步服务器
U1->>S: 请求1 需要等DB
Note over S: await db.query 让出CPU
U2->>S: 请求2 需要等API
Note over S: await api.call 让出CPU
S-->>U1: 请求1 DB返回 继续处理
Note over S: CPU从未空闲
S-->>U2: 请求2 API返回 继续处理
S-->>U1: 请求1 完成
S-->>U2: 请求2 完成
await 关键字的意思是:"这个操作需要等待,我先让出 CPU 去处理其他请求,等结果回来了再继续执行"。
所有请求的总完成时间大幅缩短。因为服务器在等待 I/O 的时候不会闲着。
1.3 async/await 核心语法¶
关键区别:
| 同步 | 异步 |
|---|---|
def func() |
async def func() |
requests.get(url) |
await httpx.AsyncClient().get(url) |
time.sleep(1) |
await asyncio.sleep(1) |
| 多线程处理并发 | 单线程事件循环处理并发 |
1.4 为什么 RAG 系统需要异步¶
RAG 系统是典型的 I/O 密集型 应用:
- 查 Milvus(网络 I/O)
- 调 LLM API(网络 I/O)
- 读 MySQL 历史(网络 I/O)
- 读 Embedding 模型文件(磁盘 I/O)
- 计算 Embedding(CPU 密集型,用
asyncio.to_thread放到线程池)
这些操作中,大部分时间都在等待外部系统响应。异步模型可以让服务器在等待期间处理其他用户的请求。
第二部分:FastAPI 基础¶
2.1 FastAPI 是什么¶
FastAPI 是一个现代 Python Web 框架,专为构建 API 设计。它的核心特点:
- 原生异步支持:直接使用
async/await,不依赖第三方层 - 自动生成 OpenAPI 文档:访问
/docs即可看到 Swagger UI - 基于 Pydantic 的数据校验:请求和响应自动校验类型
- WebSocket 支持:内置双向通信协议
2.2 最小 FastAPI 应用¶
2.3 路由(Router)¶
当 API 变多时,把所有端点写在 app.py 会导致文件很长。FastAPI 提供了 APIRouter 来做模块化拆分:
2.4 Pydantic 数据校验¶
前置知识:如果你不熟悉 Pydantic,请先阅读 附录A:Pydantic 数据校验
FastAPI 使用 Pydantic 模型做请求/响应的自动校验。当前项目里,在线问答只走 WebSocket payload;HTTP 请求模型只保留给检索诊断接口:
设计口径:不要再为在线问答保留单独的 HTTP 请求模型。浏览器提问直接连 WebSocket /api/stream,检索诊断才使用 RetrievalDebugRequest。
2.5 CORS 中间件¶
CORS(跨域资源共享) 是浏览器的安全机制。默认情况下,http://localhost:3000 上的前端页面不能请求 http://localhost:8000 的 API。CORS 中间件告诉浏览器哪些来源被允许跨域访问。
2.6 启动事件与依赖注入¶
asyncio.to_thread 的作用:把 CPU 密集或阻塞操作放到线程池中执行,避免阻塞事件循环。BGE-M3 模型加载和 Milvus 连接预热都是阻塞操作,但它们只在启动时执行一次,所以放到线程池中是最合适的做法。
第三部分:WebSocket 协议¶
3.1 为什么需要 WebSocket¶
HTTP 协议是"请求-响应"模式:客户端发一个请求,服务端返回一个响应,通信就结束了。
RAG 系统的答案生成有个特点:LLM 是一个 token 一个 token 生成的。如果等完整答案生成完再返回,用户可能等 5-10 秒才能看到任何内容。
WebSocket 是全双工通信协议:建立连接后,服务端可以持续向客户端推送消息,不需要客户端反复请求。
3.2 HTTP vs WebSocket 对比¶
3.3 本项目中的 WebSocket 实现¶
3.4 流式事件协议¶
本项目定义了一套事件协议,主流程通过 Generator 产出不同事件:
sequenceDiagram
participant U as 浏览器
participant WS as WebSocket流
participant S as QAService
participant M as Milvus
participant L as LLM
U->>WS: 发送 query + session_id
WS->>S: stream_query
S-->>WS: type: start
WS-->>U: 创建答案区域
S-->>WS: type: status 识别意图
WS-->>U: 更新进度提示
S-->>WS: type: status 检索FAQ
S->>M: FAQ Hybrid Search
S-->>WS: type: status 生成回答
S->>L: stream prompt
loop LLM 逐个 token 生成
L-->>S: token chunk
S-->>WS: type: token 内容
WS-->>U: 追加字符
end
S-->>WS: type: end 含sources等
WS-->>U: 显示来源引用
{"type": "start", "session_id": "..."}
# ↓ 告知前端:请求已接收,准备展示答案区域
{"type": "status", "message": "正在识别问题意图..."} # ↓ 告知前端:当前进行到哪一步了
{"type": "token", "content": "入职"} {"type": "token", "content": "流程"} {"type": "token", "content": "包括"} # ↓ 逐字推送,前端实时渲染
{"type": "end", "sources": [...], "intent": {...}, "retrieval": {...}} # ↓ 告知前端:回答完毕,附带来源引用和诊断信息
原因:
1. LangChain 的 Milvus 检索和 ChatOpenAI 流式调用的底层是同步的
2. asyncio.to_thread 将整个同步流程放到线程池,不阻塞事件循环
3. 保持业务代码简洁,不需要在每一层都写 async/await
第四部分:本项目 API 层详解¶
4.1 路由拆分架构¶
4.2 在线问答唯一入口:WebSocket /api/stream¶
前端逻辑:
设计意图: - 在线问答只走一套连接、限流、事件协议和历史写入逻辑 - 问候、越界、转人工等直答在 Pipeline 意图识别阶段通过 WebSocket 事件返回 - FAQ 命中、文档检索、LLM 生成也沿用同一条事件链路,避免 HTTP 与 WebSocket 两套实现产生不一致
4.3 管理接口认证¶
前端状态页面提供令牌输入框,后端从 HTTP Header X-Admin-Token 中读取。命令行脚本默认从运行时配置读取令牌:本机调试来自 .env,Docker Compose 模式来自容器环境变量,避免把真实令牌写入终端历史。
4.4 限流保护¶
下面的滑动窗口示意图直观展示 check_rate_limit 的工作过程(limit=3,窗口=60 秒):
gantt
title 滑动窗口限流示例(limit=3,窗口=60秒)
dateFormat HH:mm:ss
axisFormat %M:%S
section 窗口位置
窗口① (t=0~60) :crit, 00:00:00, 00:01:00
窗口② (t=10~70) :crit, 00:00:10, 00:01:00
section 请求到达
A @5s 接受 :milestone, 00:00:05, 0
B @12s 接受 :milestone, 00:00:12, 0
C @35s 接受 :milestone, 00:00:35, 0
D @50s 拒绝 :milestone, 00:00:50, 0
E @70s 接受 :milestone, 00:01:10, 0
F @80s 接受 :milestone, 00:01:20, 0
横向为时间轴,窗口①和窗口②展示滑动前后的两个位置——窗口②比①向右滑动 10 秒。图中 A~F 依次到达,D 到达时 deque 中已有 3 个时间戳(已达上限),因此被拒绝;t=70 时旧请求 A 过期弹出,释放空间后 E 得以加入。
| 时间 | 请求 | 操作 | deque 状态(左→右) | 窗口计数 | 结果 |
|---|---|---|---|---|---|
| t=5 | A | 追加 | [5] | 1 | 接受 |
| t=12 | B | 追加 | [5, 12] | 2 | 接受 |
| t=35 | C | 追加 | [5, 12, 35] | 3 | 接受 |
| t=50 | D | 不追加(已达上限 3) | [5, 12, 35] | 3 | 拒绝 |
| t=70 | E | 弹出 5 → 追加 70 | [12, 35, 70] | 3 | 接受 |
| t=80 | F | 弹出 12 → 追加 80 | [35, 70, 80] | 3 | 接受 |
关键:while bucket and now - bucket[0] >= 60 循环从 deque 左端弹出超过 60 秒的旧时间戳,新请求追加到右端。达到上限时请求被拒绝,其时间戳 不会 加入 deque,避免恶意请求撑爆窗口。
本讲实践闭环¶
| 项目 | 内容 |
|---|---|
| 本讲类型 | 系统集成 |
| 实践产物 | FastAPI 路由、WebSocket、静态页面挂载和异步桥接 |
| 是否进入最终项目 | 是 |
| 验收方式 | /health、聊天接口、WebSocket 和前端页面均可访问 |
| 后续落点 | 第 13 讲加入启动前置校验,第 19 讲用于生产部署 |
通过标准:API 层只负责协议转换和连接管理,核心 RAG 逻辑仍在 service/pipeline 层。
本讲从 0 到 1 实现闭环¶
这一讲把项目从“Python 模块能调用”变成“浏览器和接口能访问”。实现时按四层推进:
flowchart TD
Browser["浏览器页面"] --> WS["WS /api/stream<br/>在线问答唯一入口"]
Browser --> History["HTTP 历史/反馈/场景接口"]
DebugPage["诊断脚本/状态页"] --> Debug["POST /api/retrieval/debug<br/>检索诊断"]
WS --> Service["QAService.stream_query()"]
Debug --> DebugService["QAService.debug_retrieval()"]
Service --> Thread["asyncio.to_thread<br/>推进阻塞型 Generator"]
Thread --> Events["逐个发送 Pipeline 事件"]
DebugService --> Json["返回意图/计划/命中 JSON"]
- 在
app.py创建 FastAPI 应用,只做路由注册、中间件、静态资源和生命周期。 - 在
qa_core/api/chat.py定义 WebSocket stream 接口,承载所有在线问答。 - 在同一个路由模块里保留历史、反馈和检索诊断等真实 HTTP 接口。
- 对阻塞型 RAG generator 使用线程桥接,避免卡住异步事件循环。
实现完成后,相关代码结构应该是下面这张图:
flowchart LR
App["app.py<br/>创建 FastAPI<br/>中间件/静态资源/路由"] --> Chat["qa_core/api/chat.py<br/>WebSocket stream<br/>history / feedback / debug"]
App --> Admin["qa_core/api/admin.py<br/>状态/报告/管理接口"]
App --> Pages["qa_core/api/pages.py<br/>页面路由"]
Chat --> Service["qa_core/application/service.py<br/>QAService"]
Static["static/<br/>浏览器页面"] --> Chat
Test["tests/test_api_protection.py<br/>令牌/限流/异常"] -. 验证 .-> Chat
来源:真实代码调用点,见 app.py。
WebSocket 接口的职责很薄:保持连接、接收请求、逐个发送 service 事件。它不拼 Prompt、不检索、不重排。
来源:真实代码逻辑压缩版,对应 qa_core/api/chat.py::websocket_endpoint() 和 _send_stream_events()。
涉及限流、管理令牌、CORS 这类保护逻辑时,要写成独立函数或依赖,避免散落在每个接口里。
来源:真实代码调用点,见 qa_core/api/chat.py 和 tests/test_api_protection.py。
闭环验证重点:
| 验证项 | 验证方式 | 期望结果 |
|---|---|---|
| 健康检查 | 请求 /health |
返回服务健康状态 |
| WebSocket | 连接 /api/stream |
能收到流式事件 |
| 检索诊断 | 请求 /api/retrieval/debug |
返回意图、计划、FAQ/Doc 命中 |
| 静态页面 | 浏览器访问首页 | 页面可加载并能发起请求 |
| API 保护 | 跑保护测试 | 管理令牌和限流生效 |
验收重点:异步框架负责并发连接,阻塞型 RAG 工作通过线程桥接;API 层不承担业务算法。
重点掌握¶
| 优先级 | 内容 | 原因 |
|---|---|---|
| ★★★ 必会 | 同步 vs 异步模型:异步在 I/O 等待时让出 CPU 处理其他请求 | RAG 系统(I/O 密集型)选择 FastAPI 的根本原因 |
| ★★★ 必会 | async/await 核心语法和与同步代码的区别 | 阅读和理解本项目所有 API 层代码的前置条件 |
| ★★★ 必会 | WebSocket 协议:全双工通信,服务端主动推送,逐 token 流式输出 | RAG 流式问答体验的底层技术 |
| ★★★ 必会 | 本项目 WebSocket 事件协议:start / status / token / end / error 五种事件 | 前后端协作的核心契约,理解 QAService Generator 的前提 |
| ★★ 理解 | FastAPI 路由拆分:APIRouter 实现模块化(pages/chat/admin/kb_versions) | 理解本项目 API 层的组织方式 |
| ★★ 理解 | 在线问答唯一入口:浏览器直接连接 WebSocket /api/stream |
避免多套在线入口造成状态、限流和回答口径不一致 |
| ★★ 理解 | 滑动窗口限流(check_rate_limit 的 deque 实现) | 生产环境必备的保护机制 |
| ★ 了解 | CORS 中间件配置 | 开发调试需要 |
| ★ 了解 | Pydantic 请求校验、依赖注入 | FastAPI 基础功能,回顾即可 |
本讲小结¶
- 异步(async/await) 让服务器在等待 I/O 时处理其他请求,适合 RAG 这种 I/O 密集型场景
- FastAPI 提供原生异步支持、自动 Pydantic 校验、WebSocket 和模块化路由
- WebSocket 支持服务端主动推送,让 LLM 的流式输出能逐 token 展示
- 本项目 API 层按职责拆分为 pages、chat、admin、kb_versions 四个路由模块
- 在线问答统一由 WebSocket(
/api/stream)承载;HTTP 只保留健康检查、历史、反馈、检索诊断和管理接口 asyncio.to_thread将同步业务逻辑放到线程池,保持事件循环不受阻塞
下一讲:应用入口与环境前置校验 — app.py 详解、启动校验链、检索栈预热