跳转至

第16讲:文档入库与索引链路

上一讲数据隔离与多租户设计 下一讲RAG 回归验收与入库质量

本讲目标

  • 理解离线入库链路和在线问答链路的边界
  • 掌握文档加载、标准化、切分的完整流程
  • 理解 MySQL IndexManifest 的增量入库机制
  • 掌握 FAQ 从 CSV 到 Milvus 的完整流程

📖 前置阅读:如果你想深入理解 Parent-Child Chunking 的设计原理和 chunk size 的选择依据,请先阅读 附录G:文档切分策略

本讲地图

本图对应本讲功能闭环,展示从输入到本讲交付物的主干路径。节点与主项目代码文件和函数保持一致,后续章节消费的能力只作为交付边界出现。

图 1:第 16 讲功能闭环地图

flowchart TD
    C16_INGEST["入库编排<br/>ingest_directory()"]
    C16_FILES["文件发现<br/>os.walk()"]
    C16_LOAD["多格式加载<br/>load_file()"]
    C16_TABLE["表格行文档<br/>load_table_file()"]
    C16_UTILS["稳定 ID 工具<br/>stable_hash() / file_fingerprint()"]
    C16_FINGER["文件指纹<br/>file_fingerprint()"]
    C16_MANIFEST["增量清单<br/>IndexManifest"]
    C16_META["来源 metadata<br/>format_source_label() / is_table_metadata()"]
    C16_REUSE["跨版本复用<br/>copy_documents_to_version()"]
    C16_NORMALIZE["元数据标准化<br/>normalize_documents()"]
    C16_CHUNK["文档切分<br/>split_documents()"]
    C16_CITE["引用兜底<br/>enforce_answer_citations()"]
    C16_WRITE["写入 store<br/>store.delete_ids() / add_documents()"]
    C16_FAQ["FAQ 入库<br/>faq_documents_from_csv() / ingest_faq_csv()"]
    C16_VERSION{{"版本记录<br/>record_ingest_result() / activate_version()"}}
    C16_INGEST --> C16_FILES
    C16_INGEST --> C16_MANIFEST
    C16_FILES --> C16_FINGER
    C16_FINGER --> C16_MANIFEST
    C16_MANIFEST -->|"跨版本未变化"| C16_REUSE
    C16_REUSE -->|"复制旧 chunk/dense"| C16_WRITE
    C16_FILES -->|"变化文件"| C16_LOAD
    C16_LOAD -->|"表格文件"| C16_TABLE
    C16_LOAD -->|"普通文档"| C16_NORMALIZE
    C16_TABLE --> C16_NORMALIZE
    C16_META --> C16_NORMALIZE
    C16_UTILS --> C16_CHUNK
    C16_NORMALIZE --> C16_CHUNK
    C16_CHUNK --> C16_WRITE
    C16_WRITE -->|"记录 chunk_id"| C16_MANIFEST
    C16_WRITE --> C16_VERSION
    C16_FAQ --> C16_VERSION
    C16_META -->|"在线引用"| C16_CITE
    style C16_INGEST fill:#F8FAFC,stroke:#64748B,stroke-width:2px
    style C16_FILES fill:#DBEAFE,stroke:#2563EB,stroke-width:2px
    style C16_LOAD fill:#F5F3FF,stroke:#7C3AED,stroke-width:2px
    style C16_TABLE fill:#FEF3C7,stroke:#D97706,stroke-width:2px
    style C16_UTILS fill:#DBEAFE,stroke:#2563EB,stroke-width:2px
    style C16_FINGER fill:#F5F3FF,stroke:#7C3AED,stroke-width:2px
    style C16_MANIFEST fill:#FEF3C7,stroke:#D97706,stroke-width:2px
    style C16_META fill:#DBEAFE,stroke:#2563EB,stroke-width:2px
    style C16_REUSE fill:#F5F3FF,stroke:#7C3AED,stroke-width:2px
    style C16_NORMALIZE fill:#FEF3C7,stroke:#D97706,stroke-width:2px
    style C16_CHUNK fill:#DBEAFE,stroke:#2563EB,stroke-width:2px
    style C16_CITE fill:#F5F3FF,stroke:#7C3AED,stroke-width:2px
    style C16_WRITE fill:#FEF3C7,stroke:#D97706,stroke-width:2px
    style C16_FAQ fill:#DBEAFE,stroke:#2563EB,stroke-width:2px
    style C16_VERSION fill:#DCFCE7,stroke:#16A34A,stroke-width:2px

节点与代码对齐

节点 对齐文件 函数/对象 本章职责
入库编排 qa_core/indexing/service.py ingest_directory() 串联场景、数据域、版本、加载、跨版本复用、切分和写入。
文件发现 qa_core/indexing/service.py os.walk() 遍历目录文件,并在单文件处理阶段校验 LoaderSpec。
多格式加载 qa_core/indexing/document_loaders.py load_file() 按后缀选择文本、PDF、Word、PPT 或表格 loader。
表格行文档 qa_core/indexing/table_documents.py load_table_file() 把 CSV/Excel 每一行转换为保留表头、sheet 和行号的 Document。
稳定 ID 工具 qa_core/utils.py stable_hash() / file_fingerprint() 统一生成 doc_id、chunk_id、faq_id 和文件指纹。
文件指纹 qa_core/utils.py file_fingerprint() 计算 SHA256 文件指纹。
增量清单 qa_core/indexing/manifest.py IndexManifest 用 MySQL 记录文件指纹和 chunk_id,判断目标版本跳过、基准版本复用或变化重建。
来源 metadata qa_core/document_metadata.py format_source_label() / is_table_metadata() 统一普通文档和表格行的来源标签。
跨版本复用 qa_core/retrieval/store.py copy_documents_to_version() 基于 active/base 版本复制未变化 chunk 和 dense 向量到新版本,不重新调用 embedding。
元数据标准化 qa_core/indexing/document_normalizer.py normalize_documents() 补 source、kb_version、scenario_id 和 data_scope。
文档切分 qa_core/indexing/chunking.py split_documents() 普通文本做 parent-child 切分,表格行保持完整语义单元。
引用兜底 qa_core/pipeline/citations.py enforce_answer_citations() 模型漏写来源时补充参考来源和表格行要点。
写入 store qa_core/indexing/service.py store.delete_ids() / add_documents() 变化文件先清理旧 chunk,再写入新 chunk;未变化文件走跨版本复用。
FAQ 入库 qa_core/indexing/faq_ingestion.py faq_documents_from_csv() / ingest_faq_csv() 把 FAQ CSV 写入 FAQ store。
版本记录 qa_core/governance/kb_versions.py record_ingest_result() / activate_version() 记录入库统计并可激活版本。

第一部分:前置知识 — 离线 vs 在线链路

1.1 清晰的工程边界

1
2
3
4
5
6
7
离线链路(入库)                      在线链路(问答)
─────────────────                    ─────────────────
定时/手动执行                         每次用户提问时执行
修改 Milvus 数据                      只读 Milvus 数据
可以慢(几分钟到几十分钟)            必须快(秒级响应)
可以重试、可以回滚                    必须一次成功
解析文件、切分、向量化                只做检索、不解析文件

关键原则:在线问答不解析文件、不执行 OCR、不写入知识库。

1.2 为什么分开

如果把文档解析放在在线链路: - 用户提问时临时解析 PDF → 首 token 延迟增加 5-10 秒 - 文件解析失败时用户看到的是"PDF 损坏"而非答案 - 无法做质量报告(因为解析是即时的,没有机会检查)

如果把向量化放在在线链路: - 用户问题需要等 Embedding 模型加载(冷启动 10+ 秒) - 无法预热 Embedding 模型

1.3 知识库构建总链路

本项目的离线知识库构建不是单独的“文档切分”或“FAQ 入库”,而是一条完整的版本化构建链路。可以用一句话概括:

离线链路负责把原始资料变成“带版本、带权限、可检索、可回滚”的知识资产;在线链路只读取当前 active 版本来回答问题。

生产、本地验证或资料发布时有两个常用入口:

入口 用途 适合场景
scripts/rebuild_scenarios.py 一次初始化/重建全部 8 个冻结场景 新环境初始化、统一准备、Milvus schema 变更后全量修复
scripts/rebuild_kb_version.py 只重建单个业务场景 只修改了某个场景资料、验证单场景入库、定位某个场景问题

如果在 Docker Compose 里执行入库命令,先确认项目根目录存在 .env.compose。仓库只提交 .env.compose.example,首次部署需要生成本地配置文件:

if (!(Test-Path .env.compose)) { Copy-Item .env.compose.example .env.compose }
notepad .env.compose

如果是新环境,或者 Milvus collection schema 变更后需要全量修复,优先使用批量脚本:

python scripts/rebuild_scenarios.py --reset-collections

它会对 8 个冻结场景逐个执行“新建版本 → 强制入库 → 质量门禁 → 激活”,并在 --reset-collections 开启时删除旧 FAQ/Doc collection,确保 Milvus schema 按当前代码重新创建。

在 Docker Compose 模式下,对应命令是:

1
2
3
docker compose --env-file .env.compose up -d mysql etcd minio milvus
docker compose --env-file .env.compose build api
docker compose --env-file .env.compose run --rm api python scripts/rebuild_scenarios.py --reset-collections

如果之前已经存在知识库,只是资料内容变化,批量重建全部 8 个场景时不加 --reset-collections

docker compose --env-file .env.compose run --rm api python scripts/rebuild_scenarios.py

如果只重建一个场景,使用 scripts/rebuild_kb_version.py

python scripts/rebuild_kb_version.py --scenario enterprise_knowledge --new-version --force --quality-gate --activate

Docker Compose 模式下,对应命令是:

docker compose --env-file .env.compose run --rm api python scripts/rebuild_kb_version.py --scenario enterprise_knowledge --new-version --force --quality-gate --activate

企业中更常见的日常资料更新方式,是“构建阶段增量,查询阶段完整版本”。如果当前 active 版本已经存在,且只是少量文件变化,可以创建新候选版本并基于 active 做跨版本增量构建:

python scripts/rebuild_kb_version.py --scenario enterprise_knowledge --new-version --incremental-from active --quality-gate --activate

这条命令的语义是:FAQ 仍按新版本重建;文档先读取 active 版本的 MySQL IndexManifest,未变化文件通过 copy_documents_to_version() 复制旧版本 chunk 和 dense 向量到新版本,变化文件才重新加载、切分、embedding。在线查询仍然只查新的 active 版本,不在查询阶段叠加 base + delta。

--incremental-from 不能和 --force--reset-collections 同时使用:--force 表示全部重算,--reset-collections 会删掉可复用的旧向量。

企业大规模优化:引用式增量版本

当前项目采用的是物理复制式增量:未变化文件不重新 embedding,但会把旧版本的 chunk 文本和 dense 向量复制一份到新版本。这样做的好处是实现路径清晰、查询简单,线上检索仍然只需要:

kb_version == 当前 active version

但如果企业知识库已经有百万级 chunk,每次只改少量文件,却复制大量未变化 chunk,就会带来空间占用和插入耗时。因此大规模企业系统常见的升级方向是引用式增量版本

引用式增量的核心思想是:未变化 chunk 不复制,新版本只记录“从哪个基线版本继承、哪些文件新增、哪些文件修改、哪些文件删除”。查询时通过有效期字段判断当前版本能看到哪些 chunk。

一种典型 metadata 设计如下:

字段 含义
scenario_id 业务场景
doc_id 稳定文件 ID
chunk_id 稳定 chunk ID
file_fingerprint 文件内容指纹
valid_from_seq 从哪个版本序号开始有效
valid_to_seq 到哪个版本序号前失效,0 表示仍然有效

假设 active 版本序号是 8,检索表达式会从简单的 kb_version == active 升级为:

