跳转至

第10讲:RAG Pipeline 主流程深度解析

上一讲QAService 核心编排 下一讲Prompt 工程与 Profile 系统

本讲目标

  • 理解 RAG Pipeline 的 8 个 Stage(0-7)事件生成模型
  • 掌握 FAQ 快速路径和 FAQ 标准直出的区别
  • 理解上下文构建中的筛选、去重、截断策略
  • 理解答案引用增强的实现

本讲边界

本讲关注一次在线问答的完整主流程:问题如何进入 Pipeline、如何检索、如何构建上下文、如何流式返回。Prompt 选择和模板细节在本讲只作为“生成阶段的一部分”理解,完整展开放到第 11 讲。

本讲地图

本图对应本讲功能闭环,展示从输入到本讲交付物的主干路径。节点与主项目代码文件和函数保持一致,后续章节消费的能力只作为交付边界出现。

图 1:第 10 讲功能闭环地图

flowchart TD
    C10_STREAM["主流程<br/>stream_query()"]
    C10_CTX["请求上下文<br/>create_query_context()"]
    C10_EVENT_START["开始事件<br/>start_event()"]
    C10_ROUTE["查询路由<br/>decide_route()"]
    C10_PREP["检索准备<br/>prepare_retrieval()"]
    C10_FAQ["FAQ 检索<br/>search_faq() / get_faq_direct_answer()"]
    C10_DOC["文档检索<br/>search_doc()"]
    C10_CONTEXT["上下文构建<br/>build_context()"]
    C10_ANSWER{{"生成与结束<br/>_finish_with_single_answer()"}}
    C10_STREAM --> C10_CTX
    C10_CTX --> C10_EVENT_START
    C10_EVENT_START --> C10_ROUTE
    C10_ROUTE -->|"retrieval"| C10_PREP
    C10_PREP --> C10_FAQ
    C10_FAQ -->|"高置信直出"| C10_ANSWER
    C10_FAQ -->|"继续检索"| C10_DOC
    C10_DOC --> C10_CONTEXT
    C10_CONTEXT --> C10_ANSWER
    style C10_STREAM fill:#F8FAFC,stroke:#64748B,stroke-width:2px
    style C10_CTX fill:#DBEAFE,stroke:#2563EB,stroke-width:2px
    style C10_EVENT_START fill:#F5F3FF,stroke:#7C3AED,stroke-width:2px
    style C10_ROUTE fill:#FEF3C7,stroke:#D97706,stroke-width:2px
    style C10_PREP fill:#DBEAFE,stroke:#2563EB,stroke-width:2px
    style C10_FAQ fill:#F5F3FF,stroke:#7C3AED,stroke-width:2px
    style C10_DOC fill:#FEF3C7,stroke:#D97706,stroke-width:2px
    style C10_CONTEXT fill:#DBEAFE,stroke:#2563EB,stroke-width:2px
    style C10_ANSWER fill:#DCFCE7,stroke:#16A34A,stroke-width:2px

节点与代码对齐

节点 对齐文件 函数/对象 本章职责
主流程 qa_core/pipeline/rag.py stream_query() 串联 route、prepare、FAQ、doc、answer、end。
请求上下文 qa_core/pipeline/runtime.py create_query_context() 创建 scenario、DataScope、session_id、trace_id 和 kb_version。
开始事件 qa_core/pipeline/runtime.py start_event() 向前端发送 start 事件。
查询路由 qa_core/pipeline/steps.py decide_route() 处理 direct_answer、faq_exact 或进入 retrieval。
检索准备 qa_core/pipeline/steps.py prepare_retrieval() 生成 RetrievalPreparation。
FAQ 检索 qa_core/pipeline/retrieval_steps.py search_faq() / get_faq_direct_answer() FAQ 高置信可提前结束。
文档检索 qa_core/pipeline/retrieval_steps.py search_doc() FAQ 未直出时继续检索文档。
上下文构建 qa_core/pipeline/context.py build_context() 筛选证据并填入 PromptProfile.user_template。
生成与结束 qa_core/pipeline/rag.py _finish_with_single_answer() 发送 token、保存历史并 finish_success。

第一部分:前置知识 — Pipeline 设计模式

1.1 Pipeline vs Chain

Chain(链):固定的步骤序列,A → B → C → D,没有分支。

Pipeline(管道):有分支、有快慢路径的流程。每一步可以提前结束(如 FAQ 命中时跳过文档检索),也可以根据上一步的结果调整下一步的参数。

本项目的 RAG 流程是 Pipeline 而非 Chain:

1
2
3
4
5
用户问题
  → 查询路由(direct_answer / faq_exact / retrieval)
  → 检索准备(历史 / 意图 / source / 按需改写 / 计划 / 变体)
  → 按 RetrievalPlan 执行 FAQ 检索(高置信标准直出可结束)
  → 按 RetrievalPlan 执行文档检索 → 上下文构建 → LLM 生成

1.2 Pipeline 的模块化拆分

项目将 Pipeline 拆分为多个职责单一的文件:

qa_core/pipeline/
├── rag.py          # 主流程编排(stream_query, debug_retrieval)
├── runtime.py      # 请求上下文(RAGQueryContext)和事件工具函数
├── steps.py        # 查询路由、检索准备、Prompt 准备
├── retrieval_steps.py  # FAQ / 文档检索执行
├── context.py      # 上下文构建(筛选、去重、格式化)
├── rewrite.py      # 查询改写
├── query_variants.py  # 查询变体生成
├── events.py       # 事件构造(start/status/token/end/error)
└── citations.py    # 答案引用增强

第二部分:8 个 Stage 主流程

Stage 0-7 可视化总览

