llm_evaluate_sources.py 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631
  1. """
  2. 基于 LLM 的 source 知识质量评估
  3. 对 source.json 中已匹配的帖子逐条做 rubric 评估,严格参照
  4. test_script/evaluation/知识质量评估-rubric.{json,md} 的 **post 部分**:
  5. 先把帖子分到 procedure / step / tool(可多标签),再按"通用 + 命中类型"维度
  6. 各打 1-5,最后给出 report / discard 决策。
  7. 设计要点:
  8. - rubric 在运行时从文件加载并整段嵌入 prompt(rubric 改了评估自动跟随)。
  9. - prompt 各块从 eval_prompt_template.md 加载、按 str.format 填充(template 改了无需改代码)。
  10. - LLM 输出严格匹配 rubric 的 `post.output` 结构,写回到 source 的 `llm_evaluation` 字段。
  11. - decision=discard 的帖子从 source.json 移入 filtered_cases.json(filter_reason=llm_discard:...)。
  12. - 增量:已带 llm_evaluation 的 report 帖跳过;之前已 LLM 拒过的帖(被 extract 重新匹配回来)
  13. 短路直接再剔除,不重复花钱。
  14. - fail-open:单帖 LLM 调用失败 → 保留该帖并标记,不因瞬时错误丢内容。
  15. 入口:evaluate_sources_with_llm(source_file, llm_call, model, requirement, ...) -> stats dict
  16. 由 run_pipeline.py 的 source 阶段在规则预筛(extract_sources)之后调用。
  17. """
  18. import asyncio
  19. import json
  20. import logging
  21. import re
  22. import sys
  23. from pathlib import Path
  24. from typing import Any, Callable, Dict, List, Optional, Tuple
  25. # 直接当脚本跑时(python .../llm_evaluate_sources.py)需要项目根在 sys.path,
  26. # 才能 import examples.* ;被 run_pipeline 作为模块导入时路径已就绪,此处无副作用。
  27. _PROJECT_ROOT = Path(__file__).resolve().parents[3]
  28. if str(_PROJECT_ROOT) not in sys.path:
  29. sys.path.insert(0, str(_PROJECT_ROOT))
  30. from examples.process_pipeline.script.llm_helper import call_llm_with_retry
  31. logger = logging.getLogger(__name__)
  32. # prompt 模板 (mod.md 风格的 SYSTEM + USER 两块):主源放在 script/search_eval/ 下,
  33. # 跟 batch_3forms / server / eval_one_sample 同目录(IDE 编辑那份就是 runtime 实际读的版本)。
  34. # 若主源缺失则退到 test_script/search_eval/ 历史副本。首次加载后缓存。
  35. # 注:原 rubric.json / rubric.md 已固化进本 template,不再外部 load(_RUBRIC_* / load_post_rubric
  36. # / load_rubric_md 一并清掉)。
  37. _PROMPT_TEMPLATE_PATH = (
  38. Path(__file__).resolve().parent / "search_eval" / "eval_prompt_template.md"
  39. )
  40. if not _PROMPT_TEMPLATE_PATH.exists():
  41. _PROMPT_TEMPLATE_PATH = (
  42. Path(__file__).resolve().parent.parent / "test_script" / "search_eval" / "eval_prompt_template.md"
  43. )
  44. _PROMPT_TEMPLATE_CACHE: Optional[Dict[str, str]] = None
  45. # mod.md 风格的中文 schema 知识类型枚举值(取代了旧 英文 procedure/step/tool)
  46. _VALID_KNOWLEDGE_TYPES = {"工序", "能力", "工具"}
  47. _MAX_BODY_CHARS = 8000 # 控制单帖 prompt token:正文/字幕截断上限
  48. # 上次评估的产物字段——dump source JSON 给 LLM 时必须剥掉,
  49. # 否则 LLM 会"先验 anchoring"到旧分数,新评估失真。
  50. _EVAL_PRODUCT_FIELDS = frozenset({
  51. "llm_evaluation", # 之前 LLM 给的整段评估结果(scores/decision/reason/...)
  52. "images_sent", # 评估时记录"实际发了几张图"——管线元数据,LLM 评内容无关
  53. })
  54. # ── 评估模型选择(可切换)────────────────────────────────────────────────────────
  55. # key -> (backend, model_id)。backend="qwen" 走 create_qwen_llm_call(阿里云原生),
  56. # 其余走 create_openrouter_llm_call。run_pipeline.py / search_and_evaluate.py 共用此工厂。
  57. EVAL_MODELS: Dict[str, Tuple[str, str]] = {
  58. "qwen": ("qwen", "qwen3.5-plus"),
  59. "sonnet": ("openrouter", "claude-sonnet-4-6"),
  60. "gemini-flash": ("openrouter", "google/gemini-3-flash-preview"),
  61. "gemini-3.5-flash": ("openrouter", "google/gemini-3.5-flash"), # flash 正式版 (3.5)
  62. "gemini-flash-lite": ("openrouter", "google/gemini-3.1-flash-lite"), # 最快,适合大批量评估
  63. "gemini": ("openrouter", "google/gemini-3-flash-preview"), # 别名 → gemini-flash
  64. "gpt": ("openrouter", "gpt-5.4"),
  65. }
  66. DEFAULT_EVAL_MODEL = "gemini-flash-lite"
  67. def build_eval_llm_call(choice: str) -> Tuple[Callable, str]:
  68. """根据选择返回 (llm_call, model_id)。
  69. choice 可以是 EVAL_MODELS 的 key(qwen/sonnet/gemini/gpt),
  70. 也可以直接传模型 id(含 'qwen' 走 qwen 后端,否则走 OpenRouter)。
  71. """
  72. if choice in EVAL_MODELS:
  73. backend, model_id = EVAL_MODELS[choice]
  74. else:
  75. model_id = choice
  76. backend = "qwen" if "qwen" in choice.lower() else "openrouter"
  77. if backend == "qwen":
  78. from agent.llm import create_qwen_llm_call
  79. return create_qwen_llm_call(model=model_id), model_id
  80. from agent.llm.openrouter import create_openrouter_llm_call
  81. return create_openrouter_llm_call(model=model_id), model_id
  82. # ── prompt 模板加载 ─────────────────────────────────────────────────────────────
  83. def load_prompt_template() -> Dict[str, str]:
  84. """解析 eval_prompt_template.md 成 {BLOCK_NAME: template_str}。
  85. 分隔符 `=== BLOCK_NAME ===` 单独成行 → 下一块开始;第一个分隔符之前的文件头当注释跳过;
  86. **块内所有行字面保留**(包括 `#` 开头的 markdown 标题)——因为 template 内嵌了完整的
  87. rubric markdown 和 JSON schema,不能再剥 `#` 行(否则 H1/H2 标题被吃)。
  88. 模板正文里如有 `{var}` 占位符,由 _build_eval_messages 在该块上调 str.format 填充。
  89. 含字面 `{` / `}` 的块(如 USER_RUBRIC_JSON)不能走 .format(会爆 KeyError),代码里
  90. 对它们直接取字面。
  91. 首次加载后缓存在 _PROMPT_TEMPLATE_CACHE;改模板需重启 server。
  92. 解析失败直接抛——prompt 模板是评估链路核心,没它评不了,不做 fail-open。
  93. """
  94. global _PROMPT_TEMPLATE_CACHE
  95. if _PROMPT_TEMPLATE_CACHE is not None:
  96. return _PROMPT_TEMPLATE_CACHE
  97. text = _PROMPT_TEMPLATE_PATH.read_text(encoding="utf-8")
  98. blocks: Dict[str, List[str]] = {}
  99. current: Optional[str] = None
  100. for line in text.splitlines():
  101. m = re.match(r"^===\s+([A-Z_]+)\s+===\s*$", line)
  102. if m:
  103. current = m.group(1)
  104. blocks[current] = []
  105. continue
  106. if current is None:
  107. continue # 第一个 === 之前是文件头说明,跳过
  108. blocks[current].append(line)
  109. # 各块 join 回字符串,strip 掉首尾空行(块之间衔接由调用方加 \n\n)
  110. _PROMPT_TEMPLATE_CACHE = {k: "\n".join(v).strip("\n") for k, v in blocks.items()}
  111. return _PROMPT_TEMPLATE_CACHE
  112. # ── 帖子内容格式化(喂给 LLM)──────────────────────────────────────────────────
  113. def _extract_author(post: Dict[str, Any]) -> str:
  114. return (
  115. post.get("author")
  116. or post.get("channel_account_name")
  117. or post.get("channel")
  118. or ""
  119. )
  120. def _format_post_for_eval(source: Dict[str, Any]) -> str:
  121. """把一条 source 序列化为 JSON 字符串供 LLM 评估。
  122. 现代 LLM 读结构化 JSON 比读"标签:值"自然语言更准——字段名直接告诉它语义,
  123. 不需要靠 prompt 工程把字段标签写漂亮。所以直接 dump source 整段。
  124. 正文做一处截断防 token 爆:
  125. - post.body_text 截到 _MAX_BODY_CHARS(个别帖子正文几万字)
  126. 其余字段全量给 LLM,由它自行判断哪些有用(如 images URL / channel_account_id 等)。
  127. """
  128. # 浅拷贝避免修改调用方的 source
  129. s = dict(source)
  130. post = dict(s.get("post") or {})
  131. body = post.get("body_text") or post.get("desc") or ""
  132. if isinstance(body, str) and len(body) > _MAX_BODY_CHARS:
  133. post["body_text"] = body[:_MAX_BODY_CHARS] + f"\n…(正文已截断,原长 {len(body)} 字)"
  134. s["post"] = post
  135. # 剥掉两类不该进 prompt 的字段:
  136. # ① `_` 前缀内部字段(如 _quality_grade / _image_data_urls)——管线元数据,无信息量
  137. # ② _EVAL_PRODUCT_FIELDS(如 llm_evaluation / images_sent)——上次评估的产物,
  138. # 喂回新评估会让 LLM"先验 anchoring"到旧分数,新评估失真
  139. s = {k: v for k, v in s.items()
  140. if not str(k).startswith("_") and k not in _EVAL_PRODUCT_FIELDS}
  141. return json.dumps(s, ensure_ascii=False, indent=2)
  142. # ── prompt 构建(从模板各块组装) ─────────────────────────────────────────────
  143. def _build_eval_messages(
  144. requirement: str,
  145. post_block: str,
  146. image_urls: Optional[List[str]] = None,
  147. query: Optional[str] = None,
  148. ) -> List[Dict[str, Any]]:
  149. """从 eval_prompt_template.md (mod.md 风格) 拼出 system + user message。
  150. template 只有 SYSTEM + USER 两块——USER 块内含 检索词 / 知识类型分类 / 输出 schema /
  151. 待评估帖子 / 注意事项 的完整 markdown。两个占位符 {query} 和 {post_block} 用 .replace
  152. 替换 (不走 .format,因 schema JSON 含字面 `{`/`}` 会触发 .format KeyError)。
  153. requirement 仅作为 query 缺失时的降级 fallback。
  154. """
  155. tpl = load_prompt_template()
  156. system = tpl["SYSTEM"]
  157. # query 优先,requirement 降级,都没就给占位描述
  158. effective_query = query or requirement or "(未指定检索词)"
  159. user_text = tpl["USER"].replace("{query}", effective_query).replace("{post_block}", post_block)
  160. if image_urls:
  161. # 多模态:把帖子图片随文本一起发给模型(模板末尾已有『请结合配图判断』提示)
  162. user_content: List[Dict[str, Any]] = [{"type": "text", "text": user_text}]
  163. for u in image_urls:
  164. user_content.append({"type": "image_url", "image_url": {"url": u}})
  165. return [
  166. {"role": "system", "content": system},
  167. {"role": "user", "content": user_content},
  168. ]
  169. return [
  170. {"role": "system", "content": system},
  171. {"role": "user", "content": user_text},
  172. ]
  173. def _validate_eval(data: Dict[str, Any]) -> Optional[str]:
  174. """对 LLM 评估输出做最小结构校验(新版 mod.md schema)。返回错误描述或 None。
  175. 校验字段(宽松,避免重试爆炸):
  176. - 知识类型: list, 值 ∈ {工序, 能力, 工具}(步骤→能力 软修复)
  177. - 相关性: {和内容制作知识相关, 和 query 相关} 各自 {得分, 理由}
  178. - 质量: {固定维度, 动态维度} 至少一个;具体子字段不强校验
  179. 时效性不校验 —— 由 _calc_recency_score 注入, LLM 不该输出。
  180. 脱壳:有些模型会把结果多套一层 {"output": {...}}, 这里就地解包。
  181. """
  182. if not isinstance(data, dict):
  183. return "输出不是 JSON 对象"
  184. # 脱壳: 有些模型(gemini-flash-lite 等)会照着 schema 多套一层 {"output": {...}}
  185. if "知识类型" not in data and isinstance(data.get("output"), dict) \
  186. and "知识类型" in data["output"]:
  187. inner = data["output"]
  188. data.clear()
  189. data.update(inner)
  190. # 英文 key fallback: LLM 偶发回归英文 schema, 自动 rename 成中文
  191. _EN_TO_CN = {
  192. "knowledge_type": "知识类型",
  193. "relevance": "相关性",
  194. "quality": "质量",
  195. }
  196. for en, cn in _EN_TO_CN.items():
  197. if cn not in data and en in data:
  198. data[cn] = data.pop(en)
  199. # 知识类型: 允许为空/缺失/非法; 软修复:
  200. # ① 字符串 → list ② 英文/旧 步骤 → 中文新枚举 (procedure→工序 / step→能力 / 步骤→能力)
  201. # ③ 空 list 时反推: 看 质量.动态维度 里 LLM 填了哪些类型子块
  202. # ④ 非法值过滤掉
  203. _KT_REMAP = {
  204. "procedure": "工序", "step": "能力", "tool": "工具",
  205. "步骤": "能力", # 旧 schema 兼容
  206. }
  207. kt = data.get("知识类型")
  208. if isinstance(kt, str):
  209. kt = [kt]
  210. if isinstance(kt, list):
  211. kt = [_KT_REMAP.get(k, k) for k in kt]
  212. kt = [k for k in kt if k in _VALID_KNOWLEDGE_TYPES]
  213. else:
  214. kt = []
  215. if not kt:
  216. dyn = ((data.get("质量") or {}).get("动态维度") or {})
  217. inferred = [t for t in ("工序", "能力", "工具")
  218. if isinstance(dyn.get(t), dict) and dyn[t]]
  219. if inferred:
  220. kt = inferred
  221. data["知识类型"] = kt
  222. # 相关性: 两子键必须各为 {得分, 理由} 对象, 得分 0-10
  223. rel = data.get("相关性")
  224. if not isinstance(rel, dict):
  225. return "相关性 必须是对象"
  226. for sub_key in ("和内容制作知识相关", "和 query 相关"):
  227. sub = rel.get(sub_key)
  228. if not isinstance(sub, dict):
  229. return f"相关性.{sub_key!r} 必须是 {{得分, 理由}} 对象"
  230. try:
  231. v = float(sub.get("得分"))
  232. except (TypeError, ValueError):
  233. return f"相关性.{sub_key}.得分 缺失或不是数字 (需 0-10)"
  234. if not (0 <= v <= 10):
  235. return f"相关性.{sub_key}.得分 必须在 0-10, 得到 {sub.get('得分')!r}"
  236. # 质量: 必须是 dict, 至少包含 固定维度 或 动态维度
  237. q = data.get("质量")
  238. if not isinstance(q, dict):
  239. return "质量 必须是对象"
  240. if not isinstance(q.get("固定维度"), dict) and not isinstance(q.get("动态维度"), dict):
  241. return "质量 必须至少包含 固定维度 或 动态维度"
  242. # ── 采纳门槛依据维度必填(其余子字段仍不强校验)──────────────────────────────
  243. # 背景:部分模型(实测 gemini-3.5-flash)会静默漏掉判罚维度。校验放行后 _repro_score
  244. # 取不到值→门槛失效→低质帖被误采纳。故对门槛依据的两维强校验,缺失/非数字即触发重试:
  245. # · 意图可控性(固定维度)
  246. # · 实现完整性(动态维度·命中的 工序/能力·字段完整性) —— 承载可复现性封顶规则
  247. fixed = q.get("固定维度") if isinstance(q.get("固定维度"), dict) else {}
  248. dyn = q.get("动态维度") if isinstance(q.get("动态维度"), dict) else {}
  249. def _need_score(node, path):
  250. if isinstance(node, dict):
  251. node = node.get("得分")
  252. try:
  253. v = float(node)
  254. except (TypeError, ValueError):
  255. return f"{path}.得分 缺失或非数字(必填,采纳门槛依据)"
  256. return None if 0 <= v <= 10 else f"{path}.得分 必须在 0-10, 得到 {node!r}"
  257. err = _need_score(fixed.get("意图可控性"), "质量.固定维度.意图可控性")
  258. if err:
  259. return err
  260. for t in data["知识类型"]:
  261. if t not in ("工序", "能力"):
  262. continue
  263. impl = ((dyn.get(t) or {}).get("字段完整性") or {}).get("实现完整性")
  264. err = _need_score(impl, f"质量.动态维度.{t}.字段完整性.实现完整性")
  265. if err:
  266. return err
  267. return None
  268. # ── 时效性: 代码计算, 不让 LLM 评 ────────────────────────────────────────────────
  269. def _calc_recency_score(publish_ts: str) -> int:
  270. """按发布日期 vs 今日的天数差打 0-10 阶梯分:
  271. ≤30d=10 / ≤90d=8 / ≤180d=6 / ≤270d=3 / ≤365d=1 / >365d=0
  272. 无法解析时间戳 → 0 (保守判过期)。"""
  273. from datetime import datetime
  274. try:
  275. d = datetime.strptime((publish_ts or "")[:10], "%Y-%m-%d")
  276. except (ValueError, TypeError):
  277. return 0
  278. days = (datetime.now() - d).days
  279. if days <= 30: return 10
  280. if days <= 90: return 8
  281. if days <= 180: return 6
  282. if days <= 270: return 3
  283. if days <= 365: return 1
  284. return 0
  285. def _inject_recency(data: Dict[str, Any], source: Dict[str, Any]) -> None:
  286. """LLM 输出后注入时效性到 质量.固定维度.时效性.{得分}, 无 理由 (代码算的不解释)。"""
  287. if not isinstance(data, dict):
  288. return
  289. score = _calc_recency_score((source.get("post") or {}).get("publish_timestamp", ""))
  290. q = data.setdefault("质量", {})
  291. if not isinstance(q, dict):
  292. return
  293. fixed = q.setdefault("固定维度", {})
  294. if not isinstance(fixed, dict):
  295. return
  296. fixed["时效性"] = {"得分": score}
  297. # ── 单帖评估 ────────────────────────────────────────────────────────────────────
  298. def _source_key(source: Dict[str, Any]) -> Tuple[Any, Any]:
  299. return (source.get("platform"), source.get("channel_content_id"))
  300. def _move_to_discard(
  301. source: Dict[str, Any],
  302. discarded: List[Dict[str, Any]],
  303. reason: Optional[str],
  304. ) -> None:
  305. """把一条 source 标上 llm_discard 原因并加入 discarded 列表(仅淘汰模式调用)。"""
  306. s_copy = dict(source)
  307. r = (reason or "").replace("\n", " ")[:120]
  308. s_copy["filter_reason"] = f"llm_discard:{r}" if r else "llm_discard"
  309. discarded.append(s_copy)
  310. async def _evaluate_one(
  311. source: Dict[str, Any],
  312. requirement: str,
  313. llm_call: Callable,
  314. model: str,
  315. sem: asyncio.Semaphore,
  316. image_urls: Optional[List[str]] = None,
  317. query: Optional[str] = None,
  318. ) -> Tuple[Optional[Dict[str, Any]], float]:
  319. """评估单条 source,返回 (llm_evaluation, cost)。失败返回 (None, cost)。
  320. image_urls 非空时走多模态评估(把帖子配图一并发给模型,需模型支持图片,如 gemini)。
  321. query 非空时把它作为检索锚点喂给模型(判相关性看『这帖是否回答了这个检索词』)。
  322. rubric 详解 / 输出 schema 已固化进 eval_prompt_template.md,本函数不再传 rubric 参数。
  323. """
  324. post_block = _format_post_for_eval(source)
  325. messages = _build_eval_messages(requirement, post_block, image_urls, query)
  326. async with sem:
  327. data, cost = await call_llm_with_retry(
  328. llm_call=llm_call,
  329. messages=messages,
  330. model=model,
  331. temperature=0.1,
  332. max_tokens=2000,
  333. validate_fn=_validate_eval,
  334. task_name=f"LLM-Eval[{source.get('case_id', '?')}]",
  335. )
  336. # 时效性走代码注入 (LLM 不评), 写到 质量.固定维度.时效性.{得分}
  337. _inject_recency(data, source)
  338. return data, cost
  339. # ── filtered_cases.json 追加 ────────────────────────────────────────────────────
  340. def _append_to_filtered(raw_cases_dir: Path, discarded: List[Dict[str, Any]]) -> None:
  341. """把 LLM 判为 discard 的帖子并入 filtered_cases.json(与 extract_sources 同结构,按原因分组)。"""
  342. if not discarded:
  343. return
  344. filtered_file = raw_cases_dir / "filtered_cases.json"
  345. existing: List[Dict[str, Any]] = []
  346. existing_ids: set = set()
  347. if filtered_file.exists():
  348. try:
  349. with open(filtered_file, "r", encoding="utf-8") as f:
  350. data = json.load(f)
  351. for group in data.get("by_reason", {}).values():
  352. for s in group.get("sources", []):
  353. existing.append(s)
  354. existing_ids.add((s.get("platform"), s.get("channel_content_id")))
  355. except Exception as e:
  356. logger.warning("读取已有 filtered_cases.json 失败: %s", e)
  357. for s in discarded:
  358. key = (s.get("platform"), s.get("channel_content_id"))
  359. if key not in existing_ids:
  360. existing.append(s)
  361. existing_ids.add(key)
  362. by_reason: Dict[str, List[Dict[str, Any]]] = {}
  363. for s in existing:
  364. category = (s.get("filter_reason") or "unknown").split(":", 1)[0]
  365. by_reason.setdefault(category, []).append(s)
  366. output = {
  367. "total": len(existing),
  368. "by_reason": {
  369. cat: {"count": len(items), "sources": items}
  370. for cat, items in by_reason.items()
  371. },
  372. }
  373. with open(filtered_file, "w", encoding="utf-8") as f:
  374. json.dump(output, f, ensure_ascii=False, indent=2)
  375. def _load_prior_llm_discards(raw_cases_dir: Path) -> set:
  376. """从 filtered_cases.json 收集之前已被 LLM 拒过的帖子 key,避免重复评估花钱。"""
  377. filtered_file = raw_cases_dir / "filtered_cases.json"
  378. keys: set = set()
  379. if not filtered_file.exists():
  380. return keys
  381. try:
  382. with open(filtered_file, "r", encoding="utf-8") as f:
  383. data = json.load(f)
  384. for category, group in data.get("by_reason", {}).items():
  385. if category != "llm_discard":
  386. continue
  387. for s in group.get("sources", []):
  388. keys.add((s.get("platform"), s.get("channel_content_id")))
  389. except Exception as e:
  390. logger.warning("读取 filtered_cases.json 失败: %s", e)
  391. return keys
  392. # ── 主入口 ────────────────────────────────────────────────────────────────────
  393. async def evaluate_sources_with_llm(
  394. source_file: Path,
  395. llm_call: Callable,
  396. model: str,
  397. requirement: str,
  398. max_concurrent: int = 3,
  399. apply_decision: bool = False,
  400. ) -> Dict[str, Any]:
  401. """
  402. 对 source.json 中所有 source 逐条做 LLM rubric 评估,把结果写进每条的 `llm_evaluation` 字段。
  403. apply_decision(淘汰开关,默认 False —— 当前只标注不淘汰):
  404. - False(标注模式):评估并标注所有帖子,**全部保留在 source.json**,
  405. 不据 decision 剔除、不动 filtered_cases.json。
  406. (rubric 阈值/权重尚待标定,先采集评分数据,淘汰留待标定后再开。)
  407. - True(淘汰模式):decision=discard 的帖子移入 filtered_cases.json 并从 source.json 删除,
  408. 历史已拒帖短路再剔除。将来阈值标定好后由调用方打开此开关即可启用门槛。
  409. Returns 统计 dict:
  410. evaluated —— 本次真正调用 LLM 的条数
  411. reported —— decision=report 的总数
  412. would_discard —— decision=discard 的总数(标注模式下仅统计、不实际剔除)
  413. discarded —— 实际从 source.json 移除的条数(标注模式恒为 0)
  414. skipped —— 跳过的条数(已评过复用 + 淘汰模式下历史拒帖短路)
  415. total_cost —— LLM 调用累计成本
  416. llm_discard_details —— [{case_id, platform, title, filter_reason}],淘汰模式下供研究反馈引用
  417. """
  418. source_file = Path(source_file)
  419. raw_cases_dir = source_file.parent
  420. with open(source_file, "r", encoding="utf-8") as f:
  421. data = json.load(f)
  422. sources: List[Dict[str, Any]] = data.get("sources", [])
  423. if not sources:
  424. return {
  425. "evaluated": 0, "reported": 0, "would_discard": 0, "discarded": 0,
  426. "skipped": 0, "total_cost": 0.0, "llm_discard_details": [],
  427. }
  428. # rubric 详解 / 输出 schema 已固化在 eval_prompt_template.md, 不再 load 外部 rubric 文件
  429. prior_discards = _load_prior_llm_discards(raw_cases_dir) if apply_decision else set()
  430. kept: List[Dict[str, Any]] = [] # 留在 source.json 的
  431. discarded: List[Dict[str, Any]] = [] # 移入 filtered 的(仅淘汰模式)
  432. to_eval: List[Dict[str, Any]] = [] # 需要真正跑 LLM 的
  433. skipped = 0
  434. for s in sources:
  435. existing_eval = s.get("llm_evaluation")
  436. # 1. 已评过(且非失败标记)→ 直接复用,不重复花钱
  437. if isinstance(existing_eval, dict) and not existing_eval.get("error"):
  438. if apply_decision and existing_eval.get("decision") == "discard":
  439. _move_to_discard(s, discarded, existing_eval.get("reason"))
  440. else:
  441. kept.append(s)
  442. skipped += 1
  443. continue
  444. # 2. 淘汰模式下:之前已被 LLM 拒过(被重新匹配回来)→ 短路再剔除
  445. if apply_decision and _source_key(s) in prior_discards:
  446. s_copy = dict(s)
  447. s_copy["filter_reason"] = "llm_discard:previously_rejected"
  448. discarded.append(s_copy)
  449. skipped += 1
  450. continue
  451. # 3. 需要评估
  452. to_eval.append(s)
  453. total_cost = 0.0
  454. evaluated = 0
  455. if to_eval:
  456. sem = asyncio.Semaphore(max_concurrent)
  457. results = await asyncio.gather(*[
  458. _evaluate_one(s, requirement, llm_call, model, sem)
  459. for s in to_eval
  460. ])
  461. for s, (llm_eval, cost) in zip(to_eval, results):
  462. total_cost += cost
  463. evaluated += 1
  464. if llm_eval is None:
  465. # fail-open:评估失败保留该帖,标记便于排查(error=True,下次会重评)
  466. s["llm_evaluation"] = {"decision": "report", "reason": "llm_eval_failed_kept", "error": True}
  467. kept.append(s)
  468. continue
  469. s["llm_evaluation"] = llm_eval
  470. is_discard = llm_eval.get("decision") == "discard"
  471. # 仅淘汰模式才真正移除;标注模式下 discard 帖也留在 source.json
  472. if apply_decision and is_discard:
  473. _move_to_discard(s, discarded, llm_eval.get("reason"))
  474. else:
  475. kept.append(s)
  476. # 统计基于最终 kept/discarded 的 llm_evaluation 决策(含本轮新评 + 历史 skip)
  477. reported = sum(
  478. 1 for s in kept
  479. if isinstance(s.get("llm_evaluation"), dict)
  480. and s["llm_evaluation"].get("decision") == "report"
  481. )
  482. would_discard = sum(
  483. 1 for s in (kept + discarded)
  484. if isinstance(s.get("llm_evaluation"), dict)
  485. and s["llm_evaluation"].get("decision") == "discard"
  486. )
  487. # 写回 source.json
  488. data["sources"] = kept
  489. data["total"] = len(kept)
  490. with open(source_file, "w", encoding="utf-8") as f:
  491. json.dump(data, f, ensure_ascii=False, indent=2)
  492. # 仅淘汰模式:并入 filtered_cases.json
  493. if apply_decision and discarded:
  494. _append_to_filtered(raw_cases_dir, discarded)
  495. # 给研究反馈用的摘要(仅淘汰模式有实际剔除)
  496. llm_discard_details: List[Dict[str, Any]] = []
  497. for s in discarded:
  498. post = s.get("post", {}) or {}
  499. title = post.get("title") or s.get("source_url", "")
  500. llm_discard_details.append({
  501. "case_id": s.get("case_id", ""),
  502. "platform": s.get("platform", ""),
  503. "title": title[:60] if title else "",
  504. "filter_reason": s.get("filter_reason", ""),
  505. })
  506. return {
  507. "evaluated": evaluated,
  508. "reported": reported,
  509. "would_discard": would_discard,
  510. "discarded": len(discarded),
  511. "skipped": skipped,
  512. "total_cost": round(total_cost, 4),
  513. "llm_discard_details": llm_discard_details,
  514. }
  515. # ── CLI ────────────────────────────────────────────────────────────────────────
  516. if __name__ == "__main__":
  517. import argparse
  518. from dotenv import load_dotenv
  519. load_dotenv()
  520. parser = argparse.ArgumentParser(description="对 source.json 做 LLM rubric 评估")
  521. parser.add_argument("source_file", type=Path, help="source.json 路径")
  522. parser.add_argument("--requirement", type=str, default="", help="采集需求 / 目标格子描述")
  523. parser.add_argument("--model", type=str, default=DEFAULT_EVAL_MODEL,
  524. help=f"评估模型,可选 {list(EVAL_MODELS)} 或直接传模型 id(默认 {DEFAULT_EVAL_MODEL})")
  525. parser.add_argument("--max-concurrent", type=int, default=3)
  526. parser.add_argument("--apply-decision", action="store_true",
  527. help="按 LLM decision 实际淘汰 discard 帖(默认只标注不淘汰,阈值标定后再开)")
  528. args = parser.parse_args()
  529. _llm_call, _model_id = build_eval_llm_call(args.model)
  530. print(f"[eval-model] {args.model} -> {_model_id}")
  531. stats = asyncio.run(evaluate_sources_with_llm(
  532. source_file=args.source_file,
  533. llm_call=_llm_call,
  534. model=_model_id,
  535. requirement=args.requirement,
  536. max_concurrent=args.max_concurrent,
  537. apply_decision=args.apply_decision,
  538. ))
  539. print(json.dumps(stats, ensure_ascii=False, indent=2))