llm_evaluate_sources.py 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627
  1. """
  2. 基于 LLM 的 source 知识质量评估
  3. 对 source.json 中已匹配的帖子逐条做 rubric 评估,严格参照
  4. test_script/evaluation/知识质量评估-rubric.{json,md} 的 **post 部分**:
  5. 先把帖子分到 procedure / step / tool(可多标签),再按"通用 + 命中类型"维度
  6. 各打 1-5,最后给出 report / discard 决策。
  7. 设计要点:
  8. - rubric 在运行时从文件加载并整段嵌入 prompt(rubric 改了评估自动跟随)。
  9. - prompt 各块从 eval_prompt_template.md 加载、按 str.format 填充(template 改了无需改代码)。
  10. - LLM 输出严格匹配 rubric 的 `post.output` 结构,写回到 source 的 `llm_evaluation` 字段。
  11. - decision=discard 的帖子从 source.json 移入 filtered_cases.json(filter_reason=llm_discard:...)。
  12. - 增量:已带 llm_evaluation 的 report 帖跳过;之前已 LLM 拒过的帖(被 extract 重新匹配回来)
  13. 短路直接再剔除,不重复花钱。
  14. - fail-open:单帖 LLM 调用失败 → 保留该帖并标记,不因瞬时错误丢内容。
  15. 入口:evaluate_sources_with_llm(source_file, llm_call, model, requirement, ...) -> stats dict
  16. 由 run_pipeline.py 的 source 阶段在规则预筛(extract_sources)之后调用。
  17. """
  18. import asyncio
  19. import json
  20. import logging
  21. import re
  22. import sys
  23. from pathlib import Path
  24. from typing import Any, Callable, Dict, List, Optional, Tuple
  25. # 直接当脚本跑时(python .../llm_evaluate_sources.py)需要项目根在 sys.path,
  26. # 才能 import examples.* ;被 run_pipeline 作为模块导入时路径已就绪,此处无副作用。
  27. _PROJECT_ROOT = Path(__file__).resolve().parents[3]
  28. if str(_PROJECT_ROOT) not in sys.path:
  29. sys.path.insert(0, str(_PROJECT_ROOT))
  30. from examples.process_pipeline.script.llm_helper import call_llm_with_retry
  31. logger = logging.getLogger(__name__)
  32. # prompt 模板 (mod.md 风格的 SYSTEM + USER 两块):主源放在 script/search_eval/ 下,
  33. # 跟 batch_3forms / server / eval_one_sample 同目录(IDE 编辑那份就是 runtime 实际读的版本)。
  34. # 若主源缺失则退到 test_script/search_eval/ 历史副本。首次加载后缓存。
  35. # 注:原 rubric.json / rubric.md 已固化进本 template,不再外部 load(_RUBRIC_* / load_post_rubric
  36. # / load_rubric_md 一并清掉)。
  37. _PROMPT_TEMPLATE_PATH = (
  38. Path(__file__).resolve().parent / "search_eval" / "eval_prompt_template.md"
  39. )
  40. if not _PROMPT_TEMPLATE_PATH.exists():
  41. _PROMPT_TEMPLATE_PATH = (
  42. Path(__file__).resolve().parent.parent / "test_script" / "search_eval" / "eval_prompt_template.md"
  43. )
  44. _PROMPT_TEMPLATE_CACHE: Optional[Dict[str, str]] = None
  45. # mod.md 风格的中文 schema 知识类型枚举值(取代了旧 英文 procedure/step/tool)
  46. _VALID_KNOWLEDGE_TYPES = {"工序", "能力", "工具"}
  47. _MAX_BODY_CHARS = 8000 # 控制单帖 prompt token:正文/字幕截断上限
  48. _MAX_COMMENTS = 20 # 评论最多带多少条(喂"评论反馈"维度)
  49. _MAX_COMMENT_CHARS = 200 # 单条评论截断上限
  50. # 上次评估的产物字段——dump source JSON 给 LLM 时必须剥掉,
  51. # 否则 LLM 会"先验 anchoring"到旧分数,新评估失真。
  52. _EVAL_PRODUCT_FIELDS = frozenset({
  53. "llm_evaluation", # 之前 LLM 给的整段评估结果(scores/decision/reason/...)
  54. "images_sent", # 评估时记录"实际发了几张图"——管线元数据,LLM 评内容无关
  55. })
  56. # ── 评估模型选择(可切换)────────────────────────────────────────────────────────
  57. # key -> (backend, model_id)。backend="qwen" 走 create_qwen_llm_call(阿里云原生),
  58. # 其余走 create_openrouter_llm_call。run_pipeline.py / search_and_evaluate.py 共用此工厂。
  59. EVAL_MODELS: Dict[str, Tuple[str, str]] = {
  60. "qwen": ("qwen", "qwen3.5-plus"),
  61. "sonnet": ("openrouter", "claude-sonnet-4-6"),
  62. "gemini-flash": ("openrouter", "google/gemini-3-flash-preview"),
  63. "gemini-flash-lite": ("openrouter", "google/gemini-3.1-flash-lite"), # 最快,适合大批量评估
  64. "gemini": ("openrouter", "google/gemini-3-flash-preview"), # 别名 → gemini-flash
  65. "gpt": ("openrouter", "gpt-5.4"),
  66. }
  67. DEFAULT_EVAL_MODEL = "gemini-flash-lite"
  68. def build_eval_llm_call(choice: str) -> Tuple[Callable, str]:
  69. """根据选择返回 (llm_call, model_id)。
  70. choice 可以是 EVAL_MODELS 的 key(qwen/sonnet/gemini/gpt),
  71. 也可以直接传模型 id(含 'qwen' 走 qwen 后端,否则走 OpenRouter)。
  72. """
  73. if choice in EVAL_MODELS:
  74. backend, model_id = EVAL_MODELS[choice]
  75. else:
  76. model_id = choice
  77. backend = "qwen" if "qwen" in choice.lower() else "openrouter"
  78. if backend == "qwen":
  79. from agent.llm import create_qwen_llm_call
  80. return create_qwen_llm_call(model=model_id), model_id
  81. from agent.llm.openrouter import create_openrouter_llm_call
  82. return create_openrouter_llm_call(model=model_id), model_id
  83. # ── prompt 模板加载 ─────────────────────────────────────────────────────────────
  84. def load_prompt_template() -> Dict[str, str]:
  85. """解析 eval_prompt_template.md 成 {BLOCK_NAME: template_str}。
  86. 分隔符 `=== BLOCK_NAME ===` 单独成行 → 下一块开始;第一个分隔符之前的文件头当注释跳过;
  87. **块内所有行字面保留**(包括 `#` 开头的 markdown 标题)——因为 template 内嵌了完整的
  88. rubric markdown 和 JSON schema,不能再剥 `#` 行(否则 H1/H2 标题被吃)。
  89. 模板正文里如有 `{var}` 占位符,由 _build_eval_messages 在该块上调 str.format 填充。
  90. 含字面 `{` / `}` 的块(如 USER_RUBRIC_JSON)不能走 .format(会爆 KeyError),代码里
  91. 对它们直接取字面。
  92. 首次加载后缓存在 _PROMPT_TEMPLATE_CACHE;改模板需重启 server。
  93. 解析失败直接抛——prompt 模板是评估链路核心,没它评不了,不做 fail-open。
  94. """
  95. global _PROMPT_TEMPLATE_CACHE
  96. if _PROMPT_TEMPLATE_CACHE is not None:
  97. return _PROMPT_TEMPLATE_CACHE
  98. text = _PROMPT_TEMPLATE_PATH.read_text(encoding="utf-8")
  99. blocks: Dict[str, List[str]] = {}
  100. current: Optional[str] = None
  101. for line in text.splitlines():
  102. m = re.match(r"^===\s+([A-Z_]+)\s+===\s*$", line)
  103. if m:
  104. current = m.group(1)
  105. blocks[current] = []
  106. continue
  107. if current is None:
  108. continue # 第一个 === 之前是文件头说明,跳过
  109. blocks[current].append(line)
  110. # 各块 join 回字符串,strip 掉首尾空行(块之间衔接由调用方加 \n\n)
  111. _PROMPT_TEMPLATE_CACHE = {k: "\n".join(v).strip("\n") for k, v in blocks.items()}
  112. return _PROMPT_TEMPLATE_CACHE
  113. # ── 帖子内容格式化(喂给 LLM)──────────────────────────────────────────────────
  114. def _extract_author(post: Dict[str, Any]) -> str:
  115. return (
  116. post.get("author")
  117. or post.get("channel_account_name")
  118. or post.get("channel")
  119. or ""
  120. )
  121. def _extract_comments(source: Dict[str, Any]) -> List[str]:
  122. """从 source.comments 抽出评论文本,截断条数与长度。"""
  123. raw = source.get("comments") or []
  124. out: List[str] = []
  125. for c in raw[:_MAX_COMMENTS]:
  126. if isinstance(c, dict):
  127. text = c.get("content") or c.get("text") or c.get("comment") or ""
  128. else:
  129. text = str(c)
  130. text = (text or "").strip()
  131. if text:
  132. out.append(text[:_MAX_COMMENT_CHARS])
  133. return out
  134. def _format_post_for_eval(source: Dict[str, Any]) -> str:
  135. """把一条 source 序列化为 JSON 字符串供 LLM 评估。
  136. 现代 LLM 读结构化 JSON 比读"标签:值"自然语言更准——字段名直接告诉它语义,
  137. 不需要靠 prompt 工程把字段标签写漂亮。所以直接 dump source 整段。
  138. 保留两处截断防 token 爆:
  139. - post.body_text 截到 _MAX_BODY_CHARS(个别帖子正文几万字)
  140. - comments 数量截到 _MAX_COMMENTS
  141. 其余字段全量给 LLM,由它自行判断哪些有用(如 images URL / channel_account_id 等)。
  142. """
  143. # 浅拷贝避免修改调用方的 source
  144. s = dict(source)
  145. post = dict(s.get("post") or {})
  146. body = post.get("body_text") or post.get("desc") or ""
  147. if isinstance(body, str) and len(body) > _MAX_BODY_CHARS:
  148. post["body_text"] = body[:_MAX_BODY_CHARS] + f"\n…(正文已截断,原长 {len(body)} 字)"
  149. s["post"] = post
  150. comments = s.get("comments") or []
  151. if isinstance(comments, list) and len(comments) > _MAX_COMMENTS:
  152. s["comments"] = comments[:_MAX_COMMENTS] + [
  153. {"_note": f"(评论已截断,共 {len(comments)} 条,只发前 {_MAX_COMMENTS})"}
  154. ]
  155. # 剥掉两类不该进 prompt 的字段:
  156. # ① `_` 前缀内部字段(如 _quality_grade / _image_data_urls)——管线元数据,无信息量
  157. # ② _EVAL_PRODUCT_FIELDS(如 llm_evaluation / images_sent)——上次评估的产物,
  158. # 喂回新评估会让 LLM"先验 anchoring"到旧分数,新评估失真
  159. s = {k: v for k, v in s.items()
  160. if not str(k).startswith("_") and k not in _EVAL_PRODUCT_FIELDS}
  161. return json.dumps(s, ensure_ascii=False, indent=2)
  162. # ── prompt 构建(从模板各块组装) ─────────────────────────────────────────────
  163. def _build_eval_messages(
  164. requirement: str,
  165. post_block: str,
  166. image_urls: Optional[List[str]] = None,
  167. query: Optional[str] = None,
  168. ) -> List[Dict[str, Any]]:
  169. """从 eval_prompt_template.md (mod.md 风格) 拼出 system + user message。
  170. template 只有 SYSTEM + USER 两块——USER 块内含 检索词 / 知识类型分类 / 输出 schema /
  171. 待评估帖子 / 注意事项 的完整 markdown。两个占位符 {query} 和 {post_block} 用 .replace
  172. 替换 (不走 .format,因 schema JSON 含字面 `{`/`}` 会触发 .format KeyError)。
  173. requirement 仅作为 query 缺失时的降级 fallback。
  174. """
  175. tpl = load_prompt_template()
  176. system = tpl["SYSTEM"]
  177. # query 优先,requirement 降级,都没就给占位描述
  178. effective_query = query or requirement or "(未指定检索词)"
  179. user_text = tpl["USER"].replace("{query}", effective_query).replace("{post_block}", post_block)
  180. if image_urls:
  181. # 多模态:把帖子图片随文本一起发给模型(模板末尾已有『请结合配图判断』提示)
  182. user_content: List[Dict[str, Any]] = [{"type": "text", "text": user_text}]
  183. for u in image_urls:
  184. user_content.append({"type": "image_url", "image_url": {"url": u}})
  185. return [
  186. {"role": "system", "content": system},
  187. {"role": "user", "content": user_content},
  188. ]
  189. return [
  190. {"role": "system", "content": system},
  191. {"role": "user", "content": user_text},
  192. ]
  193. def _validate_eval(data: Dict[str, Any]) -> Optional[str]:
  194. """对 LLM 评估输出做最小结构校验(新版 mod.md schema)。返回错误描述或 None。
  195. 校验字段(宽松,避免重试爆炸):
  196. - 知识类型: list, 值 ∈ {工序, 能力, 工具}(步骤→能力 软修复)
  197. - 相关性: {和内容制作知识相关, 和 query 相关} 各自 {得分, 理由}
  198. - 质量: {固定维度, 动态维度} 至少一个;具体子字段不强校验
  199. 时效性不校验 —— 由 _calc_recency_score 注入, LLM 不该输出。
  200. 脱壳:有些模型会把结果多套一层 {"output": {...}}, 这里就地解包。
  201. """
  202. if not isinstance(data, dict):
  203. return "输出不是 JSON 对象"
  204. # 脱壳: 有些模型(gemini-flash-lite 等)会照着 schema 多套一层 {"output": {...}}
  205. if "知识类型" not in data and isinstance(data.get("output"), dict) \
  206. and "知识类型" in data["output"]:
  207. inner = data["output"]
  208. data.clear()
  209. data.update(inner)
  210. # 英文 key fallback: LLM 偶发回归英文 schema, 自动 rename 成中文
  211. _EN_TO_CN = {
  212. "knowledge_type": "知识类型",
  213. "relevance": "相关性",
  214. "quality": "质量",
  215. }
  216. for en, cn in _EN_TO_CN.items():
  217. if cn not in data and en in data:
  218. data[cn] = data.pop(en)
  219. # 知识类型: 允许为空/缺失/非法; 软修复:
  220. # ① 字符串 → list ② 英文/旧 步骤 → 中文新枚举 (procedure→工序 / step→能力 / 步骤→能力)
  221. # ③ 空 list 时反推: 看 质量.动态维度 里 LLM 填了哪些类型子块
  222. # ④ 非法值过滤掉
  223. _KT_REMAP = {
  224. "procedure": "工序", "step": "能力", "tool": "工具",
  225. "步骤": "能力", # 旧 schema 兼容
  226. }
  227. kt = data.get("知识类型")
  228. if isinstance(kt, str):
  229. kt = [kt]
  230. if isinstance(kt, list):
  231. kt = [_KT_REMAP.get(k, k) for k in kt]
  232. kt = [k for k in kt if k in _VALID_KNOWLEDGE_TYPES]
  233. else:
  234. kt = []
  235. if not kt:
  236. dyn = ((data.get("质量") or {}).get("动态维度") or {})
  237. inferred = [t for t in ("工序", "能力", "工具")
  238. if isinstance(dyn.get(t), dict) and dyn[t]]
  239. if inferred:
  240. kt = inferred
  241. data["知识类型"] = kt
  242. # 相关性: 两子键必须各为 {得分, 理由} 对象, 得分 0-10
  243. rel = data.get("相关性")
  244. if not isinstance(rel, dict):
  245. return "相关性 必须是对象"
  246. for sub_key in ("和内容制作知识相关", "和 query 相关"):
  247. sub = rel.get(sub_key)
  248. if not isinstance(sub, dict):
  249. return f"相关性.{sub_key!r} 必须是 {{得分, 理由}} 对象"
  250. try:
  251. v = float(sub.get("得分"))
  252. except (TypeError, ValueError):
  253. return f"相关性.{sub_key}.得分 缺失或不是数字 (需 0-10)"
  254. if not (0 <= v <= 10):
  255. return f"相关性.{sub_key}.得分 必须在 0-10, 得到 {sub.get('得分')!r}"
  256. # 质量: 必须是 dict, 至少包含 固定维度 或 动态维度
  257. q = data.get("质量")
  258. if not isinstance(q, dict):
  259. return "质量 必须是对象"
  260. if not isinstance(q.get("固定维度"), dict) and not isinstance(q.get("动态维度"), dict):
  261. return "质量 必须至少包含 固定维度 或 动态维度"
  262. # 子字段不强校验, 由 prompt 引导 + 前端兜底; 这层保 batch 不被重试拖死
  263. return None
  264. # ── 时效性: 代码计算, 不让 LLM 评 ────────────────────────────────────────────────
  265. def _calc_recency_score(publish_ts: str) -> int:
  266. """按发布日期 vs 今日的天数差打 0-10 阶梯分:
  267. ≤30d=10 / ≤90d=8 / ≤180d=6 / ≤270d=3 / ≤365d=1 / >365d=0
  268. 无法解析时间戳 → 0 (保守判过期)。"""
  269. from datetime import datetime
  270. try:
  271. d = datetime.strptime((publish_ts or "")[:10], "%Y-%m-%d")
  272. except (ValueError, TypeError):
  273. return 0
  274. days = (datetime.now() - d).days
  275. if days <= 30: return 10
  276. if days <= 90: return 8
  277. if days <= 180: return 6
  278. if days <= 270: return 3
  279. if days <= 365: return 1
  280. return 0
  281. def _inject_recency(data: Dict[str, Any], source: Dict[str, Any]) -> None:
  282. """LLM 输出后注入时效性到 质量.固定维度.时效性.{得分}, 无 理由 (代码算的不解释)。"""
  283. if not isinstance(data, dict):
  284. return
  285. score = _calc_recency_score((source.get("post") or {}).get("publish_timestamp", ""))
  286. q = data.setdefault("质量", {})
  287. if not isinstance(q, dict):
  288. return
  289. fixed = q.setdefault("固定维度", {})
  290. if not isinstance(fixed, dict):
  291. return
  292. fixed["时效性"] = {"得分": score}
  293. # ── 单帖评估 ────────────────────────────────────────────────────────────────────
  294. def _source_key(source: Dict[str, Any]) -> Tuple[Any, Any]:
  295. return (source.get("platform"), source.get("channel_content_id"))
  296. def _move_to_discard(
  297. source: Dict[str, Any],
  298. discarded: List[Dict[str, Any]],
  299. reason: Optional[str],
  300. ) -> None:
  301. """把一条 source 标上 llm_discard 原因并加入 discarded 列表(仅淘汰模式调用)。"""
  302. s_copy = dict(source)
  303. r = (reason or "").replace("\n", " ")[:120]
  304. s_copy["filter_reason"] = f"llm_discard:{r}" if r else "llm_discard"
  305. discarded.append(s_copy)
  306. async def _evaluate_one(
  307. source: Dict[str, Any],
  308. requirement: str,
  309. llm_call: Callable,
  310. model: str,
  311. sem: asyncio.Semaphore,
  312. image_urls: Optional[List[str]] = None,
  313. query: Optional[str] = None,
  314. ) -> Tuple[Optional[Dict[str, Any]], float]:
  315. """评估单条 source,返回 (llm_evaluation, cost)。失败返回 (None, cost)。
  316. image_urls 非空时走多模态评估(把帖子配图一并发给模型,需模型支持图片,如 gemini)。
  317. query 非空时把它作为检索锚点喂给模型(判相关性看『这帖是否回答了这个检索词』)。
  318. rubric 详解 / 输出 schema 已固化进 eval_prompt_template.md,本函数不再传 rubric 参数。
  319. """
  320. post_block = _format_post_for_eval(source)
  321. messages = _build_eval_messages(requirement, post_block, image_urls, query)
  322. async with sem:
  323. data, cost = await call_llm_with_retry(
  324. llm_call=llm_call,
  325. messages=messages,
  326. model=model,
  327. temperature=0.1,
  328. max_tokens=2000,
  329. validate_fn=_validate_eval,
  330. task_name=f"LLM-Eval[{source.get('case_id', '?')}]",
  331. )
  332. # 时效性走代码注入 (LLM 不评), 写到 质量.固定维度.时效性.{得分}
  333. _inject_recency(data, source)
  334. return data, cost
  335. # ── filtered_cases.json 追加 ────────────────────────────────────────────────────
  336. def _append_to_filtered(raw_cases_dir: Path, discarded: List[Dict[str, Any]]) -> None:
  337. """把 LLM 判为 discard 的帖子并入 filtered_cases.json(与 extract_sources 同结构,按原因分组)。"""
  338. if not discarded:
  339. return
  340. filtered_file = raw_cases_dir / "filtered_cases.json"
  341. existing: List[Dict[str, Any]] = []
  342. existing_ids: set = set()
  343. if filtered_file.exists():
  344. try:
  345. with open(filtered_file, "r", encoding="utf-8") as f:
  346. data = json.load(f)
  347. for group in data.get("by_reason", {}).values():
  348. for s in group.get("sources", []):
  349. existing.append(s)
  350. existing_ids.add((s.get("platform"), s.get("channel_content_id")))
  351. except Exception as e:
  352. logger.warning("读取已有 filtered_cases.json 失败: %s", e)
  353. for s in discarded:
  354. key = (s.get("platform"), s.get("channel_content_id"))
  355. if key not in existing_ids:
  356. existing.append(s)
  357. existing_ids.add(key)
  358. by_reason: Dict[str, List[Dict[str, Any]]] = {}
  359. for s in existing:
  360. category = (s.get("filter_reason") or "unknown").split(":", 1)[0]
  361. by_reason.setdefault(category, []).append(s)
  362. output = {
  363. "total": len(existing),
  364. "by_reason": {
  365. cat: {"count": len(items), "sources": items}
  366. for cat, items in by_reason.items()
  367. },
  368. }
  369. with open(filtered_file, "w", encoding="utf-8") as f:
  370. json.dump(output, f, ensure_ascii=False, indent=2)
  371. def _load_prior_llm_discards(raw_cases_dir: Path) -> set:
  372. """从 filtered_cases.json 收集之前已被 LLM 拒过的帖子 key,避免重复评估花钱。"""
  373. filtered_file = raw_cases_dir / "filtered_cases.json"
  374. keys: set = set()
  375. if not filtered_file.exists():
  376. return keys
  377. try:
  378. with open(filtered_file, "r", encoding="utf-8") as f:
  379. data = json.load(f)
  380. for category, group in data.get("by_reason", {}).items():
  381. if category != "llm_discard":
  382. continue
  383. for s in group.get("sources", []):
  384. keys.add((s.get("platform"), s.get("channel_content_id")))
  385. except Exception as e:
  386. logger.warning("读取 filtered_cases.json 失败: %s", e)
  387. return keys
  388. # ── 主入口 ────────────────────────────────────────────────────────────────────
  389. async def evaluate_sources_with_llm(
  390. source_file: Path,
  391. llm_call: Callable,
  392. model: str,
  393. requirement: str,
  394. max_concurrent: int = 3,
  395. apply_decision: bool = False,
  396. ) -> Dict[str, Any]:
  397. """
  398. 对 source.json 中所有 source 逐条做 LLM rubric 评估,把结果写进每条的 `llm_evaluation` 字段。
  399. apply_decision(淘汰开关,默认 False —— 当前只标注不淘汰):
  400. - False(标注模式):评估并标注所有帖子,**全部保留在 source.json**,
  401. 不据 decision 剔除、不动 filtered_cases.json。
  402. (rubric 阈值/权重尚待标定,先采集评分数据,淘汰留待标定后再开。)
  403. - True(淘汰模式):decision=discard 的帖子移入 filtered_cases.json 并从 source.json 删除,
  404. 历史已拒帖短路再剔除。将来阈值标定好后由调用方打开此开关即可启用门槛。
  405. Returns 统计 dict:
  406. evaluated —— 本次真正调用 LLM 的条数
  407. reported —— decision=report 的总数
  408. would_discard —— decision=discard 的总数(标注模式下仅统计、不实际剔除)
  409. discarded —— 实际从 source.json 移除的条数(标注模式恒为 0)
  410. skipped —— 跳过的条数(已评过复用 + 淘汰模式下历史拒帖短路)
  411. total_cost —— LLM 调用累计成本
  412. llm_discard_details —— [{case_id, platform, title, filter_reason}],淘汰模式下供研究反馈引用
  413. """
  414. source_file = Path(source_file)
  415. raw_cases_dir = source_file.parent
  416. with open(source_file, "r", encoding="utf-8") as f:
  417. data = json.load(f)
  418. sources: List[Dict[str, Any]] = data.get("sources", [])
  419. if not sources:
  420. return {
  421. "evaluated": 0, "reported": 0, "would_discard": 0, "discarded": 0,
  422. "skipped": 0, "total_cost": 0.0, "llm_discard_details": [],
  423. }
  424. # rubric 详解 / 输出 schema 已固化在 eval_prompt_template.md, 不再 load 外部 rubric 文件
  425. prior_discards = _load_prior_llm_discards(raw_cases_dir) if apply_decision else set()
  426. kept: List[Dict[str, Any]] = [] # 留在 source.json 的
  427. discarded: List[Dict[str, Any]] = [] # 移入 filtered 的(仅淘汰模式)
  428. to_eval: List[Dict[str, Any]] = [] # 需要真正跑 LLM 的
  429. skipped = 0
  430. for s in sources:
  431. existing_eval = s.get("llm_evaluation")
  432. # 1. 已评过(且非失败标记)→ 直接复用,不重复花钱
  433. if isinstance(existing_eval, dict) and not existing_eval.get("error"):
  434. if apply_decision and existing_eval.get("decision") == "discard":
  435. _move_to_discard(s, discarded, existing_eval.get("reason"))
  436. else:
  437. kept.append(s)
  438. skipped += 1
  439. continue
  440. # 2. 淘汰模式下:之前已被 LLM 拒过(被重新匹配回来)→ 短路再剔除
  441. if apply_decision and _source_key(s) in prior_discards:
  442. s_copy = dict(s)
  443. s_copy["filter_reason"] = "llm_discard:previously_rejected"
  444. discarded.append(s_copy)
  445. skipped += 1
  446. continue
  447. # 3. 需要评估
  448. to_eval.append(s)
  449. total_cost = 0.0
  450. evaluated = 0
  451. if to_eval:
  452. sem = asyncio.Semaphore(max_concurrent)
  453. results = await asyncio.gather(*[
  454. _evaluate_one(s, requirement, llm_call, model, sem)
  455. for s in to_eval
  456. ])
  457. for s, (llm_eval, cost) in zip(to_eval, results):
  458. total_cost += cost
  459. evaluated += 1
  460. if llm_eval is None:
  461. # fail-open:评估失败保留该帖,标记便于排查(error=True,下次会重评)
  462. s["llm_evaluation"] = {"decision": "report", "reason": "llm_eval_failed_kept", "error": True}
  463. kept.append(s)
  464. continue
  465. s["llm_evaluation"] = llm_eval
  466. is_discard = llm_eval.get("decision") == "discard"
  467. # 仅淘汰模式才真正移除;标注模式下 discard 帖也留在 source.json
  468. if apply_decision and is_discard:
  469. _move_to_discard(s, discarded, llm_eval.get("reason"))
  470. else:
  471. kept.append(s)
  472. # 统计基于最终 kept/discarded 的 llm_evaluation 决策(含本轮新评 + 历史 skip)
  473. reported = sum(
  474. 1 for s in kept
  475. if isinstance(s.get("llm_evaluation"), dict)
  476. and s["llm_evaluation"].get("decision") == "report"
  477. )
  478. would_discard = sum(
  479. 1 for s in (kept + discarded)
  480. if isinstance(s.get("llm_evaluation"), dict)
  481. and s["llm_evaluation"].get("decision") == "discard"
  482. )
  483. # 写回 source.json
  484. data["sources"] = kept
  485. data["total"] = len(kept)
  486. with open(source_file, "w", encoding="utf-8") as f:
  487. json.dump(data, f, ensure_ascii=False, indent=2)
  488. # 仅淘汰模式:并入 filtered_cases.json
  489. if apply_decision and discarded:
  490. _append_to_filtered(raw_cases_dir, discarded)
  491. # 给研究反馈用的摘要(仅淘汰模式有实际剔除)
  492. llm_discard_details: List[Dict[str, Any]] = []
  493. for s in discarded:
  494. post = s.get("post", {}) or {}
  495. title = post.get("title") or s.get("source_url", "")
  496. llm_discard_details.append({
  497. "case_id": s.get("case_id", ""),
  498. "platform": s.get("platform", ""),
  499. "title": title[:60] if title else "",
  500. "filter_reason": s.get("filter_reason", ""),
  501. })
  502. return {
  503. "evaluated": evaluated,
  504. "reported": reported,
  505. "would_discard": would_discard,
  506. "discarded": len(discarded),
  507. "skipped": skipped,
  508. "total_cost": round(total_cost, 4),
  509. "llm_discard_details": llm_discard_details,
  510. }
  511. # ── CLI ────────────────────────────────────────────────────────────────────────
  512. if __name__ == "__main__":
  513. import argparse
  514. from dotenv import load_dotenv
  515. load_dotenv()
  516. parser = argparse.ArgumentParser(description="对 source.json 做 LLM rubric 评估")
  517. parser.add_argument("source_file", type=Path, help="source.json 路径")
  518. parser.add_argument("--requirement", type=str, default="", help="采集需求 / 目标格子描述")
  519. parser.add_argument("--model", type=str, default=DEFAULT_EVAL_MODEL,
  520. help=f"评估模型,可选 {list(EVAL_MODELS)} 或直接传模型 id(默认 {DEFAULT_EVAL_MODEL})")
  521. parser.add_argument("--max-concurrent", type=int, default=3)
  522. parser.add_argument("--apply-decision", action="store_true",
  523. help="按 LLM decision 实际淘汰 discard 帖(默认只标注不淘汰,阈值标定后再开)")
  524. args = parser.parse_args()
  525. _llm_call, _model_id = build_eval_llm_call(args.model)
  526. print(f"[eval-model] {args.model} -> {_model_id}")
  527. stats = asyncio.run(evaluate_sources_with_llm(
  528. source_file=args.source_file,
  529. llm_call=_llm_call,
  530. model=_model_id,
  531. requirement=args.requirement,
  532. max_concurrent=args.max_concurrent,
  533. apply_decision=args.apply_decision,
  534. ))
  535. print(json.dumps(stats, ensure_ascii=False, indent=2))