# -*- coding: utf-8 -*- """搜索评估案例查看 server。 沿用 图文排版搜索评估.html 的版式(卡片 + dialog 详情 + rubric 评分条), 数据实时扫描 runs/*/form_*.json —— runs 下每新增一个 q 文件夹,刷新即出现。 分页:query → 三种形式(A/B/C) → 三个渠道 三行从上到下。 用法:python server.py [port] 默认 8770,浏览器开 http://0.0.0.0:8770 """ import json, re, glob, sys, pathlib, subprocess from datetime import datetime from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer try: # Windows 控制台默认 cp1252,中文 print 会崩,统一切 utf-8 sys.stdout.reconfigure(encoding="utf-8") except Exception: pass HERE = pathlib.Path(__file__).parent PORT = int(sys.argv[1]) if len(sys.argv) > 1 else 8770 PLAT = {"xhs": "小红书", "gzh": "公众号", "zhihu": "知乎", "x": "X", "bili": "B站", "douyin": "抖音", "sph": "视频号", "youtube": "YouTube", "github": "GitHub", "toutiao": "头条", "weibo": "微博"} KT = {"procedure": "工序", "step": "步骤", "tool": "工具"} # 从 taxonomy 取动作叶子/类型名,用于把 original_q 解析回原始维度(动作×类型 正交) EVALDIR = HERE.parent.parent / "test_script" / "evaluation" if not EVALDIR.exists(): EVALDIR = HERE.parent / "evaluation" try: _jm = json.load(open(EVALDIR / "judged_matrix.json", encoding="utf-8")) ACT_L1 = {a["name"]: a["l1"] for a in _jm["actions"]} ACTION_SET = set(ACT_L1) TYPE_SET = {t["name"] for t in _jm["types"]} ACTIONS_TAX = [{"name": a["name"], "l1": a["l1"], "l2": a.get("l2", "")} for a in _jm["actions"]] TYPES_TAX = [{"name": t["name"], "l1": t["l1"]} for t in _jm["types"]] # taxonomy 顺序沿用 judged_matrix(严格版);矩阵分值改用 type_action_scores(宽松版) — # 两份是同一组 27×50 cell 的独立 gemini judging,前者只 53 格到 tier3,后者 156 格到 score3 _tas = json.load(open(EVALDIR / "type_action_scores.json", encoding="utf-8"))["scores"] _MATRIX = [] for a in _jm["actions"]: row = [] for t in _jm["types"]: rec = _tas.get(t["name"], {}).get(a["name"]) row.append({"tier": rec["score"], "r": rec.get("reason", "")} if rec else {}) _MATRIX.append(row) except Exception: ACT_L1, ACTION_SET, TYPE_SET, ACTIONS_TAX, TYPES_TAX, _MATRIX = {}, set(), set(), [], [], [] MODSET = {"文", "图", "视频", "音频"} TOOLQUAL = {"AI": "AI 模型", "软件": "桌面 APP", "电脑端": "桌面 APP", "在线": "云端 Web", "网页版": "云端 Web", "代码": "API·CLI", "命令行": "API·CLI", "插件": "插件扩展"} def parse_dims(oq): """把组合 query(如 '文 元素生成 提示词 教程')解析回 {动作, 类型, 动作L1, 约束}。""" toks = (oq or "").split() action = next((t for t in toks if t in ACTION_SET), None) type_ = next((t for t in toks if t in TYPE_SET), None) cons = None if toks: t0 = toks[0] if t0 in MODSET: cons = {"kind": "模态", "value": t0} elif t0 in TOOLQUAL: cons = {"kind": "工具类型", "value": TOOLQUAL[t0]} return {"action": action, "type": type_, "action_l1": ACT_L1.get(action, ""), "constraint": cons} def flat_scores(sc): f = {} for k, v in (sc or {}).items(): if isinstance(v, dict): for kk, vv in v.items(): try: f[kk] = int(vv) except Exception: pass else: try: f[k] = int(v) except Exception: pass return f def _recency_hard(date_str): """按 publish_timestamp 头 10 字符(YYYY-MM-DD)算硬时效:半年内=3 / 两年内=2 / 更早=1。 取代原 LLM 评的 recency 维度——脚本算更稳,发布时间在帖子抓取时就有,无需 LLM token。 """ try: d = datetime.strptime((date_str or "")[:10], "%Y-%m-%d") except (ValueError, TypeError): return None days = (datetime.now() - d).days if days <= 180: return 3 if days <= 730: return 2 return 1 def adapt(r): p = r.get("post", {}); e = r.get("llm_evaluation", {}) # 1. 判定是否为新版中文 schema is_new_schema = "评分" in e or "知识类型" in e or "制作相关性" in e # 2. 解析 知识类型 (knowledge_type) kt = [] if is_new_schema: kt_raw = e.get("知识类型") or [] for k in kt_raw: if k in ("工序", "procedure"): kt.append("procedure") elif k in ("步骤", "step"): kt.append("step") elif k in ("工具", "tool"): kt.append("tool") else: kt = e.get("knowledge_type") or [] # 3. 解析 评分 (scores) CN_TO_EN = { "相关性": "relevance", "成品质量": "result_quality", "可信度": "credibility", "具体用例": "concrete_use_case", "完整性": "completeness", "步骤结构": "step_structure", "步骤可复现": "step_reproducibility", "步骤可复现性": "step_reproducibility", "能力定义": "capability_definition", "实现深度": "implementation_depth", "边界失败": "boundary_failure_eval", "通用性": "generality", "能力覆盖": "capability_coverage", "有效对比": "effective_comparison", "参数具体": "param_specificity", "实操示例": "worked_example", "实操用例": "worked_example", "示例完整": "worked_example", "版本限制": "version_limits", "版本说明": "version_limits", "限制说明": "version_limits", } fs = {} score_reasons = {} if is_new_schema: # 新版嵌套结构: "评分": { "通用": { "相关性": { "得分": 5, "理由": "..." } } } pf = e.get("评分") or {} for cat, metrics in pf.items(): if isinstance(metrics, dict): for metric, val in metrics.items(): en_key = CN_TO_EN.get(metric, metric) if isinstance(val, dict) and "得分" in val: try: fs[en_key] = int(val["得分"]) except Exception: pass elif isinstance(val, (int, float)): fs[en_key] = int(val) if isinstance(val, dict) and "理由" in val: score_reasons[en_key] = val["理由"] else: fs = flat_scores(e.get("scores", {})) # 计算均分 (overall) overall = round(sum(fs.values()) / len(fs), 1) if fs else 0 anomaly = bool(e.get("error")) or not fs grade = p.get("_quality_grade", "") fb = r.get("found_by_queries", []) # 4. 解析 制作相关性 (production_relevance) if is_new_schema: pr_block = e.get("制作相关性") or {} pr_raw = pr_block.get("得分") if isinstance(pr_block, dict) else pr_block else: pr_raw = e.get("production_relevance") try: production_relevance = int(float(pr_raw)) if pr_raw is not None else None except (TypeError, ValueError): production_relevance = None recency_hard = _recency_hard(p.get("publish_timestamp", "")) # 5. 解析 判定决策 (decision) 和 理由 (reason) reason = e.get("判定理由") or e.get("reason") or "" # 根据过滤指标决定是否保留 (过滤指标判定逻辑优先,不依赖文字匹配) is_discard = False # 制作相关性低于2则丢弃(非空且 < 2,1分丢弃,兼容旧版本不含该指标的情况) if production_relevance is not None and production_relevance < 2: is_discard = True # 时效性低于2被丢弃(非空且 < 2,1分丢弃,发布时间超两年的老帖) elif recency_hard is not None and recency_hard < 2: is_discard = True # 综合均分低于3被丢弃 elif overall < 3: is_discard = True decision = "discard" if is_discard else "report" return { "platform": PLAT.get(r.get("platform"), r.get("platform")), "platformKey": r.get("platform"), "title": p.get("title", "") or "(无标题)", "date": (p.get("publish_timestamp", "") or "")[:10], "url": r.get("source_url", ""), "engagement": f'{p.get("like_count", 0)} 赞', "knowledge_type": kt, "decision": decision, "tools": [KT.get(k, k) for k in kt] + ([f"质量 {grade}"] if grade else []), "found_by": fb, "images": (p.get("images") or [])[:6], "text": p.get("body_text", "") or "", "scores": fs, "overall": overall, "reason": reason, "score_reasons": score_reasons, "grade": grade, "qscore": p.get("_quality_score", 0), "anomaly": anomaly, "production_relevance": production_relevance, "recency_hard": recency_hard, } def scan_runs(): runs = {} for f in sorted(glob.glob(str(HERE / "runs" / "*" / "form_*.json"))): try: d = json.load(open(f, encoding="utf-8")) except Exception: continue run = pathlib.Path(f).parent.name results = [adapt(r) for r in d.get("results", [])] report_val = sum(1 for r in results if r.get("decision") == "report" and not r.get("anomaly")) discard_val = sum(1 for r in results if r.get("decision") == "discard" and not r.get("anomaly")) runs.setdefault(run, []).append({ "form": d.get("form"), "query": d.get("query"), "original_q": d.get("original_q", ""), "requirement": d.get("requirement", ""), "platforms": d.get("platforms", []), "total": d.get("total"), "report": report_val, "discard": discard_val, "results": results, }) for v in runs.values(): v.sort(key=lambda x: x.get("form") or "") def _qnum(name): # "q156" → 156,按数字排,避免 "q156" < "q99" 的字符串误排 m = re.search(r"\d+", name) return (int(m.group()) if m else 0, name) out = [] for k, v in sorted(runs.items(), key=lambda kv: _qnum(kv[0])): oq = v[0].get("original_q") or v[0].get("query") or "" seen, hits = set(), 0 # 知识命中数 = 各形式采纳(report)且非异常、按 url 去重后的帖子数 for f in v: for r in f.get("results", []): if r.get("decision") == "report" and not r.get("anomaly") and r.get("url") not in seen: seen.add(r.get("url")); hits += 1 out.append({"key": k, "forms": v, "dims": parse_dims(oq), "original_q": oq, "hits": hits, "tot": sum((f.get("total") or 0) for f in v)}) return {"queries": out, "actions": ACTIONS_TAX, "types": TYPES_TAX, "matrix": _MATRIX} class H(BaseHTTPRequestHandler): def _send(self, code, body, ctype): b = body.encode("utf-8") if isinstance(body, str) else body self.send_response(code); self.send_header("Content-Type", ctype + "; charset=utf-8") self.send_header("Content-Length", str(len(b))); self.end_headers(); self.wfile.write(b) def do_GET(self): if self.path in ("/", "/index.html"): try: page = (HERE / "index.html").read_text(encoding="utf-8") self._send(200, page, "text/html") except Exception as e: self._send(500, f"Error reading index.html: {e}", "text/plain") elif self.path.startswith("/api/data"): self._send(200, json.dumps(scan_runs(), ensure_ascii=False), "application/json") else: self._send(404, "not found", "text/plain") def do_POST(self): # /api/reeval —— 后台启动 batch_3forms.py 只对指定 q 复评,立即返回(不等结果) # 复评是 LLM 调用、几十秒到几分钟;浏览器侧用 fetch 启动 + 提示用户稍后刷新,不阻塞 if self.path != "/api/reeval": self._send(404, json.dumps({"error": "not found"}), "application/json"); return length = int(self.headers.get("Content-Length") or 0) raw = self.rfile.read(length).decode("utf-8") if length > 0 else "{}" try: payload = json.loads(raw) except Exception as e: self._send(400, json.dumps({"error": f"bad json: {e}"}), "application/json"); return q = (payload.get("q") or "").strip() # 限定 qNN 形式避免路径注入 if not re.match(r"^q\d+$", q): self._send(400, json.dumps({"error": f"bad q (expect 'qNN'): {q!r}"}, ensure_ascii=False), "application/json"); return q_dir = HERE / "runs" / q if not q_dir.is_dir(): self._send(404, json.dumps({"error": f"runs/{q} not found"}, ensure_ascii=False), "application/json"); return # 后台跑 batch_3forms.py,stdout/stderr 合并写到 q_dir/_reeval.log(可 tail 看进度) log_path = q_dir / "_reeval.log" try: log_fh = open(log_path, "w", encoding="utf-8", buffering=1) cmd = [sys.executable, "-u", str(HERE / "batch_3forms.py"), "--reeval", "--reeval-q", q, "--output-dir", str(HERE / "runs")] flags = subprocess.CREATE_NEW_PROCESS_GROUP if sys.platform == "win32" else 0 proc = subprocess.Popen(cmd, stdout=log_fh, stderr=subprocess.STDOUT, cwd=str(HERE), creationflags=flags) self._send(200, json.dumps( {"status": "started", "pid": proc.pid, "q": q, "log": str(log_path.relative_to(HERE))}, ensure_ascii=False), "application/json") except Exception as e: self._send(500, json.dumps({"error": f"failed to start: {e}"}, ensure_ascii=False), "application/json") def log_message(self, *a): pass if __name__ == "__main__": n = len(scan_runs()["queries"]) print(f"搜索评估查看 server:http://0.0.0.0:{PORT} (runs/ 下 {n} 个 query,实时扫描)") ThreadingHTTPServer(("0.0.0.0", PORT), H).serve_forever()