附录H:项目工具类开发详解
前置阅读:第 13 讲:应用入口与环境前置校验
本附录覆盖项目中所有跨模块复用的基础工具类,包括设计思路、实现细节和使用模式。
一、common.py:UTC 时间与 JSON 读写
设计思路
整个项目的时间戳统一使用 UTC 时间,避免时区混乱。所有需要持久化的时间(trace 记录、版本号、入库报告)都用 utc_now(),所有文件系统中的时间标识都用 utc_file_stamp()。
| # qa_core/common.py
import json
from datetime import datetime, timezone
from pathlib import Path
def utc_now() -> str:
"""返回 UTC ISO 时间字符串,用于版本管理、报告和 trace 的时间统一排序。
格式: '2025-01-15T10:30:00.123456+00:00'
"""
return datetime.now(timezone.utc).isoformat()
def utc_file_stamp() -> str:
"""返回适合放进文件名和版本号的 UTC 时间戳。
格式: '20250115_103000'
"""
return datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")
def path_updated_at(path: str | Path) -> str:
"""返回文件修改时间的 UTC ISO 字符串,用于报告列表的排序。"""
return datetime.fromtimestamp(Path(path).stat().st_mtime, timezone.utc).isoformat()
|
JSON 读写封装
项目统一使用 ensure_ascii=False, indent=2 格式写 JSON,保证中文字符可读、层级清晰。
| # qa_core/common.py
def read_json(path: str | Path, default: Any = None) -> Any:
"""读取 JSON 文件,读取失败时返回 default —— 避免文件损坏导致进程崩溃。"""
try:
return json.loads(Path(path).read_text(encoding="utf-8"))
except (json.JSONDecodeError, OSError):
return default
def read_json_dict(path: str | Path, default: dict[str, Any] | None = None) -> dict[str, Any]:
"""读取对象型 JSON,非对象或读取失败时返回字典默认值。
用于旧 manifest 迁移、质量报告等对象型 JSON 文件。
"""
payload = read_json(path, default=None)
if isinstance(payload, dict):
return payload
return dict(default or {})
def write_json(path: str | Path, payload: Any) -> str:
"""按项目统一格式写入 JSON。自动创建父目录。返回绝对路径。"""
output_path = Path(path)
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(
json.dumps(payload, ensure_ascii=False, indent=2),
encoding="utf-8"
)
return str(output_path)
|
列表报告工具
模板方法模式:通用的 JSON 文件扫描 + 按修改时间排序逻辑,被质量报告和状态页复用。
| def list_json_reports(root: Path, glob_pattern: str, *, limit: int = 20) -> list[dict[str, Any]]:
"""列出目录下最近的 JSON 报告。
每个条目包含 path / file_name / updated_at / payload 四个字段。
被 `quality/ingestion.py` 的列表页复用。
"""
if not root.exists():
return []
reports = []
for path in sorted(root.glob(glob_pattern),
key=lambda item: item.stat().st_mtime, reverse=True):
payload = read_json_dict(path)
reports.append({
"path": str(path), "file_name": path.name,
"updated_at": path_updated_at(path), "payload": payload,
})
if limit and len(reports) >= limit:
break
return reports
|
重点掌握
| 优先级 |
内容 |
原因 |
| ★★★ 必会 |
write_json() 的统一格式设计(ensure_ascii=False, indent=2) |
整个项目的持久化都依赖这个约定 |
| ★★★ 必会 |
read_json(path, default=...) 的容错模式 |
避免文件损坏导致进程崩溃 |
| ★★ 理解 |
utc_now() vs utc_file_stamp() 的使用场景区别 |
UTC 规范是系统间协作的基础 |
| ★ 了解 |
list_json_reports() 的模板方法模式 |
用于状态页报告列表 |
二、utils.py:稳定哈希与指纹
设计思路
入库和检索都需要稳定的 ID 生成——同一个文件重新入库时产生相同的 chunk_id,才能正确覆盖旧数据。stable_hash() 用 SHA-256 将任意数量的参数拼接后生成确定性的 hex 字符串。file_fingerprint() 只读文件元数据(路径 + 修改时间 + 大小),不读内容,大幅减少增量入库的 IO。
| # qa_core/utils.py
import hashlib
import os
from pathlib import Path
def stable_hash(*parts: object) -> str:
"""根据任意值创建稳定的 SHA-256 hex 字符串。
关键特性:相同输入永远产生相同输出(确定性)。
用于生成 chunk_id、faq_id、parent_id 等 Milvus 主键。
例: stable_hash("hr_v2", "/data/入职流程.md", "chunk_3")
→ 'a1b2c3d4e5f6...' (64 位 hex)
"""
raw = "||".join("" if part is None else str(part) for part in parts)
return hashlib.sha256(raw.encode("utf-8")).hexdigest()
def file_fingerprint(path: str | Path) -> str:
"""根据文件路径、修改时间和大小生成指纹,用于增量入库的变化判断。
为什么不用内容哈希 (SHA-256 of content)?
- 大文件(几十 MB 的 PDF)读内容做哈希太慢
- 本地机器 CPU 有限,每轮入库读全量文件成本太高
- 路径 + mtime + size 的组合在实际使用中已经足够可靠
"""
p = Path(path)
stat = p.stat()
return stable_hash(str(p.resolve()), stat.st_mtime_ns, stat.st_size)
def normalize_source_from_path(path: str | Path) -> str:
"""从 '<source>_data' 目录名提取 source 标识。
例: 'scenarios/enterprise_knowledge/data/hr_data' → 'hr'
'data/legal_data' → 'legal'
"""
name = os.path.basename(str(path)).replace("_data", "")
return name or "default"
|
重点掌握
| 优先级 |
内容 |
原因 |
| ★★★ 必会 |
stable_hash() 的确定性设计 |
chunk_id/faq_id 的稳定性依赖于此 |
| ★★★ 必会 |
file_fingerprint() 为什么用 mtime+size 而不是内容哈希 |
IO 成本与可靠性之间的工程权衡 |
| ★★ 理解 |
normalize_source_from_path() 的命名约定 |
入库脚本的 source 推断依赖目录命名规范 |
三、元数据 Store:MySQL 控制面
设计思路
当前项目的关键元数据已经进入 MySQL:知识库版本状态保存在 kb_versions / kb_active_versions,文档增量入库状态保存在 kb_document_manifests。这样版本激活、回滚、入库统计和增量跳过都可以被 API、脚本、测试和容器环境共享,不再依赖某个本地目录里的 JSON 文件。
质量报告仍然会使用 common.py 中的 JSON 读写工具生成可检查产物,但版本控制面和入库 manifest 只写 MySQL,避免同一份状态在文件和数据库之间分叉。
| # qa_core/governance/kb_versions.py
class KnowledgeBaseVersionStore(_MySqlStore):
"""知识库版本状态机的 MySQL 存储实现。"""
def ensure_tables(self) -> None:
"""创建 kb_versions / kb_active_versions。"""
def activate_version(self, kb_version: str) -> KnowledgeBaseVersion:
"""事务内切换 active 指针,不修改 Milvus 数据。"""
|
| # qa_core/indexing/manifest.py
class IndexManifest(_MySqlStore):
"""文档增量入库清单的 MySQL 存储实现。"""
def update(self, source, file_path, fingerprint, chunk_ids, *, scenario_id, kb_version):
"""记录文件指纹、kb_version、chunk_ids 和模型版本。"""
def iter_records(self, *, scenario_id=None, source=None, kb_version=None):
"""按 scenario_id/source/kb_version 查询已入库文件。"""
|
重点掌握
| 优先级 |
内容 |
原因 |
| ★★★ 必会 |
版本控制面和入库 manifest 进入 MySQL |
保证 API、脚本、测试、Docker 部署看到同一份状态 |
| ★★ 理解 |
JSON 工具只用于报告类产物 |
把运行时状态和可读报告分开,降低维护成本 |
四、schemas.py:Pydantic 数据模型
设计思路
所有 API 层的请求/响应都用 Pydantic BaseModel 定义,FastAPI 自动做参数校验和序列化。内部流转的数据结构用 Python dataclass(性能更好,不需要校验)。
| # qa_core/schemas.py
from pydantic import BaseModel, Field
class RetrievalDebugRequest(BaseModel):
"""HTTP 检索诊断请求体。在线问答不复用该模型。"""
query: str = Field(..., min_length=1) # 必填,最短 1 字符
source_filter: str | None = None # 前端选择的业务分类
session_id: str | None = None # 会话 ID,未传则自动生成
scenario_id: str | None = None # 场景 ID,未传用默认场景
tenant_id: str | None = None # 租户 ID,未传默认 "default"
dataset_id: str | None = None # 数据集 ID,未传默认 "default"
visibility: str | None = None # 可见级别,未传默认 "public"
user_role: str | None = None # 单一角色(前端简化入口)
user_roles: list[str] = Field(default_factory=list) # 多角色(正式入口)
kb_version: str | None = None # 知识库版本,未传用 active 版本
class FeedbackRequest(BaseModel):
"""用户反馈载荷。rating 约束为 useful/not_useful,不允许自定义值。"""
session_id: str | None = None
question: str = Field(..., min_length=1)
answer: str = Field(..., min_length=1)
rating: str = Field(..., pattern="^(useful|not_useful)$") # 仅允许这两个值
comment: str | None = None
sources: list[dict] = Field(default_factory=list)
class RetrievalDebugResponse(BaseModel):
"""检索调试响应,不包含最终答案,faq 和 doc 来源分开返回。"""
query: str
rewritten_query: str
source_filter: str | None = None
scenario_id: str | None = None
tenant_id: str | None = None
dataset_id: str | None = None
visibility: str | None = None
data_scope: dict | None = None
kb_version: str | None = None
intent: dict
retrieval_plan: dict
faq_sources: list[dict] = Field(default_factory=list)
doc_sources: list[dict] = Field(default_factory=list)
|
内部流转的 dataclass
| # qa_core/schemas.py (节选)
from dataclasses import dataclass, field
@dataclass
class RetrievalHit:
"""一次检索命中的封装:document + score。"""
document: Any # LangChain Document 对象
score: float
@dataclass
class RetrievalResult:
"""一次完整检索的结果:命中列表 + 元信息。"""
hits: list[RetrievalHit] = field(default_factory=list)
query: str = ""
source_type: str = "" # "faq" 或 "doc"
elapsed_ms: float = 0.0
@property
def top_score(self) -> float | None:
"""最高分,无命中时为 None。"""
return self.hits[0].score if self.hits else None
|
重点掌握
| 优先级 |
内容 |
原因 |
| ★★★ 必会 |
RetrievalDebugRequest 的所有字段含义 |
是 /api/retrieval/debug 的诊断请求结构 |
| ★★★ 必会 |
RetrievalDebugResponse 的字段含义 |
诊断检索质量时要看意图、计划、FAQ/Doc 命中 |
| ★★ 理解 |
FeedbackRequest.rating 的 pattern 约束 |
防止前端传自定义值污染数据 |
| ★ 了解 |
RetrievalHit / RetrievalResult 的 dataclass 设计 |
内部流转的数据结构 |
五、config/settings.py:配置管理
设计思路
使用 pydantic-settings 从进程环境变量和本机 .env 加载全部配置。Docker Compose 部署时,.env.compose 先由 Compose 注入到 API 容器的环境变量里,再由 Settings 读取。每个配置项有明确的默认值,注释标注了用途。model_config 设置 env_file=".env",用于本机 API 调试;容器模式不要把 .env.compose 复制成 .env。
| # qa_core/config/settings.py (关键字段)
from pydantic_settings import BaseSettings, SettingsConfigDict
from pydantic import Field
class Settings(BaseSettings):
model_config = SettingsConfigDict(env_file=".env", case_sensitive=False)
# ── 场景 ──
active_scenario_id: str = Field(default="enterprise_knowledge", ...)
scenario_config_dir: str = Field(default="scenarios", ...)
# ── Milvus ──
milvus_uri: str = Field(default="http://localhost:19530", ...)
milvus_database: str = Field(default="", ...)
# ── MySQL ──
mysql_host: str = Field(default="localhost", ...)
mysql_database: str = Field(default="qa_system", ...)
# ── LLM (DashScope / OpenAI 兼容) ──
llm_api_key: str = Field(default="", validation_alias="DASHSCOPE_API_KEY")
llm_base_url: str = Field(default="https://dashscope.aliyuncs.com/compatible-mode/v1", ...)
llm_model: str = Field(default="qwen-plus", ...)
llm_temperature: float = Field(default=0.1, ge=0.0, le=1.0)
llm_timeout: int = Field(default=60, ge=1)
# ── 检索参数(所有默认值可用于 .env 覆盖)──
faq_top_k: int = Field(default=20, ge=1)
doc_top_k: int = Field(default=20, ge=1)
faq_direct_score_threshold: float = Field(default=0.72, ge=0.0, le=1.0)
max_context_chars: int = Field(default=6000, ge=1)
max_context_doc_chars: int = Field(default=1600, ge=1)
short_query_max_chars: int = Field(default=20, ge=1)
# ── 历史摘要 ──
history_summary_enabled: bool = Field(default=True)
history_summary_after_messages: int = Field(default=14, ge=1)
@lru_cache(maxsize=1)
def get_settings() -> Settings:
"""进程级 Settings 单例。lru_cache 避免重复读取运行时配置。"""
return Settings()
|
重点掌握
| 优先级 |
内容 |
原因 |
| ★★★ 必会 |
get_settings() 的 @lru_cache(maxsize=1) 单例模式 |
整个项目的配置入口 |
| ★★★ 必会 |
validation_alias(如 DASHSCOPE_API_KEY)的用途 |
.env 变量名与代码字段名的映射 |
| ★★ 理解 |
检索参数组的默认值和可覆盖性 |
调整这些参数不需要改代码 |
| ★ 了解 |
model_config 的 case_sensitive=False |
Windows/Linux 环境变量兼容 |
六、config/logging_config.py:结构化日志
设计思路
项目统一使用 Python 标准 logging + 自定义格式化器,输出 JSON 格式的结构化日志。get_logger(name) 为每个模块提供带模块名的 logger 实例。
| # qa_core/config/logging_config.py
import logging
import json
from datetime import datetime, timezone
class JsonFormatter(logging.Formatter):
def format(self, record):
return json.dumps({
"time": datetime.now(timezone.utc).isoformat(),
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
}, ensure_ascii=False)
def get_logger(name: str) -> logging.Logger:
logger = logging.getLogger(name)
if not logger.handlers:
handler = logging.StreamHandler()
handler.setFormatter(JsonFormatter())
logger.addHandler(handler)
logger.setLevel(logging.INFO)
return logger
|
重点掌握
| 优先级 |
内容 |
原因 |
| ★★ 理解 |
JSON 格式化日志的设计目的 |
方便 log 分析工具解析 |
| ★ 了解 |
get_logger(name) 的 handler 防重复逻辑 |
避免多次调用时添加重复 handler |
七、memory/base.py:MySQL 惰性连接基类
设计思路
ChatHistoryStore、FeedbackStore、KnowledgeBaseVersionStore 和 IndexManifest 都需要 MySQL 连接,但不是每个请求都立即需要。_MySqlStore 基类提供惰性初始化的引擎属性——第一次访问 engine 时才创建连接,创建后缓存复用。pool_pre_ping=True 在连接被 MySQL 服务端断开后自动探活重连。
| # qa_core/memory/base.py
from sqlalchemy import create_engine, text
class _MySqlStore:
"""MySQL 存储的公共基类。
子类必须在访问 engine 之前设置 self.settings(提供 mysql_sync_uri)。
子类: ChatHistoryStore, FeedbackStore, KnowledgeBaseVersionStore, IndexManifest
"""
def __init__(self) -> None:
self._engine = None
@property
def engine(self):
"""惰性创建的 SQLAlchemy 同步引擎。
pool_pre_ping=True: 每次从连接池取连接前先发送 ping,
避免 "MySQL server has gone away" 错误。
"""
if self._engine is None:
self._engine = create_engine(
self.settings.mysql_sync_uri,
pool_pre_ping=True
)
return self._engine
def _execute_ddl(self, sql: str) -> None:
"""执行 DDL 语句(如 CREATE TABLE IF NOT EXISTS)。
在事务内执行,自动提交。
"""
with self.engine.begin() as conn:
conn.execute(text(sql))
|
重点掌握
| 优先级 |
内容 |
原因 |
| ★★★ 必会 |
惰性引擎的 pool_pre_ping=True 设计 |
生产环境中 "MySQL gone away" 是高频问题 |
| ★★ 理解 |
_execute_ddl() 封装的 DDL 执行模式 |
被会话、反馈、知识库版本和入库 manifest 的建表逻辑复用 |
| ★ 了解 |
为什么不用 ORM 而用原生 SQL |
系统项目优先可读性,ORM 增加学习成本 |
本讲小结
common.py 提供了项目级的 UTC 时间规范 和 JSON 读写容错
utils.py 的 stable_hash() 是基于 SHA-256 的 确定性 ID 生成器
- 核心元数据进入 MySQL 控制面,JSON 读写工具只服务报告类产物
schemas.py 用 Pydantic 定义请求/响应模型,FastAPI 自动校验
settings.py 用 pydantic-settings 加载 .env 配置,@lru_cache 单例
memory/base.py 的 _MySqlStore 提供了 惰性 MySQL 连接 的公共能力,被会话、反馈、版本控制和入库 manifest 共同复用