跳转至

附录H:项目工具类开发详解

前置阅读第 13 讲:应用入口与环境前置校验

本附录覆盖项目中所有跨模块复用的基础工具类,包括设计思路、实现细节和使用模式。


一、common.py:UTC 时间与 JSON 读写

设计思路

整个项目的时间戳统一使用 UTC 时间,避免时区混乱。所有需要持久化的时间(trace 记录、版本号、入库报告)都用 utc_now(),所有文件系统中的时间标识都用 utc_file_stamp()

# qa_core/common.py
import json
from datetime import datetime, timezone
from pathlib import Path

def utc_now() -> str:
    """返回 UTC ISO 时间字符串,用于版本管理、报告和 trace 的时间统一排序。
    格式: '2025-01-15T10:30:00.123456+00:00'
    """
    return datetime.now(timezone.utc).isoformat()

def utc_file_stamp() -> str:
    """返回适合放进文件名和版本号的 UTC 时间戳。
    格式: '20250115_103000'
    """
    return datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")

def path_updated_at(path: str | Path) -> str:
    """返回文件修改时间的 UTC ISO 字符串,用于报告列表的排序。"""
    return datetime.fromtimestamp(Path(path).stat().st_mtime, timezone.utc).isoformat()

JSON 读写封装

项目统一使用 ensure_ascii=False, indent=2 格式写 JSON,保证中文字符可读、层级清晰。

# qa_core/common.py

def read_json(path: str | Path, default: Any = None) -> Any:
    """读取 JSON 文件,读取失败时返回 default —— 避免文件损坏导致进程崩溃。"""
    try:
        return json.loads(Path(path).read_text(encoding="utf-8"))
    except (json.JSONDecodeError, OSError):
        return default

def read_json_dict(path: str | Path, default: dict[str, Any] | None = None) -> dict[str, Any]:
    """读取对象型 JSON,非对象或读取失败时返回字典默认值。
    用于旧 manifest 迁移、质量报告等对象型 JSON 文件。
    """
    payload = read_json(path, default=None)
    if isinstance(payload, dict):
        return payload
    return dict(default or {})

def write_json(path: str | Path, payload: Any) -> str:
    """按项目统一格式写入 JSON。自动创建父目录。返回绝对路径。"""
    output_path = Path(path)
    output_path.parent.mkdir(parents=True, exist_ok=True)
    output_path.write_text(
        json.dumps(payload, ensure_ascii=False, indent=2),
        encoding="utf-8"
    )
    return str(output_path)

列表报告工具

模板方法模式:通用的 JSON 文件扫描 + 按修改时间排序逻辑,被质量报告和状态页复用。

def list_json_reports(root: Path, glob_pattern: str, *, limit: int = 20) -> list[dict[str, Any]]:
    """列出目录下最近的 JSON 报告。
    每个条目包含 path / file_name / updated_at / payload 四个字段。
    被 `quality/ingestion.py` 的列表页复用。
    """
    if not root.exists():
        return []
    reports = []
    for path in sorted(root.glob(glob_pattern),
                       key=lambda item: item.stat().st_mtime, reverse=True):
        payload = read_json_dict(path)
        reports.append({
            "path": str(path), "file_name": path.name,
            "updated_at": path_updated_at(path), "payload": payload,
        })
        if limit and len(reports) >= limit:
            break
    return reports

重点掌握

优先级 内容 原因
★★★ 必会 write_json() 的统一格式设计(ensure_ascii=False, indent=2) 整个项目的持久化都依赖这个约定
★★★ 必会 read_json(path, default=...) 的容错模式 避免文件损坏导致进程崩溃
★★ 理解 utc_now() vs utc_file_stamp() 的使用场景区别 UTC 规范是系统间协作的基础
★ 了解 list_json_reports() 的模板方法模式 用于状态页报告列表

二、utils.py:稳定哈希与指纹

设计思路

入库和检索都需要稳定的 ID 生成——同一个文件重新入库时产生相同的 chunk_id,才能正确覆盖旧数据。stable_hash() 用 SHA-256 将任意数量的参数拼接后生成确定性的 hex 字符串。file_fingerprint() 只读文件元数据(路径 + 修改时间 + 大小),不读内容,大幅减少增量入库的 IO。

# qa_core/utils.py
import hashlib
import os
from pathlib import Path