flowchart TD
    Start(["🚀 stream_query() 开始"]) --> Stage0

    Stage0["🏗️ Stage 0:创建上下文<br/>场景/数据域/会话/trace/KB版本"] --> Stage1

    Stage1["🧭 Stage 1:查询路由<br/>decide_route()"] --> RouteCheck{"RouteDecision.route?"}
    RouteCheck -->|"direct_answer"| End1["📝 返回直接答案<br/>问候/越界/转人工/边界"]
    RouteCheck -->|"faq_exact"| End2Fast["🎯 返回 FAQ 标准答案<br/>intent=FAQ_QUERY"]
    RouteCheck -->|"retrieval"| Stage2

    Stage2["🎯 Stage 2:检索准备<br/>历史/意图/source/按需改写/计划/变体"] --> Stage2Check{"兜底 direct_answer?"}
    Stage2Check -->|"✅"| End2Direct["📝 返回直接答案"]
    Stage2Check -->|"❌"| Stage3

    Stage3["🔍 Stage 3:FAQ 检索<br/>plan.run_faq=True 时执行"] --> Stage3Check{"FAQ 高置信直出?<br/>精确匹配 或 分数>阈值"}
    Stage3Check -->|"✅"| End3["📋 返回标准答案<br/>hit_type: faq_direct"]
    Stage3Check -->|"❌"| Stage4

    Stage4["📚 Stage 4:文档检索<br/>plan.run_doc=True 时执行<br/>Dense + Sparse Hybrid / Rerank"] --> Stage5
    Stage5["📊 Stage 5:上下文构建<br/>筛选去重截断 / 组织引用来源"] --> Stage5Check{"召回结果是否不足?"}
    Stage5Check -->|"信息不足"| End5["⚠️ 信息不足提示<br/>引导联系人工"]
    Stage5Check -->|"✅ 有资料"| Stage6

    Stage6["🤖 Stage 6:LLM 流式生成 + 引用增强<br/>逐 token 推送/补充来源标注"] --> Stage7

    Stage7["💾 Stage 7:保存历史<br/>写入 Trace"] --> Final(["✅ end 事件<br/>返回 sources/intent/retrieval"])

    style Stage1 fill:#FEF3C7,stroke:#D97706,stroke-width:2px
    style Stage2 fill:#EFF6FF,stroke:#2563EB,stroke-width:2px
    style Stage3 fill:#ECFDF5,stroke:#059669,stroke-width:2px
    style Stage4 fill:#FFFBEB,stroke:#D97706,stroke-width:2px
    style Stage5 fill:#FFFBEB,stroke:#D97706,stroke-width:2px
    style Stage6 fill:#FEF2F2,stroke:#DC2626,stroke-width:2px
    style Final fill:#ECFDF5,stroke:#059669,stroke-width:3px

2.1 完整流程代码(简化)

# qa_core/pipeline/rag.py
def stream_query(history, validate_source, query, source_filter,
                 session_id, ...):
    # 创建请求上下文
    context = create_query_context(...)
    yield build_query_start_event(context)  # start 事件

    try:
        # === Stage 1: 查询路由 ===
        yield build_status_event("正在进行查询路由...", context.session_id)
        route = decide_route(context)
        if route.answer:
            yield from _finish_with_single_answer(context, history, query, route.answer)
            return

        # === Stage 2: 检索准备 ===
        yield build_status_event("正在识别问题意图...", context.session_id)
        prepared = prepare_retrieval(context)

        # === 兜底:非 RAG 类回答(通常已在 Stage 1 返回)===
        if prepared.intent.direct_answer:
            yield from _finish_with_single_answer(context, history, query, prepared.intent.direct_answer)
            return

        # === Stage 3-6: FAQ 检索 → 文档检索 → 上下文构建 → LLM 生成 ===
        helper_result = yield from _search_and_generate(context, prepared, query, history)
        if helper_result is None:
            return  # 已在内部收尾(FAQ 直出或信息不足)

        # === 引用补强 ===
        answer = enforce_answer_citations(context.answer, helper_result.context_docs)

        # === Stage 7: 保存历史 + 写入 Trace + 结束事件 ===
        history.add_turn(context.session_id, query, answer)
        yield finish_success(context, answer=answer)

    except Exception as exc:
        yield finish_error(context, exc)


def _search_and_generate(context, prepared, query, history):
    """检索-生成核心链路:FAQ 检索 → 文档检索 → 上下文构建 → LLM 流式生成。

    提取为独立函数使 stream_query 主干更清晰,便于单步调试和异常定位。
    """
    # Stage 3: FAQ 检索 + 直出判断
    yield build_status_event("正在检索业务 FAQ 知识库...", context.session_id)
    faq_result = search_faq(context, prepared)
    direct_answer = get_faq_direct_answer(context, prepared, faq_result)
    if direct_answer:
        yield from _finish_with_single_answer(context, history, query, direct_answer)
        return None

    # Stage 4: 文档检索
    yield build_status_event("正在匹配相关业务资料...", context.session_id)
    doc_result = search_doc(context, prepared)

    # Stage 5: 上下文构建
    answer_prepared = prepare_answer(context, prepared, faq_result, doc_result)
    context.sources = answer_prepared.sources
    context.hit_type = answer_prepared.hit_type

    if context.hit_type == "insufficient_context":
        answer = build_insufficient_context_answer(context)
        yield from _finish_with_single_answer(context, history, query, answer)
        return None

    # Stage 6: LLM 流式生成
    yield build_status_event("正在生成回答...", context.session_id)
    for chunk in stream_llm_answer(answer_prepared.system_prompt, answer_prepared.user_prompt):
        token = str(getattr(chunk, "content", "") or "")
        if not token:
            continue
        yield build_token_event(token, context.session_id)

    return answer_prepared

2.2 Stage 1:查询路由

这是在线问答进入检索准备之前的低成本路由层。它统一处理三类结果:

route intent 含义
direct_answer GREETING / HUMAN_SERVICE / OUT_OF_SCOPE 问候、转人工、越界、source 边界,直接返回
faq_exact FAQ_QUERY FAQ 标准问题精确命中,直接返回标准答案
retrieval 暂不确定 路由不了,进入检索准备

这里的关键点是:intent 描述用户想做什么,route 描述系统下一步怎么处理。FAQ 精确命中不是新的用户意图,而是 route=faq_exact,同时携带 intent=FAQ_QUERY

@dataclass
class RouteDecision:
    route: Literal["direct_answer", "faq_exact", "retrieval"]
    answer: str | None = None
    intent: IntentResult | None = None
    reason: str = ""