1
2
3
scenario_id == "enterprise_knowledge"
and valid_from_seq <= 8
and (valid_to_seq == 0 or valid_to_seq > 8)

增量构建时:

文件状态 引用式增量处理方式
未变化 不写 Milvus,只继续引用旧 chunk
新增 插入新 chunk,valid_from_seq = 当前版本序号
修改 旧 chunk 写 valid_to_seq = 当前版本序号,新 chunk 写 valid_from_seq = 当前版本序号
删除 旧 chunk 写 valid_to_seq = 当前版本序号,作为 tombstone 处理

下面用一个具体例子看引用式增量怎么工作。假设企业知识库里有三份资料:

1
2
3
hr_onboarding.md      入职流程
it_vpn.md             VPN 处理
finance_expense.md    报销流程

版本 v1:首次全量入库

第一次入库时,三个文件都写入 Milvus:

chunk_id 文件 内容摘要 valid_from_seq valid_to_seq
hr_c1 hr_onboarding.md 入职需要提交身份证、银行卡、合同信息 1 0
it_c1 it_vpn.md VPN 连不上先检查账号、网络和 MFA 1 0
fin_c1 finance_expense.md 报销流程包括提交单据、审批、财务复核 1 0

此时 active 版本序号是 1,查询表达式是:

valid_from_seq <= 1
and (valid_to_seq == 0 or valid_to_seq > 1)

能查到:

hr_c1, it_c1, fin_c1

版本 v2:只修改 VPN 文档

后来 IT 更新了 VPN 文档,新增了“客户端版本检查”的要求。引用式增量不会复制 HR 和财务 chunk,只处理变化文件:

操作 chunk_id 文件 valid_from_seq valid_to_seq 说明
标记旧 chunk 失效 it_c1 it_vpn.md 1 2 v2 开始不再使用旧 VPN 口径
插入新 chunk it_c2 it_vpn.md 2 0 新 VPN 口径从 v2 开始有效

Milvus 中现在一共有四条 chunk:

chunk_id 文件 valid_from_seq valid_to_seq
hr_c1 hr_onboarding.md 1 0
it_c1 it_vpn.md 1 2
it_c2 it_vpn.md 2 0
fin_c1 finance_expense.md 1 0

如果 active 版本序号切到 2,查询表达式是:

valid_from_seq <= 2
and (valid_to_seq == 0 or valid_to_seq > 2)

能查到:

hr_c1, it_c2, fin_c1

注意:hr_c1fin_c1 没有复制一份到 v2,但它们仍然有效,因为 valid_to_seq = 0

版本 v3:删除财务报销文档,新增差旅文档

再后来财务删除旧的报销流程文档,并新增差旅规则文档:

删除:finance_expense.md
新增:finance_travel.md

引用式增量处理如下:

操作 chunk_id 文件 valid_from_seq valid_to_seq 说明
标记旧 chunk 失效 fin_c1 finance_expense.md 1 3 v3 起旧报销资料不再可见
插入新 chunk fin_travel_c1 finance_travel.md 3 0 差旅规则从 v3 起生效

如果 active 版本序号切到 3,有效 chunk 是:

hr_c1, it_c2, fin_travel_c1

完整状态表如下:

chunk_id 文件 valid_from_seq valid_to_seq v1 可见 v2 可见 v3 可见
hr_c1 hr_onboarding.md 1 0
it_c1 it_vpn.md 1 2
it_c2 it_vpn.md 2 0
fin_c1 finance_expense.md 1 3
fin_travel_c1 finance_travel.md 3 0

这个例子说明了引用式增量的关键点:

  1. 未变化资料不复制,例如 hr_c1 从 v1 一直被 v2、v3 复用。
  2. 修改资料不是覆盖旧 chunk,而是让旧 chunk 在新版本前失效,再插入新 chunk。
  3. 删除资料不是立刻物理删除,而是写 valid_to_seq,让它从某个版本开始不可见。
  4. 回滚时只需要把 active 版本序号从 3 切回 2fin_c1 又会重新可见。

这种方案节省空间,也更适合大规模知识库,但实现复杂度更高:检索表达式、版本回滚、质量报告、垃圾回收和调试诊断都要理解“有效版本视图”。所以本课程主项目先采用物理复制式增量,把版本发布链路跑清楚;引用式增量作为企业规模化优化方向理解即可。

如果 Milvus Collection 的 schema 发生过变化,例如 sparse 字段从普通 SparseVector 改成 BM25 Function 输出字段,需要加上 --reset-collections 删除旧集合并重新建表:

python scripts/rebuild_kb_version.py --scenario enterprise_knowledge --new-version --force --reset-collections --quality-gate --activate

这里要区分两个参数:--force 只是忽略文件指纹、强制把资料重新写入新版本;--reset-collections 会删除 Milvus 里的 FAQ/Doc collection,让当前代码重新创建 schema。已有知识库只更新资料时,用 --force,不要默认加 --reset-collections

完整链路如下:

flowchart TD
    Start(["执行 rebuild_kb_version.py"]) --> Scenario["解析 scenario.toml<br/>确定 FAQ/Doc collection、数据目录、权限默认值"]
    Scenario --> Reset{"是否传入<br/>--reset-collections?"}
    Reset -->|是| Drop["删除旧 FAQ/Doc collection<br/>用于 schema 变更后的重建"]
    Reset -->|否| Version
    Drop --> Version["创建或确认 STAGED 知识库版本<br/>KnowledgeBaseVersionStore"]

    Version --> FAQ["FAQ 入库<br/>CSV 问答对 -> Document<br/>question=text, answer=metadata"]
    Version --> Docs["文档入库<br/>Loader -> 标准化 -> 父子块切分"]

    FAQ --> Milvus["写入 Milvus<br/>text + dense + sparse(BM25 Function) + metadata"]
    Docs --> Milvus

    Milvus --> Manifest["更新 MySQL IndexManifest<br/>记录文件指纹与 chunk_id"]
    Manifest --> Report["生成入库质量报告<br/>空文件、低质量 chunk、重复 FAQ、FAQ/文档冲突"]
    Report --> Gate{"quality gate<br/>是否通过?"}
    Gate -->|否| Staged["保留 STAGED 版本<br/>不影响线上 ACTIVE 版本"]
    Gate -->|是 + --activate| Active["激活新版本<br/>当前场景 active_kb_version 指向新版本"]
    Active --> Online["在线问答只读 active 版本<br/>检索时追加 kb_version 过滤条件"]

这条链路里有几个容易混淆的点:

环节 作用 解释
scenario.toml 定义业务场景、collection、数据目录、默认权限 先确定“这次给哪个业务场景建知识库”
--new-version 创建一个新的 STAGED 版本 新资料先进入候选版本,不直接覆盖线上
--force 强制重新处理数据 用于需要重新入库时跳过增量判断
--incremental-from active 基于当前 active 版本做跨版本增量构建 未变化文档复制旧 chunk/dense 向量,变化文档重新处理,查询仍只查新 active 版本
--reset-collections 删除并重建 Milvus collection 只在 schema 变化或旧集合不兼容时使用
FAQ 入库 把 CSV 问答对写入 FAQ collection 适合标准问答、政策口径、固定流程
文档入库 把 PDF/Word/Markdown/表格等切成 chunk 写入 Doc collection 适合长文档、制度、合同、手册
IndexManifest 在 MySQL 中记录文件指纹和 chunk ID 下次入库时判断文件是否变化,避免重复处理
质量报告 统计入库质量问题 把“知识库是否可靠”变成可检查的数据
--quality-gate 质量门禁 报告不达标就不激活,保护线上问答
--activate 激活版本 只有通过门禁的新版本才进入在线链路

所以第 16 讲后面的 FAQ 入库、文档加载、表格行入库、父子块切分、MySQL IndexManifest,都是这条总链路中的局部实现;第 14 讲负责解释版本状态机,第 17 讲负责解释质量门禁和评测。


第二部分:文档加载器注册表

2.1 注册表设计

# qa_core/indexing/document_loaders.py

# Loader 注册表:后缀 → 加载器工厂
DOCUMENT_LOADER_SPECS: tuple[DocumentLoaderSpec, ...] = (
    DocumentLoaderSpec(
        suffixes=(".txt", ".md"),
        factory=_utf8_text_loader,
        description="UTF-8 文本/Markdown"
    ),
    DocumentLoaderSpec(
        suffixes=(".pdf",),
        factory=_pdf_loader,
        description="PDF 文档"
    ),
    DocumentLoaderSpec(
        suffixes=(".docx",),
        factory=_docx_loader,
        description="Word 文档"
    ),
    DocumentLoaderSpec(
        suffixes=(".pptx", ".ppt"),
        factory=lambda p: UnstructuredPowerPointLoader(str(p)),
        description="PowerPoint"
    ),
    DocumentLoaderSpec(
        suffixes=(".csv", ".xlsx", ".xls"),
        factory=load_table_file,
        description="表格文件 — 按行解析"
    ),
)

DOCUMENT_LOADER_REGISTRY: dict[str, DocumentLoaderSpec] = {
    suffix: spec
    for spec in DOCUMENT_LOADER_SPECS
    for suffix in spec.suffixes
}

def get_document_loader_spec(path: Path) -> DocumentLoaderSpec | None:
    """根据文件后缀获取加载器注册项。"""
    return DOCUMENT_LOADER_REGISTRY.get(path.suffix.lower())

2.2 扩展性

新增文件格式只需添加一个注册项:

1
2
3
4
5
6
# 新增:支持 .html 文件
DOCUMENT_LOADER_REGISTRY[".html"] = DOCUMENT_LOADER_REGISTRY[".htm"] = DocumentLoaderSpec(
    suffixes=(".html", ".htm"),
    factory=lambda p: UnstructuredHTMLLoader(str(p)),
    description="HTML 网页"
)

注册表模式比 if/elif 分支更可维护。当文件类型增长时,if/elif 会变成几百行难以维护的代码。

2.3 为什么没有直接使用 LlamaIndex 入库流水线

LlamaIndex 的 SimpleDirectoryReader 可以快速读取本地目录文件,IngestionPipeline 可以把 transformations、embedding、缓存和向量库写入串起来。这些能力适合快速搭建 RAG 数据接入原型,也适合作为企业项目后续优化方向。

但本课程主项目没有把 LlamaIndex 接入第 16 讲主代码,原因是本章要讲清楚的是企业知识库入库治理,而不只是“把文件变成向量”。

对比点 本项目当前实现 如果直接换成 LlamaIndex
学习主线 load_file -> normalize_documents -> split_documents -> add_documents -> manifest -> quality report 每一步都和代码对齐 会额外引入 Document / Node / Index / QueryEngine / IngestionPipeline 等概念
版本治理 每个 chunk 显式写入 scenario_id / kb_version / source / DataScope 仍需要自己把这些 metadata 接回企业治理链路
增量机制 MySQL IndexManifest 明确记录文件指纹和 chunk_id LlamaIndex cache 能减少重复 transformation,但不能直接替代 active 版本发布和质量门禁
表格资料 自定义 table_loader.py 按表头、工作表、行号生成行级 Document 通用 loader 未必能保留本项目需要的业务 metadata 粒度
质量报告 入库后生成空文件、重复 chunk、FAQ 冲突、OCR 风险等报告 质量指标仍然要由项目自己定义和落库

因此当前代码保持轻量显式实现:文件加载使用注册表,切分和向量化使用 LangChain/Milvus,版本和质量治理由 qa_core 自己掌控。这样做的好处是每个字段为什么存在、每个步骤为什么执行,都能和后续检索、评测、回滚闭环对齐。

后续如果要引入 LlamaIndex,建议只作为局部替代:

可替代:文档读取、预处理 transformations、部分缓存
不替代:scenario、source、kb_version、DataScope、质量门禁、active 指针、线上检索策略

当前代码和依赖保持一致:requirements.txt 不包含 llama-indexqa_core 主链路也不导入 llama_index


