# -*- coding: utf-8 -*- """mode_workflow server · 页面 + API + 解构任务管理 ================================================================================ 单服务(默认 8772): - GET / index.html - GET /search.html 知识检索页(聚类库 tab 内嵌;API 域名由 .env 注入) - GET /api/dashboard Dashboard 全部聚合指标(含内容树覆盖) - GET /api/queries|posts|process|tools(+_versions) Dataset 数据 - GET /api/extract 点帖子合一:版本列表+解构详情(单连接,带 ETag/304) - POST /api/run_search|extract_process|extract_tools 起子进程跑 pipeline - GET /api/task_status 轮询任务状态(读日志尾部) 用法:python server.py [port] """ import hashlib import json import os import subprocess import sys import threading import time from collections import Counter from datetime import datetime from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from pathlib import Path from urllib.parse import urlparse, parse_qs import urllib.request import urllib.error try: sys.stdout.reconfigure(encoding="utf-8") except Exception: pass HERE = Path(__file__).resolve().parent sys.path.insert(0, str(HERE)) import db PORT = int(sys.argv[1]) if len(sys.argv) > 1 else 8772 MATRIX_FILE = HERE / "reference" / "judged_matrix.json" LOG_DIR = HERE / "runs" / "logs" # 工序步骤 via 字段的「无具体工具」占位符,不计入工序提及工具 TOP 榜 _VIA_PLACEHOLDERS = {"-", "—", "-", "--", "/", "无", "n/a", "none"} # 知识检索后端地址:从 .env 的 KNOWLEDGE_API_BASE 读取(db.py 已 load_dotenv)。 # 注意:不能把它注入到 search.html 让浏览器直连——后端是明文 http://,而页面 # 经 Cloudflare 隧道是 https://,浏览器会以「混合内容(Mixed Content)」拦截请求。 # 因此 search.html 保持相对路径 '/api/v1/knowledge',由本服务同源反代到后端, # 这样既无混合内容、也无跨域(CORS)问题。 KNOWLEDGE_API_BASE = os.getenv("KNOWLEDGE_API_BASE", "").rstrip("/") def _render_search_html(): # 保持相对路径,接口走本服务的 /api/v1/knowledge 反向代理。 return (HERE / "search.html").read_bytes() # ── 任务管理:task_id → {proc, log, status} ────────────────────────────────── TASKS = {} _TASK_LOCK = threading.Lock() def _spawn_task(kind, cmd): LOG_DIR.mkdir(parents=True, exist_ok=True) task_id = f"{kind}_{datetime.now().strftime('%m%d%H%M%S%f')}" log_path = LOG_DIR / f"{task_id}.log" f = open(log_path, "w", encoding="utf-8") proc = subprocess.Popen(cmd, stdout=f, stderr=subprocess.STDOUT, cwd=str(HERE), text=True) with _TASK_LOCK: TASKS[task_id] = {"proc": proc, "log": log_path, "status": "running"} def _wait(): rc = proc.wait() f.close() with _TASK_LOCK: TASKS[task_id]["status"] = "done" if rc == 0 else "failed" # 搜索/解构任务一结束,四表数据可能变,作废 Dashboard 缓存,下次重算 _invalidate_dashboard() threading.Thread(target=_wait, daemon=True).start() return task_id def _task_status(task_id): with _TASK_LOCK: t = TASKS.get(task_id) if not t: return None tail = "" try: text = t["log"].read_text(encoding="utf-8", errors="replace") tail = text[-3000:] except Exception: pass return {"status": t["status"], "log_tail": tail} def _next_query_id(): """两张搜索表统一编号,避免跨方向撞 ID。""" qs = [q["query_id"] for m in ("process", "tools") for q in db.fetch_queries(m)] nums = [int(q[1:]) for q in qs if q.startswith("q") and q[1:].isdigit()] return f"q{(max(nums) + 1 if nums else 0):04d}" # ── Dashboard 聚合 ──────────────────────────────────────────────────────────── def _split_values(v): """substance/form 字段:数组直接用;字符串按 、,/ 分割;None 丢弃。""" out = [] items = v if isinstance(v, list) else [v] for it in items: if not it or not isinstance(it, str): continue for piece in it.replace(",", "、").replace("/", "、").split("、"): piece = piece.strip() if piece: out.append(piece) return out # ── Dashboard 结果缓存 ──────────────────────────────────────────────────────── # Dashboard 要拉/聚合四表(本库远程 RDS 下整体 ~2s),但数据只在搜索/解构任务 # 完成时才变。故缓存计算结果:命中即 <1ms 返回;任务结束时主动作废(见 _spawn_task), # 另设兜底 TTL 兜住外部直接改库的情况。 _DASH_CACHE = {"data": None, "ts": 0.0} _DASH_LOCK = threading.Lock() _DASH_TTL = 60.0 # 秒 def _invalidate_dashboard(): with _DASH_LOCK: _DASH_CACHE["ts"] = 0.0 def _dashboard_cached(): with _DASH_LOCK: if _DASH_CACHE["data"] is not None and time.monotonic() - _DASH_CACHE["ts"] < _DASH_TTL: return _DASH_CACHE["data"] data = _dashboard() # 计算放锁外,不阻塞其它请求(偶发并发重算可接受) with _DASH_LOCK: _DASH_CACHE["data"] = data _DASH_CACHE["ts"] = time.monotonic() return data def _dashboard(): posts, procs, tools = db.fetch_dashboard_rows() # 最新版本行集(覆盖度/Top10 用最新版,成本/耗时按全部版本累计) def latest(rows): best = {} for r in rows: cid = r["case_id"] if cid not in best or (r["version"] or "") > (best[cid] or ""): best[cid] = r["version"] return [r for r in rows if r["version"] == best[r["case_id"]]] latest_procs = latest(procs) latest_tools = latest(tools) # 内容树覆盖:steps 的 (action 叶子 × 输入/输出 type) ∩ 有效节点(tier≥1) jm = json.loads(MATRIX_FILE.read_text(encoding="utf-8")) a_idx = {a["name"]: i for i, a in enumerate(jm["actions"])} t_idx = {t["name"]: i for i, t in enumerate(jm["types"])} valid = set() for ai, row in enumerate(jm["matrix"]): for ti, cell in enumerate(row): if isinstance(cell, dict) and cell.get("tier", 0) >= 1: valid.add((ai, ti)) covered = set() via_counter = Counter() substance_counter = Counter() form_counter = Counter() for r in latest_procs: for s in r["steps"]: if not isinstance(s, dict): continue leaf = (s.get("action") or "").split("/")[-1].strip() types = [] for io in ("inputs", "outputs"): for x in s.get(io) or []: if isinstance(x, dict) and x.get("type"): types.append(str(x["type"]).strip()) if leaf in a_idx: for tp in types: if tp in t_idx and (a_idx[leaf], t_idx[tp]) in valid: covered.add((a_idx[leaf], t_idx[tp])) via = (s.get("via") or "").strip() # "-" / "—" / "无" 等是「无具体工具」占位符,不计入工具 TOP 榜 if via and via.lower() not in _VIA_PLACEHOLDERS: via_counter[via] += 1 for v in _split_values(s.get("substance")): substance_counter[v] += 1 for v in _split_values(s.get("form")): form_counter[v] += 1 for r in latest_tools: for v in _split_values(r["substance_scope"]): substance_counter[v] += 1 for v in _split_values(r["form_scope"]): form_counter[v] += 1 # 成本/耗时:同一 (case_id, version) 只计一次(各行重复存同一次调用的值) def cost_groups(rows): g = {} for r in rows: key = (r["case_id"], r["version"]) if key not in g and r["cost_usd"] is not None: g[key] = (r["cost_usd"], r["duration_s"] or 0.0, r["created_at"]) return list(g.values()) runs = cost_groups(procs) + cost_groups(tools) total_cost = round(sum(c for c, _, _ in runs), 4) total_dur = round(sum(d for _, d, _ in runs), 1) # 按日成本趋势 daily = Counter() for c, _, ts in runs: if ts: daily[ts[:10]] += c cost_trend = [{"date": d, "cost": round(v, 4)} for d, v in sorted(daily.items())] # 进度:分子分母同口径,都走「采纳」。分母 = 该方向 search 表里采纳的帖(distinct case), # 即「需解构」;分子 = 采纳帖里已解构的(∩ 保证 ≤ 分母,杜绝越界/虚高)。 # 方向由 p["mode"] 区分(process=search_process,tools=search_tools),不再看 knowledge_type。 proc_targets = {p["case_id"] for p in posts if p["mode"] == "process" and p["adopted"]} tool_targets = {p["case_id"] for p in posts if p["mode"] == "tools" and p["adopted"]} proc_extracted = {r["case_id"] for r in procs} tool_extracted = {r["case_id"] for r in tools} proc_done = proc_extracted & proc_targets tool_done = tool_extracted & tool_targets # 渠道分项/解构总数:按实际解构过的 distinct case(不限采纳),平台由 case 内禀。 extracted_all = proc_extracted | tool_extracted case_plat = {p["case_id"]: (p["platform"] or "other") for p in posts} collected_by_plat = Counter((p["platform"] or "other") for p in posts) extracted_by_plat = Counter(case_plat.get(c, "other") for c in extracted_all) return { "result": { "collected_by_platform": collected_by_plat.most_common(), "extracted_by_platform": extracted_by_plat.most_common(), "matrix_covered": len(covered), "matrix_valid": len(valid), "matrix_cells": sorted([ai, ti] for ai, ti in covered), "matrix_actions": [a["name"] for a in jm["actions"]], "matrix_types": [t["name"] for t in jm["types"]], "substance_count": len(substance_counter), "substance_top": substance_counter.most_common(15), "form_count": len(form_counter), "form_top": form_counter.most_common(15), "post_count": len(posts), "extracted_post_count": len(extracted_all), "tool_count": len({r["tool_name"] for r in latest_tools if r["tool_name"]}), "via_top10": via_counter.most_common(10), }, "process_data": { "run_count": len(runs), "avg_cost": round(total_cost / len(runs), 4) if runs else 0, "total_cost": total_cost, "avg_duration": round(total_dur / len(runs), 1) if runs else 0, "total_duration": total_dur, "cost_trend": cost_trend, "process_progress": {"done": len(proc_done), "total": len(proc_targets)}, "tools_progress": {"done": len(tool_done), "total": len(tool_targets)}, }, } # ── HTTP handler ───────────────────────────────────────────────────────────── class Handler(BaseHTTPRequestHandler): def _json(self, data, code=200): body = json.dumps(data, ensure_ascii=False, default=str).encode("utf-8") self.send_response(code) self.send_header("Content-Type", "application/json; charset=utf-8") self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body) def _err(self, msg, code=400): self._json({"error": msg}, code) def _json_etag(self, data): """带 ETag 的 JSON 响应:解构结果按 (case_id,version) 内容不变, 浏览器再次点开同一帖时带 If-None-Match 命中 304,免传 body。 Cache-Control: no-cache = 每次都回服务端校验(连接池下校验仅 ~50ms), 永不返回过期数据(重新解构后 ETag 变化即自动失效)。""" body = json.dumps(data, ensure_ascii=False, default=str).encode("utf-8") etag = '"' + hashlib.md5(body).hexdigest() + '"' if self.headers.get("If-None-Match") == etag: self.send_response(304) self.send_header("ETag", etag) self.send_header("Cache-Control", "no-cache") self.end_headers() return self.send_response(200) self.send_header("Content-Type", "application/json; charset=utf-8") self.send_header("Content-Length", str(len(body))) self.send_header("ETag", etag) self.send_header("Cache-Control", "no-cache") self.end_headers() self.wfile.write(body) def _proxy_image(self, url): """同源图片反代:绕过公众号(mmbiz.qpic.cn)等站点的防盗链。 浏览器侧 referrerpolicy=no-referrer 偶尔仍被拦,服务端直取最稳: 不带 Referer、给个常规 UA,把图片字节原样转回,并加长缓存。""" if not url or not (url.startswith("http://") or url.startswith("https://")): return self._err("非法图片地址", 400) host = (urlparse(url).hostname or "").lower() # 防 SSRF:挡掉内网/本机地址 if host in ("localhost", "127.0.0.1", "0.0.0.0", "::1") or \ host.startswith("10.") or host.startswith("192.168.") or \ host.startswith("169.254.") or host.endswith(".internal"): return self._err("禁止的图片地址", 403) req = urllib.request.Request(url, headers={ "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/120.0 Safari/537.36", "Accept": "image/avif,image/webp,image/apng,image/*,*/*;q=0.8", }) try: with urllib.request.urlopen(req, timeout=30) as resp: payload = resp.read() ct = resp.headers.get("Content-Type", "image/jpeg") except urllib.error.HTTPError as e: return self._err(f"上游图片返回 {e.code}", e.code if 400 <= e.code < 600 else 502) except Exception as e: return self._err(f"图片不可达:{type(e).__name__}: {e}", 502) self.send_response(200) self.send_header("Content-Type", ct) self.send_header("Content-Length", str(len(payload))) self.send_header("Cache-Control", "public, max-age=86400") self.end_headers() self.wfile.write(payload) def _proxy_knowledge(self, body=None): """把 /api/v1/knowledge* 同源反代到 KNOWLEDGE_API_BASE(明文后端)。 浏览器只跟本服务(经隧道走 https)通信,规避混合内容 + 跨域。""" if not KNOWLEDGE_API_BASE: return self._err("KNOWLEDGE_API_BASE 未配置", 502) target = KNOWLEDGE_API_BASE + self.path # self.path 含 query string headers = {} ct = self.headers.get("Content-Type") if ct: headers["Content-Type"] = ct req = urllib.request.Request(target, data=body, headers=headers, method=self.command) try: with urllib.request.urlopen(req, timeout=120) as resp: payload = resp.read() code = resp.status rct = resp.headers.get("Content-Type", "application/json; charset=utf-8") except urllib.error.HTTPError as e: payload = e.read() code = e.code rct = e.headers.get("Content-Type", "application/json; charset=utf-8") except Exception as e: return self._err(f"知识检索后端不可达:{type(e).__name__}: {e}", 502) self.send_response(code) self.send_header("Content-Type", rct) self.send_header("Content-Length", str(len(payload))) self.end_headers() self.wfile.write(payload) def do_GET(self): u = urlparse(self.path) qs = {k: v[0] for k, v in parse_qs(u.query).items()} try: if u.path == "/" or u.path == "/index.html": body = (HERE / "index.html").read_bytes() self.send_response(200) self.send_header("Content-Type", "text/html; charset=utf-8") self.send_header("Content-Length", str(len(body))) # 单文件前端,改版频繁:禁缓存,避免浏览器拿到旧 index.html self.send_header("Cache-Control", "no-cache, no-store, must-revalidate") self.end_headers() self.wfile.write(body) elif u.path == "/search.html": # 聚类库 tab 内嵌的知识检索页;API 域名由 .env 注入 body = _render_search_html() self.send_response(200) self.send_header("Content-Type", "text/html; charset=utf-8") self.send_header("Content-Length", str(len(body))) self.send_header("Cache-Control", "no-cache, no-store, must-revalidate") self.end_headers() self.wfile.write(body) elif u.path == "/api/dashboard": self._json_etag(_dashboard_cached()) elif u.path == "/api/queries": self._json(db.fetch_queries(qs.get("mode", "process"))) elif u.path == "/api/posts": self._json(db.fetch_posts(qs.get("query_id", ""), qs.get("mode", "process"))) elif u.path == "/api/extract": # 一次点击合一:单连接同时取版本列表 + 解构详情,前端少一次往返。 self._json_etag(db.fetch_extract( qs.get("mode", "process"), qs.get("case_id", ""), qs.get("version"))) elif u.path == "/api/process_versions": self._json(db.fetch_process_versions(qs.get("case_id", ""))) elif u.path == "/api/process": r = db.fetch_process(qs.get("case_id", ""), qs.get("version")) self._json(r) if r else self._err("无解构记录", 404) elif u.path == "/api/tools_versions": self._json(db.fetch_tools_versions(qs.get("case_id", ""))) elif u.path == "/api/tools": r = db.fetch_tools(qs.get("case_id", ""), qs.get("version")) self._json(r) if r else self._err("无解构记录", 404) elif u.path == "/api/task_status": r = _task_status(qs.get("task_id", "")) self._json(r) if r else self._err("未知 task_id", 404) elif u.path == "/api/img": self._proxy_image(qs.get("u", "")) elif u.path.startswith("/icons/") and u.path.endswith(".svg"): # 渠道 logo 静态 SVG;按 basename 取文件,杜绝路径穿越 name = Path(u.path).name f = HERE / "icons" / name if not f.is_file(): return self._err("not found", 404) body = f.read_bytes() self.send_response(200) self.send_header("Content-Type", "image/svg+xml; charset=utf-8") self.send_header("Content-Length", str(len(body))) self.send_header("Cache-Control", "public, max-age=86400") self.end_headers() self.wfile.write(body) elif u.path.startswith("/api/v1/knowledge"): self._proxy_knowledge() else: self._err("not found", 404) except Exception as e: self._err(f"{type(e).__name__}: {e}", 500) def do_POST(self): u = urlparse(self.path) try: n = int(self.headers.get("Content-Length") or 0) raw = self.rfile.read(n) except Exception: return self._err("读取请求体失败") # 知识检索接口:原样反代到后端,不在本服务做 JSON 解析 if u.path.startswith("/api/v1/knowledge"): return self._proxy_knowledge(body=raw) try: payload = json.loads(raw or b"{}") except Exception: return self._err("body 必须是 JSON") try: if u.path in ("/api/extract_process", "/api/extract_tools"): qid = payload.get("query_id") cids = payload.get("case_ids") or [] if not qid or not cids: return self._err("缺 query_id / case_ids") script = ("pipeline/procedure_extract.py" if u.path.endswith("process") else "pipeline/tool_extract.py") cmd = [sys.executable, script, "--query-id", qid, "--case-ids", ",".join(cids)] if payload.get("model"): cmd += ["--model", payload["model"]] if payload.get("force"): # 默认按 case 全局去重;force 才强制重解构 cmd += ["--force"] kind = "proc" if u.path.endswith("process") else "tool" self._json({"task_id": _spawn_task(kind, cmd)}) elif u.path == "/api/run_search": query = (payload.get("query") or "").strip() if not query: return self._err("缺 query") qid = payload.get("query_id") or _next_query_id() cmd = [sys.executable, "pipeline/search_eval.py", "--query-id", qid, "--query", query] if payload.get("synonyms"): cmd += ["--synonyms", payload["synonyms"]] if payload.get("mode_type") in ("工序", "工具"): cmd += ["--mode-type", payload["mode_type"]] if payload.get("platforms"): cmd += ["--platforms", payload["platforms"]] if payload.get("max_count"): cmd += ["--max-count", str(payload["max_count"])] self._json({"task_id": _spawn_task("search", cmd), "query_id": qid}) else: self._err("not found", 404) except Exception as e: self._err(f"{type(e).__name__}: {e}", 500) def log_message(self, fmt, *a): pass # 静默访问日志 if __name__ == "__main__": print(f"🚀 mode_workflow server → http://0.0.0.0:{PORT}") # 预热连接池:把首请求要付的 RDS 握手提前到启动阶段(失败不阻断启动) try: db._conn().close() print("✅ DB 连接池已预热") except Exception as e: print(f"⚠ 连接池预热失败(忽略,首请求会重试):{type(e).__name__}: {e}") ThreadingHTTPServer(("0.0.0.0", PORT), Handler).serve_forever()