def decide_route(context):
    # 1. 先校验 source_filter
    context.run_stage("validate_source", ...)

    # 2. 协议/安全类直答:问候、越界、短句转人工
    direct_intent = classify_direct_intent(context.query, context.scenario)
    if direct_intent and direct_intent.direct_answer:
        return RouteDecision("direct_answer", direct_intent.direct_answer, direct_intent)

    # 3. source 边界
    boundary_answer = detect_and_apply_boundary_answer(context)
    if boundary_answer:
        return RouteDecision("direct_answer", boundary_answer, out_of_scope_intent)

    # 4. FAQ 精确命中:route 是 faq_exact,intent 仍是 FAQ_QUERY
    if should_try_faq_fast_path(context.query, context.scenario):
        answer, intent = try_fast_faq_direct_answer(context)
        if answer:
            return RouteDecision("faq_exact", answer, intent)

    # 5. 路由不了,再进入检索准备
    return RouteDecision("retrieval")

这也是你截图里最应该调整的地方:你好转人工彩票怎么买 这类问题不应该先进入 FAQ 快速路径,而应该在这一阶段直接收口。

2.3 FAQ 精确命中为什么放在路由层

FAQ 精确命中依赖知识库内容、版本、tenant、source_filter 和标准问题文本,它不是“用户意图类型”。所以更准确的表达是:查询路由层可以产出 route=faq_exact,并把 intent 标记为 FAQ_QUERY

# qa_core/pipeline/steps.py
from qa_core.config.rules import get_rule_config

def should_try_faq_fast_path(query, scenario):
    """判断是否值得先尝试 FAQ 精确直出。

    快速路径只处理"短、完整、像标准问答"的问题。
    不是语义缓存,也不是跳过检索:仍然访问当前场景的 FAQ Milvus 集合,
    并带上 kb_version、tenant、dataset、visibility 和 role 过滤。
    """
    rules = get_rule_config().faq_fast_path
    compact_query = (query or "").strip()
    if (
        not compact_query
        or len(compact_query) > rules.max_chars
        or "\n" in compact_query
    ):
        return False  # 长问题、多行问题不适合快速路径
    return bool(
        rules.hint_matches(compact_query)
        or infer_source(compact_query, scenario)
    )

def try_fast_faq_direct_answer(context):
    """路由层的 FAQ 精确试探:只允许精确匹配,不允许相似直出。"""
    faq_store = get_faq_store(context.scenario.faq_collection)
    result = faq_store.search_many(
        [context.query],
        k=min(plan.faq_top_k, 12),
        source_filter=effective_source_filter,
        kb_version=context.active_kb_version,
        valid_sources=context.scenario.valid_sources,
        data_scope=context.data_scope,
        source_type="faq",
        rerank=False,
    )
    # 只允许精确匹配,分数阈值设为无穷大
    answer, _ = _exact_faq_answer(context.query, result)
    return answer, intent  # 不是精确匹配就返回 (None, FAQ_QUERY intent),继续主流程

FAQ 快路径的触发词和最大长度来自 config/rules.toml 中的 faq_fast_path 配置,不写死在代码里。max_chars = 48 是本项目的初始保护阈值,不是官方标准。它的作用是把 FAQ 快路径限制在“一句话标准问法”上:短问题先试精确命中;长问题、多行问题、带多个条件的问题交给后面的完整检索链路处理。

这里还有一个边界要注意:FAQ 快路径只做“是否精确命中标准 FAQ”的判断,不选择 Prompt Profile。因为精确命中会直接返回标准答案,不调用 LLM,也不需要构造回答 Prompt。Prompt Profile 的选择发生在后面的 prepare_retrieval() / prepare_answer(),也就是完整检索和生成路径里。

_exact_faq_answer() — 精确匹配实现

def _exact_faq_answer(query: str, faq_result: RetrievalResult) -> tuple[str | None, RetrievalResult]:
    """从 FAQ 候选中找与标准问题完全一致的答案。

    快速路径只允许精确标准问答直出,不按相似分数直出。
    找到精确命中后会把该命中排到来源列表第一位,方便页面展示。
    """
    for index, hit in enumerate(faq_result.hits):
        answer = direct_faq_answer(query, hit.document, hit.score, threshold=float("inf"))
        if not answer:
            continue
        if index:
            reordered = [hit, *faq_result.hits[:index], *faq_result.hits[index + 1 :]]
            faq_result = RetrievalResult(
                hits=reordered,
                query=faq_result.query,
                source_type=faq_result.source_type,
                elapsed_ms=faq_result.elapsed_ms,
            )
        return answer, faq_result
    return None, faq_result

为什么在检索准备之前做: - 减少首 token 延迟。标准 FAQ 的精确命中不需要经过历史加载、检索类意图识别、改写、检索计划等步骤。 - FAQ 快速路径仍然访问 Milvus(带版本和数据隔离过滤),不是本地缓存。 - 它在同一个 decide_route() 中排在 direct_answer 之后,避免问候、转人工、越界问题先触发知识库查询。

为什么只允许精确匹配: - 还没做意图识别,不知道这是 FAQ_QUERY 还是 KNOWLEDGE_QUERY - 如果是知识咨询但 FAQ 相似分数高,可能误答。所以只允许用户问题和 FAQ 标准问题完全一致时才直出。

2.4 FAQ 标准直出 vs FAQ 精确路由

这是两个容易混淆的概念:

FAQ 精确路由(Stage 1) FAQ 标准直出(Stage 3)
时机 decide_route() 中,检索准备之前 检索准备之后
route / intent route=faq_exactintent=FAQ_QUERY route=retrieval 后识别出 intent=FAQ_QUERY
匹配方式 仅精确匹配 精确匹配 + 相似分数阈值
阈值 ∞(只精确) 动态(0.62~0.86)
适用 短标准问答 已确认 FAQ_QUERY 意图
风险 低(只精确) 中(相似分数可能误命中)

第三部分:上下文构建

3.1 select_context_docs() 的筛选策略