第三部分:文档入库主流程

3.1 ingest_directory() 完整流程

flowchart TD
    Start(["ingest_directory()<br/>目录路径 + 场景 + 版本"]) --> Version["📋 创建/确认 KB 版本<br/>KnowledgeBaseVersionStore"]

    Version --> Loop["📂 遍历目录文件"]

    Loop --> Ext{"文件后缀<br/>在注册表中?"}

    Ext -->|"❌"| Skip1["⚠️ 跳过<br/>不支持的文件类型"]
    Ext -->|"✅"| Fingerprint["🔍 计算 SHA256 指纹"]

    Fingerprint --> Check{"Manifest 中<br/>指纹未变化?<br/>且非 force 模式"}

    Check -->|"✅ 未变化"| Skip2["⏭️ 增量跳过<br/>不重复入库"]
    Check -->|"❌ 已变化/新文件"| Load["📄 DocumentLoader 加载<br/>PDF→PyMuPDF<br/>MD→TextLoader<br/>XLSX→TableLoader"]

    Load --> Normalize["🏷️ normalize_documents<br/>补充 source/kb_version/<br/>tenant_id/data_scope"]

    Normalize --> Split["✂️ split_documents<br/>Markdown标题增强<br/>父子块切分"]

    Split --> Delete["🗑️ 删除旧 chunk_ids<br/>(如果存在)"]

    Delete --> Write["💾 Milvus add_documents<br/>BGE-M3 生成 Dense 向量<br/>Milvus 生成 BM25 Sparse"]

    Write --> Manifest["📝 更新 MySQL IndexManifest<br/>记录指纹+chunk_ids"]

    Manifest --> Loop

    Skip1 --> Loop
    Skip2 --> Loop

    Loop --> Done(["✅ 返回写入总数"])

    style Load fill:#EFF6FF,stroke:#2563EB,stroke-width:2px
    style Split fill:#ECFDF5,stroke:#059669,stroke-width:2px
    style Write fill:#FFFBEB,stroke:#D97706,stroke-width:2px
    style Done fill:#ECFDF5,stroke:#059669,stroke-width:3px
# qa_core/indexing/service.py
def ingest_directory(
    directory_path: str,
    source: str | None = None,
    *,
    scenario_id: str | None = None,
    tenant_id: str | None = None,
    dataset_id: str | None = None,
    visibility: str | None = None,
    allowed_roles: list[str] | None = None,
    force: bool = False,
    kb_version: str | None = None,
    create_new_version: bool = False,
    activate: bool = False,
    description: str = "",
) -> int:
    """把目录中的业务文档增量写入 Milvus,逐文件委托 _ingest_single_file 处理。"""

    # Step 1:解析场景、构建数据域、确定业务分类
    scenario = resolve_scenario(scenario_id)
    data_scope = resolve_data_scope(
        tenant_id=tenant_id, dataset_id=dataset_id,
        visibility=visibility, user_roles=allowed_roles,
    )
    root = Path(directory_path)
    resolved_source = source or normalize_source_from_path(root)
    if resolved_source not in scenario.valid_sources:
        raise ValueError(f"无效的业务分类:{resolved_source}")

    # Step 2:创建/确认知识库版本
    version_store = get_kb_version_store(scenario.scenario_id)
    version = version_store.ensure_version(
        kb_version, create_new=create_new_version,
        description=description, created_by="ingest_directory",
    )
    active_kb_version = version.kb_version

    # Step 3:打开增量清单 + 文档存储
    manifest = IndexManifest()
    doc_store = get_doc_store(scenario.doc_collection)

    # Step 4:遍历目录,逐个文件委托给 _ingest_single_file
    total_chunks = 0
    skipped_files = 0
    for current_root, _, files in os.walk(root):
        for file_name in files:
            path = Path(current_root) / file_name
            chunks, skipped = _ingest_single_file(
                path, resolved_source, active_kb_version, scenario,
                data_scope, allowed_roles, doc_store, manifest, force,
            )
            if skipped:
                skipped_files += 1
            else:
                total_chunks += chunks

    # Step 5:记录入库统计
    version_store.record_ingest_result(
        active_kb_version, content_type="doc",
        count=total_chunks, source=resolved_source,
    )

    # Step 6:可选激活版本(需要 --activate 参数)
    if activate:
        version_store.activate_version(active_kb_version)

    return total_chunks

_ingest_single_file() 负责单个文件的增量入库逻辑,被 ingest_directory 的循环调用:

def _ingest_single_file(
    path, resolved_source, active_kb_version, scenario,
    data_scope, allowed_roles, doc_store, manifest, force,
) -> tuple[int, bool]:
    """处理单个文件:未变化则跳过,否则删除旧 chunk 后重新入库。"""
    if get_document_loader_spec(path) is None:
        raise ValueError(f"不支持的文档类型:{path}")
    fingerprint = file_fingerprint(path)
    existing = manifest.get(
        resolved_source, path, active_kb_version, scenario.scenario_id
    )
    # 指纹、Embedding 模型版本、chunk schema 均未变化时跳过
    if (
        not force
        and existing
        and existing.fingerprint == fingerprint
        and existing.embedding_model_version == get_settings().embedding_model_version
        and existing.chunk_schema_version == get_settings().chunk_schema_version
    ):
        return 0, True

    # 存在旧 chunk 时先删除,再重新加载、标准化、切分
    if existing and existing.chunk_ids:
        doc_store.delete_ids(existing.chunk_ids)
    docs = normalize_documents(
        load_file(path), path, resolved_source,
        active_kb_version, scenario.scenario_id,
        data_scope, allowed_roles,
    )
    chunks, ids = split_documents(docs)
    if chunks:
        doc_store.add_documents(chunks, ids=ids)
        manifest.update(
            resolved_source, path, fingerprint, ids,
            scenario_id=scenario.scenario_id,
            kb_version=active_kb_version,
            embedding_model_version=get_settings().embedding_model_version,
            chunk_schema_version=get_settings().chunk_schema_version,
        )
        return len(chunks), False
    return 0, False

3.2 normalize_documents 的作用

def normalize_documents(
    documents: list[Document],
    file_path: Path,
    source: str,
    kb_version: str | None = None,
    scenario_id: str | None = None,
    data_scope: DataScope | None = None,
    allowed_roles: list[str] | None = None,
) -> list[Document]:
    """为文档补充项目标准元数据,供过滤和引用使用。"""
    doc_id = file_fingerprint(file_path)
    scenario = resolve_scenario(scenario_id)
    scope = data_scope or resolve_data_scope()
    version_meta = version_metadata(kb_version, scenario.scenario_id)
    normalized: list[Document] = []
    for index, doc in enumerate(documents):
        metadata = dict(doc.metadata or {})
        metadata.update(
            {
                "source": source,
                "scenario_id": scenario.scenario_id,
                **scope.metadata(allowed_roles=allowed_roles),
                "file_path": str(file_path),
                "file_name": file_path.name,
                "file_type": file_path.suffix.lower(),
                "doc_id": doc_id,
                "page_index": metadata.get("page", index),
                "content_type": metadata.get("content_type") or "text",
                **version_meta,
            }
        )
        normalized.append(Document(page_content=doc.page_content, metadata=metadata))
    return normalized

第四部分:表格 CSV / Excel 专用入库设计

4.1 为什么表格不能按普通文本切分

普通制度、流程、手册是一段段自然语言,适合用 Parent-Child Chunking 按章节和字符长度切分。

但 CSV / Excel 表格不是自然段,而是一条条行记录。一行里多个单元格共同表达一个完整业务事实:

1
2
3
4
材料名称=施工照片
状态=待补交
责任人=项目经理
截止日期=2026-05-30

如果把表格当普通文本递归切分,可能出现:

  • 检索命中了“施工照片”,但状态被切到另一个 chunk;
  • 检索命中了“金额”,但付款节点、责任人丢失;
  • 两行不同记录被拼到同一个 chunk,答案把 A 行状态说成 B 行状态;
  • 答案引用只能定位到文件,不能定位到工作表和行号。

所以本项目对表格资料的原则是:

一行表格 = 一个完整业务语义单元。

4.2 文件读取策略

表格文件在 Loader 注册表中作为独立类型接入:

1
2
3
4
5
6
# qa_core/indexing/document_loaders.py
DocumentLoaderSpec(
    suffixes=(".csv", ".xlsx", ".xls"),
    factory=_table_loader,
    description="CSV/Excel 表格解析;按行保留表头、sheet 和单元格键值。",
)

读取规则:

文件类型 读取方式 说明
.csv pandas.read_csv(..., encoding="utf-8-sig") 兼容带 BOM 的中文 CSV
.xlsx pandas.read_excel(..., sheet_name=None, engine="openpyxl") 一次读取全部工作表
.xls pandas.read_excel(..., sheet_name=None, engine="xlrd") 兼容旧版 Excel

Excel 会逐个 sheet 处理,避免把多个业务表混成一张表。

4.3 表格清洗

表格入库前先做轻量清洗:

def _normalize_frame(frame: pd.DataFrame) -> pd.DataFrame:
    """清理表格空行空列,并把缺失表头补成稳定列名。"""
    data = frame.dropna(how="all").dropna(axis=1, how="all").fillna("")
    columns = []
    for index, column in enumerate(data.columns, start=1):
        name = str(column).strip()
        if not name or name.lower().startswith("unnamed:"):
            name = f"列{index}"
        columns.append(name)
    data.columns = columns
    return data

清洗目标不是复杂 ETL,而是保证表格行进入 RAG 时不会因为空行、空列表头、Unnamed 列名造成检索噪声。

单元格值也会转成适合检索的短文本:

1
2
3
4
5
def _cell_text(value: object) -> str:
    text = str(value).strip()
    if text.endswith(".0") and text[:-2].isdigit():
        return text[:-2]
    return text

这样 1000.0 会变成 1000,金额、编号、数量类问题更容易命中。

4.4 每行转换为 Document

表格 loader 会把每一行转换成一个 LangChain Document

content = "\n".join(
    [
        f"表格文件:{path.name}",
        f"工作表:{sheet_name}",
        f"表头:{' / '.join(headers)}",
        f"行号:{row_number}",
        "单元格:",
        *cell_lines,
    ]
)

生成后的正文类似:

1
2
3
4
5
6
7
8
9
表格文件:验收清单.xlsx
工作表:材料验收
表头:材料名称 / 状态 / 责任人 / 截止日期
行号:3
单元格:
- 材料名称:施工照片
- 状态:待补交
- 责任人:项目经理
- 截止日期:2026-05-30

这样做有两个好处:

  1. 语义完整:同一行的字段和值不会被拆散;
  2. 适合向量检索和 BM25:既有自然语言标签,也有明确的列名和值。

4.5 metadata 设计

表格行必须携带可追溯 metadata:

1
2
3
4
5
6
7
8
9
metadata={
    "content_type": "table_row",
    "table_id": table_id,
    "sheet_name": str(sheet_name),
    "row_number": row_number,
    "row_count": len(normalized),
    "column_count": len(headers),
    "table_headers": " | ".join(headers),
}

字段含义:

字段 作用
content_type=table_row 告诉切分、质量检测、检索上下文:这是表格行
table_id 标识同一个文件下的同一个工作表
sheet_name 支持答案引用到具体工作表
row_number 支持答案引用到具体行
row_count / column_count 质量报告和容量评估使用
table_headers 帮助回看表结构,也便于后续扩展表头召回

4.6 表格行不再递归切分

split_documents() 会识别 content_type=table_row

1
2
3
4
5
# qa_core/indexing/chunking.py
if is_table_metadata(doc.metadata):
    parent_docs = [doc]
else:
    parent_docs = parent_splitter.split_documents([doc])

也就是说,表格行不会再进入普通字符切分器。