def stable_hash(*parts: object) -> str:
    """根据任意值创建稳定的 SHA-256 hex 字符串。

    关键特性:相同输入永远产生相同输出(确定性)。
    用于生成 chunk_id、faq_id、parent_id 等 Milvus 主键。

    例: stable_hash("hr_v2", "/data/入职流程.md", "chunk_3") 
        → 'a1b2c3d4e5f6...' (64 位 hex)
    """
    raw = "||".join("" if part is None else str(part) for part in parts)
    return hashlib.sha256(raw.encode("utf-8")).hexdigest()

def file_fingerprint(path: str | Path) -> str:
    """根据文件路径、修改时间和大小生成指纹,用于增量入库的变化判断。

    为什么不用内容哈希 (SHA-256 of content)?
    - 大文件(几十 MB 的 PDF)读内容做哈希太慢
    - 本地机器 CPU 有限,每轮入库读全量文件成本太高
    - 路径 + mtime + size 的组合在实际使用中已经足够可靠
    """
    p = Path(path)
    stat = p.stat()
    return stable_hash(str(p.resolve()), stat.st_mtime_ns, stat.st_size)

def normalize_source_from_path(path: str | Path) -> str:
    """从 '<source>_data' 目录名提取 source 标识。

    例: 'scenarios/enterprise_knowledge/data/hr_data' → 'hr'
        'data/legal_data' → 'legal'
    """
    name = os.path.basename(str(path)).replace("_data", "")
    return name or "default"

重点掌握

优先级 内容 原因
★★★ 必会 stable_hash() 的确定性设计 chunk_id/faq_id 的稳定性依赖于此
★★★ 必会 file_fingerprint() 为什么用 mtime+size 而不是内容哈希 IO 成本与可靠性之间的工程权衡
★★ 理解 normalize_source_from_path() 的命名约定 入库脚本的 source 推断依赖目录命名规范

三、元数据 Store:MySQL 控制面

设计思路

当前项目的关键元数据已经进入 MySQL:知识库版本状态保存在 kb_versions / kb_active_versions,文档增量入库状态保存在 kb_document_manifests。这样版本激活、回滚、入库统计和增量跳过都可以被 API、脚本、测试和容器环境共享,不再依赖某个本地目录里的 JSON 文件。

质量报告仍然会使用 common.py 中的 JSON 读写工具生成可检查产物,但版本控制面和入库 manifest 只写 MySQL,避免同一份状态在文件和数据库之间分叉。

1
2
3
4
5
6
7
8
9
# qa_core/governance/kb_versions.py
class KnowledgeBaseVersionStore(_MySqlStore):
    """知识库版本状态机的 MySQL 存储实现。"""

    def ensure_tables(self) -> None:
        """创建 kb_versions / kb_active_versions。"""

    def activate_version(self, kb_version: str) -> KnowledgeBaseVersion:
        """事务内切换 active 指针,不修改 Milvus 数据。"""
1
2
3
4
5
6
7
8
9
# qa_core/indexing/manifest.py
class IndexManifest(_MySqlStore):
    """文档增量入库清单的 MySQL 存储实现。"""

    def update(self, source, file_path, fingerprint, chunk_ids, *, scenario_id, kb_version):
        """记录文件指纹、kb_version、chunk_ids 和模型版本。"""

    def iter_records(self, *, scenario_id=None, source=None, kb_version=None):
        """按 scenario_id/source/kb_version 查询已入库文件。"""

重点掌握

优先级 内容 原因
★★★ 必会 版本控制面和入库 manifest 进入 MySQL 保证 API、脚本、测试、Docker 部署看到同一份状态
★★ 理解 JSON 工具只用于报告类产物 把运行时状态和可读报告分开,降低维护成本

四、schemas.py:Pydantic 数据模型

设计思路

所有 API 层的请求/响应都用 Pydantic BaseModel 定义,FastAPI 自动做参数校验和序列化。内部流转的数据结构用 Python dataclass(性能更好,不需要校验)。

# qa_core/schemas.py
from pydantic import BaseModel, Field

class RetrievalDebugRequest(BaseModel):
    """HTTP 检索诊断请求体。在线问答不复用该模型。"""
    query: str = Field(..., min_length=1)        # 必填,最短 1 字符
    source_filter: str | None = None              # 前端选择的业务分类
    session_id: str | None = None                 # 会话 ID,未传则自动生成
    scenario_id: str | None = None                # 场景 ID,未传用默认场景
    tenant_id: str | None = None                  # 租户 ID,未传默认 "default"
    dataset_id: str | None = None                 # 数据集 ID,未传默认 "default"
    visibility: str | None = None                 # 可见级别,未传默认 "public"
    user_role: str | None = None                  # 单一角色(前端简化入口)
    user_roles: list[str] = Field(default_factory=list)  # 多角色(正式入口)
    kb_version: str | None = None                 # 知识库版本,未传用 active 版本