# qa_core/pipeline/context.py
def select_context_docs(faq_hits: list, doc_hits: list, plan: RetrievalPlan) -> list[Document]:
    """筛选进入 Prompt 的文档片段(只依赖 plan 对象,不需要 scenario 参数)。

    执行流程:
    1. FAQ 命中:过滤分数 → 取前 2 条 → 转成"常见问题 + 标准答案"格式
    2. 文档命中:过滤分数 → prefer_table 时表格行优先 → 优先用 parent_content
    3. 每条追加受 final_context_top_n / max_context_chars / max_context_doc_chars 三重约束
    """
    selected = []
    seen_keys = set()
    used_chars = 0

    # ── FAQ 部分:过滤 min_context_score → 取前 2 条 → 转成标准问答格式 ──
    for hit in [h for h in faq_hits if h.score >= plan.min_context_score][:2]:
        answer = hit.document.metadata.get("answer")
        question = hit.document.metadata.get("standard_question") or hit.document.page_content
        if answer:
            _append_with_budget(
                Document(page_content=f"常见问题:{question}\n标准答案:{answer}"),
                f"faq:{document_key(hit.document)}",
                selected, seen_keys, used_chars, plan)

    # ── 文档部分:过滤分数 → prefer_table 排序 → 优先用 parent_content ──
    eligible = [h for h in doc_hits if h.score >= plan.min_context_score]
    if plan.prefer_table:
        # 表格行(content_type 以 table 开头)排到普通正文前面
        eligible = sorted(eligible,
            key=lambda h: (0 if is_table_document(h.document) else 1, -h.score))

    for hit in eligible:
        parent_content = hit.document.metadata.get("parent_content")
        key = str(hit.document.metadata.get("parent_id") or document_key(hit.document))
        _append_with_budget(
            Document(page_content=str(parent_content or hit.document.page_content)),
            f"doc:{key}",
            selected, seen_keys, used_chars, plan)
    return selected

3.2 build_context() 的格式化输出

def build_context(docs: list[Document]) -> str:
    """构建最终上下文文本(只依赖 doc.metadata,不依赖 scenario 对象)。"""
    lines = []
    seen: set[str] = set()
    for i, doc in enumerate(docs):
        content = doc.page_content.strip()
        if not content or content in seen:
            continue  # 内容去重
        seen.add(content)
        source = _context_source_label(doc.metadata or {})

        # 格式:[编号] 来源:文件名 或 标准问题名 或 表格 sheet+行号
        header = f"[{i+1}] 来源:{source}"
        lines.append(f"{header}\n{content}")

    return "\n\n".join(lines)

输出示例:

1
2
3
4
5
6
7
8
[1] 来源:人事制度 / 入职管理
入职流程包括以下步骤:1. 提交入职材料(身份证复印件、学历证书...)

[2] 来源:人事制度 / 审批权限
部门经理负责审批本部门员工的入职申请,审批时限为 3 个工作日...

[3] 来源:行政管理 / 工位分配
新员工入职后由行政部统一分配工位和办公设备...

第四部分:信息不足处理

4.1 什么情况判定为信息不足

prepare_answersteps.py 中定义,其内部的信息不足判定委托给 _build_answer_context

# qa_core/pipeline/steps.py
def prepare_answer(
    context: RAGQueryContext,
    prepared: RetrievalPreparation,
    faq_result: RetrievalResult,
    doc_result: RetrievalResult,
) -> AnswerPreparation:
    """将 FAQ + 文档检索结果整理为 LLM Prompt、引用来源列表和命中类型。

    信息不足判定委托给 _build_answer_context,prepare_answer 负责
    组装最终的 system_prompt 和 user_prompt。
    """
    context_docs, sources, hit_type, top_score = context.run_stage(
        "build_answer_context",
        lambda: _build_answer_context(prepared, faq_result, doc_result),
    )

    user_prompt = prepared.prompt_profile.user_template.format(
        history=format_messages(prepared.history_messages),
        question=prepared.rewritten_query,
        context=build_context(context_docs)
        or "无可用上下文。必须明确回答:信息不足,无法确认。",
    )
    return AnswerPreparation(
        context_docs=context_docs,
        sources=sources,
        hit_type=hit_type,
        system_prompt=prepared.prompt_profile.system_template,
        user_prompt=user_prompt,
    )

_build_answer_context 负责实际的上下文筛选和命中类型判定:

def _build_answer_context(prepared, faq_result, doc_result):
    """整理上下文文档、引用来源列表、命中类型和最高分数。

    无上下文通过分数过滤时命中类型标记为 insufficient_context。
    """
    context_docs = select_context_docs(faq_result.hits, doc_result.hits, prepared.plan)
    if prepared.plan.prefer_table:
        sources = doc_result.source_payloads(limit=5) + faq_result.source_payloads(limit=2)
    else:
        sources = faq_result.source_payloads(limit=2) + doc_result.source_payloads(limit=5)
    top_score = max(
        [score for score in [faq_result.top_score, doc_result.top_score] if score is not None],
        default=0.0,
    )
    return context_docs, sources, "rag" if context_docs else "insufficient_context", top_score

4.2 信息不足的答案

1
2
3
4
def build_insufficient_context_answer(context: RAGQueryContext) -> str:
    """无可用上下文时返回确定性"信息不足"回答,避免 LLM 幻觉。"""
    context.retrieval_info["insufficient_context_reason"] = "no_context_after_score_filter"
    return f"信息不足,无法确认。当前知识库没有召回到足够可靠的依据,请联系{context.scenario.support_contact}。"

设计意图:信息不足时,系统明确告知用户(而不是让 LLM 即兴发挥),避免 LLM 在没有可靠资料的情况下生成"幻觉"答案。


第五部分:答案引用增强

5.1 什么是引用增强

LLM 生成的答案可能引用上下文中的信息,但不会自动标注"这个信息来自哪个文档"。引用增强在 LLM 生成完答案后,检查答案是否提到了上下文中的关键信息,如果提到了就补充来源标注。

# qa_core/pipeline/citations.py
CITATION_RE = re.compile(r"\[\d+\]")

def has_source_citation(answer: str) -> bool:
    """判断答案中是否已经包含 [数字] 形式的来源编号。"""
    return bool(CITATION_RE.search(answer or ""))

def source_reference_label(doc: Document, index: int) -> str:
    """生成简短来源标签(文件名/FAQ 标准问题;表格资料附加 sheet 和行号)。"""
    from qa_core.document_metadata import format_source_label
    return f"[{index}] {format_source_label(dict(doc.metadata or {}))}"

def enforce_table_row_details(answer: str, context_docs: list[Document]) -> str:
    """确保表格类答案在模型遗漏关键单元格时,确定性追加表格行要点。"""
    details = []
    for index, doc in enumerate(context_docs, start=1):
        if not is_table_document(doc) or not needs_table_row_detail(answer, doc):
            continue
        detail = build_table_row_detail(doc, index)
        if detail:
            details.append(detail)
        if len(details) >= 1:
            break
    if not details:
        return answer
    return f"{answer}\n\n" + "\n".join(details)