原因是:表格行已经是完整业务单元,再切一次反而会破坏“列名 -> 单元格值”的关系。

4.7 检索策略中的 prefer_table

表格入库只是第一步。检索时还要识别用户是否在问表格问题。

本项目通过 is_table_query() 判断问题是否包含表格、清单、台账、字段、行号、工作表、状态、金额、责任人等表达:

prefer_table = is_table_query(compact_query)
params = _apply_table_preference(prefer_table, params["run_doc"], params, settings)

prefer_table=True 时:

  • 扩大 doc_top_k,多召回一些候选表格行;
  • 扩大 final_context_top_n,给表格证据更多上下文空间;
  • 设置 faq_direct_exact_only=True,禁止相似 FAQ 直接回答;
  • 上下文构建时把表格行排在普通正文前。

为什么要禁用相似 FAQ 直出?

1
2
3
4
5
6
用户问:验收材料清单里测试报告那一行是什么状态?
相似 FAQ:验收需要提交哪些材料?

这两个问题都包含“验收”“材料”“测试报告”,相似度可能不低。
但 FAQ 回答的是材料范围,用户问的是某一行字段值。
所以表格类问题只允许精确 FAQ 直出,相似 FAQ 必须让位给文档 RAG。

4.8 答案引用和兜底

表格资料的答案必须能回到原始证据。当前项目在来源标签中追加工作表和行号:

[1] 验收清单.xlsx / 工作表:材料验收 / 第 3 行

另外,表格类问题经常涉及状态、金额、责任人、日期等精确值。LLM 有时会概括回答而漏掉某个关键单元格,所以项目里增加了表格行兜底:

def enforce_table_row_details(answer: str, context_docs: list[Document]) -> str:
    """确保表格类答案在模型遗漏关键单元格时,确定性追加表格行要点。"""

如果模型回答没有覆盖表格行里的核心字段,系统会追加:

表格行要点:状态:待补交;责任人:项目经理 [1]

这不是替代 LLM,而是对表格精确字段的一层确定性保护。

4.9 工程口径总结

Excel 和 CSV 入库可以概括为:

Excel 和 CSV 不能按普通文本切分。我们把每一行转成一个带表头、工作表、行号和单元格键值的 LangChain Document,并写入 content_type=table_row。切分阶段识别到表格行后不会再递归切分;检索阶段如果问题命中表格、清单、台账、金额、状态等关键词,会启用 prefer_table,扩大文档召回并优先保留表格行。答案引用会展示文件、工作表和行号,如果模型漏掉关键单元格,系统会追加表格行要点,保证表格类问题能追溯、能复核、字段不丢。

4.10 表格入库练习

这组练习用于确认“表格读取 → 行级 Document → 检索偏好 → 答案引用”已经闭环。

准备一个最小 CSV:

1
2
3
4
材料名称,状态,责任人,截止日期,备注
施工图纸,已提交,设计负责人,2026-05-10,版本为 V3
隐蔽工程照片,待补交,项目经理,2026-05-18,缺少二层西侧照片
验收测试报告,已通过,质量负责人,2026-05-20,检测编号 QA-2026-021

下面这段代码用于在本地快速验证表格 loader 和切分策略。它不连接 Milvus,也不会改动线上知识库,只检查三件事:

  • load_table_file() 是否把 CSV 每一行转换成一个 Document
  • split_documents() 是否保持表格行完整,不再递归切分;
  • is_table_query() 是否能把表格类问题识别为 prefer_table=True
from pathlib import Path

from qa_core.indexing.chunking import split_documents
from qa_core.indexing.table_documents import load_table_file
from qa_core.intent.question_category import is_table_query


csv_path = Path("reports/table_practice/acceptance_material_checklist.csv")
csv_path.parent.mkdir(parents=True, exist_ok=True)
csv_path.write_text(
    "\n".join(
        [
            "材料名称,状态,责任人,截止日期,备注",
            "施工图纸,已提交,设计负责人,2026-05-10,版本为 V3",
            "隐蔽工程照片,待补交,项目经理,2026-05-18,缺少二层西侧照片",
            "验收测试报告,已通过,质量负责人,2026-05-20,检测编号 QA-2026-021",
        ]
    ),
    encoding="utf-8-sig",
)

documents = load_table_file(csv_path)
print("行级 Document 数量:", len(documents))
print("第一条 Document 正文:")
print(documents[0].page_content)
print("第一条 Document metadata:")
print(documents[0].metadata)

chunks, ids = split_documents(documents)
print("切分后 chunk 数量:", len(chunks))
print("chunk_id 示例:", ids[0])
print("第二条 chunk 正文:")
print(chunks[1].page_content)
print("第二条 chunk metadata:")
print(chunks[1].metadata)

query = "验收清单里隐蔽工程照片是什么状态,责任人是谁?"
print("是否表格类问题:", is_table_query(query))

运行时应该看到:

1
2
3
行级 Document 数量: 3
切分后 chunk 数量: 3
是否表格类问题: True

第二条 chunk 的正文应该仍然保留完整行记录,类似:

表格文件:acceptance_material_checklist.csv
工作表:csv
表头:材料名称 / 状态 / 责任人 / 截止日期 / 备注
行号:2
单元格:
- 材料名称:隐蔽工程照片
- 状态:待补交
- 责任人:项目经理
- 截止日期:2026-05-18
- 备注:缺少二层西侧照片

metadata 中至少要看到这些字段:

1
2
3
4
5
6
7
{
  "content_type": "table_row",
  "sheet_name": "csv",
  "row_number": 2,
  "row_count": 3,
  "column_count": 5
}

如果要把这个 CSV 真正放进知识库,可以把文件移动到某个场景的数据目录,例如:

scenarios/engineering_project_qa/data/quality_data/acceptance_material_checklist.csv

然后执行单场景重建:

python scripts/rebuild_kb_version.py --scenario engineering_project_qa --new-version --force --quality-gate --activate --description "table row ingestion practice"

入库后可以用检索诊断或页面提问:

验收清单里隐蔽工程照片是什么状态,责任人是谁?

期望链路是:

1
2
3
4
5
6
7
问题命中 prefer_table=True
文档检索优先保留 table_row
答案引用能定位到 CSV 文件、工作表 csv、第 2 行
如果模型漏掉状态或责任人,后处理追加表格行要点

建议把它放到工程项目资料问答场景的数据目录中,并按常规知识库重建流程入库。重点观察四件事:

检查点 期望结果 为什么检查
入库后的 metadata 包含 content_type=table_rowsheet_namerow_number 证明表格行没有被当成普通正文。
chunk 数量 每个有效数据行生成一个可检索 Document 证明行级证据粒度正确。
检索计划 表格类问题命中 prefer_table=True 证明检索策略知道当前问题更适合查表格。
答案来源 来源中能看到文件、工作表、行号 证明答案可以回到原始证据复核。

可以在页面或接口中提问:

验收清单里隐蔽工程照片是什么状态,责任人是谁?

理想回答应该包含:

  • 状态是“待补交”;
  • 责任人是“项目经理”;
  • 引用来源能定位到 CSV/Excel 的对应行;
  • 如果模型遗漏状态或责任人,系统会追加“表格行要点”。

这个练习的目的不是测试模型文采,而是验证表格证据没有在切分和生成阶段丢失。

4.11 当前边界

一期表格入库只覆盖“规范二维表”。复杂 Excel 能力不能无边界扩散,否则会把 RAG 项目变成 Office 解析项目。

边界场景 一期处理策略 推荐做法
合并单元格 不默认还原层级语义 入库前整理成普通二维表。
多级表头 不自动推断复杂表头关系 人工扁平化字段名,比如“合同-金额”“合同-付款节点”。
公式单元格 读取解析后的单元格值,不重新计算业务公式 关键计算逻辑应在业务系统或数据准备阶段完成。
图表 不把柱状图、折线图直接转成结构化证据 导出图表背后的原始数据表再入库。
截图表格 不走 CSV/Excel 表格 loader 进入 OCR/VLM 图文资料治理链路。
超大 Excel 不在一期做复杂分布式解析 拆分工作表、拆分文件,或按业务周期归档。
隐藏行列和批注 不作为可信主证据 重要内容必须整理成显式列。
透视表 不直接作为原始证据 导出明细表或汇总表后再入库。

这部分能力可以概括为:

我们一期支持的是规范 CSV/Excel 的行级语义入库,不追求解析所有复杂 Office 特性。这样做是为了保证 RAG 主链路清晰可控:表格行能召回、字段能引用、来源能复核。合并单元格、截图表格、图表解释这类复杂资料会进入后续 OCR/VLM 和资料治理链路,而不是塞进普通表格 loader 里。


第五部分:MySQL IndexManifest 增量机制

5.1 先分清两个“版本”

学习增量入库时,最容易混淆的是“知识库版本”和“Manifest 记录”。

概念 保存位置 作用
知识库版本 kb_version MySQL kb_versions / kb_active_versions 控制线上当前查哪个知识库版本,例如 kb_enterprise_knowledge_20260618_xxx
Manifest 记录 MySQL kb_document_manifests 记录某个文件在某个 kb_version 下生成了哪些 chunk,用于判断下次是否可以跳过、复用或重建。
Milvus chunk Milvus collection 保存真正用于检索的文本、向量和 metadata。

一句话概括:

kb_version 决定线上查哪一版;IndexManifest 记录每一版里每个文件对应哪些 chunk。

当前项目的线上检索始终只查一个 active 版本:

kb_version == 当前 active version

它不是查询时把“旧版本 + 增量版本”拼起来查。增量发生在离线构建阶段,最终仍然产出一个完整的新 kb_version

5.2 为什么需要 Manifest

假设一个业务场景有 500 个文档,只改了其中 1 个文件。如果每次都全量重建,会有三个问题:

问题 后果
时间浪费 499 个未变化文件重复解析、切分、embedding。
成本浪费 embedding 和 Milvus 写入重复执行。
发布风险 全量重建过程中任何一步失败,都可能影响新版本发布。

Manifest 要解决的是三个判断问题:

判断问题 处理动作
目标版本里这个文件已经入过库,而且文件、embedding 模型、切分策略都没变 直接跳过。
新建目标版本时,基准版本里这个文件没变,而且模型和切分策略也没变 复制旧版本 chunk 和 dense 向量到新版本,不重新 embedding。
文件内容变化、新增文件、embedding 模型变化、切分策略变化 重新加载、切分、embedding、写入 Milvus。

所以它不是“简单记录文件名”,而是离线入库的决策依据。

5.3 Manifest 表结构

当前项目把文档增量清单保存到 MySQL 表 kb_document_manifests。这张表记录“某个文件在某个场景、某个知识库版本下生成了哪些 chunk”。

字段 说明
manifest_key scenario_id + source + kb_version + 文件绝对路径 的稳定 hash
scenario_id 业务场景
source 业务分类
path 本地文件绝对路径
fingerprint 文件指纹,用于判断是否变化
chunk_ids_json 该文件写入 Milvus 后生成的 chunk id 列表
kb_version 所属知识库版本
embedding_model_version / chunk_schema_version 入库配置快照
updated_at 最近入库时间

这里有两个字段尤其关键:

  • embedding_model_version:同一个文件如果换了 embedding 模型,旧向量不能继续复用。
  • chunk_schema_version:同一个文件如果切分策略变了,旧 chunk 结构不能继续复用。

Manifest 表结构集中在 qa_core/storage/mysql_schema.py

1
2
3
4
# qa_core/storage/mysql_schema.py
def create_index_manifest_table(engine, *, table_name: str) -> None:
    """创建文档入库 manifest 表。"""
    _execute_ddl(engine, ddl)

qa_core/indexing/manifest.py 不再直接维护大段 CREATE TABLE 字符串,而是只负责 Manifest 的读写动作:

模块 职责
qa_core/storage/mysql_schema.py 定义 kb_document_manifests 表结构
qa_core/indexing/manifest.py 查询、更新、删除 Manifest 记录
qa_core/indexing/service.py 在入库流程中根据 Manifest 判断跳过、复用或重建

这种拆分不是为了增加文件数量,而是让“数据库初始化”和“入库业务判断”分开。读第 16 讲时先理解 Manifest 怎么做增量决策;需要看表结构时,再进入 mysql_schema.py

因此“文件没变”还不够,必须同时满足:

1
2
3
fingerprint 相同
embedding_model_version 相同
chunk_schema_version 相同

5.4 两种增量:同版本跳过与跨版本复用

项目里实际有两类增量场景,它们的处理动作不同。

场景 A:同一个目标版本重复执行入库

比如某次入库中断了,重新执行同一个目标 kb_version。这时目标版本里可能已经有部分文件写入成功。

处理逻辑是:

1
2
3
4
5
读取目标版本 manifest
当前文件 fingerprint / embedding_model_version / chunk_schema_version 都一致
直接跳过,不重复写入 Milvus

这就是 _ingest_single_file() 里第一层判断:

1
2
3
4
5
6
7
8
9
existing = manifest.get(source, path, target_kb_version, scenario_id)

if (
    existing
    and existing.fingerprint == fingerprint
    and existing.embedding_model_version == settings.embedding_model_version
    and existing.chunk_schema_version == settings.chunk_schema_version
):
    return 0, True, 0

场景 B:基于 active 版本创建一个新版本

这是更常见的企业发布方式:

python scripts/rebuild_kb_version.py --scenario enterprise_knowledge --new-version --incremental-from active --quality-gate --activate

这时目标版本是新的,例如:

旧 active 版本:kb_v1
新 staged 版本:kb_v2

如果某个文件在 kb_v1 里已经入过库,并且文件内容、embedding 模型、切分策略都没变,不能只“跳过”。原因是线上检索激活后只查:

kb_version == kb_v2

如果未变化文件什么都不写入 kb_v2,激活后这些文件就查不到了。

所以跨版本增量的正确动作是:

1
2
3
4
5
6
7
8
9
读取基准版本 kb_v1 的 manifest
确认文件未变化,模型和切分策略也未变化
根据旧 chunk_ids 从 Milvus 查出旧 chunk
复制 text + dense 向量,改写 metadata.kb_version = kb_v2
写入新版本,并更新 kb_v2 的 manifest

对应代码是 copy_documents_to_version()

copied_ids = doc_store.copy_documents_to_version(
    base_existing.chunk_ids,
    target_kb_version=target_kb_version,
    scenario_id=scenario.scenario_id,
    metadata_overrides=data_scope.metadata(allowed_roles=allowed_roles),
)
manifest.update(
    source,
    path,
    fingerprint,
    copied_ids,
    scenario_id=scenario.scenario_id,
    kb_version=target_kb_version,
    embedding_model_version=settings.embedding_model_version,
    chunk_schema_version=settings.chunk_schema_version,
)

这里复用的是旧版本的 dense 向量,不重新调用 embedding 模型;Milvus 服务端的 BM25 sparse 字段会按文本重新生成。

5.5 完整决策流程

对每一个文件,文档入库服务会按下面顺序判断:

顺序 判断 动作
1 文件类型不支持 报错,进入质量检查问题。
2 目标版本已有相同 manifest,且文件、模型、切分策略都未变化 跳过,不重复写入。
3 指定了 --incremental-from,基准版本有相同文件,且文件、模型、切分策略都未变化 复制基准版本 chunk 和 dense 向量到目标版本。
4 文件新增、文件变化、模型变化、切分策略变化 重新加载、标准化、切分、写入 Milvus。
5 新目录中已经没有某个旧文件 不复制到新版本;激活新版本后线上自然查不到它。

这也是为什么 --incremental-from 不能和下面两个参数同时使用:

参数 不能共用的原因
--force --force 表示全部重算,和复用旧版本向量的目标冲突。
--reset-collections 重置 collection 会删除旧向量,跨版本复制没有数据来源。

5.6 具体示例

假设当前 active 版本是 kb_v1,包含三份资料:

文件 kb_v1 Manifest 状态
hr/onboarding.md chunk [hr_1, hr_2] 未变化
finance/expense.md chunk [fin_1, fin_2] 内容修改
finance/budget_preapproval_matrix.xlsx chunk [table_1, table_2] 文件被删除

现在执行:

python scripts/rebuild_kb_version.py --scenario enterprise_knowledge --new-version --incremental-from active --quality-gate --activate

系统创建新版本 kb_v2,处理结果如下:

文件 处理方式 kb_v2 结果
hr/onboarding.md kb_v1 复制旧 chunk 和 dense 向量 kb_v2 也有入职资料,且不重新 embedding。
finance/expense.md 重新加载、切分、embedding kb_v2 使用新的报销资料内容。
finance/budget_preapproval_matrix.xlsx 不复制 kb_v2 激活后查不到这份已删除资料。
新增 it/vpn.md 重新加载、切分、embedding kb_v2 新增 VPN 资料。

最终线上激活后只查:

kb_version == kb_v2

所以用户看到的是一套完整的新知识库视图:

  • 未变化文件仍然能查到,因为已经复制到 kb_v2
  • 变化文件查到的是新内容;
  • 删除文件不会再被召回;
  • 旧版本 kb_v1 仍可保留,用于回滚。

5.7 当前方案的取舍

当前项目采用的是物理复制式增量

未变化文件不重新 embedding,但会把旧版本 chunk 和 dense 向量复制一份到新版本。

这样设计有三个好处:

好处 说明
查询简单 线上永远只过滤一个 active kb_version
回滚清晰 MySQL active 指针切回旧版本即可。
查询模型清晰 每个版本都是完整可查询版本,不需要在查询时合并多个版本视图。

代价是:如果知识库规模非常大,每次创建新版本都复制大量未变化 chunk,会增加 Milvus 存储和写入耗时。

企业超大规模场景可以升级成引用式增量版本:未变化 chunk 不复制,新版本只记录“继承哪个基线版本、哪些文件新增、哪些文件修改、哪些文件删除”。查询时通过有效版本视图决定哪些 chunk 对当前版本可见。

两种方案的区别可以这样对比:

对比点 当前项目:物理复制式增量 理想优化:引用式增量版本
核心思路 新版本仍然是一套完整数据。未变化文件从旧版本复制 chunk 和 dense 向量到新版本。 新版本只记录变化。未变化 chunk 不复制,而是继续引用旧版本中仍然有效的 chunk。
未变化文件 不重新 embedding,但会复制一份 Milvus 记录到目标 kb_version 不写新的 Milvus 记录,只通过版本有效期继续可见。
变化文件 重新加载、切分、embedding,写入目标 kb_version 旧 chunk 标记失效,新 chunk 从当前版本开始生效。
删除文件 新版本不复制该文件的 chunk;激活后因为只查新 kb_version,所以自然不可见。 给旧 chunk 写失效版本,例如 valid_to_seq = 当前版本序号
线上查询表达式 简单:kb_version == 当前 active version 更复杂:valid_from_seq <= active_seq and (valid_to_seq == 0 or valid_to_seq > active_seq)
新版本完整性 每个 active 版本在 Milvus 中都有完整可查数据。 active 版本是一张“有效版本视图”,由历史 chunk 和当前变化共同组成。
存储占用 较高。每次新版本会复制大量未变化 chunk。 较低。未变化 chunk 只保存一份。
构建耗时 中等。不重新 embedding,但要复制旧 chunk。 更低。只处理新增、修改、删除文件。
回滚方式 切换 MySQL active 指针到旧 kb_version 切换 active 版本序号,重新解释有效版本视图。
质量报告 相对简单,报告目标版本的完整资料状态。 更复杂,需要报告“继承资料 + 当前变化 + 删除 tombstone”的综合视图。
垃圾回收 可以按旧 kb_version 清理历史版本。 需要确认没有任何版本继续引用旧 chunk,才能物理清理。
适用场景 中小规模企业知识库、版本发布链路清晰优先。 百万级 chunk、大规模企业知识库、频繁小批量更新、存储成本敏感。

用一句话记:

当前项目方案:离线复制未变化 chunk,换来查询简单。
引用式增量方案:离线只记录变化,换来存储节省,但查询和治理更复杂。

本项目暂不采用引用式增量,原因是它会显著增加检索表达式、回滚、质量报告、垃圾回收和诊断复杂度。课程主线先把“候选版本入库、质量门禁、active 指针切换、线上只查 active 版本”讲清楚。

5.8 核心方法

class IndexManifest(_MySqlStore):
    @staticmethod
    def key(source, file_path, kb_version=None, scenario_id=None):
        """根据来源、路径、版本和场景生成稳定清单键。"""
        return stable_hash(scenario_id or "", source, kb_version or "", str(Path(file_path).resolve()))

    def get(self, source, file_path, kb_version=None, scenario_id=None):
        """按 manifest_key 从 MySQL 读取文件入库记录。"""
        row = conn.execute(
            text("SELECT ... FROM kb_document_manifests WHERE manifest_key=:key"),
            {"key": self.key(source, file_path, kb_version, scenario_id)},
        ).mappings().fetchone()
        return ManifestRecord.from_row(row) if row else None

    def update(self, source, file_path, fingerprint, chunk_ids, *, scenario_id="", kb_version="", ...):
        """Milvus 写入成功后,把新 fingerprint 和 chunk_ids upsert 到 MySQL。"""
        conn.execute(text("INSERT INTO kb_document_manifests (...) VALUES (...) ON DUPLICATE KEY UPDATE ..."), params)

    def iter_records(self, *, scenario_id=None, source=None, kb_version=None):
        """按条件列出清单记录,用于缺失文件清理和治理报告。"""
        rows = conn.execute(text("SELECT ... FROM kb_document_manifests WHERE ..."), params)
        return [ManifestRecord.from_row(row) for row in rows]

方法和职责可以这样记:

方法 职责
IndexManifest.get() 查询某个文件在某个版本下是否已经入库。
IndexManifest.update() 文件成功写入或成功复制后,记录新的 fingerprint 和 chunk_ids。
IndexManifest.iter_records() 按场景、source、版本列出清单,用于治理和诊断。
IndexManifest.remove_by_key() 清理某条 manifest 记录。
copy_documents_to_version() 跨版本复制旧 chunk 和 dense 向量,生成目标版本的新 chunk 记录。

5.9 常见误解

误解 正确理解
Manifest 是向量库 Manifest 只保存文件指纹和 chunk id,不保存正文和向量。
增量版本查询时要查旧版本加新版本 当前项目不是这样。查询只查 active 版本,增量发生在离线构建阶段。
文件内容没变就一定能复用 还要检查 embedding 模型版本和 chunk schema 版本。
跨版本增量就是跳过未变化文件 不是。新版本必须完整,所以未变化文件要复制到新版本。
删除文件会立即删除旧版本数据 不会。旧版本数据保留用于回滚;新版本不复制它,激活后线上自然查不到。

第六部分:FAQ 入库流程

6.1 CSV 格式

FAQ 使用 CSV 文件管理,每行一个问答对:

1
2
3
4
5
source,question,answer
hr,入职需要准备哪些材料,入职当天需要携带:身份证原件及复印件、学历证书复印件、离职证明、体检报告、银行卡信息...
hr,试用期转正流程是什么,试用期转正流程:1. 员工提交转正申请 2. 直属领导评估 3. HR 审核 4. 部门负责人审批...
it,VPN 连接失败怎么办,请按以下步骤排查:1. 确认账号密码正确 2. 检查网络连接 3. 尝试切换 VPN 节点...
billing,如何申请发票,在订单页面点击"申请发票",选择发票类型(电子/纸质),填写发票抬头...

6.2 入库实现

# qa_core/indexing/faq_ingestion.py

