# -*- coding: utf-8 -*- """一次性:用当前 eval_prompt_template.md(新 prompt)对单帖重评,与库里旧评估对比打分。 用法: python eval_compare.py """ import argparse import asyncio import json import sys from pathlib import Path PROJECT_ROOT = Path(__file__).resolve().parents[2] # …/Agent sys.path.insert(0, str(PROJECT_ROOT)) from dotenv import load_dotenv load_dotenv() HERE = Path(__file__).resolve().parent sys.path.insert(0, str(HERE)) import db from examples.process_pipeline.script.search_eval.search_and_evaluate import _attach_image_refs from examples.process_pipeline.script.llm_evaluate_sources import ( _evaluate_one, build_eval_llm_call, DEFAULT_EVAL_MODEL, ) def _row_to_source(row): return { "case_id": row["case_id"], "platform": row["platform"], "channel_content_id": row["channel_content_id"], "source_url": row["url"], "post": { "title": row["title"], "body_text": row["body"], "images": row["images"] or [], "like_count": row["like_count"], "publish_timestamp": row["publish_time"], "link": row["url"], }, } def flatten_scores(blob, prefix=""): """blob → {dotted_path: 得分}。只收叶子 {得分:...} 节点。""" out = {} if not isinstance(blob, dict): return out if "得分" in blob: out[prefix.rstrip(".")] = blob.get("得分") return out for k, v in blob.items(): if isinstance(v, dict): out.update(flatten_scores(v, f"{prefix}{k}.")) return out async def main(): ap = argparse.ArgumentParser() ap.add_argument("query_id") ap.add_argument("case_id") ap.add_argument("--model", default=DEFAULT_EVAL_MODEL) ap.add_argument("--max-images", type=int, default=4) args = ap.parse_args() row = db.fetch_post(args.query_id, args.case_id, table="search_process") if not row: print(f"❌ {args.query_id}/{args.case_id} 不在 search_process"); return 1 old_blob = row.get("llm_evaluation") or {} src = _row_to_source(row) await _attach_image_refs([src], args.max_images, 8, "url") n_img = len(src.get("_image_data_urls") or []) print(f"📄 {args.case_id} | {(row['title'] or '')[:40]} | 配图 {n_img} 张 | 模型 {args.model}") print(f"🔍 检索词: {row['query_text']}\n") eval_llm, model_id = build_eval_llm_call(args.model) sem = asyncio.Semaphore(1) new_blob, cost = await _evaluate_one( src, "", eval_llm, model_id, sem, image_urls=src.get("_image_data_urls"), query=row["query_text"]) if new_blob is None: print("❌ 新评估失败(重试耗尽)"); return 1 old_f = flatten_scores(old_blob) new_f = flatten_scores(new_blob) keys = sorted(set(old_f) | set(new_f)) print(f"{'维度路径':<46} {'旧分':>6} {'新分':>6} 变化") print("─" * 72) for k in keys: o, n = old_f.get(k), new_f.get(k) mark = "" try: if o is not None and n is not None and float(o) != float(n): mark = f" {float(o):g}→{float(n):g}" except (TypeError, ValueError): pass only = "" if (k in old_f and k in new_f) else (" (旧无)" if k not in old_f else " (新无)") print(f"{k:<46} {str(o) if o is not None else '-':>6} {str(n) if n is not None else '-':>6}{mark}{only}") print("─" * 72) o_overall, n_overall = db.overall_score(old_blob), db.overall_score(new_blob) o_adopt = db.is_adopted(o_overall, old_blob, row["publish_time"]) n_adopt = db.is_adopted(n_overall, new_blob, row["publish_time"]) print(f"{'overall_score':<46} {str(o_overall):>6} {str(n_overall):>6}") print(f"{'知识类型':<46} {str(old_blob.get('知识类型')):>6} | {new_blob.get('知识类型')}") print(f"{'是否采纳':<46} {str(o_adopt):>6} {str(n_adopt):>6}") print(f"\n💲 本次重评成本 ${cost:.4f}") # 落盘完整新 blob,便于细看理由 out = HERE / "runs" / f"eval_compare_{args.case_id}.json" out.write_text(json.dumps({"old": old_blob, "new": new_blob}, ensure_ascii=False, indent=2), encoding="utf-8") print(f"📝 完整新旧 blob(含理由): {out}") return 0 if __name__ == "__main__": raise SystemExit(asyncio.run(main()))