def enforce_answer_citations(answer: str, context_docs: list[Document]) -> str:
    """确保 RAG 答案带有可见来源编号:模型已写则保留,未写则末尾补充前 3 个来源。

    额外检查表格类答案:模型遗漏核心单元格值(状态/金额/责任人等)时
    确定性追加表格行要点,避免 LLM 忽略关键数据。
    """
    clean_answer = (answer or "").strip()
    if not clean_answer or not context_docs:
        return clean_answer  # 空答案或空来源时原样返回,不阻断流程

    # 确保表格类答案不丢失关键单元格信息
    clean_answer = enforce_table_row_details(clean_answer, context_docs)
    # 答案已包含 [数字] 来源编号时不重复添加
    if has_source_citation(clean_answer):
        return clean_answer

    # 为前 3 个上下文文档生成来源标签(文件名/FAQ 问题名/表格 sheet 和行号)
    references = ";".join(
        source_reference_label(doc, index)
        for index, doc in enumerate(context_docs[:3], start=1)
    )
    return f"{clean_answer}\n\n参考来源:{references}"

第六部分:性能追踪

6.1 阶段计时

# qa_core/pipeline/runtime.py
class RAGQueryContext:
    """RAG 请求的运行时状态(dataclass,字段名与讲义一致)。"""
    started: float           # 请求开始时间戳(time.perf_counter())
    stage_timings_ms: dict[str, float] = {}  # 各阶段耗时字典
    first_token_ms: float | None = None      # 首 token 到达时间(毫秒)

    @contextmanager
    def stage(self, name: str):
        """记录某个阶段的耗时。"""
        started = time.perf_counter()
        try:
            yield
        finally:
            self.record_stage(name, started)

    def record_stage(self, stage_name: str, started: float) -> float:
        """将阶段耗时写入 stage_timings_ms 字典。"""
        elapsed_ms = (time.perf_counter() - started) * 1000
        self.stage_timings_ms[stage_name] = round(elapsed_ms, 2)
        return elapsed_ms

    def mark_first_token(self):
        """记录首 token 时间(从请求 started 到首次推送 token 的毫秒数)。"""
        if self.first_token_ms is None:
            self.first_token_ms = round((time.perf_counter() - self.started) * 1000, 2)

追踪信息最终进入 end 事件:

{
    "type": "end",
    "retrieval": {
        "first_token_ms": 2478.7,
        "stage_timings_ms": [
            {"stage": "intent", "elapsed_ms": 320.5},
            {"stage": "faq_search", "elapsed_ms": 450.2},
            {"stage": "doc_search", "elapsed_ms": 680.1},
            {"stage": "context_build", "elapsed_ms": 15.3},
            {"stage": "llm_generation", "elapsed_ms": 3200.8},
            {"stage": "save_history", "elapsed_ms": 45.2}
        ],
        "slowest_stage": {"stage": "llm_generation", "elapsed_ms": 3200.8}
    }
}

这些数据帮助性能优化:如果文档检索阶段总是很慢,可能需要调整 top_k 或索引参数;如果 LLM 生成阶段很慢,可能需要换更快的模型或调整 max_tokens。


第七部分:流式事件协议 — 前后端如何协作

7.1 事件驱动的问答模型

一次 RAG 问答不是"前端发请求 → 等 5 秒 → 收到完整答案"。实际的用户体验是:

1
2
3
前端发送问题 → 看到"正在进行查询路由..." →
看到"正在识别问题意图..." → 看到"正在检索 FAQ..." → 看到"正在匹配业务资料..." → 看到"正在生成回答..." → token 逐字出现 →
看到完整答案 + 来源引用

这就是事件驱动模型。后端通过 WebSocket 持续推送事件,前端根据事件类型更新 UI。

7.2 五种事件类型

sequenceDiagram
    participant Browser as 浏览器
    participant WS as /api/stream (WebSocket)
    participant QASvc as QAService
    participant Pipeline as RAG Pipeline

    Browser->>WS: {"query": "入职流程有哪些步骤", ...}
    WS->>QASvc: stream_query(...)
    QASvc->>Pipeline: 创建生成器

    Pipeline-->>WS: {"type": "start", "session_id": "...", "trace_id": "..."}
    WS-->>Browser: 问答已开始,记录 session_id

    Pipeline-->>WS: {"type": "status", "message": "正在进行查询路由..."}
    WS-->>Browser: 更新状态提示

    Pipeline-->>WS: {"type": "status", "message": "正在识别问题意图..."}
    WS-->>Browser: 更新状态提示

    Pipeline-->>WS: {"type": "status", "message": "正在检索业务 FAQ 知识库..."}
    WS-->>Browser: 更新状态提示

    Pipeline-->>WS: {"type": "status", "message": "正在匹配相关业务资料..."}
    WS-->>Browser: 更新状态提示

    Pipeline-->>WS: {"type": "status", "message": "正在生成回答..."}
    WS-->>Browser: 更新状态提示,准备接收 token

    loop LLM 流式生成
        Pipeline-->>WS: {"type": "token", "token": "入"}
        WS-->>Browser: 追加字符到答案区
        Pipeline-->>WS: {"type": "token", "token": "职"}
        WS-->>Browser: 追加字符到答案区
        Pipeline-->>WS: {"type": "token", "token": "流"}
        WS-->>Browser: 追加字符到答案区
    end

    Pipeline-->>WS: {"type": "end", "sources": [...], "hit_type": "rag", ...}
    WS-->>Browser: 渲染来源引用、展示诊断信息

7.3 每种事件的字段结构

start 事件 — 请求已被接收:

1
2
3
4
5
6
7
8
9
{
    "type": "start",
    "session_id": "abc123",
    "trace_id": "xyz789",
    "scenario_id": "enterprise_knowledge",
    "scenario_name": "企业内部知识助手",
    "data_scope": {"tenant_id": "default", "dataset_id": "default"},
    "kb_version": "20260515_a1b2c3d4"
}

status 事件 — 阶段性进度通知:

1
2
3
4
5
{
    "type": "status",
    "session_id": "abc123",
    "message": "正在检索业务 FAQ 知识库..."
}

前端通常将 message 显示为一个动态更新的状态栏或加载提示。

