llm_evaluate_sources.py 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612
  1. """
  2. 基于 LLM 的 source 知识质量评估
  3. 对 source.json 中已匹配的帖子逐条做 rubric 评估,严格参照
  4. test_script/evaluation/知识质量评估-rubric.{json,md} 的 **post 部分**:
  5. 先把帖子分到 procedure / step / tool(可多标签),再按"通用 + 命中类型"维度
  6. 各打 1-5,最后给出 report / discard 决策。
  7. 设计要点:
  8. - rubric 在运行时从文件加载并整段嵌入 prompt(rubric 改了评估自动跟随)。
  9. - prompt 各块从 eval_prompt_template.md 加载、按 str.format 填充(template 改了无需改代码)。
  10. - LLM 输出严格匹配 rubric 的 `post.output` 结构,写回到 source 的 `llm_evaluation` 字段。
  11. - decision=discard 的帖子从 source.json 移入 filtered_cases.json(filter_reason=llm_discard:...)。
  12. - 增量:已带 llm_evaluation 的 report 帖跳过;之前已 LLM 拒过的帖(被 extract 重新匹配回来)
  13. 短路直接再剔除,不重复花钱。
  14. - fail-open:单帖 LLM 调用失败 → 保留该帖并标记,不因瞬时错误丢内容。
  15. 入口:evaluate_sources_with_llm(source_file, llm_call, model, requirement, ...) -> stats dict
  16. 由 run_pipeline.py 的 source 阶段在规则预筛(extract_sources)之后调用。
  17. """
  18. import asyncio
  19. import json
  20. import logging
  21. import re
  22. import sys
  23. from pathlib import Path
  24. from typing import Any, Callable, Dict, List, Optional, Tuple
  25. # 直接当脚本跑时(python .../llm_evaluate_sources.py)需要项目根在 sys.path,
  26. # 才能 import examples.* ;被 run_pipeline 作为模块导入时路径已就绪,此处无副作用。
  27. _PROJECT_ROOT = Path(__file__).resolve().parents[3]
  28. if str(_PROJECT_ROOT) not in sys.path:
  29. sys.path.insert(0, str(_PROJECT_ROOT))
  30. from examples.process_pipeline.script.llm_helper import call_llm_with_retry
  31. logger = logging.getLogger(__name__)
  32. # rubric 路径:本文件在 script/,rubric 在 ../test_script/evaluation/
  33. _RUBRIC_DIR = Path(__file__).resolve().parent.parent / "test_script" / "evaluation"
  34. _RUBRIC_PATH = _RUBRIC_DIR / "知识质量评估-rubric.json" # 输出结构契约(output schema)
  35. _RUBRIC_MD_PATH = _RUBRIC_DIR / "知识质量评估-rubric.md" # 判据详解 / 边界 / 套用示例
  36. # prompt 模板 (mod.md 风格的 SYSTEM + USER 两块):放在 test_script/search_eval/ 下,跟评估
  37. # 相关脚本(eval_one_sample.py / batch_3forms.py / server.py)同目录。首次加载后缓存。
  38. _PROMPT_TEMPLATE_PATH = (
  39. Path(__file__).resolve().parent.parent / "test_script" / "search_eval" / "eval_prompt_template.md"
  40. )
  41. _PROMPT_TEMPLATE_CACHE: Optional[Dict[str, str]] = None
  42. # mod.md 风格的中文 schema 知识类型枚举值(取代了旧 英文 procedure/step/tool)
  43. _VALID_KNOWLEDGE_TYPES = {"工序", "步骤", "工具"}
  44. _MAX_BODY_CHARS = 8000 # 控制单帖 prompt token:正文/字幕截断上限
  45. _MAX_COMMENTS = 20 # 评论最多带多少条(喂"评论反馈"维度)
  46. _MAX_COMMENT_CHARS = 200 # 单条评论截断上限
  47. # 上次评估的产物字段——dump source JSON 给 LLM 时必须剥掉,
  48. # 否则 LLM 会"先验 anchoring"到旧分数,新评估失真。
  49. _EVAL_PRODUCT_FIELDS = frozenset({
  50. "llm_evaluation", # 之前 LLM 给的整段评估结果(scores/decision/reason/...)
  51. "images_sent", # 评估时记录"实际发了几张图"——管线元数据,LLM 评内容无关
  52. })
  53. # ── 评估模型选择(可切换)────────────────────────────────────────────────────────
  54. # key -> (backend, model_id)。backend="qwen" 走 create_qwen_llm_call(阿里云原生),
  55. # 其余走 create_openrouter_llm_call。run_pipeline.py / search_and_evaluate.py 共用此工厂。
  56. EVAL_MODELS: Dict[str, Tuple[str, str]] = {
  57. "qwen": ("qwen", "qwen3.5-plus"),
  58. "sonnet": ("openrouter", "claude-sonnet-4-6"),
  59. "gemini-flash": ("openrouter", "google/gemini-3-flash-preview"),
  60. "gemini-flash-lite": ("openrouter", "google/gemini-3.1-flash-lite"), # 最快,适合大批量评估
  61. "gemini": ("openrouter", "google/gemini-3-flash-preview"), # 别名 → gemini-flash
  62. "gpt": ("openrouter", "gpt-5.4"),
  63. }
  64. DEFAULT_EVAL_MODEL = "qwen"
  65. def build_eval_llm_call(choice: str) -> Tuple[Callable, str]:
  66. """根据选择返回 (llm_call, model_id)。
  67. choice 可以是 EVAL_MODELS 的 key(qwen/sonnet/gemini/gpt),
  68. 也可以直接传模型 id(含 'qwen' 走 qwen 后端,否则走 OpenRouter)。
  69. """
  70. if choice in EVAL_MODELS:
  71. backend, model_id = EVAL_MODELS[choice]
  72. else:
  73. model_id = choice
  74. backend = "qwen" if "qwen" in choice.lower() else "openrouter"
  75. if backend == "qwen":
  76. from agent.llm import create_qwen_llm_call
  77. return create_qwen_llm_call(model=model_id), model_id
  78. from agent.llm.openrouter import create_openrouter_llm_call
  79. return create_openrouter_llm_call(model=model_id), model_id
  80. # ── rubric / prompt 模板加载 ────────────────────────────────────────────────────
  81. def load_post_rubric() -> Dict[str, Any]:
  82. """加载 rubric JSON 的 `post` 部分(输出结构契约,评估帖子只用这一块)。"""
  83. with open(_RUBRIC_PATH, "r", encoding="utf-8") as f:
  84. rubric = json.load(f)
  85. post = rubric.get("post")
  86. if not isinstance(post, dict):
  87. raise ValueError(f"rubric 文件缺少 post 部分: {_RUBRIC_PATH}")
  88. return post
  89. def load_rubric_md() -> str:
  90. """加载 rubric 的 .md 详解(判据定义 / 维度边界),作为判断口径背景嵌入 prompt。
  91. JSON 里每个维度只有一句话,.md 才讲清了 JSON 压缩掉的判定口径(锚点 / 边界 / gate 说明)。
  92. 加载失败则返回空串,退化为仅用 JSON(不阻塞评估)。
  93. """
  94. try:
  95. return _RUBRIC_MD_PATH.read_text(encoding="utf-8")
  96. except Exception as e:
  97. logger.warning("加载 rubric md 失败(退化为仅用 JSON): %s", e)
  98. return ""
  99. def load_prompt_template() -> Dict[str, str]:
  100. """解析 eval_prompt_template.md 成 {BLOCK_NAME: template_str}。
  101. 分隔符 `=== BLOCK_NAME ===` 单独成行 → 下一块开始;第一个分隔符之前的文件头当注释跳过;
  102. **块内所有行字面保留**(包括 `#` 开头的 markdown 标题)——因为 template 内嵌了完整的
  103. rubric markdown 和 JSON schema,不能再剥 `#` 行(否则 H1/H2 标题被吃)。
  104. 模板正文里如有 `{var}` 占位符,由 _build_eval_messages 在该块上调 str.format 填充。
  105. 含字面 `{` / `}` 的块(如 USER_RUBRIC_JSON)不能走 .format(会爆 KeyError),代码里
  106. 对它们直接取字面。
  107. 首次加载后缓存在 _PROMPT_TEMPLATE_CACHE;改模板需重启 server。
  108. 解析失败直接抛——prompt 模板是评估链路核心,没它评不了,不做 fail-open。
  109. """
  110. global _PROMPT_TEMPLATE_CACHE
  111. if _PROMPT_TEMPLATE_CACHE is not None:
  112. return _PROMPT_TEMPLATE_CACHE
  113. text = _PROMPT_TEMPLATE_PATH.read_text(encoding="utf-8")
  114. blocks: Dict[str, List[str]] = {}
  115. current: Optional[str] = None
  116. for line in text.splitlines():
  117. m = re.match(r"^===\s+([A-Z_]+)\s+===\s*$", line)
  118. if m:
  119. current = m.group(1)
  120. blocks[current] = []
  121. continue
  122. if current is None:
  123. continue # 第一个 === 之前是文件头说明,跳过
  124. blocks[current].append(line)
  125. # 各块 join 回字符串,strip 掉首尾空行(块之间衔接由调用方加 \n\n)
  126. _PROMPT_TEMPLATE_CACHE = {k: "\n".join(v).strip("\n") for k, v in blocks.items()}
  127. return _PROMPT_TEMPLATE_CACHE
  128. # ── 帖子内容格式化(喂给 LLM)──────────────────────────────────────────────────
  129. def _extract_author(post: Dict[str, Any]) -> str:
  130. return (
  131. post.get("author")
  132. or post.get("channel_account_name")
  133. or post.get("channel")
  134. or ""
  135. )
  136. def _extract_comments(source: Dict[str, Any]) -> List[str]:
  137. """从 source.comments 抽出评论文本,截断条数与长度。"""
  138. raw = source.get("comments") or []
  139. out: List[str] = []
  140. for c in raw[:_MAX_COMMENTS]:
  141. if isinstance(c, dict):
  142. text = c.get("content") or c.get("text") or c.get("comment") or ""
  143. else:
  144. text = str(c)
  145. text = (text or "").strip()
  146. if text:
  147. out.append(text[:_MAX_COMMENT_CHARS])
  148. return out
  149. def _format_post_for_eval(source: Dict[str, Any]) -> str:
  150. """把一条 source 序列化为 JSON 字符串供 LLM 评估。
  151. 现代 LLM 读结构化 JSON 比读"标签:值"自然语言更准——字段名直接告诉它语义,
  152. 不需要靠 prompt 工程把字段标签写漂亮。所以直接 dump source 整段。
  153. 保留两处截断防 token 爆:
  154. - post.body_text 截到 _MAX_BODY_CHARS(个别帖子正文几万字)
  155. - comments 数量截到 _MAX_COMMENTS
  156. 其余字段全量给 LLM,由它自行判断哪些有用(如 images URL / channel_account_id 等)。
  157. """
  158. # 浅拷贝避免修改调用方的 source
  159. s = dict(source)
  160. post = dict(s.get("post") or {})
  161. body = post.get("body_text") or post.get("desc") or ""
  162. if isinstance(body, str) and len(body) > _MAX_BODY_CHARS:
  163. post["body_text"] = body[:_MAX_BODY_CHARS] + f"\n…(正文已截断,原长 {len(body)} 字)"
  164. s["post"] = post
  165. comments = s.get("comments") or []
  166. if isinstance(comments, list) and len(comments) > _MAX_COMMENTS:
  167. s["comments"] = comments[:_MAX_COMMENTS] + [
  168. {"_note": f"(评论已截断,共 {len(comments)} 条,只发前 {_MAX_COMMENTS})"}
  169. ]
  170. # 剥掉两类不该进 prompt 的字段:
  171. # ① `_` 前缀内部字段(如 _quality_grade / _image_data_urls)——管线元数据,无信息量
  172. # ② _EVAL_PRODUCT_FIELDS(如 llm_evaluation / images_sent)——上次评估的产物,
  173. # 喂回新评估会让 LLM"先验 anchoring"到旧分数,新评估失真
  174. s = {k: v for k, v in s.items()
  175. if not str(k).startswith("_") and k not in _EVAL_PRODUCT_FIELDS}
  176. return json.dumps(s, ensure_ascii=False, indent=2)
  177. # ── prompt 构建(从模板各块组装) ─────────────────────────────────────────────
  178. def _build_eval_messages(
  179. requirement: str,
  180. post_block: str,
  181. image_urls: Optional[List[str]] = None,
  182. query: Optional[str] = None,
  183. ) -> List[Dict[str, Any]]:
  184. """从 eval_prompt_template.md (mod.md 风格) 拼出 system + user message。
  185. template 只有 SYSTEM + USER 两块——USER 块内含 检索词 / 知识类型分类 / 输出 schema /
  186. 待评估帖子 / 注意事项 的完整 markdown。两个占位符 {query} 和 {post_block} 用 .replace
  187. 替换 (不走 .format,因 schema JSON 含字面 `{`/`}` 会触发 .format KeyError)。
  188. requirement 仅作为 query 缺失时的降级 fallback。
  189. """
  190. tpl = load_prompt_template()
  191. system = tpl["SYSTEM"]
  192. # query 优先,requirement 降级,都没就给占位描述
  193. effective_query = query or requirement or "(未指定检索词)"
  194. user_text = tpl["USER"].replace("{query}", effective_query).replace("{post_block}", post_block)
  195. if image_urls:
  196. # 多模态:把帖子图片随文本一起发给模型(模板末尾已有『请结合配图判断』提示)
  197. user_content: List[Dict[str, Any]] = [{"type": "text", "text": user_text}]
  198. for u in image_urls:
  199. user_content.append({"type": "image_url", "image_url": {"url": u}})
  200. return [
  201. {"role": "system", "content": system},
  202. {"role": "user", "content": user_content},
  203. ]
  204. return [
  205. {"role": "system", "content": system},
  206. {"role": "user", "content": user_text},
  207. ]
  208. def _validate_eval(data: Dict[str, Any]) -> Optional[str]:
  209. """对 LLM 评估输出做最小结构校验(mod.md 风格的中文 schema)。返回错误描述或 None。
  210. 校验字段:
  211. - 知识类型: 非空数组,值必须 ∈ {工序, 步骤, 工具}
  212. - 制作相关性: {得分, 理由} 对象,得分 ∈ [1, 3]
  213. - 评分: 对象
  214. - 判定理由: 非空字符串
  215. 脱壳:有些模型会把结果多套一层 {"output": {...}}。这里就地解包。
  216. """
  217. if not isinstance(data, dict):
  218. return "输出不是 JSON 对象"
  219. # 脱壳: 有些模型(gemini-flash-lite 等)会照着 schema 多套一层 {"output": {...}}
  220. if "知识类型" not in data and isinstance(data.get("output"), dict) \
  221. and "知识类型" in data["output"]:
  222. inner = data["output"]
  223. data.clear()
  224. data.update(inner)
  225. # 英文 key fallback: LLM 偶发回归英文 schema(知识类型→knowledge_type 等),
  226. # 自动 rename 成中文 key 救场,避免无意义重试。其他字段同理。
  227. _EN_TO_CN = {
  228. "knowledge_type": "知识类型",
  229. "production_relevance": "制作相关性",
  230. "scores": "评分",
  231. "reason": "判定理由",
  232. "decision": "判定理由", # 旧 schema 里 decision/reason 合并到判定理由
  233. }
  234. for en, cn in _EN_TO_CN.items():
  235. if cn not in data and en in data:
  236. data[cn] = data.pop(en)
  237. # 知识类型: 允许为空/缺失/非法值——LLM 长 prompt 下偶发漏标,这里不强校验避免高重试率。
  238. # 仍做软修复:
  239. # ① 单字符串 → list (如 "工序" → ["工序"])
  240. # ② 英文枚举 → 中文 (procedure → 工序)
  241. # ③ 空 list 时,看"评分"里填了哪些类型子块 → 反推 知识类型 (填分=LLM 隐式认领类型)
  242. # ④ 含非法值时过滤掉非法元素,保留合法的(不报错)
  243. _EN_KT_TO_CN = {"procedure": "工序", "step": "步骤", "tool": "工具"}
  244. kt = data.get("知识类型")
  245. if isinstance(kt, str):
  246. kt = [kt]
  247. if isinstance(kt, list):
  248. kt = [_EN_KT_TO_CN.get(k, k) for k in kt]
  249. kt = [k for k in kt if k in _VALID_KNOWLEDGE_TYPES] # 过滤非法
  250. else:
  251. kt = []
  252. # 反推: 空 list 时看"评分"里哪些类型子块有内容(填分=LLM 认领了这个类型)
  253. if not kt:
  254. scores = data.get("评分") or {}
  255. inferred = [t for t in ("工序", "步骤", "工具")
  256. if isinstance(scores.get(t), dict) and scores[t]]
  257. if inferred:
  258. kt = inferred
  259. data["知识类型"] = kt # 写回(可能仍是空 list,容忍)
  260. # 制作相关性: {得分, 理由} 对象, 得分 ∈ [1,3]
  261. pr = data.get("制作相关性")
  262. if not isinstance(pr, dict):
  263. return "制作相关性 必须是 {得分, 理由} 对象"
  264. try:
  265. pr_val = float(pr.get("得分"))
  266. except (TypeError, ValueError):
  267. return "制作相关性.得分 缺失或不是数字 (需 1-3 整数)"
  268. if not (1 <= pr_val <= 3):
  269. return f"制作相关性.得分 必须在 1-3, 得到 {pr.get('得分')!r}"
  270. if not pr.get("理由"):
  271. return "制作相关性.理由 不能为空"
  272. # 评分: 必须是 dict (内部子结构由 prompt 引导, 不严格校验, 避免重试爆炸)
  273. if not isinstance(data.get("评分"), dict):
  274. return "评分 必须是对象"
  275. # 判定理由: 顶层综合判断, 不能为空
  276. if not data.get("判定理由"):
  277. return "判定理由 不能为空"
  278. return None
  279. # ── 单帖评估 ────────────────────────────────────────────────────────────────────
  280. def _source_key(source: Dict[str, Any]) -> Tuple[Any, Any]:
  281. return (source.get("platform"), source.get("channel_content_id"))
  282. def _move_to_discard(
  283. source: Dict[str, Any],
  284. discarded: List[Dict[str, Any]],
  285. reason: Optional[str],
  286. ) -> None:
  287. """把一条 source 标上 llm_discard 原因并加入 discarded 列表(仅淘汰模式调用)。"""
  288. s_copy = dict(source)
  289. r = (reason or "").replace("\n", " ")[:120]
  290. s_copy["filter_reason"] = f"llm_discard:{r}" if r else "llm_discard"
  291. discarded.append(s_copy)
  292. async def _evaluate_one(
  293. source: Dict[str, Any],
  294. requirement: str,
  295. llm_call: Callable,
  296. model: str,
  297. sem: asyncio.Semaphore,
  298. image_urls: Optional[List[str]] = None,
  299. query: Optional[str] = None,
  300. ) -> Tuple[Optional[Dict[str, Any]], float]:
  301. """评估单条 source,返回 (llm_evaluation, cost)。失败返回 (None, cost)。
  302. image_urls 非空时走多模态评估(把帖子配图一并发给模型,需模型支持图片,如 gemini)。
  303. query 非空时把它作为检索锚点喂给模型(判相关性看『这帖是否回答了这个检索词』)。
  304. rubric 详解 / 输出 schema 已固化进 eval_prompt_template.md,本函数不再传 rubric 参数。
  305. """
  306. post_block = _format_post_for_eval(source)
  307. messages = _build_eval_messages(requirement, post_block, image_urls, query)
  308. async with sem:
  309. data, cost = await call_llm_with_retry(
  310. llm_call=llm_call,
  311. messages=messages,
  312. model=model,
  313. temperature=0.1,
  314. max_tokens=2000,
  315. validate_fn=_validate_eval,
  316. task_name=f"LLM-Eval[{source.get('case_id', '?')}]",
  317. )
  318. return data, cost
  319. # ── filtered_cases.json 追加 ────────────────────────────────────────────────────
  320. def _append_to_filtered(raw_cases_dir: Path, discarded: List[Dict[str, Any]]) -> None:
  321. """把 LLM 判为 discard 的帖子并入 filtered_cases.json(与 extract_sources 同结构,按原因分组)。"""
  322. if not discarded:
  323. return
  324. filtered_file = raw_cases_dir / "filtered_cases.json"
  325. existing: List[Dict[str, Any]] = []
  326. existing_ids: set = set()
  327. if filtered_file.exists():
  328. try:
  329. with open(filtered_file, "r", encoding="utf-8") as f:
  330. data = json.load(f)
  331. for group in data.get("by_reason", {}).values():
  332. for s in group.get("sources", []):
  333. existing.append(s)
  334. existing_ids.add((s.get("platform"), s.get("channel_content_id")))
  335. except Exception as e:
  336. logger.warning("读取已有 filtered_cases.json 失败: %s", e)
  337. for s in discarded:
  338. key = (s.get("platform"), s.get("channel_content_id"))
  339. if key not in existing_ids:
  340. existing.append(s)
  341. existing_ids.add(key)
  342. by_reason: Dict[str, List[Dict[str, Any]]] = {}
  343. for s in existing:
  344. category = (s.get("filter_reason") or "unknown").split(":", 1)[0]
  345. by_reason.setdefault(category, []).append(s)
  346. output = {
  347. "total": len(existing),
  348. "by_reason": {
  349. cat: {"count": len(items), "sources": items}
  350. for cat, items in by_reason.items()
  351. },
  352. }
  353. with open(filtered_file, "w", encoding="utf-8") as f:
  354. json.dump(output, f, ensure_ascii=False, indent=2)
  355. def _load_prior_llm_discards(raw_cases_dir: Path) -> set:
  356. """从 filtered_cases.json 收集之前已被 LLM 拒过的帖子 key,避免重复评估花钱。"""
  357. filtered_file = raw_cases_dir / "filtered_cases.json"
  358. keys: set = set()
  359. if not filtered_file.exists():
  360. return keys
  361. try:
  362. with open(filtered_file, "r", encoding="utf-8") as f:
  363. data = json.load(f)
  364. for category, group in data.get("by_reason", {}).items():
  365. if category != "llm_discard":
  366. continue
  367. for s in group.get("sources", []):
  368. keys.add((s.get("platform"), s.get("channel_content_id")))
  369. except Exception as e:
  370. logger.warning("读取 filtered_cases.json 失败: %s", e)
  371. return keys
  372. # ── 主入口 ────────────────────────────────────────────────────────────────────
  373. async def evaluate_sources_with_llm(
  374. source_file: Path,
  375. llm_call: Callable,
  376. model: str,
  377. requirement: str,
  378. max_concurrent: int = 3,
  379. apply_decision: bool = False,
  380. ) -> Dict[str, Any]:
  381. """
  382. 对 source.json 中所有 source 逐条做 LLM rubric 评估,把结果写进每条的 `llm_evaluation` 字段。
  383. apply_decision(淘汰开关,默认 False —— 当前只标注不淘汰):
  384. - False(标注模式):评估并标注所有帖子,**全部保留在 source.json**,
  385. 不据 decision 剔除、不动 filtered_cases.json。
  386. (rubric 阈值/权重尚待标定,先采集评分数据,淘汰留待标定后再开。)
  387. - True(淘汰模式):decision=discard 的帖子移入 filtered_cases.json 并从 source.json 删除,
  388. 历史已拒帖短路再剔除。将来阈值标定好后由调用方打开此开关即可启用门槛。
  389. Returns 统计 dict:
  390. evaluated —— 本次真正调用 LLM 的条数
  391. reported —— decision=report 的总数
  392. would_discard —— decision=discard 的总数(标注模式下仅统计、不实际剔除)
  393. discarded —— 实际从 source.json 移除的条数(标注模式恒为 0)
  394. skipped —— 跳过的条数(已评过复用 + 淘汰模式下历史拒帖短路)
  395. total_cost —— LLM 调用累计成本
  396. llm_discard_details —— [{case_id, platform, title, filter_reason}],淘汰模式下供研究反馈引用
  397. """
  398. source_file = Path(source_file)
  399. raw_cases_dir = source_file.parent
  400. with open(source_file, "r", encoding="utf-8") as f:
  401. data = json.load(f)
  402. sources: List[Dict[str, Any]] = data.get("sources", [])
  403. if not sources:
  404. return {
  405. "evaluated": 0, "reported": 0, "would_discard": 0, "discarded": 0,
  406. "skipped": 0, "total_cost": 0.0, "llm_discard_details": [],
  407. }
  408. # rubric 详解 / 输出 schema 已固化在 eval_prompt_template.md, 不再 load 外部 rubric 文件
  409. prior_discards = _load_prior_llm_discards(raw_cases_dir) if apply_decision else set()
  410. kept: List[Dict[str, Any]] = [] # 留在 source.json 的
  411. discarded: List[Dict[str, Any]] = [] # 移入 filtered 的(仅淘汰模式)
  412. to_eval: List[Dict[str, Any]] = [] # 需要真正跑 LLM 的
  413. skipped = 0
  414. for s in sources:
  415. existing_eval = s.get("llm_evaluation")
  416. # 1. 已评过(且非失败标记)→ 直接复用,不重复花钱
  417. if isinstance(existing_eval, dict) and not existing_eval.get("error"):
  418. if apply_decision and existing_eval.get("decision") == "discard":
  419. _move_to_discard(s, discarded, existing_eval.get("reason"))
  420. else:
  421. kept.append(s)
  422. skipped += 1
  423. continue
  424. # 2. 淘汰模式下:之前已被 LLM 拒过(被重新匹配回来)→ 短路再剔除
  425. if apply_decision and _source_key(s) in prior_discards:
  426. s_copy = dict(s)
  427. s_copy["filter_reason"] = "llm_discard:previously_rejected"
  428. discarded.append(s_copy)
  429. skipped += 1
  430. continue
  431. # 3. 需要评估
  432. to_eval.append(s)
  433. total_cost = 0.0
  434. evaluated = 0
  435. if to_eval:
  436. sem = asyncio.Semaphore(max_concurrent)
  437. results = await asyncio.gather(*[
  438. _evaluate_one(s, requirement, llm_call, model, sem)
  439. for s in to_eval
  440. ])
  441. for s, (llm_eval, cost) in zip(to_eval, results):
  442. total_cost += cost
  443. evaluated += 1
  444. if llm_eval is None:
  445. # fail-open:评估失败保留该帖,标记便于排查(error=True,下次会重评)
  446. s["llm_evaluation"] = {"decision": "report", "reason": "llm_eval_failed_kept", "error": True}
  447. kept.append(s)
  448. continue
  449. s["llm_evaluation"] = llm_eval
  450. is_discard = llm_eval.get("decision") == "discard"
  451. # 仅淘汰模式才真正移除;标注模式下 discard 帖也留在 source.json
  452. if apply_decision and is_discard:
  453. _move_to_discard(s, discarded, llm_eval.get("reason"))
  454. else:
  455. kept.append(s)
  456. # 统计基于最终 kept/discarded 的 llm_evaluation 决策(含本轮新评 + 历史 skip)
  457. reported = sum(
  458. 1 for s in kept
  459. if isinstance(s.get("llm_evaluation"), dict)
  460. and s["llm_evaluation"].get("decision") == "report"
  461. )
  462. would_discard = sum(
  463. 1 for s in (kept + discarded)
  464. if isinstance(s.get("llm_evaluation"), dict)
  465. and s["llm_evaluation"].get("decision") == "discard"
  466. )
  467. # 写回 source.json
  468. data["sources"] = kept
  469. data["total"] = len(kept)
  470. with open(source_file, "w", encoding="utf-8") as f:
  471. json.dump(data, f, ensure_ascii=False, indent=2)
  472. # 仅淘汰模式:并入 filtered_cases.json
  473. if apply_decision and discarded:
  474. _append_to_filtered(raw_cases_dir, discarded)
  475. # 给研究反馈用的摘要(仅淘汰模式有实际剔除)
  476. llm_discard_details: List[Dict[str, Any]] = []
  477. for s in discarded:
  478. post = s.get("post", {}) or {}
  479. title = post.get("title") or s.get("source_url", "")
  480. llm_discard_details.append({
  481. "case_id": s.get("case_id", ""),
  482. "platform": s.get("platform", ""),
  483. "title": title[:60] if title else "",
  484. "filter_reason": s.get("filter_reason", ""),
  485. })
  486. return {
  487. "evaluated": evaluated,
  488. "reported": reported,
  489. "would_discard": would_discard,
  490. "discarded": len(discarded),
  491. "skipped": skipped,
  492. "total_cost": round(total_cost, 4),
  493. "llm_discard_details": llm_discard_details,
  494. }
  495. # ── CLI ────────────────────────────────────────────────────────────────────────
  496. if __name__ == "__main__":
  497. import argparse
  498. from dotenv import load_dotenv
  499. load_dotenv()
  500. parser = argparse.ArgumentParser(description="对 source.json 做 LLM rubric 评估")
  501. parser.add_argument("source_file", type=Path, help="source.json 路径")
  502. parser.add_argument("--requirement", type=str, default="", help="采集需求 / 目标格子描述")
  503. parser.add_argument("--model", type=str, default=DEFAULT_EVAL_MODEL,
  504. help=f"评估模型,可选 {list(EVAL_MODELS)} 或直接传模型 id(默认 {DEFAULT_EVAL_MODEL})")
  505. parser.add_argument("--max-concurrent", type=int, default=3)
  506. parser.add_argument("--apply-decision", action="store_true",
  507. help="按 LLM decision 实际淘汰 discard 帖(默认只标注不淘汰,阈值标定后再开)")
  508. args = parser.parse_args()
  509. _llm_call, _model_id = build_eval_llm_call(args.model)
  510. print(f"[eval-model] {args.model} -> {_model_id}")
  511. stats = asyncio.run(evaluate_sources_with_llm(
  512. source_file=args.source_file,
  513. llm_call=_llm_call,
  514. model=_model_id,
  515. requirement=args.requirement,
  516. max_concurrent=args.max_concurrent,
  517. apply_decision=args.apply_decision,
  518. ))
  519. print(json.dumps(stats, ensure_ascii=False, indent=2))