| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383 |
- # -*- coding: utf-8 -*-
- """mode_workflow server · 页面 + API + 解构任务管理
- ================================================================================
- 单服务(默认 8772):
- - GET / index.html
- - GET /search.html 知识检索页(聚类库 tab 内嵌;API 域名由 .env 注入)
- - GET /api/dashboard Dashboard 全部聚合指标(含内容树覆盖)
- - GET /api/queries|posts|process|tools(+_versions) Dataset 数据
- - POST /api/run_search|extract_process|extract_tools 起子进程跑 pipeline
- - GET /api/task_status 轮询任务状态(读日志尾部)
- 用法:python server.py [port]
- """
- import json
- import os
- import subprocess
- import sys
- import threading
- from collections import Counter
- from datetime import datetime
- from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
- from pathlib import Path
- from urllib.parse import urlparse, parse_qs
- import urllib.request
- import urllib.error
- try:
- sys.stdout.reconfigure(encoding="utf-8")
- except Exception:
- pass
- HERE = Path(__file__).resolve().parent
- sys.path.insert(0, str(HERE))
- import db
- PORT = int(sys.argv[1]) if len(sys.argv) > 1 else 8772
- MATRIX_FILE = HERE / "reference" / "judged_matrix.json"
- LOG_DIR = HERE / "runs" / "logs"
- # 知识检索后端地址:从 .env 的 KNOWLEDGE_API_BASE 读取(db.py 已 load_dotenv)。
- # 注意:不能把它注入到 search.html 让浏览器直连——后端是明文 http://,而页面
- # 经 Cloudflare 隧道是 https://,浏览器会以「混合内容(Mixed Content)」拦截请求。
- # 因此 search.html 保持相对路径 '/api/v1/knowledge',由本服务同源反代到后端,
- # 这样既无混合内容、也无跨域(CORS)问题。
- KNOWLEDGE_API_BASE = os.getenv("KNOWLEDGE_API_BASE", "").rstrip("/")
- def _render_search_html():
- # 保持相对路径,接口走本服务的 /api/v1/knowledge 反向代理。
- return (HERE / "search.html").read_bytes()
- # ── 任务管理:task_id → {proc, log, status} ──────────────────────────────────
- TASKS = {}
- _TASK_LOCK = threading.Lock()
- def _spawn_task(kind, cmd):
- LOG_DIR.mkdir(parents=True, exist_ok=True)
- task_id = f"{kind}_{datetime.now().strftime('%m%d%H%M%S%f')}"
- log_path = LOG_DIR / f"{task_id}.log"
- f = open(log_path, "w", encoding="utf-8")
- proc = subprocess.Popen(cmd, stdout=f, stderr=subprocess.STDOUT,
- cwd=str(HERE), text=True)
- with _TASK_LOCK:
- TASKS[task_id] = {"proc": proc, "log": log_path, "status": "running"}
- def _wait():
- rc = proc.wait()
- f.close()
- with _TASK_LOCK:
- TASKS[task_id]["status"] = "done" if rc == 0 else "failed"
- threading.Thread(target=_wait, daemon=True).start()
- return task_id
- def _task_status(task_id):
- with _TASK_LOCK:
- t = TASKS.get(task_id)
- if not t:
- return None
- tail = ""
- try:
- text = t["log"].read_text(encoding="utf-8", errors="replace")
- tail = text[-3000:]
- except Exception:
- pass
- return {"status": t["status"], "log_tail": tail}
- def _next_query_id():
- """两张搜索表统一编号,避免跨方向撞 ID。"""
- qs = [q["query_id"] for m in ("process", "tools") for q in db.fetch_queries(m)]
- nums = [int(q[1:]) for q in qs if q.startswith("q") and q[1:].isdigit()]
- return f"q{(max(nums) + 1 if nums else 0):04d}"
- # ── Dashboard 聚合 ────────────────────────────────────────────────────────────
- def _split_values(v):
- """substance/form 字段:数组直接用;字符串按 、,/ 分割;None 丢弃。"""
- out = []
- items = v if isinstance(v, list) else [v]
- for it in items:
- if not it or not isinstance(it, str):
- continue
- for piece in it.replace(",", "、").replace("/", "、").split("、"):
- piece = piece.strip()
- if piece:
- out.append(piece)
- return out
- def _dashboard():
- posts, procs, tools = db.fetch_dashboard_rows()
- # 最新版本行集(覆盖度/Top10 用最新版,成本/耗时按全部版本累计)
- def latest(rows):
- best = {}
- for r in rows:
- cid = r["case_id"]
- if cid not in best or (r["version"] or "") > (best[cid] or ""):
- best[cid] = r["version"]
- return [r for r in rows if r["version"] == best[r["case_id"]]]
- latest_procs = latest(procs)
- latest_tools = latest(tools)
- # 内容树覆盖:steps 的 (action 叶子 × 输入/输出 type) ∩ 有效节点(tier≥1)
- jm = json.loads(MATRIX_FILE.read_text(encoding="utf-8"))
- a_idx = {a["name"]: i for i, a in enumerate(jm["actions"])}
- t_idx = {t["name"]: i for i, t in enumerate(jm["types"])}
- valid = set()
- for ai, row in enumerate(jm["matrix"]):
- for ti, cell in enumerate(row):
- if isinstance(cell, dict) and cell.get("tier", 0) >= 1:
- valid.add((ai, ti))
- covered = set()
- via_counter = Counter()
- substance_counter = Counter()
- form_counter = Counter()
- for r in latest_procs:
- for s in r["steps"]:
- if not isinstance(s, dict):
- continue
- leaf = (s.get("action") or "").split("/")[-1].strip()
- types = []
- for io in ("inputs", "outputs"):
- for x in s.get(io) or []:
- if isinstance(x, dict) and x.get("type"):
- types.append(str(x["type"]).strip())
- if leaf in a_idx:
- for tp in types:
- if tp in t_idx and (a_idx[leaf], t_idx[tp]) in valid:
- covered.add((a_idx[leaf], t_idx[tp]))
- via = (s.get("via") or "").strip()
- if via:
- via_counter[via] += 1
- for v in _split_values(s.get("substance")):
- substance_counter[v] += 1
- for v in _split_values(s.get("form")):
- form_counter[v] += 1
- for r in latest_tools:
- for v in _split_values(r["substance_scope"]):
- substance_counter[v] += 1
- for v in _split_values(r["form_scope"]):
- form_counter[v] += 1
- # 成本/耗时:同一 (case_id, version) 只计一次(各行重复存同一次调用的值)
- def cost_groups(rows):
- g = {}
- for r in rows:
- key = (r["case_id"], r["version"])
- if key not in g and r["cost_usd"] is not None:
- g[key] = (r["cost_usd"], r["duration_s"] or 0.0, r["created_at"])
- return list(g.values())
- runs = cost_groups(procs) + cost_groups(tools)
- total_cost = round(sum(c for c, _, _ in runs), 4)
- total_dur = round(sum(d for _, d, _ in runs), 1)
- # 按日成本趋势
- daily = Counter()
- for c, _, ts in runs:
- if ts:
- daily[ts[:10]] += c
- cost_trend = [{"date": d, "cost": round(v, 4)} for d, v in sorted(daily.items())]
- # 进度:分子分母同口径,都走「采纳」。分母 = 该方向 search 表里采纳的帖(distinct case),
- # 即「需解构」;分子 = 采纳帖里已解构的(∩ 保证 ≤ 分母,杜绝越界/虚高)。
- # 方向由 p["mode"] 区分(process=search_process,tools=search_tools),不再看 knowledge_type。
- proc_targets = {p["case_id"] for p in posts if p["mode"] == "process" and p["adopted"]}
- tool_targets = {p["case_id"] for p in posts if p["mode"] == "tools" and p["adopted"]}
- proc_extracted = {r["case_id"] for r in procs}
- tool_extracted = {r["case_id"] for r in tools}
- proc_done = proc_extracted & proc_targets
- tool_done = tool_extracted & tool_targets
- # 渠道分项/解构总数:按实际解构过的 distinct case(不限采纳),平台由 case 内禀。
- extracted_all = proc_extracted | tool_extracted
- case_plat = {p["case_id"]: (p["platform"] or "other") for p in posts}
- collected_by_plat = Counter((p["platform"] or "other") for p in posts)
- extracted_by_plat = Counter(case_plat.get(c, "other") for c in extracted_all)
- return {
- "result": {
- "collected_by_platform": collected_by_plat.most_common(),
- "extracted_by_platform": extracted_by_plat.most_common(),
- "matrix_covered": len(covered), "matrix_valid": len(valid),
- "matrix_cells": sorted([ai, ti] for ai, ti in covered),
- "matrix_actions": [a["name"] for a in jm["actions"]],
- "matrix_types": [t["name"] for t in jm["types"]],
- "substance_count": len(substance_counter),
- "substance_top": substance_counter.most_common(15),
- "form_count": len(form_counter),
- "form_top": form_counter.most_common(15),
- "post_count": len(posts),
- "extracted_post_count": len(extracted_all),
- "tool_count": len({r["tool_name"] for r in latest_tools if r["tool_name"]}),
- "via_top10": via_counter.most_common(10),
- },
- "process_data": {
- "run_count": len(runs),
- "avg_cost": round(total_cost / len(runs), 4) if runs else 0,
- "total_cost": total_cost,
- "avg_duration": round(total_dur / len(runs), 1) if runs else 0,
- "total_duration": total_dur,
- "cost_trend": cost_trend,
- "process_progress": {"done": len(proc_done), "total": len(proc_targets)},
- "tools_progress": {"done": len(tool_done), "total": len(tool_targets)},
- },
- }
- # ── HTTP handler ─────────────────────────────────────────────────────────────
- class Handler(BaseHTTPRequestHandler):
- def _json(self, data, code=200):
- body = json.dumps(data, ensure_ascii=False, default=str).encode("utf-8")
- self.send_response(code)
- self.send_header("Content-Type", "application/json; charset=utf-8")
- self.send_header("Content-Length", str(len(body)))
- self.end_headers()
- self.wfile.write(body)
- def _err(self, msg, code=400):
- self._json({"error": msg}, code)
- def _proxy_knowledge(self, body=None):
- """把 /api/v1/knowledge* 同源反代到 KNOWLEDGE_API_BASE(明文后端)。
- 浏览器只跟本服务(经隧道走 https)通信,规避混合内容 + 跨域。"""
- if not KNOWLEDGE_API_BASE:
- return self._err("KNOWLEDGE_API_BASE 未配置", 502)
- target = KNOWLEDGE_API_BASE + self.path # self.path 含 query string
- headers = {}
- ct = self.headers.get("Content-Type")
- if ct:
- headers["Content-Type"] = ct
- req = urllib.request.Request(target, data=body, headers=headers,
- method=self.command)
- try:
- with urllib.request.urlopen(req, timeout=120) as resp:
- payload = resp.read()
- code = resp.status
- rct = resp.headers.get("Content-Type", "application/json; charset=utf-8")
- except urllib.error.HTTPError as e:
- payload = e.read()
- code = e.code
- rct = e.headers.get("Content-Type", "application/json; charset=utf-8")
- except Exception as e:
- return self._err(f"知识检索后端不可达:{type(e).__name__}: {e}", 502)
- self.send_response(code)
- self.send_header("Content-Type", rct)
- self.send_header("Content-Length", str(len(payload)))
- self.end_headers()
- self.wfile.write(payload)
- def do_GET(self):
- u = urlparse(self.path)
- qs = {k: v[0] for k, v in parse_qs(u.query).items()}
- try:
- if u.path == "/" or u.path == "/index.html":
- body = (HERE / "index.html").read_bytes()
- self.send_response(200)
- self.send_header("Content-Type", "text/html; charset=utf-8")
- self.send_header("Content-Length", str(len(body)))
- # 单文件前端,改版频繁:禁缓存,避免浏览器拿到旧 index.html
- self.send_header("Cache-Control", "no-cache, no-store, must-revalidate")
- self.end_headers()
- self.wfile.write(body)
- elif u.path == "/search.html":
- # 聚类库 tab 内嵌的知识检索页;API 域名由 .env 注入
- body = _render_search_html()
- self.send_response(200)
- self.send_header("Content-Type", "text/html; charset=utf-8")
- self.send_header("Content-Length", str(len(body)))
- self.send_header("Cache-Control", "no-cache, no-store, must-revalidate")
- self.end_headers()
- self.wfile.write(body)
- elif u.path == "/api/dashboard":
- self._json(_dashboard())
- elif u.path == "/api/queries":
- self._json(db.fetch_queries(qs.get("mode", "process")))
- elif u.path == "/api/posts":
- self._json(db.fetch_posts(qs.get("query_id", ""), qs.get("mode", "process")))
- elif u.path == "/api/process_versions":
- self._json(db.fetch_process_versions(qs.get("case_id", "")))
- elif u.path == "/api/process":
- r = db.fetch_process(qs.get("case_id", ""), qs.get("version"))
- self._json(r) if r else self._err("无解构记录", 404)
- elif u.path == "/api/tools_versions":
- self._json(db.fetch_tools_versions(qs.get("case_id", "")))
- elif u.path == "/api/tools":
- r = db.fetch_tools(qs.get("case_id", ""), qs.get("version"))
- self._json(r) if r else self._err("无解构记录", 404)
- elif u.path == "/api/task_status":
- r = _task_status(qs.get("task_id", ""))
- self._json(r) if r else self._err("未知 task_id", 404)
- elif u.path.startswith("/api/v1/knowledge"):
- self._proxy_knowledge()
- else:
- self._err("not found", 404)
- except Exception as e:
- self._err(f"{type(e).__name__}: {e}", 500)
- def do_POST(self):
- u = urlparse(self.path)
- try:
- n = int(self.headers.get("Content-Length") or 0)
- raw = self.rfile.read(n)
- except Exception:
- return self._err("读取请求体失败")
- # 知识检索接口:原样反代到后端,不在本服务做 JSON 解析
- if u.path.startswith("/api/v1/knowledge"):
- return self._proxy_knowledge(body=raw)
- try:
- payload = json.loads(raw or b"{}")
- except Exception:
- return self._err("body 必须是 JSON")
- try:
- if u.path in ("/api/extract_process", "/api/extract_tools"):
- qid = payload.get("query_id")
- cids = payload.get("case_ids") or []
- if not qid or not cids:
- return self._err("缺 query_id / case_ids")
- script = ("pipeline/procedure_extract.py" if u.path.endswith("process")
- else "pipeline/tool_extract.py")
- cmd = [sys.executable, script, "--query-id", qid,
- "--case-ids", ",".join(cids)]
- if payload.get("model"):
- cmd += ["--model", payload["model"]]
- if payload.get("force"): # 默认按 case 全局去重;force 才强制重解构
- cmd += ["--force"]
- kind = "proc" if u.path.endswith("process") else "tool"
- self._json({"task_id": _spawn_task(kind, cmd)})
- elif u.path == "/api/run_search":
- query = (payload.get("query") or "").strip()
- if not query:
- return self._err("缺 query")
- qid = payload.get("query_id") or _next_query_id()
- cmd = [sys.executable, "pipeline/search_eval.py",
- "--query-id", qid, "--query", query]
- if payload.get("synonyms"):
- cmd += ["--synonyms", payload["synonyms"]]
- if payload.get("mode_type") in ("工序", "工具"):
- cmd += ["--mode-type", payload["mode_type"]]
- if payload.get("platforms"):
- cmd += ["--platforms", payload["platforms"]]
- if payload.get("max_count"):
- cmd += ["--max-count", str(payload["max_count"])]
- self._json({"task_id": _spawn_task("search", cmd), "query_id": qid})
- else:
- self._err("not found", 404)
- except Exception as e:
- self._err(f"{type(e).__name__}: {e}", 500)
- def log_message(self, fmt, *a):
- pass # 静默访问日志
- if __name__ == "__main__":
- print(f"🚀 mode_workflow server → http://0.0.0.0:{PORT}")
- ThreadingHTTPServer(("0.0.0.0", PORT), Handler).serve_forever()
|