token 事件 — 流式答案的片段:

1
2
3
4
5
{
    "type": "token",
    "session_id": "abc123",
    "token": "入"
}

每个 token 是一个或多个中文字符。前端将这些 token 逐个追加到答案区域,实现打字机效果。

end 事件 — 问答完成:

{
    "type": "end",
    "session_id": "abc123",
    "hit_type": "rag",
    "answer": "入职流程包括以下步骤:1. 提交材料 ...",
    "sources": [
        {"file_name": "入职流程.md", "source": "hr", "score": 0.92},
        {"file_name": "FAQ", "standard_question": "入职需要哪些材料", "score": 0.88}
    ],
    "intent": {"intent": "KNOWLEDGE_QUERY", "confidence": 0.85},
    "retrieval": {
        "plan": {"faq_top_k": 20, "doc_top_k": 20, "rerank": true},
        "query_variants": ["入职流程", "入职步骤", "入职办理流程"],
        "faq_elapsed_ms": 45.2,
        "doc_elapsed_ms": 120.5,
        "stage_timings_ms": {...},
        "first_token_ms": 350.8,
        "total_elapsed_ms": 4520.3
    },
    "processing_time": 4.52,
    "trace_id": "xyz789"
}

error 事件 — 异常恢复:

1
2
3
4
5
6
{
    "type": "error",
    "session_id": "abc123",
    "error": "LLM 服务暂时不可用,请稍后重试。",
    "trace_id": "xyz789"
}

7.4 前端如何消费事件

// static/js/chat.js(简化逻辑)

const ws = new WebSocket(`ws://${location.host}/api/stream`);

ws.onmessage = (event) => {
    const data = JSON.parse(event.data);

    switch (data.type) {
        case "start":
            state.sessionId = data.session_id;
            state.traceId = data.trace_id;
            break;

        case "status":
            updateStatusBar(data.message);  // "正在检索 FAQ..."
            break;

        case "token":
            appendToAnswer(data.content);    // 追加到答案区
            break;

        case "end":
            renderSources(data.sources);     // 渲染来源引用
            renderDiagnostics(data.retrieval); // 展示检索诊断
            updateStatusBar("");             // 清除状态栏
            state.inProgress = false;
            break;

        case "error":
            showError(data.error);           // 显示错误提示
            state.inProgress = false;
            break;
    }
};

7.5 后端如何推进生成器

关键问题:QAService.stream_query()同步生成器(它内部顺序执行意图识别、Milvus 检索、本地 rerank 和 LLM 流式调用),但 WebSocket 路由是异步函数。如果直接调用 next(iterator),事件循环会被阻塞。

解决方案:asyncio.to_thread 将同步生成器的推进放到独立线程:

# qa_core/api/chat.py
stream = get_qa_service().stream_query(*context.service_args())

while True:
    # 在线程中推进同步生成器,不阻塞事件循环
    has_event, event = await asyncio.to_thread(_next_stream_event, stream)
    if not has_event or event is None:
        break
    await websocket.send_json(event)

    if event.get("type") in {"end", "error"}:
        if event.get("type") == "end":
            # 后台异步刷新历史摘要(不阻塞用户看到结果)
            _schedule_summary_refresh(session_id)
        break
flowchart TD
    subgraph MainThread["主线程(事件循环)"]
        WS["WebSocket 接收消息"]
        Send["发送事件到浏览器"]
        Schedule["调度后台摘要刷新"]
    end

    subgraph WorkerThread["工作线程"]
        Gen["推进同步生成器<br/>next(iterator)"]
        Intent["意图识别"]
        Milvus["Milvus 检索"]
        Rerank["本地重排"]
        LLM["LLM 流式调用"]
    end

    WS -->|"asyncio.to_thread"| Gen
    Gen --> Intent --> Milvus --> Rerank --> LLM
    LLM -->|"yield token"| Gen
    Gen -->|"返回事件"| Send

    style MainThread fill:#EFF6FF,stroke:#3B82F6,stroke-width:2px
    style WorkerThread fill:#ECFDF5,stroke:#059669,stroke-width:2px

为什么需要两个线程? 这是一个 Python 异步编程中很经典的"同步生成器 + 异步 WebSocket"阻抗匹配问题。

QAService.stream_query() 是一个同步生成器——它内部顺序执行意图识别、Milvus 检索(gRPC 阻塞调用)、本地 Rerank(CPU 密集计算)、LLM 流式调用(HTTP 阻塞读取)。如果把这段逻辑直接放在主线程的事件循环中调用 next(iterator),整个事件循环会在每次推进生成器时被阻塞,导致其他 WebSocket 连接、HTTP 请求全部卡住。

解决方案:asyncio.to_thread 作为桥梁。 主线程通过 asyncio.to_thread 把同步生成器的推进操作丢给线程池中的工作线程,自己立即返回并继续处理事件循环中的其他任务。工作线程推进完成后,结果通过 Future 传回主线程,主线程再 await websocket.send_json(event) 发给浏览器。

图中两条线程的分工:

职责 主线程(事件循环) 工作线程
WebSocket 收发 ✅ 接收用户消息、发送事件
意图识别 ✅ 同步调用
Milvus 检索 ✅ gRPC 阻塞调用
本地 Rerank ✅ CPU 密集计算
LLM 流式调用 ✅ HTTP 阻塞读取
后台摘要刷新 ✅ 调度(不阻塞响应)

事件如何跨线程:工作线程每产出一个 token 或状态事件,生成器 yield 一次;主线程的 _next_stream_event(stream) 捕获这个值并通过 asyncio.to_thread 的返回值传回;主线程拿到事件后立即 send_json 给浏览器。这个过程对用户透明——浏览器看到的是连续的 status → token... → end 事件流。

为什么不在 LLM 流式阶段回到主线程? LLM 的 llm.stream() 本身返回一个迭代器,每次迭代都是阻塞的 HTTP 读取操作。如果回到主线程逐 token 读取,同样会阻塞事件循环。所以整个生成器——从意图识别到最后一个 token——全部留在工作线程中执行。