def faq_documents_from_csv(
    csv_path: str,
    kb_version: str | None = None,
    scenario_id: str | None = None,
    tenant_id: str | None = None,
    dataset_id: str | None = None,
    visibility: str | None = None,
    allowed_roles: list[str] | None = None,
) -> tuple[list[Document], list[str]]:
    """把 FAQ CSV 转换为可写入 Milvus 的问题文档。

    FAQ 的 page_content 只放"标准问题",答案放在 metadata.answer。这样检索时匹配的是
    用户问题和标准问题的相似度;一旦高置信命中,就可以直接返回 metadata.answer。
    """
    scenario = resolve_scenario(scenario_id)
    data_scope = resolve_data_scope(tenant_id=tenant_id, dataset_id=dataset_id, visibility=visibility, user_roles=allowed_roles)
    version_meta = version_metadata(kb_version, scenario.scenario_id)
    data = pd.read_csv(csv_path, encoding="utf-8")
    docs: list[Document] = []
    ids: list[str] = []
    seen_ids: set[str] = set()
    for _, row in data.iterrows():
        question = str(row.get("问题") or row.get("question") or "").strip()
        answer = str(row.get("答案") or row.get("answer") or "").strip()
        subject = str(
            row.get("source")
            or row.get("source_filter")
            or row.get("业务分类")
            or row.get("subject_name")
            or ""
        ).strip()
        if not question or not answer:
            continue

        source = normalize_faq_source(subject, scenario=scenario, question=question)
        faq_id = stable_hash(scenario.scenario_id, kb_version or "", source, question)
        if faq_id in seen_ids:
            faq_id = stable_hash(scenario.scenario_id, kb_version or "", source, question, answer)
        if faq_id in seen_ids:
            continue
        seen_ids.add(faq_id)
        docs.append(
            Document(
                page_content=question,
                metadata={
                    "faq_id": faq_id,
                    "scenario_id": scenario.scenario_id,
                    **data_scope.metadata(allowed_roles=allowed_roles),
                    "standard_question": question,
                    "answer": answer,
                    "source": source,
                    "subject_name": subject,
                    "status": "published",
                    **version_meta,
                },
            )
        )
        ids.append(faq_id)
    return docs, ids

存储策略: - page_content = FAQ 标准问题 → 用于向量检索 - metadata.answer = 标准答案 → 检索命中后直接取 metadata 返回 - metadata.source = 当前场景 valid_sources 中的标准分类 → 用于 Milvus 过滤和数据隔离

这样 FAQ 直出时不需要再调用 LLM,直接从 metadata 读取答案即可。

normalize_faq_source() 只依赖当前场景包的 valid_sourcessource_patterns。如果 CSV 中的分类无法映射到当前场景,系统会直接报错,而不是偷偷写入 Milvus。这样可以保证 FAQ 入库的业务边界和场景配置一致。


第七部分:清理与维护

7.1 清理已删除的本地文件

当本地文档被删除时,Milvus 中的旧 chunk 不会自动消失。需要运行清理脚本:

1
2
3
4
5
# 预览将要清理的内容(默认 dry-run)
python scripts/cleanup_missing_docs.py --scenario enterprise_knowledge

# 实际执行清理
python scripts/cleanup_missing_docs.py --scenario enterprise_knowledge --no-dry-run

7.2 cleanup_missing_document_chunks 原理

def cleanup_missing_document_chunks(
    *,
    scenario_id: str | None = None,
    source: str | None = None,
    kb_version: str | None = None,
    dry_run: bool = True,
) -> dict[str, Any]:
    """清理 MySQL manifest 中已不存在本地文件的文档 chunk。

    该操作会删除 Milvus 数据,默认 dry-run 先预览再执行。
    """
    scenario = resolve_scenario(scenario_id)
    manifest = IndexManifest()
    records = manifest.iter_records(
        scenario_id=scenario.scenario_id,
        source=source,
        kb_version=kb_version,
    )
    missing = [r for r in records if r.path and not Path(r.path).exists()]

    if dry_run:
        return {
            "dry_run": True,
            "missing_file_count": len(missing),
            "affected_chunk_count": sum(len(r.chunk_ids) for r in missing),
            "missing_files": [
                {"path": r.path, "chunk_count": len(r.chunk_ids)}
                for r in missing
            ],
        }

    # 实际删除
    doc_store = get_doc_store(scenario.doc_collection)
    for record in missing:
        doc_store.delete_ids(record.chunk_ids)
        manifest.remove_by_key(record.key)

    return {
        "dry_run": False,
        "deleted_chunk_count": sum(len(r.chunk_ids) for r in missing),
        "deleted_file_count": len(missing),
    }

默认 dry-run:先预览再执行,防止误删。


第八部分:复杂图文资料入库治理

8.1 这属于多模态吗

导入文档中同时存在文字、图片、截图、扫描页、流程图、设备照片时,本质上已经进入了多模态资料处理范围。

但在当前一期项目里,它应该被定位为:

多模态入库治理,不是多模态在线问答。

两者区别如下:

类型 做什么 当前一期定位
多模态入库治理 离线解析图片、扫描件、图文 PDF,把结果转成可复核文本或图文块 可以讲清楚设计边界,谨慎接入
多模态在线问答 用户实时上传图片,模型现场看图回答 不放一期主链路
多模态检索 同时存文本向量和图片向量,用 CLIP/VLM 做跨模态召回 更适合二期或三期

这样设计的原因是:在线问答必须稳定、低延迟、可追踪;图片解析、OCR、VLM 描述成本高且失败率高,如果直接塞进在线链路,会让 RAG 主流程变慢、变重、变不可控。

8.2 为什么不能“图片 OCR 一下就入库”

真实企业资料中的图片经常包含:

  • 合同扫描件;
  • 审批截图;
  • 设备告警截图;
  • 流程图;
  • 验收照片;
  • 表格截图;
  • 盖章文件;
  • 票据和单证照片。

这些内容的风险不只是“能不能识别出文字”,而是:

风险 示例
OCR 识别错误 金额 8000 被识别成 B000
上下文断裂 图片中的“处理步骤”脱离前后正文后无法理解
来源不可追溯 回答引用了图片内容,但不知道来自第几页第几张图
证据未确认 扫描件内容未经人工复核,不能作为正式制度口径
图中信息不全 流程图箭头、颜色、图例无法仅靠 OCR 还原

所以复杂图文资料不能简单走“OCR -> 普通文本切分 -> 入库”。当前项目先实现离线 OCR 复核 Markdown 闭环,后续可演进到 image_text_block

1
2
3
4
5
6
7
8
图文资料
  -> 抽取文本层
  -> 识别图片/扫描页
  -> OCR 生成候选 Markdown
  -> 人工复核
  -> 提升为已复核 OCR Markdown
  -> 入库质量检查
  -> 新知识库版本激活

如果后续引入 VLM 和图文块,再升级为:

1
2
3
4
5
6
7
图片/流程图/设备照片
  -> OCR 或 VLM 生成候选说明
  -> 绑定附近正文、页码、图片编号
  -> 人工复核
  -> 生成 image_text_block
  -> 入库质量检查
  -> 新知识库版本激活

8.3 三类资料的处理策略

资料类型 处理方式 是否直接进入 active 知识库
有文本层的 PDF / Word / PPT 正文先按普通文档入库,图片进入风险报告 正文可以,图片不直接进
扫描件 / 图片 PDF 进入离线 OCR,生成待复核 Markdown;复核后提升为 ocr_reviewed_text 复核后才可以进
图片和正文强相关资料 后续可生成图文语义块 image_text_block 当前作为演进方向

当前项目已有离线 OCR 脚本:

python scripts/ocr/run_offline_ocr.py --input-dir incoming_scans --output-dir reports/ocr/batch_001
python scripts/ocr/promote_ocr_candidates.py --input-dir reports/ocr/batch_001 --scenario engineering_project_qa --source quality --apply

第一条命令只生成待复核资料,第二条命令才把复核后的 Markdown 提升到场景资料目录。提升后仍然要执行知识库版本重建、入库质量检查和 RAG 回归验收。

当前已经落地的最小闭环是:

1
2
3
4
5
6
7
run_offline_ocr.py
  -> 输出 OCR Markdown 和报告
  -> 人工把复核状态改为“已复核”或加入 review_status: reviewed
  -> promote_ocr_candidates.py 复制到场景资料目录
  -> 文本 loader 标记 content_type=ocr_reviewed_text
  -> 入库质量门禁不再按未复核 OCR 风险拦截
  -> rebuild_kb_version.py 生成候选知识库版本

8.4 image_text_block 推荐结构

对于图片和正文强相关的资料,不应该把 OCR 文本当成普通段落直接切分,而应该在后续增强中生成专门的图文块:

page_content:
  第 3 页第 2 张图片说明:设备告警面板显示 E102,温度超过 85°C。
  图片附近正文:处理方式为先停机检查冷却风扇,再联系运维。

metadata:
  content_type: image_text_block
  file_name: equipment_alarm_manual.pdf
  page_index: 3
  image_index: 2
  ocr_confidence: 0.91
  review_status: reviewed
  parent_content: 第 3 页完整上下文

关键字段说明:

字段 作用
content_type=image_text_block 告诉检索和上下文构建:这是图文块,不是普通正文
page_index / image_index 支持答案引用到具体页和具体图片
ocr_confidence 用于入库质量检查,低置信度不能直接激活
review_status 只有 reviewed 才允许进入 active 知识库
parent_content 保留图片附近正文,避免图片文字脱离上下文

8.5 分块策略

图文混排资料的切分原则是:

  1. 正文按章节或父子块切分:继续复用当前 split_documents() 的 Parent-Child Chunking。
  2. 表格按行切分:CSV/Excel 仍然使用 content_type=table_row,不参与普通递归切分。
  3. 图片 OCR 文本不单独裸切:必须绑定页码、图片编号和附近正文。
  4. 未复核图文块不进 active:只能作为候选资料进入复核区或治理报告。
  5. 低置信度图文块阻断激活:避免把错误金额、日期、合同号写入正式知识库。

也就是说,图文资料的最小语义单元不是“识别出的一行字”,而是:

图片 OCR 文本 + 图片附近正文 + 页码 + 图片编号 + 置信度 + 复核状态

8.6 检索策略

图文块进入知识库后,也不应该和普通正文完全同权。注意:这一节是 image_text_block 后续增强策略;当前已落地的是 ocr_reviewed_text 复核文本入库闭环。

推荐策略:

  • 普通知识问题:优先使用文本 chunk 和表格 chunk。
  • 用户问题包含“图片、截图、扫描件、照片、图中、流程图、告警面板”等表达时,提高 image_text_block 权重。
  • 如果命中的图文块 review_status != reviewed,回答必须标记“未确认”,不能把它当成正式证据。
  • 来源展示必须包含文件名、页码和图片编号。

这样既能让图文资料参与 RAG,又不会让未确认图片内容污染正式答案。

8.7 工程口径总结

多模态能力的边界可以概括为:

我们一期没有做实时多模态对话,而是把多模态能力收敛在知识库入库治理侧。当前已实现的是离线 OCR 复核 Markdown 闭环:扫描件或图片 PDF 先生成可复核文本,人工复核后以 ocr_reviewed_text 进入场景资料目录,再经过入库质量检查、版本激活和回归评测。后续如果接入 VLM,可以进一步把图片、页码、附近正文和置信度整理成 image_text_block。这样既能处理企业资料中的多模态信息,又不会让在线问答链路变重、变慢、变不稳定。

这段内容的重点不是展示 OCR 接入,而是理解企业 RAG 中多模态资料必须经过治理、复核、入库质量检查和版本化上线。


第九部分:data_packs 与企业资料增强包

这一部分不是为了多介绍一个目录,而是为了讲清楚企业 RAG 项目里的一个真实问题:

