# -*- coding: utf-8 -*- """mode_workflow server · 页面 + API + 解构任务管理 ================================================================================ 单服务(默认 8772): - GET / index.html - GET /search.html 知识检索页(聚类库 tab 内嵌;API 域名由 .env 注入) - GET /api/dashboard Dashboard 全部聚合指标(含内容树覆盖) - GET /api/queries|posts|process|tools(+_versions) Dataset 数据 - POST /api/run_search|extract_process|extract_tools 起子进程跑 pipeline - GET /api/task_status 轮询任务状态(读日志尾部) 用法:python server.py [port] """ import json import os import subprocess import sys import threading from collections import Counter from datetime import datetime from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from pathlib import Path from urllib.parse import urlparse, parse_qs import urllib.request import urllib.error try: sys.stdout.reconfigure(encoding="utf-8") except Exception: pass HERE = Path(__file__).resolve().parent sys.path.insert(0, str(HERE)) import db PORT = int(sys.argv[1]) if len(sys.argv) > 1 else 8772 MATRIX_FILE = HERE / "reference" / "judged_matrix.json" LOG_DIR = HERE / "runs" / "logs" # 知识检索后端地址:从 .env 的 KNOWLEDGE_API_BASE 读取(db.py 已 load_dotenv)。 # 注意:不能把它注入到 search.html 让浏览器直连——后端是明文 http://,而页面 # 经 Cloudflare 隧道是 https://,浏览器会以「混合内容(Mixed Content)」拦截请求。 # 因此 search.html 保持相对路径 '/api/v1/knowledge',由本服务同源反代到后端, # 这样既无混合内容、也无跨域(CORS)问题。 KNOWLEDGE_API_BASE = os.getenv("KNOWLEDGE_API_BASE", "").rstrip("/") def _render_search_html(): # 保持相对路径,接口走本服务的 /api/v1/knowledge 反向代理。 return (HERE / "search.html").read_bytes() # ── 任务管理:task_id → {proc, log, status} ────────────────────────────────── TASKS = {} _TASK_LOCK = threading.Lock() def _spawn_task(kind, cmd): LOG_DIR.mkdir(parents=True, exist_ok=True) task_id = f"{kind}_{datetime.now().strftime('%m%d%H%M%S%f')}" log_path = LOG_DIR / f"{task_id}.log" f = open(log_path, "w", encoding="utf-8") proc = subprocess.Popen(cmd, stdout=f, stderr=subprocess.STDOUT, cwd=str(HERE), text=True) with _TASK_LOCK: TASKS[task_id] = {"proc": proc, "log": log_path, "status": "running"} def _wait(): rc = proc.wait() f.close() with _TASK_LOCK: TASKS[task_id]["status"] = "done" if rc == 0 else "failed" threading.Thread(target=_wait, daemon=True).start() return task_id def _task_status(task_id): with _TASK_LOCK: t = TASKS.get(task_id) if not t: return None tail = "" try: text = t["log"].read_text(encoding="utf-8", errors="replace") tail = text[-3000:] except Exception: pass return {"status": t["status"], "log_tail": tail} def _next_query_id(): """两张搜索表统一编号,避免跨方向撞 ID。""" qs = [q["query_id"] for m in ("process", "tools") for q in db.fetch_queries(m)] nums = [int(q[1:]) for q in qs if q.startswith("q") and q[1:].isdigit()] return f"q{(max(nums) + 1 if nums else 0):04d}" # ── Dashboard 聚合 ──────────────────────────────────────────────────────────── def _split_values(v): """substance/form 字段:数组直接用;字符串按 、,/ 分割;None 丢弃。""" out = [] items = v if isinstance(v, list) else [v] for it in items: if not it or not isinstance(it, str): continue for piece in it.replace(",", "、").replace("/", "、").split("、"): piece = piece.strip() if piece: out.append(piece) return out def _dashboard(): posts, procs, tools = db.fetch_dashboard_rows() # 最新版本行集(覆盖度/Top10 用最新版,成本/耗时按全部版本累计) def latest(rows): best = {} for r in rows: cid = r["case_id"] if cid not in best or (r["version"] or "") > (best[cid] or ""): best[cid] = r["version"] return [r for r in rows if r["version"] == best[r["case_id"]]] latest_procs = latest(procs) latest_tools = latest(tools) # 内容树覆盖:steps 的 (action 叶子 × 输入/输出 type) ∩ 有效节点(tier≥1) jm = json.loads(MATRIX_FILE.read_text(encoding="utf-8")) a_idx = {a["name"]: i for i, a in enumerate(jm["actions"])} t_idx = {t["name"]: i for i, t in enumerate(jm["types"])} valid = set() for ai, row in enumerate(jm["matrix"]): for ti, cell in enumerate(row): if isinstance(cell, dict) and cell.get("tier", 0) >= 1: valid.add((ai, ti)) covered = set() via_counter = Counter() substance_counter = Counter() form_counter = Counter() for r in latest_procs: for s in r["steps"]: if not isinstance(s, dict): continue leaf = (s.get("action") or "").split("/")[-1].strip() types = [] for io in ("inputs", "outputs"): for x in s.get(io) or []: if isinstance(x, dict) and x.get("type"): types.append(str(x["type"]).strip()) if leaf in a_idx: for tp in types: if tp in t_idx and (a_idx[leaf], t_idx[tp]) in valid: covered.add((a_idx[leaf], t_idx[tp])) via = (s.get("via") or "").strip() if via: via_counter[via] += 1 for v in _split_values(s.get("substance")): substance_counter[v] += 1 for v in _split_values(s.get("form")): form_counter[v] += 1 for r in latest_tools: for v in _split_values(r["substance_scope"]): substance_counter[v] += 1 for v in _split_values(r["form_scope"]): form_counter[v] += 1 # 成本/耗时:同一 (case_id, version) 只计一次(各行重复存同一次调用的值) def cost_groups(rows): g = {} for r in rows: key = (r["case_id"], r["version"]) if key not in g and r["cost_usd"] is not None: g[key] = (r["cost_usd"], r["duration_s"] or 0.0, r["created_at"]) return list(g.values()) runs = cost_groups(procs) + cost_groups(tools) total_cost = round(sum(c for c, _, _ in runs), 4) total_dur = round(sum(d for _, d, _ in runs), 1) # 按日成本趋势 daily = Counter() for c, _, ts in runs: if ts: daily[ts[:10]] += c cost_trend = [{"date": d, "cost": round(v, 4)} for d, v in sorted(daily.items())] # 进度:分子分母同口径,都走「采纳」。分母 = 该方向 search 表里采纳的帖(distinct case), # 即「需解构」;分子 = 采纳帖里已解构的(∩ 保证 ≤ 分母,杜绝越界/虚高)。 # 方向由 p["mode"] 区分(process=search_process,tools=search_tools),不再看 knowledge_type。 proc_targets = {p["case_id"] for p in posts if p["mode"] == "process" and p["adopted"]} tool_targets = {p["case_id"] for p in posts if p["mode"] == "tools" and p["adopted"]} proc_extracted = {r["case_id"] for r in procs} tool_extracted = {r["case_id"] for r in tools} proc_done = proc_extracted & proc_targets tool_done = tool_extracted & tool_targets # 渠道分项/解构总数:按实际解构过的 distinct case(不限采纳),平台由 case 内禀。 extracted_all = proc_extracted | tool_extracted case_plat = {p["case_id"]: (p["platform"] or "other") for p in posts} collected_by_plat = Counter((p["platform"] or "other") for p in posts) extracted_by_plat = Counter(case_plat.get(c, "other") for c in extracted_all) return { "result": { "collected_by_platform": collected_by_plat.most_common(), "extracted_by_platform": extracted_by_plat.most_common(), "matrix_covered": len(covered), "matrix_valid": len(valid), "matrix_cells": sorted([ai, ti] for ai, ti in covered), "matrix_actions": [a["name"] for a in jm["actions"]], "matrix_types": [t["name"] for t in jm["types"]], "substance_count": len(substance_counter), "substance_top": substance_counter.most_common(15), "form_count": len(form_counter), "form_top": form_counter.most_common(15), "post_count": len(posts), "extracted_post_count": len(extracted_all), "tool_count": len({r["tool_name"] for r in latest_tools if r["tool_name"]}), "via_top10": via_counter.most_common(10), }, "process_data": { "run_count": len(runs), "avg_cost": round(total_cost / len(runs), 4) if runs else 0, "total_cost": total_cost, "avg_duration": round(total_dur / len(runs), 1) if runs else 0, "total_duration": total_dur, "cost_trend": cost_trend, "process_progress": {"done": len(proc_done), "total": len(proc_targets)}, "tools_progress": {"done": len(tool_done), "total": len(tool_targets)}, }, } # ── HTTP handler ───────────────────────────────────────────────────────────── class Handler(BaseHTTPRequestHandler): def _json(self, data, code=200): body = json.dumps(data, ensure_ascii=False, default=str).encode("utf-8") self.send_response(code) self.send_header("Content-Type", "application/json; charset=utf-8") self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body) def _err(self, msg, code=400): self._json({"error": msg}, code) def _proxy_knowledge(self, body=None): """把 /api/v1/knowledge* 同源反代到 KNOWLEDGE_API_BASE(明文后端)。 浏览器只跟本服务(经隧道走 https)通信,规避混合内容 + 跨域。""" if not KNOWLEDGE_API_BASE: return self._err("KNOWLEDGE_API_BASE 未配置", 502) target = KNOWLEDGE_API_BASE + self.path # self.path 含 query string headers = {} ct = self.headers.get("Content-Type") if ct: headers["Content-Type"] = ct req = urllib.request.Request(target, data=body, headers=headers, method=self.command) try: with urllib.request.urlopen(req, timeout=120) as resp: payload = resp.read() code = resp.status rct = resp.headers.get("Content-Type", "application/json; charset=utf-8") except urllib.error.HTTPError as e: payload = e.read() code = e.code rct = e.headers.get("Content-Type", "application/json; charset=utf-8") except Exception as e: return self._err(f"知识检索后端不可达:{type(e).__name__}: {e}", 502) self.send_response(code) self.send_header("Content-Type", rct) self.send_header("Content-Length", str(len(payload))) self.end_headers() self.wfile.write(payload) def do_GET(self): u = urlparse(self.path) qs = {k: v[0] for k, v in parse_qs(u.query).items()} try: if u.path == "/" or u.path == "/index.html": body = (HERE / "index.html").read_bytes() self.send_response(200) self.send_header("Content-Type", "text/html; charset=utf-8") self.send_header("Content-Length", str(len(body))) # 单文件前端,改版频繁:禁缓存,避免浏览器拿到旧 index.html self.send_header("Cache-Control", "no-cache, no-store, must-revalidate") self.end_headers() self.wfile.write(body) elif u.path == "/search.html": # 聚类库 tab 内嵌的知识检索页;API 域名由 .env 注入 body = _render_search_html() self.send_response(200) self.send_header("Content-Type", "text/html; charset=utf-8") self.send_header("Content-Length", str(len(body))) self.send_header("Cache-Control", "no-cache, no-store, must-revalidate") self.end_headers() self.wfile.write(body) elif u.path == "/api/dashboard": self._json(_dashboard()) elif u.path == "/api/queries": self._json(db.fetch_queries(qs.get("mode", "process"))) elif u.path == "/api/posts": self._json(db.fetch_posts(qs.get("query_id", ""), qs.get("mode", "process"))) elif u.path == "/api/process_versions": self._json(db.fetch_process_versions(qs.get("case_id", ""))) elif u.path == "/api/process": r = db.fetch_process(qs.get("case_id", ""), qs.get("version")) self._json(r) if r else self._err("无解构记录", 404) elif u.path == "/api/tools_versions": self._json(db.fetch_tools_versions(qs.get("case_id", ""))) elif u.path == "/api/tools": r = db.fetch_tools(qs.get("case_id", ""), qs.get("version")) self._json(r) if r else self._err("无解构记录", 404) elif u.path == "/api/task_status": r = _task_status(qs.get("task_id", "")) self._json(r) if r else self._err("未知 task_id", 404) elif u.path.startswith("/api/v1/knowledge"): self._proxy_knowledge() else: self._err("not found", 404) except Exception as e: self._err(f"{type(e).__name__}: {e}", 500) def do_POST(self): u = urlparse(self.path) try: n = int(self.headers.get("Content-Length") or 0) raw = self.rfile.read(n) except Exception: return self._err("读取请求体失败") # 知识检索接口:原样反代到后端,不在本服务做 JSON 解析 if u.path.startswith("/api/v1/knowledge"): return self._proxy_knowledge(body=raw) try: payload = json.loads(raw or b"{}") except Exception: return self._err("body 必须是 JSON") try: if u.path in ("/api/extract_process", "/api/extract_tools"): qid = payload.get("query_id") cids = payload.get("case_ids") or [] if not qid or not cids: return self._err("缺 query_id / case_ids") script = ("pipeline/procedure_extract.py" if u.path.endswith("process") else "pipeline/tool_extract.py") cmd = [sys.executable, script, "--query-id", qid, "--case-ids", ",".join(cids)] if payload.get("model"): cmd += ["--model", payload["model"]] kind = "proc" if u.path.endswith("process") else "tool" self._json({"task_id": _spawn_task(kind, cmd)}) elif u.path == "/api/run_search": query = (payload.get("query") or "").strip() if not query: return self._err("缺 query") qid = payload.get("query_id") or _next_query_id() cmd = [sys.executable, "pipeline/search_eval.py", "--query-id", qid, "--query", query] if payload.get("synonyms"): cmd += ["--synonyms", payload["synonyms"]] if payload.get("mode_type") in ("工序", "工具"): cmd += ["--mode-type", payload["mode_type"]] if payload.get("platforms"): cmd += ["--platforms", payload["platforms"]] if payload.get("max_count"): cmd += ["--max-count", str(payload["max_count"])] self._json({"task_id": _spawn_task("search", cmd), "query_id": qid}) else: self._err("not found", 404) except Exception as e: self._err(f"{type(e).__name__}: {e}", 500) def log_message(self, fmt, *a): pass # 静默访问日志 if __name__ == "__main__": print(f"🚀 mode_workflow server → http://0.0.0.0:{PORT}") ThreadingHTTPServer(("0.0.0.0", PORT), Handler).serve_forever()