| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612 |
- """
- 基于 LLM 的 source 知识质量评估
- 对 source.json 中已匹配的帖子逐条做 rubric 评估,严格参照
- test_script/evaluation/知识质量评估-rubric.{json,md} 的 **post 部分**:
- 先把帖子分到 procedure / step / tool(可多标签),再按"通用 + 命中类型"维度
- 各打 1-5,最后给出 report / discard 决策。
- 设计要点:
- - rubric 在运行时从文件加载并整段嵌入 prompt(rubric 改了评估自动跟随)。
- - prompt 各块从 eval_prompt_template.md 加载、按 str.format 填充(template 改了无需改代码)。
- - LLM 输出严格匹配 rubric 的 `post.output` 结构,写回到 source 的 `llm_evaluation` 字段。
- - decision=discard 的帖子从 source.json 移入 filtered_cases.json(filter_reason=llm_discard:...)。
- - 增量:已带 llm_evaluation 的 report 帖跳过;之前已 LLM 拒过的帖(被 extract 重新匹配回来)
- 短路直接再剔除,不重复花钱。
- - fail-open:单帖 LLM 调用失败 → 保留该帖并标记,不因瞬时错误丢内容。
- 入口:evaluate_sources_with_llm(source_file, llm_call, model, requirement, ...) -> stats dict
- 由 run_pipeline.py 的 source 阶段在规则预筛(extract_sources)之后调用。
- """
- import asyncio
- import json
- import logging
- import re
- import sys
- from pathlib import Path
- from typing import Any, Callable, Dict, List, Optional, Tuple
- # 直接当脚本跑时(python .../llm_evaluate_sources.py)需要项目根在 sys.path,
- # 才能 import examples.* ;被 run_pipeline 作为模块导入时路径已就绪,此处无副作用。
- _PROJECT_ROOT = Path(__file__).resolve().parents[3]
- if str(_PROJECT_ROOT) not in sys.path:
- sys.path.insert(0, str(_PROJECT_ROOT))
- from examples.process_pipeline.script.llm_helper import call_llm_with_retry
- logger = logging.getLogger(__name__)
- # rubric 路径:本文件在 script/,rubric 在 ../test_script/evaluation/
- _RUBRIC_DIR = Path(__file__).resolve().parent.parent / "test_script" / "evaluation"
- _RUBRIC_PATH = _RUBRIC_DIR / "知识质量评估-rubric.json" # 输出结构契约(output schema)
- _RUBRIC_MD_PATH = _RUBRIC_DIR / "知识质量评估-rubric.md" # 判据详解 / 边界 / 套用示例
- # prompt 模板 (mod.md 风格的 SYSTEM + USER 两块):放在 test_script/search_eval/ 下,跟评估
- # 相关脚本(eval_one_sample.py / batch_3forms.py / server.py)同目录。首次加载后缓存。
- _PROMPT_TEMPLATE_PATH = (
- Path(__file__).resolve().parent.parent / "test_script" / "search_eval" / "eval_prompt_template.md"
- )
- _PROMPT_TEMPLATE_CACHE: Optional[Dict[str, str]] = None
- # mod.md 风格的中文 schema 知识类型枚举值(取代了旧 英文 procedure/step/tool)
- _VALID_KNOWLEDGE_TYPES = {"工序", "步骤", "工具"}
- _MAX_BODY_CHARS = 8000 # 控制单帖 prompt token:正文/字幕截断上限
- _MAX_COMMENTS = 20 # 评论最多带多少条(喂"评论反馈"维度)
- _MAX_COMMENT_CHARS = 200 # 单条评论截断上限
- # 上次评估的产物字段——dump source JSON 给 LLM 时必须剥掉,
- # 否则 LLM 会"先验 anchoring"到旧分数,新评估失真。
- _EVAL_PRODUCT_FIELDS = frozenset({
- "llm_evaluation", # 之前 LLM 给的整段评估结果(scores/decision/reason/...)
- "images_sent", # 评估时记录"实际发了几张图"——管线元数据,LLM 评内容无关
- })
- # ── 评估模型选择(可切换)────────────────────────────────────────────────────────
- # key -> (backend, model_id)。backend="qwen" 走 create_qwen_llm_call(阿里云原生),
- # 其余走 create_openrouter_llm_call。run_pipeline.py / search_and_evaluate.py 共用此工厂。
- EVAL_MODELS: Dict[str, Tuple[str, str]] = {
- "qwen": ("qwen", "qwen3.5-plus"),
- "sonnet": ("openrouter", "claude-sonnet-4-6"),
- "gemini-flash": ("openrouter", "google/gemini-3-flash-preview"),
- "gemini-flash-lite": ("openrouter", "google/gemini-3.1-flash-lite"), # 最快,适合大批量评估
- "gemini": ("openrouter", "google/gemini-3-flash-preview"), # 别名 → gemini-flash
- "gpt": ("openrouter", "gpt-5.4"),
- }
- DEFAULT_EVAL_MODEL = "qwen"
- def build_eval_llm_call(choice: str) -> Tuple[Callable, str]:
- """根据选择返回 (llm_call, model_id)。
- choice 可以是 EVAL_MODELS 的 key(qwen/sonnet/gemini/gpt),
- 也可以直接传模型 id(含 'qwen' 走 qwen 后端,否则走 OpenRouter)。
- """
- if choice in EVAL_MODELS:
- backend, model_id = EVAL_MODELS[choice]
- else:
- model_id = choice
- backend = "qwen" if "qwen" in choice.lower() else "openrouter"
- if backend == "qwen":
- from agent.llm import create_qwen_llm_call
- return create_qwen_llm_call(model=model_id), model_id
- from agent.llm.openrouter import create_openrouter_llm_call
- return create_openrouter_llm_call(model=model_id), model_id
- # ── rubric / prompt 模板加载 ────────────────────────────────────────────────────
- def load_post_rubric() -> Dict[str, Any]:
- """加载 rubric JSON 的 `post` 部分(输出结构契约,评估帖子只用这一块)。"""
- with open(_RUBRIC_PATH, "r", encoding="utf-8") as f:
- rubric = json.load(f)
- post = rubric.get("post")
- if not isinstance(post, dict):
- raise ValueError(f"rubric 文件缺少 post 部分: {_RUBRIC_PATH}")
- return post
- def load_rubric_md() -> str:
- """加载 rubric 的 .md 详解(判据定义 / 维度边界),作为判断口径背景嵌入 prompt。
- JSON 里每个维度只有一句话,.md 才讲清了 JSON 压缩掉的判定口径(锚点 / 边界 / gate 说明)。
- 加载失败则返回空串,退化为仅用 JSON(不阻塞评估)。
- """
- try:
- return _RUBRIC_MD_PATH.read_text(encoding="utf-8")
- except Exception as e:
- logger.warning("加载 rubric md 失败(退化为仅用 JSON): %s", e)
- return ""
- def load_prompt_template() -> Dict[str, str]:
- """解析 eval_prompt_template.md 成 {BLOCK_NAME: template_str}。
- 分隔符 `=== BLOCK_NAME ===` 单独成行 → 下一块开始;第一个分隔符之前的文件头当注释跳过;
- **块内所有行字面保留**(包括 `#` 开头的 markdown 标题)——因为 template 内嵌了完整的
- rubric markdown 和 JSON schema,不能再剥 `#` 行(否则 H1/H2 标题被吃)。
- 模板正文里如有 `{var}` 占位符,由 _build_eval_messages 在该块上调 str.format 填充。
- 含字面 `{` / `}` 的块(如 USER_RUBRIC_JSON)不能走 .format(会爆 KeyError),代码里
- 对它们直接取字面。
- 首次加载后缓存在 _PROMPT_TEMPLATE_CACHE;改模板需重启 server。
- 解析失败直接抛——prompt 模板是评估链路核心,没它评不了,不做 fail-open。
- """
- global _PROMPT_TEMPLATE_CACHE
- if _PROMPT_TEMPLATE_CACHE is not None:
- return _PROMPT_TEMPLATE_CACHE
- text = _PROMPT_TEMPLATE_PATH.read_text(encoding="utf-8")
- blocks: Dict[str, List[str]] = {}
- current: Optional[str] = None
- for line in text.splitlines():
- m = re.match(r"^===\s+([A-Z_]+)\s+===\s*$", line)
- if m:
- current = m.group(1)
- blocks[current] = []
- continue
- if current is None:
- continue # 第一个 === 之前是文件头说明,跳过
- blocks[current].append(line)
- # 各块 join 回字符串,strip 掉首尾空行(块之间衔接由调用方加 \n\n)
- _PROMPT_TEMPLATE_CACHE = {k: "\n".join(v).strip("\n") for k, v in blocks.items()}
- return _PROMPT_TEMPLATE_CACHE
- # ── 帖子内容格式化(喂给 LLM)──────────────────────────────────────────────────
- def _extract_author(post: Dict[str, Any]) -> str:
- return (
- post.get("author")
- or post.get("channel_account_name")
- or post.get("channel")
- or ""
- )
- def _extract_comments(source: Dict[str, Any]) -> List[str]:
- """从 source.comments 抽出评论文本,截断条数与长度。"""
- raw = source.get("comments") or []
- out: List[str] = []
- for c in raw[:_MAX_COMMENTS]:
- if isinstance(c, dict):
- text = c.get("content") or c.get("text") or c.get("comment") or ""
- else:
- text = str(c)
- text = (text or "").strip()
- if text:
- out.append(text[:_MAX_COMMENT_CHARS])
- return out
- def _format_post_for_eval(source: Dict[str, Any]) -> str:
- """把一条 source 序列化为 JSON 字符串供 LLM 评估。
- 现代 LLM 读结构化 JSON 比读"标签:值"自然语言更准——字段名直接告诉它语义,
- 不需要靠 prompt 工程把字段标签写漂亮。所以直接 dump source 整段。
- 保留两处截断防 token 爆:
- - post.body_text 截到 _MAX_BODY_CHARS(个别帖子正文几万字)
- - comments 数量截到 _MAX_COMMENTS
- 其余字段全量给 LLM,由它自行判断哪些有用(如 images URL / channel_account_id 等)。
- """
- # 浅拷贝避免修改调用方的 source
- s = dict(source)
- post = dict(s.get("post") or {})
- body = post.get("body_text") or post.get("desc") or ""
- if isinstance(body, str) and len(body) > _MAX_BODY_CHARS:
- post["body_text"] = body[:_MAX_BODY_CHARS] + f"\n…(正文已截断,原长 {len(body)} 字)"
- s["post"] = post
- comments = s.get("comments") or []
- if isinstance(comments, list) and len(comments) > _MAX_COMMENTS:
- s["comments"] = comments[:_MAX_COMMENTS] + [
- {"_note": f"(评论已截断,共 {len(comments)} 条,只发前 {_MAX_COMMENTS})"}
- ]
- # 剥掉两类不该进 prompt 的字段:
- # ① `_` 前缀内部字段(如 _quality_grade / _image_data_urls)——管线元数据,无信息量
- # ② _EVAL_PRODUCT_FIELDS(如 llm_evaluation / images_sent)——上次评估的产物,
- # 喂回新评估会让 LLM"先验 anchoring"到旧分数,新评估失真
- s = {k: v for k, v in s.items()
- if not str(k).startswith("_") and k not in _EVAL_PRODUCT_FIELDS}
- return json.dumps(s, ensure_ascii=False, indent=2)
- # ── prompt 构建(从模板各块组装) ─────────────────────────────────────────────
- def _build_eval_messages(
- requirement: str,
- post_block: str,
- image_urls: Optional[List[str]] = None,
- query: Optional[str] = None,
- ) -> List[Dict[str, Any]]:
- """从 eval_prompt_template.md (mod.md 风格) 拼出 system + user message。
- template 只有 SYSTEM + USER 两块——USER 块内含 检索词 / 知识类型分类 / 输出 schema /
- 待评估帖子 / 注意事项 的完整 markdown。两个占位符 {query} 和 {post_block} 用 .replace
- 替换 (不走 .format,因 schema JSON 含字面 `{`/`}` 会触发 .format KeyError)。
- requirement 仅作为 query 缺失时的降级 fallback。
- """
- tpl = load_prompt_template()
- system = tpl["SYSTEM"]
- # query 优先,requirement 降级,都没就给占位描述
- effective_query = query or requirement or "(未指定检索词)"
- user_text = tpl["USER"].replace("{query}", effective_query).replace("{post_block}", post_block)
- if image_urls:
- # 多模态:把帖子图片随文本一起发给模型(模板末尾已有『请结合配图判断』提示)
- user_content: List[Dict[str, Any]] = [{"type": "text", "text": user_text}]
- for u in image_urls:
- user_content.append({"type": "image_url", "image_url": {"url": u}})
- return [
- {"role": "system", "content": system},
- {"role": "user", "content": user_content},
- ]
- return [
- {"role": "system", "content": system},
- {"role": "user", "content": user_text},
- ]
- def _validate_eval(data: Dict[str, Any]) -> Optional[str]:
- """对 LLM 评估输出做最小结构校验(mod.md 风格的中文 schema)。返回错误描述或 None。
- 校验字段:
- - 知识类型: 非空数组,值必须 ∈ {工序, 步骤, 工具}
- - 制作相关性: {得分, 理由} 对象,得分 ∈ [1, 3]
- - 评分: 对象
- - 判定理由: 非空字符串
- 脱壳:有些模型会把结果多套一层 {"output": {...}}。这里就地解包。
- """
- if not isinstance(data, dict):
- return "输出不是 JSON 对象"
- # 脱壳: 有些模型(gemini-flash-lite 等)会照着 schema 多套一层 {"output": {...}}
- if "知识类型" not in data and isinstance(data.get("output"), dict) \
- and "知识类型" in data["output"]:
- inner = data["output"]
- data.clear()
- data.update(inner)
- # 英文 key fallback: LLM 偶发回归英文 schema(知识类型→knowledge_type 等),
- # 自动 rename 成中文 key 救场,避免无意义重试。其他字段同理。
- _EN_TO_CN = {
- "knowledge_type": "知识类型",
- "production_relevance": "制作相关性",
- "scores": "评分",
- "reason": "判定理由",
- "decision": "判定理由", # 旧 schema 里 decision/reason 合并到判定理由
- }
- for en, cn in _EN_TO_CN.items():
- if cn not in data and en in data:
- data[cn] = data.pop(en)
- # 知识类型: 允许为空/缺失/非法值——LLM 长 prompt 下偶发漏标,这里不强校验避免高重试率。
- # 仍做软修复:
- # ① 单字符串 → list (如 "工序" → ["工序"])
- # ② 英文枚举 → 中文 (procedure → 工序)
- # ③ 空 list 时,看"评分"里填了哪些类型子块 → 反推 知识类型 (填分=LLM 隐式认领类型)
- # ④ 含非法值时过滤掉非法元素,保留合法的(不报错)
- _EN_KT_TO_CN = {"procedure": "工序", "step": "步骤", "tool": "工具"}
- kt = data.get("知识类型")
- if isinstance(kt, str):
- kt = [kt]
- if isinstance(kt, list):
- kt = [_EN_KT_TO_CN.get(k, k) for k in kt]
- kt = [k for k in kt if k in _VALID_KNOWLEDGE_TYPES] # 过滤非法
- else:
- kt = []
- # 反推: 空 list 时看"评分"里哪些类型子块有内容(填分=LLM 认领了这个类型)
- if not kt:
- scores = data.get("评分") or {}
- inferred = [t for t in ("工序", "步骤", "工具")
- if isinstance(scores.get(t), dict) and scores[t]]
- if inferred:
- kt = inferred
- data["知识类型"] = kt # 写回(可能仍是空 list,容忍)
- # 制作相关性: {得分, 理由} 对象, 得分 ∈ [1,3]
- pr = data.get("制作相关性")
- if not isinstance(pr, dict):
- return "制作相关性 必须是 {得分, 理由} 对象"
- try:
- pr_val = float(pr.get("得分"))
- except (TypeError, ValueError):
- return "制作相关性.得分 缺失或不是数字 (需 1-3 整数)"
- if not (1 <= pr_val <= 3):
- return f"制作相关性.得分 必须在 1-3, 得到 {pr.get('得分')!r}"
- if not pr.get("理由"):
- return "制作相关性.理由 不能为空"
- # 评分: 必须是 dict (内部子结构由 prompt 引导, 不严格校验, 避免重试爆炸)
- if not isinstance(data.get("评分"), dict):
- return "评分 必须是对象"
- # 判定理由: 顶层综合判断, 不能为空
- if not data.get("判定理由"):
- return "判定理由 不能为空"
- return None
- # ── 单帖评估 ────────────────────────────────────────────────────────────────────
- def _source_key(source: Dict[str, Any]) -> Tuple[Any, Any]:
- return (source.get("platform"), source.get("channel_content_id"))
- def _move_to_discard(
- source: Dict[str, Any],
- discarded: List[Dict[str, Any]],
- reason: Optional[str],
- ) -> None:
- """把一条 source 标上 llm_discard 原因并加入 discarded 列表(仅淘汰模式调用)。"""
- s_copy = dict(source)
- r = (reason or "").replace("\n", " ")[:120]
- s_copy["filter_reason"] = f"llm_discard:{r}" if r else "llm_discard"
- discarded.append(s_copy)
- async def _evaluate_one(
- source: Dict[str, Any],
- requirement: str,
- llm_call: Callable,
- model: str,
- sem: asyncio.Semaphore,
- image_urls: Optional[List[str]] = None,
- query: Optional[str] = None,
- ) -> Tuple[Optional[Dict[str, Any]], float]:
- """评估单条 source,返回 (llm_evaluation, cost)。失败返回 (None, cost)。
- image_urls 非空时走多模态评估(把帖子配图一并发给模型,需模型支持图片,如 gemini)。
- query 非空时把它作为检索锚点喂给模型(判相关性看『这帖是否回答了这个检索词』)。
- rubric 详解 / 输出 schema 已固化进 eval_prompt_template.md,本函数不再传 rubric 参数。
- """
- post_block = _format_post_for_eval(source)
- messages = _build_eval_messages(requirement, post_block, image_urls, query)
- async with sem:
- data, cost = await call_llm_with_retry(
- llm_call=llm_call,
- messages=messages,
- model=model,
- temperature=0.1,
- max_tokens=2000,
- validate_fn=_validate_eval,
- task_name=f"LLM-Eval[{source.get('case_id', '?')}]",
- )
- return data, cost
- # ── filtered_cases.json 追加 ────────────────────────────────────────────────────
- def _append_to_filtered(raw_cases_dir: Path, discarded: List[Dict[str, Any]]) -> None:
- """把 LLM 判为 discard 的帖子并入 filtered_cases.json(与 extract_sources 同结构,按原因分组)。"""
- if not discarded:
- return
- filtered_file = raw_cases_dir / "filtered_cases.json"
- existing: List[Dict[str, Any]] = []
- existing_ids: set = set()
- if filtered_file.exists():
- try:
- with open(filtered_file, "r", encoding="utf-8") as f:
- data = json.load(f)
- for group in data.get("by_reason", {}).values():
- for s in group.get("sources", []):
- existing.append(s)
- existing_ids.add((s.get("platform"), s.get("channel_content_id")))
- except Exception as e:
- logger.warning("读取已有 filtered_cases.json 失败: %s", e)
- for s in discarded:
- key = (s.get("platform"), s.get("channel_content_id"))
- if key not in existing_ids:
- existing.append(s)
- existing_ids.add(key)
- by_reason: Dict[str, List[Dict[str, Any]]] = {}
- for s in existing:
- category = (s.get("filter_reason") or "unknown").split(":", 1)[0]
- by_reason.setdefault(category, []).append(s)
- output = {
- "total": len(existing),
- "by_reason": {
- cat: {"count": len(items), "sources": items}
- for cat, items in by_reason.items()
- },
- }
- with open(filtered_file, "w", encoding="utf-8") as f:
- json.dump(output, f, ensure_ascii=False, indent=2)
- def _load_prior_llm_discards(raw_cases_dir: Path) -> set:
- """从 filtered_cases.json 收集之前已被 LLM 拒过的帖子 key,避免重复评估花钱。"""
- filtered_file = raw_cases_dir / "filtered_cases.json"
- keys: set = set()
- if not filtered_file.exists():
- return keys
- try:
- with open(filtered_file, "r", encoding="utf-8") as f:
- data = json.load(f)
- for category, group in data.get("by_reason", {}).items():
- if category != "llm_discard":
- continue
- for s in group.get("sources", []):
- keys.add((s.get("platform"), s.get("channel_content_id")))
- except Exception as e:
- logger.warning("读取 filtered_cases.json 失败: %s", e)
- return keys
- # ── 主入口 ────────────────────────────────────────────────────────────────────
- async def evaluate_sources_with_llm(
- source_file: Path,
- llm_call: Callable,
- model: str,
- requirement: str,
- max_concurrent: int = 3,
- apply_decision: bool = False,
- ) -> Dict[str, Any]:
- """
- 对 source.json 中所有 source 逐条做 LLM rubric 评估,把结果写进每条的 `llm_evaluation` 字段。
- apply_decision(淘汰开关,默认 False —— 当前只标注不淘汰):
- - False(标注模式):评估并标注所有帖子,**全部保留在 source.json**,
- 不据 decision 剔除、不动 filtered_cases.json。
- (rubric 阈值/权重尚待标定,先采集评分数据,淘汰留待标定后再开。)
- - True(淘汰模式):decision=discard 的帖子移入 filtered_cases.json 并从 source.json 删除,
- 历史已拒帖短路再剔除。将来阈值标定好后由调用方打开此开关即可启用门槛。
- Returns 统计 dict:
- evaluated —— 本次真正调用 LLM 的条数
- reported —— decision=report 的总数
- would_discard —— decision=discard 的总数(标注模式下仅统计、不实际剔除)
- discarded —— 实际从 source.json 移除的条数(标注模式恒为 0)
- skipped —— 跳过的条数(已评过复用 + 淘汰模式下历史拒帖短路)
- total_cost —— LLM 调用累计成本
- llm_discard_details —— [{case_id, platform, title, filter_reason}],淘汰模式下供研究反馈引用
- """
- source_file = Path(source_file)
- raw_cases_dir = source_file.parent
- with open(source_file, "r", encoding="utf-8") as f:
- data = json.load(f)
- sources: List[Dict[str, Any]] = data.get("sources", [])
- if not sources:
- return {
- "evaluated": 0, "reported": 0, "would_discard": 0, "discarded": 0,
- "skipped": 0, "total_cost": 0.0, "llm_discard_details": [],
- }
- # rubric 详解 / 输出 schema 已固化在 eval_prompt_template.md, 不再 load 外部 rubric 文件
- prior_discards = _load_prior_llm_discards(raw_cases_dir) if apply_decision else set()
- kept: List[Dict[str, Any]] = [] # 留在 source.json 的
- discarded: List[Dict[str, Any]] = [] # 移入 filtered 的(仅淘汰模式)
- to_eval: List[Dict[str, Any]] = [] # 需要真正跑 LLM 的
- skipped = 0
- for s in sources:
- existing_eval = s.get("llm_evaluation")
- # 1. 已评过(且非失败标记)→ 直接复用,不重复花钱
- if isinstance(existing_eval, dict) and not existing_eval.get("error"):
- if apply_decision and existing_eval.get("decision") == "discard":
- _move_to_discard(s, discarded, existing_eval.get("reason"))
- else:
- kept.append(s)
- skipped += 1
- continue
- # 2. 淘汰模式下:之前已被 LLM 拒过(被重新匹配回来)→ 短路再剔除
- if apply_decision and _source_key(s) in prior_discards:
- s_copy = dict(s)
- s_copy["filter_reason"] = "llm_discard:previously_rejected"
- discarded.append(s_copy)
- skipped += 1
- continue
- # 3. 需要评估
- to_eval.append(s)
- total_cost = 0.0
- evaluated = 0
- if to_eval:
- sem = asyncio.Semaphore(max_concurrent)
- results = await asyncio.gather(*[
- _evaluate_one(s, requirement, llm_call, model, sem)
- for s in to_eval
- ])
- for s, (llm_eval, cost) in zip(to_eval, results):
- total_cost += cost
- evaluated += 1
- if llm_eval is None:
- # fail-open:评估失败保留该帖,标记便于排查(error=True,下次会重评)
- s["llm_evaluation"] = {"decision": "report", "reason": "llm_eval_failed_kept", "error": True}
- kept.append(s)
- continue
- s["llm_evaluation"] = llm_eval
- is_discard = llm_eval.get("decision") == "discard"
- # 仅淘汰模式才真正移除;标注模式下 discard 帖也留在 source.json
- if apply_decision and is_discard:
- _move_to_discard(s, discarded, llm_eval.get("reason"))
- else:
- kept.append(s)
- # 统计基于最终 kept/discarded 的 llm_evaluation 决策(含本轮新评 + 历史 skip)
- reported = sum(
- 1 for s in kept
- if isinstance(s.get("llm_evaluation"), dict)
- and s["llm_evaluation"].get("decision") == "report"
- )
- would_discard = sum(
- 1 for s in (kept + discarded)
- if isinstance(s.get("llm_evaluation"), dict)
- and s["llm_evaluation"].get("decision") == "discard"
- )
- # 写回 source.json
- data["sources"] = kept
- data["total"] = len(kept)
- with open(source_file, "w", encoding="utf-8") as f:
- json.dump(data, f, ensure_ascii=False, indent=2)
- # 仅淘汰模式:并入 filtered_cases.json
- if apply_decision and discarded:
- _append_to_filtered(raw_cases_dir, discarded)
- # 给研究反馈用的摘要(仅淘汰模式有实际剔除)
- llm_discard_details: List[Dict[str, Any]] = []
- for s in discarded:
- post = s.get("post", {}) or {}
- title = post.get("title") or s.get("source_url", "")
- llm_discard_details.append({
- "case_id": s.get("case_id", ""),
- "platform": s.get("platform", ""),
- "title": title[:60] if title else "",
- "filter_reason": s.get("filter_reason", ""),
- })
- return {
- "evaluated": evaluated,
- "reported": reported,
- "would_discard": would_discard,
- "discarded": len(discarded),
- "skipped": skipped,
- "total_cost": round(total_cost, 4),
- "llm_discard_details": llm_discard_details,
- }
- # ── CLI ────────────────────────────────────────────────────────────────────────
- if __name__ == "__main__":
- import argparse
- from dotenv import load_dotenv
- load_dotenv()
- parser = argparse.ArgumentParser(description="对 source.json 做 LLM rubric 评估")
- parser.add_argument("source_file", type=Path, help="source.json 路径")
- parser.add_argument("--requirement", type=str, default="", help="采集需求 / 目标格子描述")
- parser.add_argument("--model", type=str, default=DEFAULT_EVAL_MODEL,
- help=f"评估模型,可选 {list(EVAL_MODELS)} 或直接传模型 id(默认 {DEFAULT_EVAL_MODEL})")
- parser.add_argument("--max-concurrent", type=int, default=3)
- parser.add_argument("--apply-decision", action="store_true",
- help="按 LLM decision 实际淘汰 discard 帖(默认只标注不淘汰,阈值标定后再开)")
- args = parser.parse_args()
- _llm_call, _model_id = build_eval_llm_call(args.model)
- print(f"[eval-model] {args.model} -> {_model_id}")
- stats = asyncio.run(evaluate_sources_with_llm(
- source_file=args.source_file,
- llm_call=_llm_call,
- model=_model_id,
- requirement=args.requirement,
- max_concurrent=args.max_concurrent,
- apply_decision=args.apply_decision,
- ))
- print(json.dumps(stats, ensure_ascii=False, indent=2))
|