第10讲:RAG Pipeline 主流程深度解析
上一讲:QAService 核心编排
下一讲:Prompt 工程与 Profile 系统
本讲目标
- 理解 RAG Pipeline 的 8 个 Stage(0-7)事件生成模型
- 掌握 FAQ 快速路径和 FAQ 标准直出的区别
- 理解上下文构建中的筛选、去重、截断策略
- 理解答案引用增强的实现
本讲边界
本讲关注一次在线问答的完整主流程:问题如何进入 Pipeline、如何检索、如何构建上下文、如何流式返回。Prompt 选择和模板细节在本讲只作为“生成阶段的一部分”理解,完整展开放到第 11 讲。
本讲地图
本图对应本讲功能闭环,展示从输入到本讲交付物的主干路径。节点与主项目代码文件和函数保持一致,后续章节消费的能力只作为交付边界出现。
图 1:第 10 讲功能闭环地图
flowchart TD
C10_STREAM["主流程<br/>stream_query()"]
C10_CTX["请求上下文<br/>create_query_context()"]
C10_EVENT_START["开始事件<br/>start_event()"]
C10_ROUTE["查询路由<br/>decide_route()"]
C10_PREP["检索准备<br/>prepare_retrieval()"]
C10_FAQ["FAQ 检索<br/>search_faq() / get_faq_direct_answer()"]
C10_DOC["文档检索<br/>search_doc()"]
C10_CONTEXT["上下文构建<br/>build_context()"]
C10_ANSWER{{"生成与结束<br/>_finish_with_single_answer()"}}
C10_STREAM --> C10_CTX
C10_CTX --> C10_EVENT_START
C10_EVENT_START --> C10_ROUTE
C10_ROUTE -->|"retrieval"| C10_PREP
C10_PREP --> C10_FAQ
C10_FAQ -->|"高置信直出"| C10_ANSWER
C10_FAQ -->|"继续检索"| C10_DOC
C10_DOC --> C10_CONTEXT
C10_CONTEXT --> C10_ANSWER
style C10_STREAM fill:#F8FAFC,stroke:#64748B,stroke-width:2px
style C10_CTX fill:#DBEAFE,stroke:#2563EB,stroke-width:2px
style C10_EVENT_START fill:#F5F3FF,stroke:#7C3AED,stroke-width:2px
style C10_ROUTE fill:#FEF3C7,stroke:#D97706,stroke-width:2px
style C10_PREP fill:#DBEAFE,stroke:#2563EB,stroke-width:2px
style C10_FAQ fill:#F5F3FF,stroke:#7C3AED,stroke-width:2px
style C10_DOC fill:#FEF3C7,stroke:#D97706,stroke-width:2px
style C10_CONTEXT fill:#DBEAFE,stroke:#2563EB,stroke-width:2px
style C10_ANSWER fill:#DCFCE7,stroke:#16A34A,stroke-width:2px
节点与代码对齐
| 节点 |
对齐文件 |
函数/对象 |
本章职责 |
| 主流程 |
qa_core/pipeline/rag.py |
stream_query() |
串联 route、prepare、FAQ、doc、answer、end。 |
| 请求上下文 |
qa_core/pipeline/runtime.py |
create_query_context() |
创建 scenario、DataScope、session_id、trace_id 和 kb_version。 |
| 开始事件 |
qa_core/pipeline/runtime.py |
start_event() |
向前端发送 start 事件。 |
| 查询路由 |
qa_core/pipeline/steps.py |
decide_route() |
处理 direct_answer、faq_exact 或进入 retrieval。 |
| 检索准备 |
qa_core/pipeline/steps.py |
prepare_retrieval() |
生成 RetrievalPreparation。 |
| FAQ 检索 |
qa_core/pipeline/retrieval_steps.py |
search_faq() / get_faq_direct_answer() |
FAQ 高置信可提前结束。 |
| 文档检索 |
qa_core/pipeline/retrieval_steps.py |
search_doc() |
FAQ 未直出时继续检索文档。 |
| 上下文构建 |
qa_core/pipeline/context.py |
build_context() |
筛选证据并填入 PromptProfile.user_template。 |
| 生成与结束 |
qa_core/pipeline/rag.py |
_finish_with_single_answer() |
发送 token、保存历史并 finish_success。 |
第一部分:前置知识 — Pipeline 设计模式
1.1 Pipeline vs Chain
Chain(链):固定的步骤序列,A → B → C → D,没有分支。
Pipeline(管道):有分支、有快慢路径的流程。每一步可以提前结束(如 FAQ 命中时跳过文档检索),也可以根据上一步的结果调整下一步的参数。
本项目的 RAG 流程是 Pipeline 而非 Chain:
| 用户问题
→ 查询路由(direct_answer / faq_exact / retrieval)
→ 检索准备(历史 / 意图 / source / 按需改写 / 计划 / 变体)
→ 按 RetrievalPlan 执行 FAQ 检索(高置信标准直出可结束)
→ 按 RetrievalPlan 执行文档检索 → 上下文构建 → LLM 生成
|
1.2 Pipeline 的模块化拆分
项目将 Pipeline 拆分为多个职责单一的文件:
| qa_core/pipeline/
├── rag.py # 主流程编排(stream_query, debug_retrieval)
├── runtime.py # 请求上下文(RAGQueryContext)和事件工具函数
├── steps.py # 查询路由、检索准备、Prompt 准备
├── retrieval_steps.py # FAQ / 文档检索执行
├── context.py # 上下文构建(筛选、去重、格式化)
├── rewrite.py # 查询改写
├── query_variants.py # 查询变体生成
├── events.py # 事件构造(start/status/token/end/error)
└── citations.py # 答案引用增强
|
第二部分:8 个 Stage 主流程
Stage 0-7 可视化总览
flowchart TD
Start(["🚀 stream_query() 开始"]) --> Stage0
Stage0["🏗️ Stage 0:创建上下文<br/>场景/数据域/会话/trace/KB版本"] --> Stage1
Stage1["🧭 Stage 1:查询路由<br/>decide_route()"] --> RouteCheck{"RouteDecision.route?"}
RouteCheck -->|"direct_answer"| End1["📝 返回直接答案<br/>问候/越界/转人工/边界"]
RouteCheck -->|"faq_exact"| End2Fast["🎯 返回 FAQ 标准答案<br/>intent=FAQ_QUERY"]
RouteCheck -->|"retrieval"| Stage2
Stage2["🎯 Stage 2:检索准备<br/>历史/意图/source/按需改写/计划/变体"] --> Stage2Check{"兜底 direct_answer?"}
Stage2Check -->|"✅"| End2Direct["📝 返回直接答案"]
Stage2Check -->|"❌"| Stage3
Stage3["🔍 Stage 3:FAQ 检索<br/>plan.run_faq=True 时执行"] --> Stage3Check{"FAQ 高置信直出?<br/>精确匹配 或 分数>阈值"}
Stage3Check -->|"✅"| End3["📋 返回标准答案<br/>hit_type: faq_direct"]
Stage3Check -->|"❌"| Stage4
Stage4["📚 Stage 4:文档检索<br/>plan.run_doc=True 时执行<br/>Dense + Sparse Hybrid / Rerank"] --> Stage5
Stage5["📊 Stage 5:上下文构建<br/>筛选去重截断 / 组织引用来源"] --> Stage5Check{"召回结果是否不足?"}
Stage5Check -->|"信息不足"| End5["⚠️ 信息不足提示<br/>引导联系人工"]
Stage5Check -->|"✅ 有资料"| Stage6
Stage6["🤖 Stage 6:LLM 流式生成 + 引用增强<br/>逐 token 推送/补充来源标注"] --> Stage7
Stage7["💾 Stage 7:保存历史<br/>写入 Trace"] --> Final(["✅ end 事件<br/>返回 sources/intent/retrieval"])
style Stage1 fill:#FEF3C7,stroke:#D97706,stroke-width:2px
style Stage2 fill:#EFF6FF,stroke:#2563EB,stroke-width:2px
style Stage3 fill:#ECFDF5,stroke:#059669,stroke-width:2px
style Stage4 fill:#FFFBEB,stroke:#D97706,stroke-width:2px
style Stage5 fill:#FFFBEB,stroke:#D97706,stroke-width:2px
style Stage6 fill:#FEF2F2,stroke:#DC2626,stroke-width:2px
style Final fill:#ECFDF5,stroke:#059669,stroke-width:3px
2.1 完整流程代码(简化)
| # qa_core/pipeline/rag.py
def stream_query(history, validate_source, query, source_filter,
session_id, ...):
# 创建请求上下文
context = create_query_context(...)
yield build_query_start_event(context) # start 事件
try:
# === Stage 1: 查询路由 ===
yield build_status_event("正在进行查询路由...", context.session_id)
route = decide_route(context)
if route.answer:
yield from _finish_with_single_answer(context, history, query, route.answer)
return
# === Stage 2: 检索准备 ===
yield build_status_event("正在识别问题意图...", context.session_id)
prepared = prepare_retrieval(context)
# === 兜底:非 RAG 类回答(通常已在 Stage 1 返回)===
if prepared.intent.direct_answer:
yield from _finish_with_single_answer(context, history, query, prepared.intent.direct_answer)
return
# === Stage 3-6: FAQ 检索 → 文档检索 → 上下文构建 → LLM 生成 ===
helper_result = yield from _search_and_generate(context, prepared, query, history)
if helper_result is None:
return # 已在内部收尾(FAQ 直出或信息不足)
# === 引用补强 ===
answer = enforce_answer_citations(context.answer, helper_result.context_docs)
# === Stage 7: 保存历史 + 写入 Trace + 结束事件 ===
history.add_turn(context.session_id, query, answer)
yield finish_success(context, answer=answer)
except Exception as exc:
yield finish_error(context, exc)
def _search_and_generate(context, prepared, query, history):
"""检索-生成核心链路:FAQ 检索 → 文档检索 → 上下文构建 → LLM 流式生成。
提取为独立函数使 stream_query 主干更清晰,便于单步调试和异常定位。
"""
# Stage 3: FAQ 检索 + 直出判断
yield build_status_event("正在检索业务 FAQ 知识库...", context.session_id)
faq_result = search_faq(context, prepared)
direct_answer = get_faq_direct_answer(context, prepared, faq_result)
if direct_answer:
yield from _finish_with_single_answer(context, history, query, direct_answer)
return None
# Stage 4: 文档检索
yield build_status_event("正在匹配相关业务资料...", context.session_id)
doc_result = search_doc(context, prepared)
# Stage 5: 上下文构建
answer_prepared = prepare_answer(context, prepared, faq_result, doc_result)
context.sources = answer_prepared.sources
context.hit_type = answer_prepared.hit_type
if context.hit_type == "insufficient_context":
answer = build_insufficient_context_answer(context)
yield from _finish_with_single_answer(context, history, query, answer)
return None
# Stage 6: LLM 流式生成
yield build_status_event("正在生成回答...", context.session_id)
for chunk in stream_llm_answer(answer_prepared.system_prompt, answer_prepared.user_prompt):
token = str(getattr(chunk, "content", "") or "")
if not token:
continue
yield build_token_event(token, context.session_id)
return answer_prepared
|
2.2 Stage 1:查询路由
这是在线问答进入检索准备之前的低成本路由层。它统一处理三类结果:
| route |
intent |
含义 |
direct_answer |
GREETING / HUMAN_SERVICE / OUT_OF_SCOPE |
问候、转人工、越界、source 边界,直接返回 |
faq_exact |
FAQ_QUERY |
FAQ 标准问题精确命中,直接返回标准答案 |
retrieval |
暂不确定 |
路由不了,进入检索准备 |
这里的关键点是:intent 描述用户想做什么,route 描述系统下一步怎么处理。FAQ 精确命中不是新的用户意图,而是 route=faq_exact,同时携带 intent=FAQ_QUERY。
| @dataclass
class RouteDecision:
route: Literal["direct_answer", "faq_exact", "retrieval"]
answer: str | None = None
intent: IntentResult | None = None
reason: str = ""
def decide_route(context):
# 1. 先校验 source_filter
context.run_stage("validate_source", ...)
# 2. 协议/安全类直答:问候、越界、短句转人工
direct_intent = classify_direct_intent(context.query, context.scenario)
if direct_intent and direct_intent.direct_answer:
return RouteDecision("direct_answer", direct_intent.direct_answer, direct_intent)
# 3. source 边界
boundary_answer = detect_and_apply_boundary_answer(context)
if boundary_answer:
return RouteDecision("direct_answer", boundary_answer, out_of_scope_intent)
# 4. FAQ 精确命中:route 是 faq_exact,intent 仍是 FAQ_QUERY
if should_try_faq_fast_path(context.query, context.scenario):
answer, intent = try_fast_faq_direct_answer(context)
if answer:
return RouteDecision("faq_exact", answer, intent)
# 5. 路由不了,再进入检索准备
return RouteDecision("retrieval")
|
这也是你截图里最应该调整的地方:你好、转人工、彩票怎么买 这类问题不应该先进入 FAQ 快速路径,而应该在这一阶段直接收口。
2.3 FAQ 精确命中为什么放在路由层
FAQ 精确命中依赖知识库内容、版本、tenant、source_filter 和标准问题文本,它不是“用户意图类型”。所以更准确的表达是:查询路由层可以产出 route=faq_exact,并把 intent 标记为 FAQ_QUERY。
| # qa_core/pipeline/steps.py
from qa_core.config.rules import get_rule_config
def should_try_faq_fast_path(query, scenario):
"""判断是否值得先尝试 FAQ 精确直出。
快速路径只处理"短、完整、像标准问答"的问题。
不是语义缓存,也不是跳过检索:仍然访问当前场景的 FAQ Milvus 集合,
并带上 kb_version、tenant、dataset、visibility 和 role 过滤。
"""
rules = get_rule_config().faq_fast_path
compact_query = (query or "").strip()
if (
not compact_query
or len(compact_query) > rules.max_chars
or "\n" in compact_query
):
return False # 长问题、多行问题不适合快速路径
return bool(
rules.hint_matches(compact_query)
or infer_source(compact_query, scenario)
)
def try_fast_faq_direct_answer(context):
"""路由层的 FAQ 精确试探:只允许精确匹配,不允许相似直出。"""
faq_store = get_faq_store(context.scenario.faq_collection)
result = faq_store.search_many(
[context.query],
k=min(plan.faq_top_k, 12),
source_filter=effective_source_filter,
kb_version=context.active_kb_version,
valid_sources=context.scenario.valid_sources,
data_scope=context.data_scope,
source_type="faq",
rerank=False,
)
# 只允许精确匹配,分数阈值设为无穷大
answer, _ = _exact_faq_answer(context.query, result)
return answer, intent # 不是精确匹配就返回 (None, FAQ_QUERY intent),继续主流程
|
FAQ 快路径的触发词和最大长度来自 config/rules.toml 中的 faq_fast_path 配置,不写死在代码里。max_chars = 48 是本项目的初始保护阈值,不是官方标准。它的作用是把 FAQ 快路径限制在“一句话标准问法”上:短问题先试精确命中;长问题、多行问题、带多个条件的问题交给后面的完整检索链路处理。
这里还有一个边界要注意:FAQ 快路径只做“是否精确命中标准 FAQ”的判断,不选择 Prompt Profile。因为精确命中会直接返回标准答案,不调用 LLM,也不需要构造回答 Prompt。Prompt Profile 的选择发生在后面的 prepare_retrieval() / prepare_answer(),也就是完整检索和生成路径里。
_exact_faq_answer() — 精确匹配实现
| def _exact_faq_answer(query: str, faq_result: RetrievalResult) -> tuple[str | None, RetrievalResult]:
"""从 FAQ 候选中找与标准问题完全一致的答案。
快速路径只允许精确标准问答直出,不按相似分数直出。
找到精确命中后会把该命中排到来源列表第一位,方便页面展示。
"""
for index, hit in enumerate(faq_result.hits):
answer = direct_faq_answer(query, hit.document, hit.score, threshold=float("inf"))
if not answer:
continue
if index:
reordered = [hit, *faq_result.hits[:index], *faq_result.hits[index + 1 :]]
faq_result = RetrievalResult(
hits=reordered,
query=faq_result.query,
source_type=faq_result.source_type,
elapsed_ms=faq_result.elapsed_ms,
)
return answer, faq_result
return None, faq_result
|
为什么在检索准备之前做:
- 减少首 token 延迟。标准 FAQ 的精确命中不需要经过历史加载、检索类意图识别、改写、检索计划等步骤。
- FAQ 快速路径仍然访问 Milvus(带版本和数据隔离过滤),不是本地缓存。
- 它在同一个 decide_route() 中排在 direct_answer 之后,避免问候、转人工、越界问题先触发知识库查询。
为什么只允许精确匹配:
- 还没做意图识别,不知道这是 FAQ_QUERY 还是 KNOWLEDGE_QUERY
- 如果是知识咨询但 FAQ 相似分数高,可能误答。所以只允许用户问题和 FAQ 标准问题完全一致时才直出。
2.4 FAQ 标准直出 vs FAQ 精确路由
这是两个容易混淆的概念:
|
FAQ 精确路由(Stage 1) |
FAQ 标准直出(Stage 3) |
| 时机 |
decide_route() 中,检索准备之前 |
检索准备之后 |
| route / intent |
route=faq_exact,intent=FAQ_QUERY |
route=retrieval 后识别出 intent=FAQ_QUERY |
| 匹配方式 |
仅精确匹配 |
精确匹配 + 相似分数阈值 |
| 阈值 |
∞(只精确) |
动态(0.62~0.86) |
| 适用 |
短标准问答 |
已确认 FAQ_QUERY 意图 |
| 风险 |
低(只精确) |
中(相似分数可能误命中) |
第三部分:上下文构建
3.1 select_context_docs() 的筛选策略
| # qa_core/pipeline/context.py
def select_context_docs(faq_hits: list, doc_hits: list, plan: RetrievalPlan) -> list[Document]:
"""筛选进入 Prompt 的文档片段(只依赖 plan 对象,不需要 scenario 参数)。
执行流程:
1. FAQ 命中:过滤分数 → 取前 2 条 → 转成"常见问题 + 标准答案"格式
2. 文档命中:过滤分数 → prefer_table 时表格行优先 → 优先用 parent_content
3. 每条追加受 final_context_top_n / max_context_chars / max_context_doc_chars 三重约束
"""
selected = []
seen_keys = set()
used_chars = 0
# ── FAQ 部分:过滤 min_context_score → 取前 2 条 → 转成标准问答格式 ──
for hit in [h for h in faq_hits if h.score >= plan.min_context_score][:2]:
answer = hit.document.metadata.get("answer")
question = hit.document.metadata.get("standard_question") or hit.document.page_content
if answer:
_append_with_budget(
Document(page_content=f"常见问题:{question}\n标准答案:{answer}"),
f"faq:{document_key(hit.document)}",
selected, seen_keys, used_chars, plan)
# ── 文档部分:过滤分数 → prefer_table 排序 → 优先用 parent_content ──
eligible = [h for h in doc_hits if h.score >= plan.min_context_score]
if plan.prefer_table:
# 表格行(content_type 以 table 开头)排到普通正文前面
eligible = sorted(eligible,
key=lambda h: (0 if is_table_document(h.document) else 1, -h.score))
for hit in eligible:
parent_content = hit.document.metadata.get("parent_content")
key = str(hit.document.metadata.get("parent_id") or document_key(hit.document))
_append_with_budget(
Document(page_content=str(parent_content or hit.document.page_content)),
f"doc:{key}",
selected, seen_keys, used_chars, plan)
return selected
|
3.2 build_context() 的格式化输出
| def build_context(docs: list[Document]) -> str:
"""构建最终上下文文本(只依赖 doc.metadata,不依赖 scenario 对象)。"""
lines = []
seen: set[str] = set()
for i, doc in enumerate(docs):
content = doc.page_content.strip()
if not content or content in seen:
continue # 内容去重
seen.add(content)
source = _context_source_label(doc.metadata or {})
# 格式:[编号] 来源:文件名 或 标准问题名 或 表格 sheet+行号
header = f"[{i+1}] 来源:{source}"
lines.append(f"{header}\n{content}")
return "\n\n".join(lines)
|
输出示例:
| [1] 来源:人事制度 / 入职管理
入职流程包括以下步骤:1. 提交入职材料(身份证复印件、学历证书...)
[2] 来源:人事制度 / 审批权限
部门经理负责审批本部门员工的入职申请,审批时限为 3 个工作日...
[3] 来源:行政管理 / 工位分配
新员工入职后由行政部统一分配工位和办公设备...
|
第四部分:信息不足处理
4.1 什么情况判定为信息不足
prepare_answer 在 steps.py 中定义,其内部的信息不足判定委托给 _build_answer_context:
| # qa_core/pipeline/steps.py
def prepare_answer(
context: RAGQueryContext,
prepared: RetrievalPreparation,
faq_result: RetrievalResult,
doc_result: RetrievalResult,
) -> AnswerPreparation:
"""将 FAQ + 文档检索结果整理为 LLM Prompt、引用来源列表和命中类型。
信息不足判定委托给 _build_answer_context,prepare_answer 负责
组装最终的 system_prompt 和 user_prompt。
"""
context_docs, sources, hit_type, top_score = context.run_stage(
"build_answer_context",
lambda: _build_answer_context(prepared, faq_result, doc_result),
)
user_prompt = prepared.prompt_profile.user_template.format(
history=format_messages(prepared.history_messages),
question=prepared.rewritten_query,
context=build_context(context_docs)
or "无可用上下文。必须明确回答:信息不足,无法确认。",
)
return AnswerPreparation(
context_docs=context_docs,
sources=sources,
hit_type=hit_type,
system_prompt=prepared.prompt_profile.system_template,
user_prompt=user_prompt,
)
|
_build_answer_context 负责实际的上下文筛选和命中类型判定:
| def _build_answer_context(prepared, faq_result, doc_result):
"""整理上下文文档、引用来源列表、命中类型和最高分数。
无上下文通过分数过滤时命中类型标记为 insufficient_context。
"""
context_docs = select_context_docs(faq_result.hits, doc_result.hits, prepared.plan)
if prepared.plan.prefer_table:
sources = doc_result.source_payloads(limit=5) + faq_result.source_payloads(limit=2)
else:
sources = faq_result.source_payloads(limit=2) + doc_result.source_payloads(limit=5)
top_score = max(
[score for score in [faq_result.top_score, doc_result.top_score] if score is not None],
default=0.0,
)
return context_docs, sources, "rag" if context_docs else "insufficient_context", top_score
|
4.2 信息不足的答案
| def build_insufficient_context_answer(context: RAGQueryContext) -> str:
"""无可用上下文时返回确定性"信息不足"回答,避免 LLM 幻觉。"""
context.retrieval_info["insufficient_context_reason"] = "no_context_after_score_filter"
return f"信息不足,无法确认。当前知识库没有召回到足够可靠的依据,请联系{context.scenario.support_contact}。"
|
设计意图:信息不足时,系统明确告知用户(而不是让 LLM 即兴发挥),避免 LLM 在没有可靠资料的情况下生成"幻觉"答案。
第五部分:答案引用增强
5.1 什么是引用增强
LLM 生成的答案可能引用上下文中的信息,但不会自动标注"这个信息来自哪个文档"。引用增强在 LLM 生成完答案后,检查答案是否提到了上下文中的关键信息,如果提到了就补充来源标注。
| # qa_core/pipeline/citations.py
CITATION_RE = re.compile(r"\[\d+\]")
def has_source_citation(answer: str) -> bool:
"""判断答案中是否已经包含 [数字] 形式的来源编号。"""
return bool(CITATION_RE.search(answer or ""))
def source_reference_label(doc: Document, index: int) -> str:
"""生成简短来源标签(文件名/FAQ 标准问题;表格资料附加 sheet 和行号)。"""
from qa_core.document_metadata import format_source_label
return f"[{index}] {format_source_label(dict(doc.metadata or {}))}"
def enforce_table_row_details(answer: str, context_docs: list[Document]) -> str:
"""确保表格类答案在模型遗漏关键单元格时,确定性追加表格行要点。"""
details = []
for index, doc in enumerate(context_docs, start=1):
if not is_table_document(doc) or not needs_table_row_detail(answer, doc):
continue
detail = build_table_row_detail(doc, index)
if detail:
details.append(detail)
if len(details) >= 1:
break
if not details:
return answer
return f"{answer}\n\n" + "\n".join(details)
def enforce_answer_citations(answer: str, context_docs: list[Document]) -> str:
"""确保 RAG 答案带有可见来源编号:模型已写则保留,未写则末尾补充前 3 个来源。
额外检查表格类答案:模型遗漏核心单元格值(状态/金额/责任人等)时
确定性追加表格行要点,避免 LLM 忽略关键数据。
"""
clean_answer = (answer or "").strip()
if not clean_answer or not context_docs:
return clean_answer # 空答案或空来源时原样返回,不阻断流程
# 确保表格类答案不丢失关键单元格信息
clean_answer = enforce_table_row_details(clean_answer, context_docs)
# 答案已包含 [数字] 来源编号时不重复添加
if has_source_citation(clean_answer):
return clean_answer
# 为前 3 个上下文文档生成来源标签(文件名/FAQ 问题名/表格 sheet 和行号)
references = ";".join(
source_reference_label(doc, index)
for index, doc in enumerate(context_docs[:3], start=1)
)
return f"{clean_answer}\n\n参考来源:{references}"
|
第六部分:性能追踪
6.1 阶段计时
| # qa_core/pipeline/runtime.py
class RAGQueryContext:
"""RAG 请求的运行时状态(dataclass,字段名与讲义一致)。"""
started: float # 请求开始时间戳(time.perf_counter())
stage_timings_ms: dict[str, float] = {} # 各阶段耗时字典
first_token_ms: float | None = None # 首 token 到达时间(毫秒)
@contextmanager
def stage(self, name: str):
"""记录某个阶段的耗时。"""
started = time.perf_counter()
try:
yield
finally:
self.record_stage(name, started)
def record_stage(self, stage_name: str, started: float) -> float:
"""将阶段耗时写入 stage_timings_ms 字典。"""
elapsed_ms = (time.perf_counter() - started) * 1000
self.stage_timings_ms[stage_name] = round(elapsed_ms, 2)
return elapsed_ms
def mark_first_token(self):
"""记录首 token 时间(从请求 started 到首次推送 token 的毫秒数)。"""
if self.first_token_ms is None:
self.first_token_ms = round((time.perf_counter() - self.started) * 1000, 2)
|
追踪信息最终进入 end 事件:
| {
"type": "end",
"retrieval": {
"first_token_ms": 2478.7,
"stage_timings_ms": [
{"stage": "intent", "elapsed_ms": 320.5},
{"stage": "faq_search", "elapsed_ms": 450.2},
{"stage": "doc_search", "elapsed_ms": 680.1},
{"stage": "context_build", "elapsed_ms": 15.3},
{"stage": "llm_generation", "elapsed_ms": 3200.8},
{"stage": "save_history", "elapsed_ms": 45.2}
],
"slowest_stage": {"stage": "llm_generation", "elapsed_ms": 3200.8}
}
}
|
这些数据帮助性能优化:如果文档检索阶段总是很慢,可能需要调整 top_k 或索引参数;如果 LLM 生成阶段很慢,可能需要换更快的模型或调整 max_tokens。
第七部分:流式事件协议 — 前后端如何协作
7.1 事件驱动的问答模型
一次 RAG 问答不是"前端发请求 → 等 5 秒 → 收到完整答案"。实际的用户体验是:
| 前端发送问题 → 看到"正在进行查询路由..." →
看到"正在识别问题意图..." → 看到"正在检索 FAQ..." → 看到"正在匹配业务资料..." → 看到"正在生成回答..." → token 逐字出现 →
看到完整答案 + 来源引用
|
这就是事件驱动模型。后端通过 WebSocket 持续推送事件,前端根据事件类型更新 UI。
7.2 五种事件类型
sequenceDiagram
participant Browser as 浏览器
participant WS as /api/stream (WebSocket)
participant QASvc as QAService
participant Pipeline as RAG Pipeline
Browser->>WS: {"query": "入职流程有哪些步骤", ...}
WS->>QASvc: stream_query(...)
QASvc->>Pipeline: 创建生成器
Pipeline-->>WS: {"type": "start", "session_id": "...", "trace_id": "..."}
WS-->>Browser: 问答已开始,记录 session_id
Pipeline-->>WS: {"type": "status", "message": "正在进行查询路由..."}
WS-->>Browser: 更新状态提示
Pipeline-->>WS: {"type": "status", "message": "正在识别问题意图..."}
WS-->>Browser: 更新状态提示
Pipeline-->>WS: {"type": "status", "message": "正在检索业务 FAQ 知识库..."}
WS-->>Browser: 更新状态提示
Pipeline-->>WS: {"type": "status", "message": "正在匹配相关业务资料..."}
WS-->>Browser: 更新状态提示
Pipeline-->>WS: {"type": "status", "message": "正在生成回答..."}
WS-->>Browser: 更新状态提示,准备接收 token
loop LLM 流式生成
Pipeline-->>WS: {"type": "token", "token": "入"}
WS-->>Browser: 追加字符到答案区
Pipeline-->>WS: {"type": "token", "token": "职"}
WS-->>Browser: 追加字符到答案区
Pipeline-->>WS: {"type": "token", "token": "流"}
WS-->>Browser: 追加字符到答案区
end
Pipeline-->>WS: {"type": "end", "sources": [...], "hit_type": "rag", ...}
WS-->>Browser: 渲染来源引用、展示诊断信息
7.3 每种事件的字段结构
start 事件 — 请求已被接收:
| {
"type": "start",
"session_id": "abc123",
"trace_id": "xyz789",
"scenario_id": "enterprise_knowledge",
"scenario_name": "企业内部知识助手",
"data_scope": {"tenant_id": "default", "dataset_id": "default"},
"kb_version": "20260515_a1b2c3d4"
}
|
status 事件 — 阶段性进度通知:
| {
"type": "status",
"session_id": "abc123",
"message": "正在检索业务 FAQ 知识库..."
}
|
前端通常将 message 显示为一个动态更新的状态栏或加载提示。
token 事件 — 流式答案的片段:
| {
"type": "token",
"session_id": "abc123",
"token": "入"
}
|
每个 token 是一个或多个中文字符。前端将这些 token 逐个追加到答案区域,实现打字机效果。
end 事件 — 问答完成:
| {
"type": "end",
"session_id": "abc123",
"hit_type": "rag",
"answer": "入职流程包括以下步骤:1. 提交材料 ...",
"sources": [
{"file_name": "入职流程.md", "source": "hr", "score": 0.92},
{"file_name": "FAQ", "standard_question": "入职需要哪些材料", "score": 0.88}
],
"intent": {"intent": "KNOWLEDGE_QUERY", "confidence": 0.85},
"retrieval": {
"plan": {"faq_top_k": 20, "doc_top_k": 20, "rerank": true},
"query_variants": ["入职流程", "入职步骤", "入职办理流程"],
"faq_elapsed_ms": 45.2,
"doc_elapsed_ms": 120.5,
"stage_timings_ms": {...},
"first_token_ms": 350.8,
"total_elapsed_ms": 4520.3
},
"processing_time": 4.52,
"trace_id": "xyz789"
}
|
error 事件 — 异常恢复:
| {
"type": "error",
"session_id": "abc123",
"error": "LLM 服务暂时不可用,请稍后重试。",
"trace_id": "xyz789"
}
|
7.4 前端如何消费事件
| // static/js/chat.js(简化逻辑)
const ws = new WebSocket(`ws://${location.host}/api/stream`);
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
switch (data.type) {
case "start":
state.sessionId = data.session_id;
state.traceId = data.trace_id;
break;
case "status":
updateStatusBar(data.message); // "正在检索 FAQ..."
break;
case "token":
appendToAnswer(data.content); // 追加到答案区
break;
case "end":
renderSources(data.sources); // 渲染来源引用
renderDiagnostics(data.retrieval); // 展示检索诊断
updateStatusBar(""); // 清除状态栏
state.inProgress = false;
break;
case "error":
showError(data.error); // 显示错误提示
state.inProgress = false;
break;
}
};
|
7.5 后端如何推进生成器
关键问题:QAService.stream_query() 是同步生成器(它内部顺序执行意图识别、Milvus 检索、本地 rerank 和 LLM 流式调用),但 WebSocket 路由是异步函数。如果直接调用 next(iterator),事件循环会被阻塞。
解决方案:asyncio.to_thread 将同步生成器的推进放到独立线程:
| # qa_core/api/chat.py
stream = get_qa_service().stream_query(*context.service_args())
while True:
# 在线程中推进同步生成器,不阻塞事件循环
has_event, event = await asyncio.to_thread(_next_stream_event, stream)
if not has_event or event is None:
break
await websocket.send_json(event)
if event.get("type") in {"end", "error"}:
if event.get("type") == "end":
# 后台异步刷新历史摘要(不阻塞用户看到结果)
_schedule_summary_refresh(session_id)
break
|
flowchart TD
subgraph MainThread["主线程(事件循环)"]
WS["WebSocket 接收消息"]
Send["发送事件到浏览器"]
Schedule["调度后台摘要刷新"]
end
subgraph WorkerThread["工作线程"]
Gen["推进同步生成器<br/>next(iterator)"]
Intent["意图识别"]
Milvus["Milvus 检索"]
Rerank["本地重排"]
LLM["LLM 流式调用"]
end
WS -->|"asyncio.to_thread"| Gen
Gen --> Intent --> Milvus --> Rerank --> LLM
LLM -->|"yield token"| Gen
Gen -->|"返回事件"| Send
style MainThread fill:#EFF6FF,stroke:#3B82F6,stroke-width:2px
style WorkerThread fill:#ECFDF5,stroke:#059669,stroke-width:2px
为什么需要两个线程? 这是一个 Python 异步编程中很经典的"同步生成器 + 异步 WebSocket"阻抗匹配问题。
QAService.stream_query() 是一个同步生成器——它内部顺序执行意图识别、Milvus 检索(gRPC 阻塞调用)、本地 Rerank(CPU 密集计算)、LLM 流式调用(HTTP 阻塞读取)。如果把这段逻辑直接放在主线程的事件循环中调用 next(iterator),整个事件循环会在每次推进生成器时被阻塞,导致其他 WebSocket 连接、HTTP 请求全部卡住。
解决方案:asyncio.to_thread 作为桥梁。 主线程通过 asyncio.to_thread 把同步生成器的推进操作丢给线程池中的工作线程,自己立即返回并继续处理事件循环中的其他任务。工作线程推进完成后,结果通过 Future 传回主线程,主线程再 await websocket.send_json(event) 发给浏览器。
图中两条线程的分工:
| 职责 |
主线程(事件循环) |
工作线程 |
| WebSocket 收发 |
✅ 接收用户消息、发送事件 |
❌ |
| 意图识别 |
❌ |
✅ 同步调用 |
| Milvus 检索 |
❌ |
✅ gRPC 阻塞调用 |
| 本地 Rerank |
❌ |
✅ CPU 密集计算 |
| LLM 流式调用 |
❌ |
✅ HTTP 阻塞读取 |
| 后台摘要刷新 |
✅ 调度(不阻塞响应) |
❌ |
事件如何跨线程:工作线程每产出一个 token 或状态事件,生成器 yield 一次;主线程的 _next_stream_event(stream) 捕获这个值并通过 asyncio.to_thread 的返回值传回;主线程拿到事件后立即 send_json 给浏览器。这个过程对用户透明——浏览器看到的是连续的 status → token... → end 事件流。
为什么不在 LLM 流式阶段回到主线程? LLM 的 llm.stream() 本身返回一个迭代器,每次迭代都是阻塞的 HTTP 读取操作。如果回到主线程逐 token 读取,同样会阻塞事件循环。所以整个生成器——从意图识别到最后一个 token——全部留在工作线程中执行。
7.6 事件协议的设计原则
- 类型安全:每个事件都有
type 字段,前端用 switch 分派处理,不靠字段存在与否判断
- 诊断信息附带:
end 事件携带完整的 retrieval 诊断信息,前端可以用 JS 渲染到页面上,帮助用户理解"系统为什么这样回答"
- 错误不崩溃:异常转为
error 事件,不抛到 WebSocket 路由。用户看到错误提示后可以继续下一轮提问
- 历史写入在最后:
end 事件之后才写 MySQL 历史,确保历史记录的是完整答案(含引用增强后的来源)
本讲实践闭环
| 项目 |
内容 |
| 本讲类型 |
系统集成 |
| 实践产物 |
Stage 0-7 RAG Pipeline、上下文构建、引用增强、流式事件协议 |
| 是否进入最终项目 |
是 |
| 验收方式 |
发起一次知识问答,观察 start/status/token/end 与引用来源 |
| 后续落点 |
第 11 讲完善 Prompt Profile,第 19 讲通过 Trace 观察阶段耗时 |
通过标准:FAQ 快速路径、文档 RAG、信息不足、流式生成都能走到可解释的分支。
本讲从 0 到 1 实现闭环
本讲是在线问答的主干实现。实现完成后,相关代码结构应该是下面这张图:
flowchart LR
subgraph Pipeline["qa_core/pipeline"]
Rag["rag.py<br/>stream_query 主流程"]
Steps["steps.py<br/>阶段函数<br/>意图/检索/生成"]
RetrievalSteps["retrieval_steps.py<br/>FAQ/Doc 检索分支"]
Context["context.py<br/>运行上下文<br/>阶段计时"]
Citations["citations.py<br/>引用增强"]
Events["events.py<br/>start/status/token/end/error"]
end
subgraph Support["支撑模块"]
Intent["intent/classifier.py"]
Strategy["retrieval/strategy.py"]
Store["retrieval/store.py"]
Prompts["prompts/selector.py"]
History["memory/history.py"]
end
Rag --> Steps
Steps --> RetrievalSteps
Steps --> Context
Steps --> Citations
Rag --> Events
Intent --> Steps
Strategy --> Steps
Store --> RetrievalSteps
Prompts --> Steps
History --> Steps
Step 1:定义流式事件协议
目标:Pipeline 不直接返回字符串,而是持续产出浏览器能理解的事件。
来源:真实代码逻辑压缩版,对应 qa_core/pipeline/events.py、qa_core/pipeline/runtime.py 与 qa_core/pipeline/rag.py。
| context = create_query_context(...)
yield start_event(context)
yield status_event("正在进行查询路由...", context.session_id)
yield status_event("正在识别问题意图...", context.session_id)
yield status_event("正在检索业务 FAQ 知识库...", context.session_id)
yield status_event("正在匹配相关业务资料...", context.session_id)
yield status_event("正在生成回答...", context.session_id)
yield token_event(token, context.session_id)
yield finish_success(context, answer=answer)
|
设计解释:前端要实时展示进度,后端要暴露阶段诊断,所以事件比单个字符串更适合 RAG。
Step 2:串起意图、计划和检索
目标:把第 5-8 讲的模块接进同一条主流程。
来源:真实代码逻辑压缩版,对应 qa_core/pipeline/rag.py 和 qa_core/pipeline/steps.py。
| yield status_event("正在进行查询路由...", context.session_id)
route = decide_route(context)
if route.answer:
yield from _finish_with_single_answer(context, history, query, route.answer)
return
yield status_event("正在识别问题意图...", context.session_id)
prepared = prepare_retrieval(context)
if prepared.intent.direct_answer:
if context.hit_type == "unknown":
context.hit_type = prepared.intent.intent.lower()
yield from _finish_with_single_answer(context, history, query, prepared.intent.direct_answer)
return
|
设计解释:Pipeline 不自己判断所有规则,而是调用前面章节实现的模块,保持职责清晰。
调试入口补充:debug_retrieval() 只用于检索诊断,故意从 prepare_retrieval() 开始跑半链路,方便观察检索类意图、source 推断、按需改写和 FAQ/Doc 召回;线上问答入口仍然是 stream_query(),并且一定先经过 decide_route()。
Step 3:实现 FAQ 快速路径和文档 RAG 分支
来源:真实代码逻辑压缩版,对应 qa_core/pipeline/rag.py::_search_and_generate() 与 qa_core/pipeline/retrieval_steps.py。
| yield status_event("正在检索业务 FAQ 知识库...", context.session_id)
faq_result = search_faq(context, prepared)
direct_answer = get_faq_direct_answer(context, prepared, faq_result)
if direct_answer:
context.hit_type = "faq_direct"
context.sources = faq_result.source_payloads()
yield from _finish_with_single_answer(context, history, query, direct_answer)
return
yield status_event("正在匹配相关业务资料...", context.session_id)
doc_result = search_doc(context, prepared)
answer_prepared = prepare_answer(context, prepared, faq_result, doc_result)
context.sources = answer_prepared.sources
context.hit_type = answer_prepared.hit_type
if context.hit_type == "insufficient_context":
answer = build_insufficient_context_answer(context)
yield from _finish_with_single_answer(context, history, query, answer, record_save_stage=True)
return
|
设计解释:FAQ 高置信直出可以省掉 LLM;文档 RAG 需要足够上下文;没有上下文时必须明确拒绝幻觉。
Step 4:构建上下文并流式生成
来源:真实代码逻辑压缩版,对应 qa_core/pipeline/rag.py、qa_core/pipeline/steps.py、qa_core/pipeline/context.py、qa_core/pipeline/citations.py。
| yield status_event("正在生成回答...", context.session_id)
with context.stage("llm_generation"):
for chunk in stream_llm_answer(answer_prepared.system_prompt, answer_prepared.user_prompt):
token = str(getattr(chunk, "content", "") or "")
if not token:
continue
context.answer_parts.append(token)
context.mark_first_token()
yield token_event(token, context.session_id)
raw_answer = context.answer
answer = enforce_answer_citations(raw_answer, answer_prepared.context_docs)
if not answer:
answer = f"信息不足,无法确认,请联系人工支持:{context.scenario.support_contact}。"
elif answer != raw_answer:
extra_token = answer[len(raw_answer):] if answer.startswith(raw_answer) else answer
yield token_event(extra_token, context.session_id)
context.answer_parts = [answer]
|
设计解释:LLM 只看到筛选后的上下文;引用增强在生成后执行,保证答案可追溯。
Step 5:保存历史并记录 Trace
来源:真实代码逻辑压缩版,对应 qa_core/pipeline/rag.py::_finish_with_single_answer() 和 Stage 7。
| with context.stage("save_history"):
history.add_turn(context.session_id, query, answer)
yield finish_success(context, answer=answer)
# 对于 FAQ 直出、直接意图、信息不足这类已有完整答案的分支:
context.answer_parts = [answer]
context.mark_first_token()
yield token_event(answer, context.session_id)
history.add_turn(context.session_id, query, answer)
yield finish_success(context, answer=answer)
|
设计解释:历史要保存最终答案,而不是未补引用的中间答案;Trace 要记录命中路径、阶段耗时和检索分数。
Step 6:验收完整链路
验收方式:
来源:命令行验收,对应 scripts/api_e2e_smoke.py。
| python scripts/api_e2e_smoke.py
|
闭环验证重点:
| 验证项 |
输入场景 |
期望结果 |
| 事件协议 |
任意知识问答 |
能看到 start/status/token/end |
| FAQ 快速路径 |
高置信 FAQ |
可直出答案并带来源 |
| 文档 RAG |
长文档类问题 |
检索上下文后生成答案 |
| 信息不足 |
知识库无依据 |
返回 insufficient_context,不编造 |
| 引用增强 |
有上下文答案 |
答案包含来源引用 |
| 历史保存 |
连续对话 |
后续追问可读取历史 |
可复制的链路验证问题
以下问题基于默认业务场景 enterprise_knowledge。这些问题不是用来考察答案内容本身,而是用来观察一次请求在 Pipeline 中走到了哪个分支。页面右侧诊断信息中的 hit_type、FAQ 最高分、Doc 最高分、最慢阶段 可以辅助确认链路。
| 目标链路 |
可复制问题 |
预期路径 |
会不会调用 LLM |
观察重点 |
| 问候直答 |
你好 |
decide_route() → direct_answer,hit_type=greeting |
不会 |
只出现“正在进行查询路由...”,不检索 Milvus。 |
| 越界拦截 |
彩票怎么买才能中奖? |
decide_route() → direct_answer,hit_type=out_of_scope |
不会 |
用场景边界直接拒绝,避免把无关问题送入知识库。 |
| FAQ 直出 |
VPN 连不上应该怎么处理? |
FAQ 快速路径 / FAQ 高置信直出,hit_type=faq_direct |
不会 |
返回 FAQ 标准答案,有来源,但跳过文档检索和 LLM。 |
| 完整 RAG 生成 |
VPN 客户端版本、账号锁定、公网 IP 这些排查项分别应该怎么处理?请按步骤说明。 |
retrieval → 意图识别 → FAQ 检索 → 文档检索 → 上下文构建 → LLM 生成,hit_type=rag |
会 |
会看到“正在生成回答...”;FAQ 分数不足以直出,Doc 命中较高,最慢阶段通常是 llm_generation。 |
| 表格资料 RAG |
预算预审批里客户招待和紧急采购分别由谁预审批,补充说明是什么? |
表格资料召回 → 上下文构建 → LLM 生成,hit_type=rag |
会 |
重点观察来源中是否出现 budget_preapproval_matrix.xlsx 这类表格资料。 |
| 资料未直接覆盖 |
公司办公楼停车位申请流程是什么? |
可能进入 rag,由 LLM 基于已召回上下文说明“资料未覆盖”;若无有效上下文则进入 insufficient_context |
可能会 |
用来观察“不能编造”的约束:有上下文但不含答案时,回答应明确说明当前资料没有依据。 |
完整 RAG 生成的关键判断是:状态流中出现下面这一组阶段,并且最终 hit_type=rag:
| 正在进行查询路由...
正在识别问题意图...
正在检索业务 FAQ 知识库...
正在匹配相关业务资料...
正在生成回答...
|
如果只出现“正在进行查询路由...”就结束,通常说明命中了问候、越界、转人工或 FAQ 快速直出;这类分支是 Pipeline 的快速路径,不是完整 RAG 生成路径。
通过标准:
- 能看到
start/status/token/end 事件。
- FAQ 快速路径、文档 RAG、信息不足至少各有可解释分支。
- 最终答案带引用来源。
- 右侧诊断或 Trace 能看到命中路径、阶段耗时和 top score。
重点掌握
| 优先级 |
内容 |
原因 |
| ★★★ 必会 |
Stage 0-7 主流程:创建上下文 → 查询路由 → 检索准备 → FAQ 检索 → 文档检索 → 上下文构建 → LLM 流式生成/引用增强 → 保存历史 |
RAG Pipeline 的完整骨架 |
| ★★★ 必会 |
route=faq_exact vs intent=FAQ_QUERY vs FAQ 标准直出的区别 |
route 是系统处理方式,intent 是用户意图,二者不要混淆 |
| ★★★ 必会 |
上下文构建(select_context_docs)的筛选策略:FAQ 前 2 条 → 分数过滤 → 去重 → 表格行优先 → 三重预算约束(条数/单条长度/总长度) |
决定 LLM 看到的上下文质量 |
| ★★★ 必会 |
流式事件协议的五种事件类型(start/status/token/end/error)及前后端协作方式 |
理解浏览器如何实时展示 RAG 进度 |
| ★★ 理解 |
信息不足(insufficient_context)时明确告知用户,不让 LLM 即兴发挥 |
避免幻觉的重要安全机制 |
| ★★ 理解 |
引用增强(enforce_answer_citations):LLM 答案已有 [N] 则不重复,否则末尾补充前 3 个来源 |
保证答案可溯源 |
| ★★ 理解 |
阶段计时(RAGQueryContext.stage)追踪每个阶段耗时 |
性能优化的数据基础 |
| ★★ 理解 |
asyncio.to_thread 桥接同步 Generator 和异步 WebSocket |
Python 异步编程的关键模式 |
| ★ 了解 |
Pipeline 的模块化拆分:rag.py / runtime.py / steps.py / retrieval_steps.py / context.py / events.py / citations.py |
了解文件职责划分 |
本讲小结
- Pipeline > Chain:RAG 是有分支、有快慢路径的管道,不是固定步骤的链
- 查询路由统一处理 direct_answer、faq_exact、retrieval,避免拆成两套“意图识别”
- FAQ 精确命中是
route=faq_exact,不是新的 intent;它命中时仍携带 intent=FAQ_QUERY
- 上下文构建依次执行:FAQ 补充 → 分数过滤 → 去重 → 优先表格 → 截断 → 格式化
- 信息不足时明确告知用户,而不是让 LLM 在没有资料的情况下生成幻觉
- 引用增强在 LLM 生成的答案后补充来源标注
- 阶段计时追踪每个阶段的耗时,帮助定位性能瓶颈
下一讲:Prompt 工程与 Profile 系统 — 提示词模板设计、Profile 选择策略、高风险问题安全约束