7.6 事件协议的设计原则

  1. 类型安全:每个事件都有 type 字段,前端用 switch 分派处理,不靠字段存在与否判断
  2. 诊断信息附带end 事件携带完整的 retrieval 诊断信息,前端可以用 JS 渲染到页面上,帮助用户理解"系统为什么这样回答"
  3. 错误不崩溃:异常转为 error 事件,不抛到 WebSocket 路由。用户看到错误提示后可以继续下一轮提问
  4. 历史写入在最后end 事件之后才写 MySQL 历史,确保历史记录的是完整答案(含引用增强后的来源)

本讲实践闭环

项目 内容
本讲类型 系统集成
实践产物 Stage 0-7 RAG Pipeline、上下文构建、引用增强、流式事件协议
是否进入最终项目
验收方式 发起一次知识问答,观察 start/status/token/end 与引用来源
后续落点 第 11 讲完善 Prompt Profile,第 19 讲通过 Trace 观察阶段耗时

通过标准:FAQ 快速路径、文档 RAG、信息不足、流式生成都能走到可解释的分支。

本讲从 0 到 1 实现闭环

本讲是在线问答的主干实现。实现完成后,相关代码结构应该是下面这张图:

flowchart LR
    subgraph Pipeline["qa_core/pipeline"]
        Rag["rag.py<br/>stream_query 主流程"]
        Steps["steps.py<br/>阶段函数<br/>意图/检索/生成"]
        RetrievalSteps["retrieval_steps.py<br/>FAQ/Doc 检索分支"]
        Context["context.py<br/>运行上下文<br/>阶段计时"]
        Citations["citations.py<br/>引用增强"]
        Events["events.py<br/>start/status/token/end/error"]
    end

    subgraph Support["支撑模块"]
        Intent["intent/classifier.py"]
        Strategy["retrieval/strategy.py"]
        Store["retrieval/store.py"]
        Prompts["prompts/selector.py"]
        History["memory/history.py"]
    end

    Rag --> Steps
    Steps --> RetrievalSteps
    Steps --> Context
    Steps --> Citations
    Rag --> Events
    Intent --> Steps
    Strategy --> Steps
    Store --> RetrievalSteps
    Prompts --> Steps
    History --> Steps

Step 1:定义流式事件协议

目标:Pipeline 不直接返回字符串,而是持续产出浏览器能理解的事件。

来源:真实代码逻辑压缩版,对应 qa_core/pipeline/events.pyqa_core/pipeline/runtime.pyqa_core/pipeline/rag.py

context = create_query_context(...)
yield start_event(context)

yield status_event("正在进行查询路由...", context.session_id)
yield status_event("正在识别问题意图...", context.session_id)
yield status_event("正在检索业务 FAQ 知识库...", context.session_id)
yield status_event("正在匹配相关业务资料...", context.session_id)
yield status_event("正在生成回答...", context.session_id)

yield token_event(token, context.session_id)
yield finish_success(context, answer=answer)

设计解释:前端要实时展示进度,后端要暴露阶段诊断,所以事件比单个字符串更适合 RAG。

Step 2:串起意图、计划和检索

目标:把第 5-8 讲的模块接进同一条主流程。

来源:真实代码逻辑压缩版,对应 qa_core/pipeline/rag.pyqa_core/pipeline/steps.py

yield status_event("正在进行查询路由...", context.session_id)
route = decide_route(context)
if route.answer:
    yield from _finish_with_single_answer(context, history, query, route.answer)
    return

yield status_event("正在识别问题意图...", context.session_id)
prepared = prepare_retrieval(context)

if prepared.intent.direct_answer:
    if context.hit_type == "unknown":
        context.hit_type = prepared.intent.intent.lower()
    yield from _finish_with_single_answer(context, history, query, prepared.intent.direct_answer)
    return

设计解释:Pipeline 不自己判断所有规则,而是调用前面章节实现的模块,保持职责清晰。

调试入口补充:debug_retrieval() 只用于检索诊断,故意从 prepare_retrieval() 开始跑半链路,方便观察检索类意图、source 推断、按需改写和 FAQ/Doc 召回;线上问答入口仍然是 stream_query(),并且一定先经过 decide_route()

Step 3:实现 FAQ 快速路径和文档 RAG 分支

来源:真实代码逻辑压缩版,对应 qa_core/pipeline/rag.py::_search_and_generate()qa_core/pipeline/retrieval_steps.py

yield status_event("正在检索业务 FAQ 知识库...", context.session_id)
faq_result = search_faq(context, prepared)
direct_answer = get_faq_direct_answer(context, prepared, faq_result)
if direct_answer:
    context.hit_type = "faq_direct"
    context.sources = faq_result.source_payloads()
    yield from _finish_with_single_answer(context, history, query, direct_answer)
    return

yield status_event("正在匹配相关业务资料...", context.session_id)
doc_result = search_doc(context, prepared)
answer_prepared = prepare_answer(context, prepared, faq_result, doc_result)
context.sources = answer_prepared.sources
context.hit_type = answer_prepared.hit_type

if context.hit_type == "insufficient_context":
    answer = build_insufficient_context_answer(context)
    yield from _finish_with_single_answer(context, history, query, answer, record_save_stage=True)
    return

设计解释:FAQ 高置信直出可以省掉 LLM;文档 RAG 需要足够上下文;没有上下文时必须明确拒绝幻觉。

Step 4:构建上下文并流式生成

来源:真实代码逻辑压缩版,对应 qa_core/pipeline/rag.pyqa_core/pipeline/steps.pyqa_core/pipeline/context.pyqa_core/pipeline/citations.py

yield status_event("正在生成回答...", context.session_id)
with context.stage("llm_generation"):
    for chunk in stream_llm_answer(answer_prepared.system_prompt, answer_prepared.user_prompt):
        token = str(getattr(chunk, "content", "") or "")
        if not token:
            continue
        context.answer_parts.append(token)
        context.mark_first_token()
        yield token_event(token, context.session_id)

raw_answer = context.answer
answer = enforce_answer_citations(raw_answer, answer_prepared.context_docs)
if not answer:
    answer = f"信息不足,无法确认,请联系人工支持:{context.scenario.support_contact}。"
elif answer != raw_answer:
    extra_token = answer[len(raw_answer):] if answer.startswith(raw_answer) else answer
    yield token_event(extra_token, context.session_id)
    context.answer_parts = [answer]

设计解释:LLM 只看到筛选后的上下文;引用增强在生成后执行,保证答案可追溯。

Step 5:保存历史并记录 Trace