class FeedbackRequest(BaseModel):
    """用户反馈载荷。rating 约束为 useful/not_useful,不允许自定义值。"""
    session_id: str | None = None
    question: str = Field(..., min_length=1)
    answer: str = Field(..., min_length=1)
    rating: str = Field(..., pattern="^(useful|not_useful)$")   # 仅允许这两个值
    comment: str | None = None
    sources: list[dict] = Field(default_factory=list)

class RetrievalDebugResponse(BaseModel):
    """检索调试响应,不包含最终答案,faq 和 doc 来源分开返回。"""
    query: str
    rewritten_query: str
    source_filter: str | None = None
    scenario_id: str | None = None
    tenant_id: str | None = None
    dataset_id: str | None = None
    visibility: str | None = None
    data_scope: dict | None = None
    kb_version: str | None = None
    intent: dict
    retrieval_plan: dict
    faq_sources: list[dict] = Field(default_factory=list)
    doc_sources: list[dict] = Field(default_factory=list)

内部流转的 dataclass

# qa_core/schemas.py (节选)
from dataclasses import dataclass, field

@dataclass
class RetrievalHit:
    """一次检索命中的封装:document + score。"""
    document: Any       # LangChain Document 对象
    score: float

@dataclass
class RetrievalResult:
    """一次完整检索的结果:命中列表 + 元信息。"""
    hits: list[RetrievalHit] = field(default_factory=list)
    query: str = ""
    source_type: str = ""      # "faq" 或 "doc"
    elapsed_ms: float = 0.0

    @property
    def top_score(self) -> float | None:
        """最高分,无命中时为 None。"""
        return self.hits[0].score if self.hits else None

重点掌握

优先级 内容 原因
★★★ 必会 RetrievalDebugRequest 的所有字段含义 /api/retrieval/debug 的诊断请求结构
★★★ 必会 RetrievalDebugResponse 的字段含义 诊断检索质量时要看意图、计划、FAQ/Doc 命中
★★ 理解 FeedbackRequest.rating 的 pattern 约束 防止前端传自定义值污染数据
★ 了解 RetrievalHit / RetrievalResult 的 dataclass 设计 内部流转的数据结构

五、config/settings.py:配置管理

设计思路

使用 pydantic-settings 从进程环境变量和本机 .env 加载全部配置。Docker Compose 部署时,.env.compose 先由 Compose 注入到 API 容器的环境变量里,再由 Settings 读取。每个配置项有明确的默认值,注释标注了用途。model_config 设置 env_file=".env",用于本机 API 调试;容器模式不要把 .env.compose 复制成 .env

# qa_core/config/settings.py (关键字段)
from pydantic_settings import BaseSettings, SettingsConfigDict
from pydantic import Field

class Settings(BaseSettings):
    model_config = SettingsConfigDict(env_file=".env", case_sensitive=False)

    # ── 场景 ──
    active_scenario_id: str = Field(default="enterprise_knowledge", ...)
    scenario_config_dir: str = Field(default="scenarios", ...)

    # ── Milvus ──
    milvus_uri: str = Field(default="http://localhost:19530", ...)
    milvus_database: str = Field(default="", ...)

    # ── MySQL ──
    mysql_host: str = Field(default="localhost", ...)
    mysql_database: str = Field(default="qa_system", ...)

    # ── LLM (DashScope / OpenAI 兼容) ──
    llm_api_key: str = Field(default="", validation_alias="DASHSCOPE_API_KEY")
    llm_base_url: str = Field(default="https://dashscope.aliyuncs.com/compatible-mode/v1", ...)
    llm_model: str = Field(default="qwen-plus", ...)
    llm_temperature: float = Field(default=0.1, ge=0.0, le=1.0)
    llm_timeout: int = Field(default=60, ge=1)

    # ── 检索参数(所有默认值可用于 .env 覆盖)──
    faq_top_k: int = Field(default=20, ge=1)
    doc_top_k: int = Field(default=20, ge=1)
    faq_direct_score_threshold: float = Field(default=0.72, ge=0.0, le=1.0)
    max_context_chars: int = Field(default=6000, ge=1)
    max_context_doc_chars: int = Field(default=1600, ge=1)
    short_query_max_chars: int = Field(default=20, ge=1)

    # ── 历史摘要 ──
    history_summary_enabled: bool = Field(default=True)
    history_summary_after_messages: int = Field(default=14, ge=1)