正式知识库不能直接接收所有资料。候选资料、增强资料、脏样本、扫描件、冲突 FAQ,都必须先在隔离区完成治理、预检和评测,确认可靠后才能进入 active 知识库。

所以项目里除了 scenarios/,还保留了 data_packs/。两者的定位完全不同:

1
2
3
4
5
6
7
8
scenarios/                         当前正式知识库数据源
  enterprise_knowledge/
  equipment_ops/
  ...

data_packs/enterprise_realistic_pack/   企业仿真增强资料包
  clean_overlay/                        可治理、可预检的增强候选资料
  dirty_samples/                        只用于资料治理的风险样本

一句话记:

scenarios 是当前线上知识库资料。
data_packs 是企业增强资料和脏数据治理隔离区。

如果把 data_packs 里的所有资料直接塞进 scenarios,主链路会变得难以讲清:

直接合并进 scenarios/ 的问题 后果
资料越来越复杂 很难判断问题来自 RAG 链路还是资料质量。
候选资料未经预检 可能把冲突、过期、OCR 噪声资料直接写入 active 知识库。
脏样本和正式资料混在一起 质量门禁会失败,排查对象会从资料质量扩散到代码、环境和模型多个方向。
企业增强资料无法单独评估 不知道增强后到底提升了哪些问题,也不知道是否引入回归。

因此本项目把资料分成三层:

1
2
3
4
5
6
7
8
第一层:scenarios
  当前正式知识库,保证 8 个业务场景可稳定初始化、稳定验证。

第二层:clean_overlay
  企业增强候选资料,先预检、再计划激活、再回归评测。

第三层:dirty_samples
  脏数据风险样本,只用于识别企业资料治理风险,不能直接入库。

9.1 scenarios 是主链路数据源

scenarios/ 是当前 8 个冻结业务场景的正式资料目录。执行下面命令时,默认读取的就是 scenarios/

python scripts/rebuild_scenarios.py --reset-collections

单场景重建也是一样:

python scripts/rebuild_kb_version.py --scenario enterprise_knowledge --new-version --force --quality-gate --activate

所以首轮跑通主链路时只需要关心 scenarios/,它保证主链路足够稳定、可控、可复现。8 个冻结业务场景都已经包含 Markdown、CSV、XLSX、DOCX、PPTX 和带中文文本层的 PDF 样例,用来验证多格式 loader、表格行入库和普通文档切分不是只停留在代码接口上。

scenarios/ 的意义是:

目标 说明
保证主链路可跑通 新环境初始化 8 个场景时,不依赖额外资料包。
保证排查边界清晰 如果检索为空、版本未激活、loader 失败,可以先排查代码和环境,不被复杂资料干扰。
保证测试稳定 测试、回归、讲义示例都基于一组冻结资料,结果更容易复现。
保证入库质量可控 资料格式覆盖足够多,但不会故意混入冲突、过期、噪声样本。

9.2 clean_overlay 是“可上线候选资料”,不是默认资料

data_packs/enterprise_realistic_pack/clean_overlay/ 用来模拟更真实的企业资料,例如:

  • 区域差异和例外规则;
  • 角色权限和金额阈值;
  • 审批链和补签流程;
  • 合同付款风险;
  • 跨境单证金额变更;
  • 理赔材料不一致;
  • SaaS 企业客户账单和集成问题。

它不是 active 知识库的一部分,也不会被 rebuild_scenarios.py 自动读取。这样设计是为了避免“增强资料还没治理完,就污染正式知识库”。

它存在的意义是:模拟企业里“业务部门又给了一批新资料,想加入知识库”的发布流程。

真实企业不会把新资料直接覆盖线上知识库,一般会先问几个问题:

问题 clean_overlay 要回答什么
新资料能不能被 loader 正常解析? 先构建预览数据集并跑入库质量报告。
新资料的 source 是否符合场景白名单? 避免资料进错业务域。
新 FAQ 有没有覆盖到回归集? 避免新增标准答案没有评测样本。
新资料有没有和原有 FAQ 冲突? 避免增强资料引入口径冲突。
是否值得激活为新版本? 通过计划脚本生成标准 rebuild 命令。

clean overlay 的正确流程是:

1
2
3
4
5
6
7
8
clean_overlay
  -> 构建预览数据集
  -> 入库质量预检
  -> overlay 就绪检查
  -> 生成上线计划
  -> 执行经过校验的 rebuild_kb_version.py
  -> 激活新版本
  -> 跑 overlay 回归评测

常用命令:

1
2
3
4
python scripts/enterprise_overlay/build_enterprise_overlay_dataset.py --all-scenarios --output reports/verification/enterprise_overlay_build_latest.json
python scripts/enterprise_overlay/check_enterprise_overlay_readiness.py --output reports/verification/enterprise_overlay_readiness_latest.json
python scripts/enterprise_overlay/plan_enterprise_overlay_activation.py --output reports/verification/enterprise_overlay_activation_plan_latest.json
python scripts/enterprise_overlay/run_enterprise_overlay_activation.py --plan reports/verification/enterprise_overlay_activation_plan_latest.json --output reports/verification/enterprise_overlay_activation_run_latest.json

注意:clean_overlay 不是“备份目录”,也不是“备用数据源”。它是一个候选发布包。只有通过预检、就绪检查、上线计划和回归评测后,才会生成新的知识库版本并激活。

一个具体例子:

1
2
3
4
5
scenarios/enterprise_knowledge/
  已有通用入职、报销、IT 支持资料。

data_packs/.../clean_overlay/enterprise_knowledge/
  增加区域入职差异、付款阈值、特殊审批规则。

如果在主链路初始资料中直接混入这些复杂规则,RAG 链路验证和资料治理问题会被混在一起;把它们放在企业增强阶段,可以把流程拆清楚:

1
2
3
4
5
基础知识库能回答通用问题
  -> clean_overlay 增加企业真实复杂度
  -> 预检确认资料质量
  -> 激活新版本
  -> 回归评测确认增强没有破坏旧能力

9.3 dirty_samples 是风险样本,不能直接入库

data_packs/enterprise_realistic_pack/dirty_samples/ 不能直接入库。它里面放的是用于识别资料治理风险的样本,例如:

脏样本类型 风险
过期制度 可能覆盖当前有效口径
OCR 噪声 金额、日期、编号可能识别错误
表格导出混乱 字段缺失、列名不规范、行语义不完整
命名混乱 source 难以推断,影响检索过滤
FAQ/正文冲突 标准答案和正文口径不一致

dirty samples 的正确流向是:

1
2
3
4
5
dirty_samples
  -> 风险识别
  -> 人工清洗/复核
  -> 变成 clean_overlay
  -> 再走 overlay 预检和版本激活

对应分析命令:

python scripts/enterprise_overlay/analyze_dirty_enterprise_samples.py --output reports/verification/dirty_enterprise_samples_latest.json

dirty_samples 存在的意义是说明:RAG 的问题很多不是模型问题,也不是向量库问题,而是资料本身不能直接作为可信知识。

例如:

看起来像什么 实际风险
一份扫描报销材料 OCR 可能把金额、日期、票据号识别错。
一份旧制度 可能和当前制度冲突,导致回答旧口径。
一份表格导出 列名缺失、字段错位,检索到也无法可靠回答。
一条 FAQ 标准答案可能和正文资料相反。

这些样本不能为了“数据量多”直接入库。它们应该进入治理流程:

1
2
3
4
识别问题
  -> 人工清洗或业务复核
  -> 变成 clean_overlay
  -> 再进入预检和版本激活流程

9.4 为什么不删掉 data_packs

如果只做一个能跑通的 RAG demo,确实可以没有 data_packs。但这个项目目标不是只验证“问答能返回内容”,而是覆盖企业级 RAG 的完整治理边界。

data_packs 的价值是:

价值 说明
把主链路和增强资料隔离 保证初始主链路验证不被复杂资料扰乱。
保留企业资料发布流程 能讲“候选资料如何变成 active 知识库”。
支持资料治理 能说明脏样本为什么不能直接入库。
支持回归评测扩展 overlay 增强后可以单独验证新增能力。
支持工程说明 能说明项目不是只会写检索代码,还考虑资料治理和版本发布。

9.5 三者关系总结

目录 是否默认入库 作用
scenarios/ 当前正式知识库资料,8 场景初始化读取这里
data_packs/.../clean_overlay/ 企业仿真增强候选资料,预检通过后才能按计划激活
data_packs/.../dirty_samples/ 资料治理风险样本,只用于风险识别和清洗流程

判断一份资料该放在哪里:

资料状态 应该放哪里
已确认、可作为当前知识库口径 scenarios/<scenario>/data/<source>_data/
新增资料,想评估后再上线 data_packs/.../clean_overlay/<scenario>/
存在 OCR 噪声、过期制度、冲突口径、格式混乱 data_packs/.../dirty_samples/ 或治理临时目录
OCR 复核后确认可用 先进入场景资料目录,再通过质量门禁和版本激活

工程口径总结:

不把所有资料都直接塞进 active 知识库。scenarios/ 是当前正式数据源,保证主链路稳定;clean_overlay 是企业增强候选资料,必须先通过预检、就绪检查、上线计划和回归评测;dirty_samples 是资料治理风险样本,只用于说明 OCR 噪声、过期制度、FAQ 冲突等风险,不能直接入库。这样既能保持 RAG 主链路可控,又能说明企业资料从候选包、脏数据到可上线知识资产的治理过程。


第十部分:入库失败排查手册

入库链路牵涉 MySQL、Milvus、Embedding、Reranker、场景配置、质量门禁和版本激活。排查时不要直接猜原因,按下面顺序查,速度最快。

10.1 先确认当前 active 版本

页面提示“信息不足”、检索结果为空、或者刚重建后仍然回答旧内容时,先查 active 版本:

docker compose --env-file .env.compose run --rm api python -c "from qa_core.config.settings import get_settings; from qa_core.scenarios.registry import resolve_scenario; from qa_core.governance.kb_versions import get_kb_version_store; s=get_settings(); sc=resolve_scenario(s.active_scenario_id); store=get_kb_version_store(sc.scenario_id); print(sc.scenario_id); print(store.resolve_active_version())"

判断:

现象 含义 处理
active=None 没有激活版本,在线问答不知道查哪批数据 重新执行 rebuild_kb_version.py --quality-gate --activate
active 不是刚构建的版本 新版本停留在 staged 或 gate 失败 查看质量报告,修复后重新激活
active 是新版本但仍没答案 继续查 collection 和过滤条件 看 10.2/10.3

10.2 再确认 Milvus collection 是否存在且有数据

docker compose --env-file .env.compose run --rm api python -c "from pymilvus import MilvusClient; from qa_core.config.settings import get_settings; c=MilvusClient(uri=get_settings().milvus_uri); print(c.list_collections())"

如果 collection 不存在,说明入库没有真正写到 Milvus;如果 collection 存在但实体数量很少或为 0,需要回看入库日志。

常见原因:

现象 原因 处理
collection 不存在 场景配置里的 collection 名和实际不一致,或入库任务失败 检查 scenario.toml,重新构建
只有 FAQ 没有 Doc 文档目录为空,或 --skip-docs 被使用 检查 scenarios/<id>/docs
只有 Doc 没有 FAQ FAQ CSV 不存在,或 --skip-faq 被使用 检查 faq.csv

10.3 schema 不兼容时使用 reset-collections

如果日志出现:

1
2
3
sparse 字段不是 BM25 Function 输出字段
nq [0] is invalid
BM25 Function / sparse 字段不兼容

通常表示复用了旧 schema collection。处理方式是删除旧 collection 并重建:

docker compose --env-file .env.compose run --rm api python scripts/rebuild_kb_version.py --scenario enterprise_knowledge --new-version --force --reset-collections --quality-gate --activate

排查口径:

