# -*- coding: utf-8 -*- """批量重评 q0000 下当前【命中(is_adopted)】的帖子,用 flash-lite+sonnet 组合(模糊带升级), 跑完定向替换 DB 的得分相关字段(overall_score / knowledge_type / llm_evaluation)。 先备份旧值到 runs/search_process/q0000.score_backup..json,可回滚。""" import asyncio, copy, json, sys from datetime import datetime from pathlib import Path PROJECT_ROOT = Path(__file__).resolve().parents[3] sys.path.insert(0, str(PROJECT_ROOT)) from dotenv import load_dotenv load_dotenv() MW = Path(__file__).resolve().parent sys.path.insert(0, str(MW)) import db from examples.process_pipeline.script.search_eval.search_and_evaluate import evaluate_posts from examples.process_pipeline.script.llm_evaluate_sources import ( _EVAL_PRODUCT_FIELDS, build_eval_llm_call, ) QUERY_ID = "q0020" TABLE = "search_process" INIT_MODEL = "gemini-flash-lite" ESC_MODEL = "sonnet" BAND = (4.0, 6.0) def _load_db_rows(): conn = db._conn() try: with conn.cursor() as c: c.execute(f"SELECT case_id, overall_score, knowledge_type, publish_time, " f"llm_evaluation FROM {TABLE} WHERE query_id=%s", (QUERY_ID,)) return c.fetchall() finally: conn.close() def _update_scores(case_id, overall, knowledge_type, evaluation): conn = db._conn() try: with conn.cursor() as c: c.execute( f"UPDATE {TABLE} SET overall_score=%s, knowledge_type=%s, llm_evaluation=%s, " f"updated_at=CURRENT_TIMESTAMP WHERE query_id=%s AND case_id=%s", (overall, db._j(knowledge_type or []), db._j(evaluation), QUERY_ID, case_id)) finally: conn.close() async def main(): rows = _load_db_rows() def _ev(r): e = r["llm_evaluation"] return json.loads(e) if isinstance(e, str) else (e or {}) adopted = [r for r in rows if db.is_adopted(r["overall_score"], _ev(r), r["publish_time"])] adopted_ids = {r["case_id"] for r in adopted} print(f"q0000 共 {len(rows)} 帖,当前命中 {len(adopted)} 帖 → 重评这些\n") # 备份旧得分字段 ts = datetime.now().strftime("%Y%m%d_%H%M%S") backup = [{"case_id": r["case_id"], "overall_score": r["overall_score"], "knowledge_type": r["knowledge_type"], "publish_time": r["publish_time"], "llm_evaluation": _ev(r)} for r in adopted] bpath = MW / "runs" / TABLE / f"{QUERY_ID}.score_backup.{ts}.json" bpath.write_text(json.dumps(backup, ensure_ascii=False, indent=2), encoding="utf-8") print(f"💾 旧得分已备份 → {bpath.name}\n") # 从 runs json 取完整帖子(含配图)作为重评输入 data = json.loads((MW / "runs" / TABLE / f"{QUERY_ID}.json").read_text(encoding="utf-8")) query = data.get("query", "") by_id = {s["case_id"]: s for s in data.get("results", [])} missing = [cid for cid in adopted_ids if cid not in by_id] if missing: print(f"⚠️ runs json 缺 {len(missing)} 条,将跳过: {missing}") targets = [] for cid in adopted_ids: if cid not in by_id: continue s = copy.deepcopy(by_id[cid]) for k in _EVAL_PRODUCT_FIELDS: s.pop(k, None) s.pop("_image_data_urls", None) targets.append(s) eval_llm, eval_model = build_eval_llm_call(INIT_MODEL) esc_llm, esc_model = build_eval_llm_call(ESC_MODEL) print(f"🧠 组合评估:{eval_model} 初评 → {esc_model} 复核(带 [{BAND[0]:g},{BAND[1]:g}])\n") sources, cost = await evaluate_posts( targets, "", eval_llm, eval_model, max_concurrent=4, include_images=True, max_images=4, image_mode="url", query=query, escalate_llm=esc_llm, escalate_model=esc_model, escalate_band=BAND) # 旧分查表 old_by_id = {r["case_id"]: r for r in adopted} report = [] for s in sources: cid = s["case_id"] ev = s["llm_evaluation"] if not isinstance(ev, dict) or ev.get("_error"): print(f" ⚠️ 评估失败,跳过更新: {cid}") continue kt = ev.get("知识类型") or [] ov = db.overall_score(ev) pub = (s.get("post") or {}).get("publish_timestamp") or old_by_id[cid]["publish_time"] new_adopt = db.is_adopted(ov, ev, pub) _update_scores(cid, ov, kt, ev) # 定向替换 DB by_id[cid]["llm_evaluation"] = ev # 同步 runs json report.append({ "case_id": cid, "escalated": bool(s.get("_escalated")), "old_overall": old_by_id[cid]["overall_score"], "new_overall": ov, "repro": db._fixed_dim_score(ev, "可复现性"), "intent": db._fixed_dim_score(ev, "意图可控性"), "new_adopted": new_adopt, "title": (s.get("post") or {}).get("title", "")[:22], }) # 同步 runs json (MW / "runs" / TABLE / f"{QUERY_ID}.json").write_text( json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8") # 报告 print("\n" + "=" * 92) print(f"{'case_id':26} {'升级':4} {'旧综':>5} {'新综':>5} {'复现':>4} {'意图':>4} {'命中':>5} 标题") still = 0 for r in sorted(report, key=lambda x: x["new_overall"]): still += int(r["new_adopted"]) print(f"{r['case_id'][:26]:26} {'★' if r['escalated'] else ' ':^4} " f"{(r['old_overall'] or 0):5.2f} {(r['new_overall'] or 0):5.2f} " f"{str(r['repro']):>4} {str(r['intent']):>4} " f"{'是' if r['new_adopted'] else '否':>4} {r['title']}") esc_n = sum(r["escalated"] for r in report) print("=" * 92) print(f"重评 {len(report)} 帖 · 升级 sonnet {esc_n} 帖 · 命中 {len(adopted)}→{still} · " f"总成本 ${cost:.4f}") print(f"DB 已更新,旧值备份在 {bpath.name}") if __name__ == "__main__": asyncio.run(main())