@lru_cache(maxsize=1)
def get_settings() -> Settings:
    """进程级 Settings 单例。lru_cache 避免重复读取运行时配置。"""
    return Settings()

重点掌握

优先级 内容 原因
★★★ 必会 get_settings()@lru_cache(maxsize=1) 单例模式 整个项目的配置入口
★★★ 必会 validation_alias(如 DASHSCOPE_API_KEY)的用途 .env 变量名与代码字段名的映射
★★ 理解 检索参数组的默认值和可覆盖性 调整这些参数不需要改代码
★ 了解 model_configcase_sensitive=False Windows/Linux 环境变量兼容

六、config/logging_config.py:结构化日志

设计思路

项目统一使用 Python 标准 logging + 自定义格式化器,输出 JSON 格式的结构化日志。get_logger(name) 为每个模块提供带模块名的 logger 实例。

# qa_core/config/logging_config.py
import logging
import json
from datetime import datetime, timezone

class JsonFormatter(logging.Formatter):
    def format(self, record):
        return json.dumps({
            "time": datetime.now(timezone.utc).isoformat(),
            "level": record.levelname,
            "logger": record.name,
            "message": record.getMessage(),
        }, ensure_ascii=False)

def get_logger(name: str) -> logging.Logger:
    logger = logging.getLogger(name)
    if not logger.handlers:
        handler = logging.StreamHandler()
        handler.setFormatter(JsonFormatter())
        logger.addHandler(handler)
        logger.setLevel(logging.INFO)
    return logger

重点掌握

优先级 内容 原因
★★ 理解 JSON 格式化日志的设计目的 方便 log 分析工具解析
★ 了解 get_logger(name) 的 handler 防重复逻辑 避免多次调用时添加重复 handler

七、memory/base.py:MySQL 惰性连接基类

设计思路

ChatHistoryStoreFeedbackStoreKnowledgeBaseVersionStoreIndexManifest 都需要 MySQL 连接,但不是每个请求都立即需要。_MySqlStore 基类提供惰性初始化的引擎属性——第一次访问 engine 时才创建连接,创建后缓存复用。pool_pre_ping=True 在连接被 MySQL 服务端断开后自动探活重连。

# qa_core/memory/base.py
from sqlalchemy import create_engine, text

class _MySqlStore:
    """MySQL 存储的公共基类。

    子类必须在访问 engine 之前设置 self.settings(提供 mysql_sync_uri)。
    子类: ChatHistoryStore, FeedbackStore, KnowledgeBaseVersionStore, IndexManifest
    """

    def __init__(self) -> None:
        self._engine = None

    @property
    def engine(self):
        """惰性创建的 SQLAlchemy 同步引擎。

        pool_pre_ping=True: 每次从连接池取连接前先发送 ping,
        避免 "MySQL server has gone away" 错误。
        """
        if self._engine is None:
            self._engine = create_engine(
                self.settings.mysql_sync_uri,
                pool_pre_ping=True
            )
        return self._engine

    def _execute_ddl(self, sql: str) -> None:
        """执行 DDL 语句(如 CREATE TABLE IF NOT EXISTS)。
        在事务内执行,自动提交。
        """
        with self.engine.begin() as conn:
            conn.execute(text(sql))

重点掌握

优先级 内容 原因
★★★ 必会 惰性引擎的 pool_pre_ping=True 设计 生产环境中 "MySQL gone away" 是高频问题
★★ 理解 _execute_ddl() 封装的 DDL 执行模式 被会话、反馈、知识库版本和入库 manifest 的建表逻辑复用
★ 了解 为什么不用 ORM 而用原生 SQL 系统项目优先可读性,ORM 增加学习成本

本讲小结

  • common.py 提供了项目级的 UTC 时间规范JSON 读写容错
  • utils.pystable_hash() 是基于 SHA-256 的 确定性 ID 生成器
  • 核心元数据进入 MySQL 控制面,JSON 读写工具只服务报告类产物
  • schemas.pyPydantic 定义请求/响应模型,FastAPI 自动校验
  • settings.pypydantic-settings 加载 .env 配置,@lru_cache 单例
  • memory/base.py_MySqlStore 提供了 惰性 MySQL 连接 的公共能力,被会话、反馈、版本控制和入库 manifest 共同复用