| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141 |
- # -*- coding: utf-8 -*-
- """批量重评 q0000 下当前【命中(is_adopted)】的帖子,用 flash-lite+sonnet 组合(模糊带升级),
- 跑完定向替换 DB 的得分相关字段(overall_score / knowledge_type / llm_evaluation)。
- 先备份旧值到 runs/search_process/q0000.score_backup.<ts>.json,可回滚。"""
- import asyncio, copy, json, sys
- from datetime import datetime
- from pathlib import Path
- PROJECT_ROOT = Path(__file__).resolve().parents[3]
- sys.path.insert(0, str(PROJECT_ROOT))
- from dotenv import load_dotenv
- load_dotenv()
- MW = Path(__file__).resolve().parent
- sys.path.insert(0, str(MW))
- import db
- from examples.process_pipeline.script.search_eval.search_and_evaluate import evaluate_posts
- from examples.process_pipeline.script.llm_evaluate_sources import (
- _EVAL_PRODUCT_FIELDS, build_eval_llm_call,
- )
- QUERY_ID = "q0020"
- TABLE = "search_process"
- INIT_MODEL = "gemini-flash-lite"
- ESC_MODEL = "sonnet"
- BAND = (4.0, 6.0)
- def _load_db_rows():
- conn = db._conn()
- try:
- with conn.cursor() as c:
- c.execute(f"SELECT case_id, overall_score, knowledge_type, publish_time, "
- f"llm_evaluation FROM {TABLE} WHERE query_id=%s", (QUERY_ID,))
- return c.fetchall()
- finally:
- conn.close()
- def _update_scores(case_id, overall, knowledge_type, evaluation):
- conn = db._conn()
- try:
- with conn.cursor() as c:
- c.execute(
- f"UPDATE {TABLE} SET overall_score=%s, knowledge_type=%s, llm_evaluation=%s, "
- f"updated_at=CURRENT_TIMESTAMP WHERE query_id=%s AND case_id=%s",
- (overall, db._j(knowledge_type or []), db._j(evaluation), QUERY_ID, case_id))
- finally:
- conn.close()
- async def main():
- rows = _load_db_rows()
- def _ev(r):
- e = r["llm_evaluation"]
- return json.loads(e) if isinstance(e, str) else (e or {})
- adopted = [r for r in rows if db.is_adopted(r["overall_score"], _ev(r), r["publish_time"])]
- adopted_ids = {r["case_id"] for r in adopted}
- print(f"q0000 共 {len(rows)} 帖,当前命中 {len(adopted)} 帖 → 重评这些\n")
- # 备份旧得分字段
- ts = datetime.now().strftime("%Y%m%d_%H%M%S")
- backup = [{"case_id": r["case_id"], "overall_score": r["overall_score"],
- "knowledge_type": r["knowledge_type"], "publish_time": r["publish_time"],
- "llm_evaluation": _ev(r)} for r in adopted]
- bpath = MW / "runs" / TABLE / f"{QUERY_ID}.score_backup.{ts}.json"
- bpath.write_text(json.dumps(backup, ensure_ascii=False, indent=2), encoding="utf-8")
- print(f"💾 旧得分已备份 → {bpath.name}\n")
- # 从 runs json 取完整帖子(含配图)作为重评输入
- data = json.loads((MW / "runs" / TABLE / f"{QUERY_ID}.json").read_text(encoding="utf-8"))
- query = data.get("query", "")
- by_id = {s["case_id"]: s for s in data.get("results", [])}
- missing = [cid for cid in adopted_ids if cid not in by_id]
- if missing:
- print(f"⚠️ runs json 缺 {len(missing)} 条,将跳过: {missing}")
- targets = []
- for cid in adopted_ids:
- if cid not in by_id:
- continue
- s = copy.deepcopy(by_id[cid])
- for k in _EVAL_PRODUCT_FIELDS:
- s.pop(k, None)
- s.pop("_image_data_urls", None)
- targets.append(s)
- eval_llm, eval_model = build_eval_llm_call(INIT_MODEL)
- esc_llm, esc_model = build_eval_llm_call(ESC_MODEL)
- print(f"🧠 组合评估:{eval_model} 初评 → {esc_model} 复核(带 [{BAND[0]:g},{BAND[1]:g}])\n")
- sources, cost = await evaluate_posts(
- targets, "", eval_llm, eval_model, max_concurrent=4,
- include_images=True, max_images=4, image_mode="url", query=query,
- escalate_llm=esc_llm, escalate_model=esc_model, escalate_band=BAND)
- # 旧分查表
- old_by_id = {r["case_id"]: r for r in adopted}
- report = []
- for s in sources:
- cid = s["case_id"]
- ev = s["llm_evaluation"]
- if not isinstance(ev, dict) or ev.get("_error"):
- print(f" ⚠️ 评估失败,跳过更新: {cid}")
- continue
- kt = ev.get("知识类型") or []
- ov = db.overall_score(ev)
- pub = (s.get("post") or {}).get("publish_timestamp") or old_by_id[cid]["publish_time"]
- new_adopt = db.is_adopted(ov, ev, pub)
- _update_scores(cid, ov, kt, ev) # 定向替换 DB
- by_id[cid]["llm_evaluation"] = ev # 同步 runs json
- report.append({
- "case_id": cid, "escalated": bool(s.get("_escalated")),
- "old_overall": old_by_id[cid]["overall_score"], "new_overall": ov,
- "repro": db._fixed_dim_score(ev, "可复现性"),
- "intent": db._fixed_dim_score(ev, "意图可控性"),
- "new_adopted": new_adopt,
- "title": (s.get("post") or {}).get("title", "")[:22],
- })
- # 同步 runs json
- (MW / "runs" / TABLE / f"{QUERY_ID}.json").write_text(
- json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
- # 报告
- print("\n" + "=" * 92)
- print(f"{'case_id':26} {'升级':4} {'旧综':>5} {'新综':>5} {'复现':>4} {'意图':>4} {'命中':>5} 标题")
- still = 0
- for r in sorted(report, key=lambda x: x["new_overall"]):
- still += int(r["new_adopted"])
- print(f"{r['case_id'][:26]:26} {'★' if r['escalated'] else ' ':^4} "
- f"{(r['old_overall'] or 0):5.2f} {(r['new_overall'] or 0):5.2f} "
- f"{str(r['repro']):>4} {str(r['intent']):>4} "
- f"{'是' if r['new_adopted'] else '否':>4} {r['title']}")
- esc_n = sum(r["escalated"] for r in report)
- print("=" * 92)
- print(f"重评 {len(report)} 帖 · 升级 sonnet {esc_n} 帖 · 命中 {len(adopted)}→{still} · "
- f"总成本 ${cost:.4f}")
- print(f"DB 已更新,旧值备份在 {bpath.name}")
- if __name__ == "__main__":
- asyncio.run(main())
|