来源:真实代码逻辑压缩版,对应 qa_core/pipeline/rag.py::_finish_with_single_answer() 和 Stage 7。

with context.stage("save_history"):
    history.add_turn(context.session_id, query, answer)
yield finish_success(context, answer=answer)

# 对于 FAQ 直出、直接意图、信息不足这类已有完整答案的分支:
context.answer_parts = [answer]
context.mark_first_token()
yield token_event(answer, context.session_id)
history.add_turn(context.session_id, query, answer)
yield finish_success(context, answer=answer)

设计解释:历史要保存最终答案,而不是未补引用的中间答案;Trace 要记录命中路径、阶段耗时和检索分数。

Step 6:验收完整链路

验收方式:

来源:命令行验收,对应 scripts/api_e2e_smoke.py

python scripts/api_e2e_smoke.py

闭环验证重点:

验证项 输入场景 期望结果
事件协议 任意知识问答 能看到 start/status/token/end
FAQ 快速路径 高置信 FAQ 可直出答案并带来源
文档 RAG 长文档类问题 检索上下文后生成答案
信息不足 知识库无依据 返回 insufficient_context,不编造
引用增强 有上下文答案 答案包含来源引用
历史保存 连续对话 后续追问可读取历史

可复制的链路验证问题

以下问题基于默认业务场景 enterprise_knowledge。这些问题不是用来考察答案内容本身,而是用来观察一次请求在 Pipeline 中走到了哪个分支。页面右侧诊断信息中的 hit_typeFAQ 最高分Doc 最高分最慢阶段 可以辅助确认链路。

目标链路 可复制问题 预期路径 会不会调用 LLM 观察重点
问候直答 你好 decide_route()direct_answerhit_type=greeting 不会 只出现“正在进行查询路由...”,不检索 Milvus。
越界拦截 彩票怎么买才能中奖? decide_route()direct_answerhit_type=out_of_scope 不会 用场景边界直接拒绝,避免把无关问题送入知识库。
FAQ 直出 VPN 连不上应该怎么处理? FAQ 快速路径 / FAQ 高置信直出,hit_type=faq_direct 不会 返回 FAQ 标准答案,有来源,但跳过文档检索和 LLM。
完整 RAG 生成 VPN 客户端版本、账号锁定、公网 IP 这些排查项分别应该怎么处理?请按步骤说明。 retrieval → 意图识别 → FAQ 检索 → 文档检索 → 上下文构建 → LLM 生成,hit_type=rag 会看到“正在生成回答...”;FAQ 分数不足以直出,Doc 命中较高,最慢阶段通常是 llm_generation
表格资料 RAG 预算预审批里客户招待和紧急采购分别由谁预审批,补充说明是什么? 表格资料召回 → 上下文构建 → LLM 生成,hit_type=rag 重点观察来源中是否出现 budget_preapproval_matrix.xlsx 这类表格资料。
资料未直接覆盖 公司办公楼停车位申请流程是什么? 可能进入 rag,由 LLM 基于已召回上下文说明“资料未覆盖”;若无有效上下文则进入 insufficient_context 可能会 用来观察“不能编造”的约束:有上下文但不含答案时,回答应明确说明当前资料没有依据。

完整 RAG 生成的关键判断是:状态流中出现下面这一组阶段,并且最终 hit_type=rag

1
2
3
4
5
正在进行查询路由...
正在识别问题意图...
正在检索业务 FAQ 知识库...
正在匹配相关业务资料...
正在生成回答...

如果只出现“正在进行查询路由...”就结束,通常说明命中了问候、越界、转人工或 FAQ 快速直出;这类分支是 Pipeline 的快速路径,不是完整 RAG 生成路径。

通过标准:

  • 能看到 start/status/token/end 事件。
  • FAQ 快速路径、文档 RAG、信息不足至少各有可解释分支。
  • 最终答案带引用来源。
  • 右侧诊断或 Trace 能看到命中路径、阶段耗时和 top score。

重点掌握

优先级 内容 原因
★★★ 必会 Stage 0-7 主流程:创建上下文 → 查询路由 → 检索准备 → FAQ 检索 → 文档检索 → 上下文构建 → LLM 流式生成/引用增强 → 保存历史 RAG Pipeline 的完整骨架
★★★ 必会 route=faq_exact vs intent=FAQ_QUERY vs FAQ 标准直出的区别 route 是系统处理方式,intent 是用户意图,二者不要混淆
★★★ 必会 上下文构建(select_context_docs)的筛选策略:FAQ 前 2 条 → 分数过滤 → 去重 → 表格行优先 → 三重预算约束(条数/单条长度/总长度) 决定 LLM 看到的上下文质量
★★★ 必会 流式事件协议的五种事件类型(start/status/token/end/error)及前后端协作方式 理解浏览器如何实时展示 RAG 进度
★★ 理解 信息不足(insufficient_context)时明确告知用户,不让 LLM 即兴发挥 避免幻觉的重要安全机制
★★ 理解 引用增强(enforce_answer_citations):LLM 答案已有 [N] 则不重复,否则末尾补充前 3 个来源 保证答案可溯源
★★ 理解 阶段计时(RAGQueryContext.stage)追踪每个阶段耗时 性能优化的数据基础
★★ 理解 asyncio.to_thread 桥接同步 Generator 和异步 WebSocket Python 异步编程的关键模式
★ 了解 Pipeline 的模块化拆分:rag.py / runtime.py / steps.py / retrieval_steps.py / context.py / events.py / citations.py 了解文件职责划分

本讲小结

  • Pipeline > Chain:RAG 是有分支、有快慢路径的管道,不是固定步骤的链
  • 查询路由统一处理 direct_answer、faq_exact、retrieval,避免拆成两套“意图识别”
  • FAQ 精确命中route=faq_exact,不是新的 intent;它命中时仍携带 intent=FAQ_QUERY
  • 上下文构建依次执行:FAQ 补充 → 分数过滤 → 去重 → 优先表格 → 截断 → 格式化
  • 信息不足时明确告知用户,而不是让 LLM 在没有资料的情况下生成幻觉
  • 引用增强在 LLM 生成的答案后补充来源标注
  • 阶段计时追踪每个阶段的耗时,帮助定位性能瓶颈

下一讲Prompt 工程与 Profile 系统 — 提示词模板设计、Profile 选择策略、高风险问题安全约束