只手动删除 collection 不会自动生成新知识库版本。必须重新跑入库脚本,让脚本重新创建 collection、写入 FAQ/Doc、生成质量报告并激活版本。

10.4 质量门禁失败先看报告,不要直接跳过

--quality-gate 失败时,说明资料里可能存在空文件、重复 FAQ、source 无效、FAQ/正文冲突或低质量 chunk。

排查顺序:

1
2
3
4
5
查看 reports/quality/
  -> 找到对应 scenario 和 kb_version 的报告
  -> 先修复 failed_files / unsupported_files / empty_files
  -> 再修复 duplicate_faq_questions / invalid_sources
  -> 最后再考虑调整阈值

不要为了让命令通过就直接去掉 --quality-gate。这会让低质量资料进入 active 知识库,后面在线问答会变成“能检索,但答得不可靠”。

10.5 重建后页面还是旧答案

按这个顺序检查:

  1. 页面右侧当前状态里的知识库版本是否变成新版本。
  2. .env.composeACTIVE_SCENARIO_ID 是否是你刚重建的场景。
  3. API 容器是否重新加载了 .env.compose
docker compose --env-file .env.compose up -d --force-recreate api
docker logs -f knowforge-api
  1. 是否有多个 Milvus 实例:宿主机脚本连的是 127.0.0.1:19530,容器内脚本连的是 http://milvus:19530。要确认两者指向同一个 Docker Compose 服务。

10.6 八场景全量初始化的推荐命令

如果需要在新环境中一次性把全部 8 个场景初始化到可运行状态,使用:

1
2
3
4
5
if (!(Test-Path .env.compose)) { Copy-Item .env.compose.example .env.compose }
notepad .env.compose
docker compose --env-file .env.compose up -d mysql etcd minio milvus
docker compose --env-file .env.compose build api
docker compose --env-file .env.compose run --rm api python scripts/rebuild_scenarios.py --reset-collections

如果之前已经存在知识库,只是资料内容变化,重建全部 8 个场景时不要删除 collection:

docker compose --env-file .env.compose run --rm api python scripts/rebuild_scenarios.py

如果容器镜像里还没有最新脚本,执行 docker compose --env-file .env.compose build api 后再运行入库命令。


本讲实践闭环

项目 内容
本讲类型 项目实现 + 工程治理
实践产物 文档/FAQ/表格入库链路、质量预检、rebuild_kb_version.pyrebuild_scenarios.py
是否进入最终项目
验收方式 单场景或 8 场景重建成功,Milvus collection 有数据,active 版本已激活
后续落点 第 17 讲对入库结果做质量评估和门禁

通过标准:资料能从文件进入可检索知识库,metadata、版本、隔离字段完整,失败可按排查手册定位。

本讲从 0 到 1 实现闭环

这一讲是离线链路的完整交付:把文件、FAQ、表格资料变成线上可检索的数据。实现顺序如下:

  1. 先实现文件 Loader 注册表,让不同后缀走不同解析器。
  2. 再实现标准化,把 source、scenario、kb_version、DataScope 写入 metadata。
  3. 然后实现 chunking,把长文档切成适合检索的片段。
  4. FAQ 单独入库:问题作为检索文本,标准答案放 metadata。
  5. 最后由 ingest_directory() 串起加载、标准化、切分、写入 Milvus、更新 MySQL Manifest。

实现完成后,相关代码结构应该是下面这张图:

flowchart LR
    subgraph Indexing["qa_core/indexing"]
        Loaders["document_loaders.py<br/>文件 Loader 注册表"]
        Normalizer["document_normalizer.py<br/>metadata 标准化"]
        Chunking["chunking.py<br/>父子块/递归切分"]
        FAQ["faq_ingestion.py<br/>FAQ CSV 入库"]
        Service["service.py<br/>ingest_directory 主流程"]
    end

    subgraph Scripts["scripts"]
        Single["rebuild_kb_version.py<br/>单场景重建"]
        All["rebuild_scenarios.py<br/>8 场景初始化"]
    end

    Loaders --> Service
    Normalizer --> Service
    Chunking --> Service
    FAQ --> Service
    Single --> Service
    All --> Single

来源:真实代码调用点,见 qa_core/indexing/document_loaders.py

LOADER_REGISTRY = {
    ".md": load_markdown,
    ".txt": load_text,
    ".csv": load_csv_rows,
    ".xlsx": load_excel_rows,
}


def load_documents(path):
    loader = LOADER_REGISTRY[path.suffix.lower()]
    return loader(path)

标准化阶段是离线链路和在线链路的接口。在线检索依赖的过滤字段,必须在这里写全。

来源:真实代码调用点,见 qa_core/indexing/document_normalizer.py

1
2
3
4
5
6
7
def normalize_documents(docs, scenario, kb_version, scope):
    for doc in docs:
        doc.metadata["scenario_id"] = scenario.id
        doc.metadata["kb_version"] = kb_version
        doc.metadata["tenant_id"] = scope.tenant_id
        doc.metadata["source"] = infer_source(doc, scenario)
    return docs

文档入库主流程要能重复执行。没变化的文件通过 MySQL Manifest 跳过;文件变化、Embedding 版本变化或 chunk schema 变化时,要先删除旧 chunk,再重新解析和写入。

来源:真实代码逻辑压缩版,对应 qa_core/indexing/service.py::ingest_directory()_ingest_single_file()

def ingest_directory(directory_path, source=None, *, scenario_id=None,
                     tenant_id=None, dataset_id=None, visibility=None,
                     allowed_roles=None, force=False, kb_version=None,
                     create_new_version=False, activate=False):
    scenario = resolve_scenario(scenario_id)
    data_scope = resolve_data_scope(
        tenant_id=tenant_id,
        dataset_id=dataset_id,
        visibility=visibility,
        user_roles=allowed_roles,
    )
    root = Path(directory_path)
    resolved_source = source or normalize_source_from_path(root)
    if resolved_source not in scenario.valid_sources:
        raise ValueError(f"无效的业务分类:{resolved_source}")

    version = get_kb_version_store(scenario.scenario_id).ensure_version(
        kb_version,
        create_new=create_new_version,
        created_by="ingest_directory",
    )
    manifest = IndexManifest()
    doc_store = get_doc_store(scenario.doc_collection)

    for path in walk_supported_files(root):
        chunks, skipped = _ingest_single_file(
            path, resolved_source, version.kb_version, scenario,
            data_scope, allowed_roles, doc_store, manifest, force,
        )
        total_chunks += chunks

    version_store.record_ingest_result(version.kb_version, content_type="doc", count=total_chunks, source=resolved_source)
    if activate:
        version_store.activate_version(version.kb_version)


def _ingest_single_file(path, source, kb_version, scenario, data_scope, allowed_roles, doc_store, manifest, force):
    if get_document_loader_spec(path) is None:
        raise ValueError(f"不支持的文档类型:{path}")

    fingerprint = file_fingerprint(path)
    existing = manifest.get(source, path, kb_version, scenario.scenario_id)
    if not force and existing and existing.fingerprint == fingerprint \
       and existing.embedding_model_version == settings.embedding_model_version \
       and existing.chunk_schema_version == settings.chunk_schema_version:
        return 0, True

    if existing and existing.chunk_ids:
        doc_store.delete_ids(existing.chunk_ids)

    docs = normalize_documents(load_file(path), path, source, kb_version, scenario.scenario_id, data_scope, allowed_roles)
    chunks, ids = split_documents(docs)
    if chunks:
        doc_store.add_documents(chunks, ids=ids)
        manifest.update(source, path, fingerprint, ids, scenario_id=scenario.scenario_id, kb_version=kb_version)
        return len(chunks), False
    return 0, False

FAQ 入库不能把“答案”也作为主要检索文本,否则标准答案过长时会稀释问题语义。本项目让问题参与检索,答案作为 metadata 被命中后直出。

来源:真实代码调用点,见 qa_core/indexing/faq_ingestion.py

1
2
3
4
5
6
7
8
Document(
    page_content=row["question"],
    metadata={
        "answer": row["answer"],
        "hit_type": "faq",
        "source": row["source"],
    },
)

单场景验收用 rebuild_kb_version.py,全量初始化 8 个场景用 rebuild_scenarios.py

来源:命令行验收,对应 scripts/rebuild_kb_version.pyscripts/rebuild_scenarios.py

python scripts/rebuild_scenarios.py --reset-collections

已有知识库只刷新资料内容时,使用:

python scripts/rebuild_scenarios.py

闭环验证重点:

验证项 验证方式 期望结果
Loader 注册 放入不同格式文件 能按后缀选择加载器
metadata 标准化 查看 chunk metadata source、scenario、version、scope 完整
文档切分 入库长文档 chunk 粒度合理且可追溯
FAQ 入库 检索标准问题 命中后可直接取 metadata.answer
MySQL Manifest 增量 重复执行入库 未变化文件可跳过
schema/模型升级 修改 embedding 或 chunk schema 版本 旧 chunk 被删除后重新写入
source 校验 传入非法 source 直接报错,防止写错业务分类
版本激活 activate=True 新版本成为 active
8 场景初始化 执行 rebuild_scenarios.py 所有场景生成 active 版本

验收重点:文件能进入 Milvus,active 版本能更新,metadata、版本、隔离字段完整,失败时能用排查手册定位。

重点掌握

优先级 内容 原因
★★★ 必会 离线入库 vs 在线问答的清晰边界:入库修改数据(可慢可重试),问答只读(必须快) 理解两条链路不能混淆的根本原因
★★★ 必会 ingest_directory() 的完整流程:确认版本 → 遍历文件 → 指纹比对 → 加载/标准化 → 切分 → 写入 Milvus → 更新 MySQL Manifest 文档入库的主流程
★★★ 必会 MySQL IndexManifest 增量机制:通过文件指纹(路径+修改时间+大小)判断文件是否变化,未变化跳过 避免每次全量重建的关键设计
★★ 理解 注册表模式管理 Document Loaders:后缀→工厂函数的映射,扩展新格式只需添加注册项 可扩展性设计模式
★★ 理解 normalize_documents() 补充项目标准元数据(source、scenario_id、kb_version、data_scope 等) 保证每个 chunk 有完整的过滤字段
★★ 理解 FAQ 入库:page_content 存问题、metadata.answer 存答案,检索命中后直接返回 FAQ 直出不走 LLM 的实现基础
★★ 理解 表格 CSV/Excel 按行入库:每行为一个完整业务语义单元,不递归切分 表格资料的特殊处理策略
★★ 理解 scenarios/clean_overlay/dirty_samples/ 的边界 防止把候选增强资料或脏样本直接污染 active 知识库
★ 了解 复杂图文资料的多模态入库治理流程 了解扩展方向
★ 了解 cleanup_missing_document_chunks() 清理已删除文件对应的 Milvus chunk 了解维护工具

本讲小结

  • 离线入库 ≠ 在线问答:入库负责解析文件、切分、向量化、写入 Milvus;问答只做检索和生成
  • 注册表模式管理文件格式→Loader 的映射,扩展新格式只需添加注册项
  • 表格 CSV/Excel 按行入库:每行是一个 table_row,保留表头、工作表、行号和单元格键值,不再递归切分
  • MySQL IndexManifest 记录每个文件的指纹和 chunk ID,实现增量入库(只处理变化的文件)
  • FAQ 入库将标准问题作为检索内容、标准答案存储在 metadata 中,检索命中后直接返回
  • 复杂图文资料属于多模态入库治理:OCR/VLM 结果必须绑定上下文、人工复核、入库质量检查和版本激活后才能进入 active 知识库
  • data_packs 不是默认数据源clean_overlay 是增强候选,dirty_samples 是治理风险样本,默认 8 场景初始化只读取 scenarios/
  • 清理脚本默认 dry-run,先预览再执行,防止误删

下一讲RAG 回归验收与入库质量 — 入库质量报告、评测指标、回归验收体系、Bad Case 闭环