# -*- coding: utf-8 -*- """mode_workflow server · 页面 + API + 解构任务管理 ================================================================================ 单服务(默认 8772): - GET / index.html - GET /search.html 知识检索页(聚类库 tab 内嵌;API 域名由 .env 注入) - GET /api/dashboard Dashboard 全部聚合指标(含内容树覆盖) - GET /api/queries|posts|process|tools(+_versions) Dataset 数据 - GET /api/extract 点帖子合一:版本列表+解构详情(单连接,带 ETag/304) - POST /api/run_search|extract_process|extract_tools 起子进程跑 pipeline - GET /api/task_status 轮询任务状态(读日志尾部) 用法:python server.py [port] """ import hashlib import json import os import re import subprocess import sys import threading import time from collections import Counter from datetime import datetime from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from pathlib import Path from urllib.parse import urlparse, parse_qs import urllib.request import urllib.parse import urllib.error try: sys.stdout.reconfigure(encoding="utf-8") except Exception: pass HERE = Path(__file__).resolve().parent sys.path.insert(0, str(HERE)) import db PORT = int(sys.argv[1]) if len(sys.argv) > 1 else 8772 MATRIX_FILE = HERE / "reference" / "judged_matrix.json" _MATRIX_CACHE = None def _matrix(): """judged_matrix.json 解析后模块级缓存(只读,进程内不变)。""" global _MATRIX_CACHE if _MATRIX_CACHE is None: _MATRIX_CACHE = json.loads(MATRIX_FILE.read_text(encoding="utf-8")) return _MATRIX_CACHE CATEGORY_API = "https://library.aiddit.com/api/pattern/executions/401/category-tree" CAT_CACHE_DIR = HERE / ".cache" / "category_tree" def _category_tree(source_type): """拉取并缓存 实质/形式 分类树(library.aiddit.com)。缓存命中直接读盘; 上游不可达则抛异常(do_GET 转 500,前端降级)。""" if source_type not in ("实质", "形式"): raise ValueError("source_type 只能是 实质/形式") CAT_CACHE_DIR.mkdir(parents=True, exist_ok=True) cache = CAT_CACHE_DIR / f"{source_type}.json" if cache.is_file(): return json.loads(cache.read_text(encoding="utf-8")) url = CATEGORY_API + "?" + urllib.parse.urlencode({"source_type": source_type}) req = urllib.request.Request(url, headers={"Accept": "application/json"}) with urllib.request.urlopen(req, timeout=20) as resp: data = json.loads(resp.read().decode("utf-8")) tmp = cache.with_suffix(".tmp") tmp.write_text(json.dumps(data, ensure_ascii=False), encoding="utf-8") tmp.replace(cache) return data SCORE_CACHE_DIR = HERE / ".cache" / "query_score" def _query_sel_hash(sel): """对评分选择 + 矩阵指纹算稳定短哈希,作为缓存文件名(同选择不重复付费)。""" matrix_sig = hashlib.md5(MATRIX_FILE.read_bytes()).hexdigest()[:8] canon = json.dumps([sel, matrix_sig], ensure_ascii=False, sort_keys=True) return hashlib.md5(canon.encode("utf-8")).hexdigest()[:16] LOG_DIR = HERE / "runs" / "logs" # 工序步骤 via 字段的「无具体工具」占位符,不计入工序提及工具 TOP 榜 _VIA_PLACEHOLDERS = {"-", "—", "-", "--", "/", "无", "n/a", "none"} # 知识检索后端地址:从 .env 的 KNOWLEDGE_API_BASE 读取(db.py 已 load_dotenv)。 # 注意:不能把它注入到 search.html 让浏览器直连——后端是明文 http://,而页面 # 经 Cloudflare 隧道是 https://,浏览器会以「混合内容(Mixed Content)」拦截请求。 # 因此 search.html 保持相对路径 '/api/v1/knowledge',由本服务同源反代到后端, # 这样既无混合内容、也无跨域(CORS)问题。 KNOWLEDGE_API_BASE = os.getenv("KNOWLEDGE_API_BASE", "").rstrip("/") def _render_search_html(): # 保持相对路径,接口走本服务的 /api/v1/knowledge 反向代理。 return (HERE / "search.html").read_bytes() # ── 任务管理:task_id → {proc, log, status} ────────────────────────────────── TASKS = {} _TASK_LOCK = threading.Lock() # ── 解构「认领」:进程内防并发重复解构 ─────────────────────────────────────────── # 解构脚本在「查 latest_real_version」与「写库」之间隔着一次很长的 LLM 调用,两个并发 # 任务对同一 case 都会查到「还没解构」→ 都调一次 LLM,白花钱。这里在进程内记下正在解构 # 中的 (mode, case_id),起任务前剔掉已在解构中的 case。仅进程内、不持久化:任务结束即 # 释放(见 _spawn_task 的 _wait),服务重启自然清空 —— 没有 DB 认领锁那种「僵尸锁卡死」风险。 # mode 区分 process/tools:同一帖的工序解构与工具解构互不影响,各自认领。 _INFLIGHT = set() # {(mode, case_id)} _INFLIGHT_LOCK = threading.Lock() def _claim_cases(mode, case_ids): """认领一批 case,返回真正抢到(此前不在解构中)的子集并登记;已在解构中的被剔除。""" claimed = [] with _INFLIGHT_LOCK: for cid in case_ids: key = (mode, cid) if key not in _INFLIGHT: _INFLIGHT.add(key) claimed.append(cid) return claimed def _release_cases(mode, case_ids): with _INFLIGHT_LOCK: for cid in case_ids: _INFLIGHT.discard((mode, cid)) def _spawn_task(kind, cmd, release=None, meta=None): """起子进程跑 pipeline。release=(mode, case_ids):任务结束时释放这些 case 的认领。 meta:附加到任务记录的元信息(如搜索任务的 {query_id, query}),供进度接口枚举。""" LOG_DIR.mkdir(parents=True, exist_ok=True) task_id = f"{kind}_{datetime.now().strftime('%m%d%H%M%S%f')}" log_path = LOG_DIR / f"{task_id}.log" f = open(log_path, "w", encoding="utf-8") proc = subprocess.Popen(cmd, stdout=f, stderr=subprocess.STDOUT, cwd=str(HERE), text=True) with _TASK_LOCK: TASKS[task_id] = {"proc": proc, "log": log_path, "status": "running", **(meta or {})} def _wait(): rc = proc.wait() f.close() with _TASK_LOCK: TASKS[task_id]["status"] = "done" if rc == 0 else "failed" if release: _release_cases(*release) # 解构结束,释放认领,后续任务可再处理这些 case # 搜索/解构任务一结束,四表数据可能变,作废 Dashboard 缓存,下次重算 _invalidate_dashboard() threading.Thread(target=_wait, daemon=True).start() return task_id def _task_status(task_id): with _TASK_LOCK: t = TASKS.get(task_id) if not t: return None tail = "" try: text = t["log"].read_text(encoding="utf-8", errors="replace") tail = text[-3000:] except Exception: pass return {"status": t["status"], "log_tail": tail} def _search_progress(mode="process"): """某方向搜索执行进度。两个口径互补: - executed:**durable**,= 搜索表里 distinct query_id 数(已产出结果的 query), 重启不丢;有 1000 个词要搜时,进度 = executed / 1000(总数由调用方掌握)。 - session_tasks:**本进程**生命周期内发起的 search 任务统计(running/done/failed), 重启清零(任务记录仅存内存,见 TASKS 注释)。批量搜索时实时看「跑了几个」。""" executed = db.count_executed_queries(mode) with _TASK_LOCK: tasks = [{"task_id": tid, "query_id": t.get("query_id"), "query": t.get("query"), "status": t["status"]} for tid, t in TASKS.items() if tid.startswith("search_")] tasks.sort(key=lambda t: t["task_id"], reverse=True) # 新任务在前 tally = {"total": len(tasks), "running": 0, "done": 0, "failed": 0} for t in tasks: if t["status"] in tally: tally[t["status"]] += 1 return { "executed": executed, "session_tasks": tally, "running_queries": [t["query_id"] for t in tasks if t["status"] == "running"], "recent": tasks[:50], # 最近 50 个搜索任务明细(task_id/query_id/query/status) } _ALLOCATED_QIDS = set() # 已分配但行可能尚未落库的 query_id(异步搜索:POST 即返回,行几分钟后才写) _QID_LOCK = threading.Lock() def _next_query_id(): """两张搜索表统一编号,避免跨方向撞 ID。 叠加内存预留集:连续发起搜索(如「搜全部达标」批量)时,前一次的行还没落库, 仅靠 DB max 会重号;故把已分配号也计入,保证连续分配不撞。""" with _QID_LOCK: qs = [q["query_id"] for m in ("process", "tools") for q in db.fetch_queries(m)] nums = [int(q[1:]) for q in qs if q.startswith("q") and q[1:].isdigit()] nums += [int(q[1:]) for q in _ALLOCATED_QIDS if q[1:].isdigit()] qid = f"q{(max(nums) + 1 if nums else 0):04d}" _ALLOCATED_QIDS.add(qid) return qid # ── Dashboard 聚合 ──────────────────────────────────────────────────────────── def _split_values(v): """substance/form 字段:数组直接用;字符串按 、,/ 分割;None 丢弃。""" out = [] items = v if isinstance(v, list) else [v] for it in items: if not it or not isinstance(it, str): continue for piece in it.replace(",", "、").replace("/", "、").split("、"): piece = piece.strip() if piece: out.append(piece) return out # ── Dashboard 结果缓存 ──────────────────────────────────────────────────────── # Dashboard 要拉/聚合四表(本库远程 RDS 下整体 ~2s),但数据只在搜索/解构任务 # 完成时才变。故缓存计算结果:命中即 <1ms 返回;任务结束时主动作废(见 _spawn_task), # 另设兜底 TTL 兜住外部直接改库的情况。 _DASH_CACHE = {"data": None, "ts": 0.0} _DASH_LOCK = threading.Lock() _DASH_TTL = 60.0 # 秒 def _invalidate_dashboard(): with _DASH_LOCK: _DASH_CACHE["ts"] = 0.0 with _QUERIES_LOCK: # query 列表同样只在任务完成时变,一并作废 _QUERIES_CACHE.clear() def _dashboard_cached(): with _DASH_LOCK: if _DASH_CACHE["data"] is not None and time.monotonic() - _DASH_CACHE["ts"] < _DASH_TTL: return _DASH_CACHE["data"] data = _dashboard() # 计算放锁外,不阻塞其它请求(偶发并发重算可接受) with _DASH_LOCK: _DASH_CACHE["data"] = data _DASH_CACHE["ts"] = time.monotonic() return data # ── query 列表缓存 ──────────────────────────────────────────────────────────── # /api/queries 要全表 JSON_EXTRACT 算每组采纳数(远程 RDS ~2s),数据同样只在任务完成时变。 # 缓存 per mode,任务结束随 dashboard 一并作废(见 _invalidate_dashboard);TTL 兜底外部改库。 _QUERIES_CACHE = {} # mode -> {"data": [...], "ts": float} _QUERIES_LOCK = threading.Lock() def _queries_cached(mode): with _QUERIES_LOCK: c = _QUERIES_CACHE.get(mode) if c and time.monotonic() - c["ts"] < _DASH_TTL: return c["data"] data = db.fetch_queries(mode) # 计算放锁外,不阻塞其它请求 with _QUERIES_LOCK: _QUERIES_CACHE[mode] = {"data": data, "ts": time.monotonic()} return data def _dashboard(): posts, procs, tools = db.fetch_dashboard_rows() # 最新版本行集(覆盖度/Top10 用最新版,成本/耗时按全部版本累计)。 # 「最新」= 最近插入的真实解构(按自增 id),排除 link_ 复制版;与 db.py 的 maxv 同口径。 # 旧实现按 version 字符串比大小,被 v_top5_/v_rest_ 等前缀版本号污染(字典序高于 v_0617*)。 def latest(rows): best = {} # case_id -> (id, version) of newest real version for r in rows: if (r["version"] or "").startswith("link_"): continue cid = r["case_id"] if cid not in best or (r["id"] or 0) > best[cid][0]: best[cid] = (r["id"] or 0, r["version"]) return [r for r in rows if cid_best(best, r)] def cid_best(best, r): b = best.get(r["case_id"]) return b is not None and r["version"] == b[1] latest_procs = latest(procs) latest_tools = latest(tools) # 内容树覆盖:steps 的 (action 叶子 × 输入/输出 type) ∩ 有效节点(tier≥1) jm = json.loads(MATRIX_FILE.read_text(encoding="utf-8")) a_idx = {a["name"]: i for i, a in enumerate(jm["actions"])} t_idx = {t["name"]: i for i, t in enumerate(jm["types"])} valid = set() for ai, row in enumerate(jm["matrix"]): for ti, cell in enumerate(row): if isinstance(cell, dict) and cell.get("tier", 0) >= 1: valid.add((ai, ti)) covered = set() via_counter = Counter() substance_counter = Counter() form_counter = Counter() for r in latest_procs: for s in r["steps"]: if not isinstance(s, dict): continue leaf = (s.get("action") or "").split("/")[-1].strip() types = [] for io in ("inputs", "outputs"): for x in s.get(io) or []: if isinstance(x, dict) and x.get("type"): types.append(str(x["type"]).strip()) if leaf in a_idx: for tp in types: if tp in t_idx and (a_idx[leaf], t_idx[tp]) in valid: covered.add((a_idx[leaf], t_idx[tp])) via = (s.get("via") or "").strip() # "-" / "—" / "无" 等是「无具体工具」占位符,不计入工具 TOP 榜 if via and via.lower() not in _VIA_PLACEHOLDERS: via_counter[via] += 1 for v in _split_values(s.get("substance")): substance_counter[v] += 1 for v in _split_values(s.get("form")): form_counter[v] += 1 for r in latest_tools: for v in _split_values(r["substance_scope"]): substance_counter[v] += 1 for v in _split_values(r["form_scope"]): form_counter[v] += 1 # 成本/耗时:同一 (case_id, version) 只计一次(各行重复存同一次调用的值) def cost_groups(rows): g = {} for r in rows: key = (r["case_id"], r["version"]) if key not in g and r["cost_usd"] is not None: g[key] = (r["cost_usd"], r["duration_s"] or 0.0, r["created_at"]) return list(g.values()) runs = cost_groups(procs) + cost_groups(tools) total_cost = round(sum(c for c, _, _ in runs), 4) total_dur = round(sum(d for _, d, _ in runs), 1) # 按日成本趋势 daily = Counter() for c, _, ts in runs: if ts: daily[ts[:10]] += c cost_trend = [{"date": d, "cost": round(v, 4)} for d, v in sorted(daily.items())] # 进度:分子分母同口径,都走「采纳」。分母 = 该方向 search 表里采纳的帖(distinct case), # 即「需解构」;分子 = 采纳帖里已解构的(∩ 保证 ≤ 分母,杜绝越界/虚高)。 # 方向由 p["mode"] 区分(process=search_process,tools=search_tools),不再看 knowledge_type。 proc_targets = {p["case_id"] for p in posts if p["mode"] == "process" and p["adopted"]} tool_targets = {p["case_id"] for p in posts if p["mode"] == "tools" and p["adopted"]} proc_extracted = {r["case_id"] for r in procs} tool_extracted = {r["case_id"] for r in tools} proc_done = proc_extracted & proc_targets tool_done = tool_extracted & tool_targets # 渠道分项/解构总数:按实际解构过的 distinct case(不限采纳),平台由 case 内禀。 extracted_all = proc_extracted | tool_extracted case_plat = {p["case_id"]: (p["platform"] or "other") for p in posts} collected_by_plat = Counter((p["platform"] or "other") for p in posts) extracted_by_plat = Counter(case_plat.get(c, "other") for c in extracted_all) return { "result": { "collected_by_platform": collected_by_plat.most_common(), "extracted_by_platform": extracted_by_plat.most_common(), "matrix_covered": len(covered), "matrix_valid": len(valid), "matrix_cells": sorted([ai, ti] for ai, ti in covered), "matrix_actions": [a["name"] for a in jm["actions"]], "matrix_types": [t["name"] for t in jm["types"]], "substance_count": len(substance_counter), "substance_top": substance_counter.most_common(15), "form_count": len(form_counter), "form_top": form_counter.most_common(15), "post_count": len(posts), "extracted_post_count": len(extracted_all), "tool_count": len({r["tool_name"] for r in latest_tools if r["tool_name"]}), "via_top10": via_counter.most_common(10), }, "process_data": { "run_count": len(runs), "avg_cost": round(total_cost / len(runs), 4) if runs else 0, "total_cost": total_cost, "avg_duration": round(total_dur / len(runs), 1) if runs else 0, "total_duration": total_dur, "cost_trend": cost_trend, "process_progress": {"done": len(proc_done), "total": len(proc_targets)}, "tools_progress": {"done": len(tool_done), "total": len(tool_targets)}, }, } # ── HTTP handler ───────────────────────────────────────────────────────────── class Handler(BaseHTTPRequestHandler): def _json(self, data, code=200): body = json.dumps(data, ensure_ascii=False, default=str).encode("utf-8") self.send_response(code) self.send_header("Content-Type", "application/json; charset=utf-8") self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body) def _err(self, msg, code=400): self._json({"error": msg}, code) def _json_etag(self, data): """带 ETag 的 JSON 响应:解构结果按 (case_id,version) 内容不变, 浏览器再次点开同一帖时带 If-None-Match 命中 304,免传 body。 Cache-Control: no-cache = 每次都回服务端校验(连接池下校验仅 ~50ms), 永不返回过期数据(重新解构后 ETag 变化即自动失效)。""" body = json.dumps(data, ensure_ascii=False, default=str).encode("utf-8") etag = '"' + hashlib.md5(body).hexdigest() + '"' if self.headers.get("If-None-Match") == etag: self.send_response(304) self.send_header("ETag", etag) self.send_header("Cache-Control", "no-cache") self.end_headers() return self.send_response(200) self.send_header("Content-Type", "application/json; charset=utf-8") self.send_header("Content-Length", str(len(body))) self.send_header("ETag", etag) self.send_header("Cache-Control", "no-cache") self.end_headers() self.wfile.write(body) def _proxy_image(self, url): """同源图片反代:绕过公众号(mmbiz.qpic.cn)等站点的防盗链。 浏览器侧 referrerpolicy=no-referrer 偶尔仍被拦,服务端直取最稳: 不带 Referer、给个常规 UA,把图片字节原样转回,并加长缓存。""" if not url or not (url.startswith("http://") or url.startswith("https://")): return self._err("非法图片地址", 400) host = (urlparse(url).hostname or "").lower() # 防 SSRF:挡掉内网/本机地址 if host in ("localhost", "127.0.0.1", "0.0.0.0", "::1") or \ host.startswith("10.") or host.startswith("192.168.") or \ host.startswith("169.254.") or host.endswith(".internal"): return self._err("禁止的图片地址", 403) req = urllib.request.Request(url, headers={ "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/120.0 Safari/537.36", "Accept": "image/avif,image/webp,image/apng,image/*,*/*;q=0.8", }) try: with urllib.request.urlopen(req, timeout=30) as resp: payload = resp.read() ct = resp.headers.get("Content-Type", "image/jpeg") except urllib.error.HTTPError as e: return self._err(f"上游图片返回 {e.code}", e.code if 400 <= e.code < 600 else 502) except Exception as e: return self._err(f"图片不可达:{type(e).__name__}: {e}", 502) self.send_response(200) self.send_header("Content-Type", ct) self.send_header("Content-Length", str(len(payload))) self.send_header("Cache-Control", "public, max-age=86400") self.end_headers() try: self.wfile.write(payload) except (BrokenPipeError, ConnectionResetError): pass # 客户端中途取消(常见于 /