server.py 38 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796
  1. # -*- coding: utf-8 -*-
  2. """mode_workflow server · 页面 + API + 解构任务管理
  3. ================================================================================
  4. 单服务(默认 8772):
  5. - GET / index.html
  6. - GET /search.html 知识检索页(聚类库 tab 内嵌;API 域名由 .env 注入)
  7. - GET /api/dashboard Dashboard 全部聚合指标(含内容树覆盖)
  8. - GET /api/queries|posts|process|tools(+_versions) Dataset 数据
  9. - GET /api/extract 点帖子合一:版本列表+解构详情(单连接,带 ETag/304)
  10. - POST /api/run_search|extract_process|extract_tools 起子进程跑 pipeline
  11. - GET /api/task_status 轮询任务状态(读日志尾部)
  12. 用法:python server.py [port]
  13. """
  14. import hashlib
  15. import json
  16. import os
  17. import re
  18. import subprocess
  19. import sys
  20. import threading
  21. import time
  22. from collections import Counter
  23. from datetime import datetime
  24. from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
  25. from pathlib import Path
  26. from urllib.parse import urlparse, parse_qs
  27. import urllib.request
  28. import urllib.parse
  29. import urllib.error
  30. try:
  31. sys.stdout.reconfigure(encoding="utf-8")
  32. except Exception:
  33. pass
  34. HERE = Path(__file__).resolve().parent
  35. sys.path.insert(0, str(HERE))
  36. import db
  37. PORT = int(sys.argv[1]) if len(sys.argv) > 1 else 8772
  38. MATRIX_FILE = HERE / "reference" / "judged_matrix.json"
  39. _MATRIX_CACHE = None
  40. def _matrix():
  41. """judged_matrix.json 解析后模块级缓存(只读,进程内不变)。"""
  42. global _MATRIX_CACHE
  43. if _MATRIX_CACHE is None:
  44. _MATRIX_CACHE = json.loads(MATRIX_FILE.read_text(encoding="utf-8"))
  45. return _MATRIX_CACHE
  46. CATEGORY_API = "https://library.aiddit.com/api/pattern/executions/401/category-tree"
  47. CAT_CACHE_DIR = HERE / ".cache" / "category_tree"
  48. def _category_tree(source_type):
  49. """拉取并缓存 实质/形式 分类树(library.aiddit.com)。缓存命中直接读盘;
  50. 上游不可达则抛异常(do_GET 转 500,前端降级)。"""
  51. if source_type not in ("实质", "形式"):
  52. raise ValueError("source_type 只能是 实质/形式")
  53. CAT_CACHE_DIR.mkdir(parents=True, exist_ok=True)
  54. cache = CAT_CACHE_DIR / f"{source_type}.json"
  55. if cache.is_file():
  56. return json.loads(cache.read_text(encoding="utf-8"))
  57. url = CATEGORY_API + "?" + urllib.parse.urlencode({"source_type": source_type})
  58. req = urllib.request.Request(url, headers={"Accept": "application/json"})
  59. with urllib.request.urlopen(req, timeout=20) as resp:
  60. data = json.loads(resp.read().decode("utf-8"))
  61. tmp = cache.with_suffix(".tmp")
  62. tmp.write_text(json.dumps(data, ensure_ascii=False), encoding="utf-8")
  63. tmp.replace(cache)
  64. return data
  65. SCORE_CACHE_DIR = HERE / ".cache" / "query_score"
  66. def _query_sel_hash(sel):
  67. """对评分选择 + 矩阵指纹算稳定短哈希,作为缓存文件名(同选择不重复付费)。"""
  68. matrix_sig = hashlib.md5(MATRIX_FILE.read_bytes()).hexdigest()[:8]
  69. canon = json.dumps([sel, matrix_sig], ensure_ascii=False, sort_keys=True)
  70. return hashlib.md5(canon.encode("utf-8")).hexdigest()[:16]
  71. LOG_DIR = HERE / "runs" / "logs"
  72. # 工序步骤 via 字段的「无具体工具」占位符,不计入工序提及工具 TOP 榜
  73. _VIA_PLACEHOLDERS = {"-", "—", "-", "--", "/", "无", "n/a", "none"}
  74. # 知识检索后端地址:从 .env 的 KNOWLEDGE_API_BASE 读取(db.py 已 load_dotenv)。
  75. # 注意:不能把它注入到 search.html 让浏览器直连——后端是明文 http://,而页面
  76. # 经 Cloudflare 隧道是 https://,浏览器会以「混合内容(Mixed Content)」拦截请求。
  77. # 因此 search.html 保持相对路径 '/api/v1/knowledge',由本服务同源反代到后端,
  78. # 这样既无混合内容、也无跨域(CORS)问题。
  79. KNOWLEDGE_API_BASE = os.getenv("KNOWLEDGE_API_BASE", "").rstrip("/")
  80. def _render_search_html():
  81. # 保持相对路径,接口走本服务的 /api/v1/knowledge 反向代理。
  82. return (HERE / "search.html").read_bytes()
  83. # ── 任务管理:task_id → {proc, log, status} ──────────────────────────────────
  84. TASKS = {}
  85. _TASK_LOCK = threading.Lock()
  86. # ── 解构「认领」:进程内防并发重复解构 ───────────────────────────────────────────
  87. # 解构脚本在「查 latest_real_version」与「写库」之间隔着一次很长的 LLM 调用,两个并发
  88. # 任务对同一 case 都会查到「还没解构」→ 都调一次 LLM,白花钱。这里在进程内记下正在解构
  89. # 中的 (mode, case_id),起任务前剔掉已在解构中的 case。仅进程内、不持久化:任务结束即
  90. # 释放(见 _spawn_task 的 _wait),服务重启自然清空 —— 没有 DB 认领锁那种「僵尸锁卡死」风险。
  91. # mode 区分 process/tools:同一帖的工序解构与工具解构互不影响,各自认领。
  92. _INFLIGHT = set() # {(mode, case_id)}
  93. _INFLIGHT_LOCK = threading.Lock()
  94. def _claim_cases(mode, case_ids):
  95. """认领一批 case,返回真正抢到(此前不在解构中)的子集并登记;已在解构中的被剔除。"""
  96. claimed = []
  97. with _INFLIGHT_LOCK:
  98. for cid in case_ids:
  99. key = (mode, cid)
  100. if key not in _INFLIGHT:
  101. _INFLIGHT.add(key)
  102. claimed.append(cid)
  103. return claimed
  104. def _release_cases(mode, case_ids):
  105. with _INFLIGHT_LOCK:
  106. for cid in case_ids:
  107. _INFLIGHT.discard((mode, cid))
  108. def _spawn_task(kind, cmd, release=None, meta=None):
  109. """起子进程跑 pipeline。release=(mode, case_ids):任务结束时释放这些 case 的认领。
  110. meta:附加到任务记录的元信息(如搜索任务的 {query_id, query}),供进度接口枚举。"""
  111. LOG_DIR.mkdir(parents=True, exist_ok=True)
  112. task_id = f"{kind}_{datetime.now().strftime('%m%d%H%M%S%f')}"
  113. log_path = LOG_DIR / f"{task_id}.log"
  114. f = open(log_path, "w", encoding="utf-8")
  115. proc = subprocess.Popen(cmd, stdout=f, stderr=subprocess.STDOUT,
  116. cwd=str(HERE), text=True)
  117. with _TASK_LOCK:
  118. TASKS[task_id] = {"proc": proc, "log": log_path, "status": "running",
  119. **(meta or {})}
  120. def _wait():
  121. rc = proc.wait()
  122. f.close()
  123. with _TASK_LOCK:
  124. TASKS[task_id]["status"] = "done" if rc == 0 else "failed"
  125. if release:
  126. _release_cases(*release) # 解构结束,释放认领,后续任务可再处理这些 case
  127. # 搜索/解构任务一结束,四表数据可能变,作废 Dashboard 缓存,下次重算
  128. _invalidate_dashboard()
  129. threading.Thread(target=_wait, daemon=True).start()
  130. return task_id
  131. def _task_status(task_id):
  132. with _TASK_LOCK:
  133. t = TASKS.get(task_id)
  134. if not t:
  135. return None
  136. tail = ""
  137. try:
  138. text = t["log"].read_text(encoding="utf-8", errors="replace")
  139. tail = text[-3000:]
  140. except Exception:
  141. pass
  142. return {"status": t["status"], "log_tail": tail}
  143. def _search_progress(mode="process"):
  144. """某方向搜索执行进度。两个口径互补:
  145. - executed:**durable**,= 搜索表里 distinct query_id 数(已产出结果的 query),
  146. 重启不丢;有 1000 个词要搜时,进度 = executed / 1000(总数由调用方掌握)。
  147. - session_tasks:**本进程**生命周期内发起的 search 任务统计(running/done/failed),
  148. 重启清零(任务记录仅存内存,见 TASKS 注释)。批量搜索时实时看「跑了几个」。"""
  149. executed = db.count_executed_queries(mode)
  150. with _TASK_LOCK:
  151. tasks = [{"task_id": tid, "query_id": t.get("query_id"),
  152. "query": t.get("query"), "status": t["status"]}
  153. for tid, t in TASKS.items() if tid.startswith("search_")]
  154. tasks.sort(key=lambda t: t["task_id"], reverse=True) # 新任务在前
  155. tally = {"total": len(tasks), "running": 0, "done": 0, "failed": 0}
  156. for t in tasks:
  157. if t["status"] in tally:
  158. tally[t["status"]] += 1
  159. return {
  160. "executed": executed,
  161. "session_tasks": tally,
  162. "running_queries": [t["query_id"] for t in tasks if t["status"] == "running"],
  163. "recent": tasks[:50], # 最近 50 个搜索任务明细(task_id/query_id/query/status)
  164. }
  165. _ALLOCATED_QIDS = set() # 已分配但行可能尚未落库的 query_id(异步搜索:POST 即返回,行几分钟后才写)
  166. _QID_LOCK = threading.Lock()
  167. def _next_query_id():
  168. """两张搜索表统一编号,避免跨方向撞 ID。
  169. 叠加内存预留集:连续发起搜索(如「搜全部达标」批量)时,前一次的行还没落库,
  170. 仅靠 DB max 会重号;故把已分配号也计入,保证连续分配不撞。"""
  171. with _QID_LOCK:
  172. qs = [q["query_id"] for m in ("process", "tools") for q in db.fetch_queries(m)]
  173. nums = [int(q[1:]) for q in qs if q.startswith("q") and q[1:].isdigit()]
  174. nums += [int(q[1:]) for q in _ALLOCATED_QIDS if q[1:].isdigit()]
  175. qid = f"q{(max(nums) + 1 if nums else 0):04d}"
  176. _ALLOCATED_QIDS.add(qid)
  177. return qid
  178. # ── Dashboard 聚合 ────────────────────────────────────────────────────────────
  179. def _split_values(v):
  180. """substance/form 字段:数组直接用;字符串按 、,/ 分割;None 丢弃。"""
  181. out = []
  182. items = v if isinstance(v, list) else [v]
  183. for it in items:
  184. if not it or not isinstance(it, str):
  185. continue
  186. for piece in it.replace(",", "、").replace("/", "、").split("、"):
  187. piece = piece.strip()
  188. if piece:
  189. out.append(piece)
  190. return out
  191. # ── Dashboard 结果缓存 ────────────────────────────────────────────────────────
  192. # Dashboard 要拉/聚合四表(本库远程 RDS 下整体 ~2s),但数据只在搜索/解构任务
  193. # 完成时才变。故缓存计算结果:命中即 <1ms 返回;任务结束时主动作废(见 _spawn_task),
  194. # 另设兜底 TTL 兜住外部直接改库的情况。
  195. _DASH_CACHE = {"data": None, "ts": 0.0}
  196. _DASH_LOCK = threading.Lock()
  197. _DASH_TTL = 60.0 # 秒
  198. def _invalidate_dashboard():
  199. with _DASH_LOCK:
  200. _DASH_CACHE["ts"] = 0.0
  201. with _QUERIES_LOCK: # query 列表同样只在任务完成时变,一并作废
  202. _QUERIES_CACHE.clear()
  203. def _dashboard_cached():
  204. with _DASH_LOCK:
  205. if _DASH_CACHE["data"] is not None and time.monotonic() - _DASH_CACHE["ts"] < _DASH_TTL:
  206. return _DASH_CACHE["data"]
  207. data = _dashboard() # 计算放锁外,不阻塞其它请求(偶发并发重算可接受)
  208. with _DASH_LOCK:
  209. _DASH_CACHE["data"] = data
  210. _DASH_CACHE["ts"] = time.monotonic()
  211. return data
  212. # ── query 列表缓存 ────────────────────────────────────────────────────────────
  213. # /api/queries 要全表 JSON_EXTRACT 算每组采纳数(远程 RDS ~2s),数据同样只在任务完成时变。
  214. # 缓存 per mode,任务结束随 dashboard 一并作废(见 _invalidate_dashboard);TTL 兜底外部改库。
  215. _QUERIES_CACHE = {} # mode -> {"data": [...], "ts": float}
  216. _QUERIES_LOCK = threading.Lock()
  217. def _queries_cached(mode):
  218. with _QUERIES_LOCK:
  219. c = _QUERIES_CACHE.get(mode)
  220. if c and time.monotonic() - c["ts"] < _DASH_TTL:
  221. return c["data"]
  222. data = db.fetch_queries(mode) # 计算放锁外,不阻塞其它请求
  223. with _QUERIES_LOCK:
  224. _QUERIES_CACHE[mode] = {"data": data, "ts": time.monotonic()}
  225. return data
  226. def _dashboard():
  227. posts, procs, tools = db.fetch_dashboard_rows()
  228. # 最新版本行集(覆盖度/Top10 用最新版,成本/耗时按全部版本累计)。
  229. # 「最新」= 最近插入的真实解构(按自增 id),排除 link_ 复制版;与 db.py 的 maxv 同口径。
  230. # 旧实现按 version 字符串比大小,被 v_top5_/v_rest_ 等前缀版本号污染(字典序高于 v_0617*)。
  231. def latest(rows):
  232. best = {} # case_id -> (id, version) of newest real version
  233. for r in rows:
  234. if (r["version"] or "").startswith("link_"):
  235. continue
  236. cid = r["case_id"]
  237. if cid not in best or (r["id"] or 0) > best[cid][0]:
  238. best[cid] = (r["id"] or 0, r["version"])
  239. return [r for r in rows if cid_best(best, r)]
  240. def cid_best(best, r):
  241. b = best.get(r["case_id"])
  242. return b is not None and r["version"] == b[1]
  243. latest_procs = latest(procs)
  244. latest_tools = latest(tools)
  245. # 内容树覆盖:steps 的 (action 叶子 × 输入/输出 type) ∩ 有效节点(tier≥1)
  246. jm = json.loads(MATRIX_FILE.read_text(encoding="utf-8"))
  247. a_idx = {a["name"]: i for i, a in enumerate(jm["actions"])}
  248. t_idx = {t["name"]: i for i, t in enumerate(jm["types"])}
  249. valid = set()
  250. for ai, row in enumerate(jm["matrix"]):
  251. for ti, cell in enumerate(row):
  252. if isinstance(cell, dict) and cell.get("tier", 0) >= 1:
  253. valid.add((ai, ti))
  254. covered = set()
  255. via_counter = Counter()
  256. substance_counter = Counter()
  257. form_counter = Counter()
  258. for r in latest_procs:
  259. for s in r["steps"]:
  260. if not isinstance(s, dict):
  261. continue
  262. leaf = (s.get("action") or "").split("/")[-1].strip()
  263. types = []
  264. for io in ("inputs", "outputs"):
  265. for x in s.get(io) or []:
  266. if isinstance(x, dict) and x.get("type"):
  267. types.append(str(x["type"]).strip())
  268. if leaf in a_idx:
  269. for tp in types:
  270. if tp in t_idx and (a_idx[leaf], t_idx[tp]) in valid:
  271. covered.add((a_idx[leaf], t_idx[tp]))
  272. via = (s.get("via") or "").strip()
  273. # "-" / "—" / "无" 等是「无具体工具」占位符,不计入工具 TOP 榜
  274. if via and via.lower() not in _VIA_PLACEHOLDERS:
  275. via_counter[via] += 1
  276. for v in _split_values(s.get("substance")):
  277. substance_counter[v] += 1
  278. for v in _split_values(s.get("form")):
  279. form_counter[v] += 1
  280. for r in latest_tools:
  281. for v in _split_values(r["substance_scope"]):
  282. substance_counter[v] += 1
  283. for v in _split_values(r["form_scope"]):
  284. form_counter[v] += 1
  285. # 成本/耗时:同一 (case_id, version) 只计一次(各行重复存同一次调用的值)
  286. def cost_groups(rows):
  287. g = {}
  288. for r in rows:
  289. key = (r["case_id"], r["version"])
  290. if key not in g and r["cost_usd"] is not None:
  291. g[key] = (r["cost_usd"], r["duration_s"] or 0.0, r["created_at"])
  292. return list(g.values())
  293. runs = cost_groups(procs) + cost_groups(tools)
  294. total_cost = round(sum(c for c, _, _ in runs), 4)
  295. total_dur = round(sum(d for _, d, _ in runs), 1)
  296. # 按日成本趋势
  297. daily = Counter()
  298. for c, _, ts in runs:
  299. if ts:
  300. daily[ts[:10]] += c
  301. cost_trend = [{"date": d, "cost": round(v, 4)} for d, v in sorted(daily.items())]
  302. # 进度:分子分母同口径,都走「采纳」。分母 = 该方向 search 表里采纳的帖(distinct case),
  303. # 即「需解构」;分子 = 采纳帖里已解构的(∩ 保证 ≤ 分母,杜绝越界/虚高)。
  304. # 方向由 p["mode"] 区分(process=search_process,tools=search_tools),不再看 knowledge_type。
  305. proc_targets = {p["case_id"] for p in posts if p["mode"] == "process" and p["adopted"]}
  306. tool_targets = {p["case_id"] for p in posts if p["mode"] == "tools" and p["adopted"]}
  307. proc_extracted = {r["case_id"] for r in procs}
  308. tool_extracted = {r["case_id"] for r in tools}
  309. proc_done = proc_extracted & proc_targets
  310. tool_done = tool_extracted & tool_targets
  311. # 渠道分项/解构总数:按实际解构过的 distinct case(不限采纳),平台由 case 内禀。
  312. extracted_all = proc_extracted | tool_extracted
  313. case_plat = {p["case_id"]: (p["platform"] or "other") for p in posts}
  314. collected_by_plat = Counter((p["platform"] or "other") for p in posts)
  315. extracted_by_plat = Counter(case_plat.get(c, "other") for c in extracted_all)
  316. return {
  317. "result": {
  318. "collected_by_platform": collected_by_plat.most_common(),
  319. "extracted_by_platform": extracted_by_plat.most_common(),
  320. "matrix_covered": len(covered), "matrix_valid": len(valid),
  321. "matrix_cells": sorted([ai, ti] for ai, ti in covered),
  322. "matrix_actions": [a["name"] for a in jm["actions"]],
  323. "matrix_types": [t["name"] for t in jm["types"]],
  324. "substance_count": len(substance_counter),
  325. "substance_top": substance_counter.most_common(15),
  326. "form_count": len(form_counter),
  327. "form_top": form_counter.most_common(15),
  328. "post_count": len(posts),
  329. "extracted_post_count": len(extracted_all),
  330. "tool_count": len({r["tool_name"] for r in latest_tools if r["tool_name"]}),
  331. "via_top10": via_counter.most_common(10),
  332. },
  333. "process_data": {
  334. "run_count": len(runs),
  335. "avg_cost": round(total_cost / len(runs), 4) if runs else 0,
  336. "total_cost": total_cost,
  337. "avg_duration": round(total_dur / len(runs), 1) if runs else 0,
  338. "total_duration": total_dur,
  339. "cost_trend": cost_trend,
  340. "process_progress": {"done": len(proc_done), "total": len(proc_targets)},
  341. "tools_progress": {"done": len(tool_done), "total": len(tool_targets)},
  342. },
  343. }
  344. # ── HTTP handler ─────────────────────────────────────────────────────────────
  345. class Handler(BaseHTTPRequestHandler):
  346. def _json(self, data, code=200):
  347. body = json.dumps(data, ensure_ascii=False, default=str).encode("utf-8")
  348. self.send_response(code)
  349. self.send_header("Content-Type", "application/json; charset=utf-8")
  350. self.send_header("Content-Length", str(len(body)))
  351. self.end_headers()
  352. self.wfile.write(body)
  353. def _err(self, msg, code=400):
  354. self._json({"error": msg}, code)
  355. def _json_etag(self, data):
  356. """带 ETag 的 JSON 响应:解构结果按 (case_id,version) 内容不变,
  357. 浏览器再次点开同一帖时带 If-None-Match 命中 304,免传 body。
  358. Cache-Control: no-cache = 每次都回服务端校验(连接池下校验仅 ~50ms),
  359. 永不返回过期数据(重新解构后 ETag 变化即自动失效)。"""
  360. body = json.dumps(data, ensure_ascii=False, default=str).encode("utf-8")
  361. etag = '"' + hashlib.md5(body).hexdigest() + '"'
  362. if self.headers.get("If-None-Match") == etag:
  363. self.send_response(304)
  364. self.send_header("ETag", etag)
  365. self.send_header("Cache-Control", "no-cache")
  366. self.end_headers()
  367. return
  368. self.send_response(200)
  369. self.send_header("Content-Type", "application/json; charset=utf-8")
  370. self.send_header("Content-Length", str(len(body)))
  371. self.send_header("ETag", etag)
  372. self.send_header("Cache-Control", "no-cache")
  373. self.end_headers()
  374. self.wfile.write(body)
  375. def _proxy_image(self, url):
  376. """同源图片反代:绕过公众号(mmbiz.qpic.cn)等站点的防盗链。
  377. 浏览器侧 referrerpolicy=no-referrer 偶尔仍被拦,服务端直取最稳:
  378. 不带 Referer、给个常规 UA,把图片字节原样转回,并加长缓存。"""
  379. if not url or not (url.startswith("http://") or url.startswith("https://")):
  380. return self._err("非法图片地址", 400)
  381. host = (urlparse(url).hostname or "").lower()
  382. # 防 SSRF:挡掉内网/本机地址
  383. if host in ("localhost", "127.0.0.1", "0.0.0.0", "::1") or \
  384. host.startswith("10.") or host.startswith("192.168.") or \
  385. host.startswith("169.254.") or host.endswith(".internal"):
  386. return self._err("禁止的图片地址", 403)
  387. req = urllib.request.Request(url, headers={
  388. "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
  389. "AppleWebKit/537.36 (KHTML, like Gecko) "
  390. "Chrome/120.0 Safari/537.36",
  391. "Accept": "image/avif,image/webp,image/apng,image/*,*/*;q=0.8",
  392. })
  393. try:
  394. with urllib.request.urlopen(req, timeout=30) as resp:
  395. payload = resp.read()
  396. ct = resp.headers.get("Content-Type", "image/jpeg")
  397. except urllib.error.HTTPError as e:
  398. return self._err(f"上游图片返回 {e.code}", e.code if 400 <= e.code < 600 else 502)
  399. except Exception as e:
  400. return self._err(f"图片不可达:{type(e).__name__}: {e}", 502)
  401. self.send_response(200)
  402. self.send_header("Content-Type", ct)
  403. self.send_header("Content-Length", str(len(payload)))
  404. self.send_header("Cache-Control", "public, max-age=86400")
  405. self.end_headers()
  406. try:
  407. self.wfile.write(payload)
  408. except (BrokenPipeError, ConnectionResetError):
  409. pass # 客户端中途取消(常见于 <img>/<video> 卸载),非错误,静默
  410. # 视频帖的 videos[0] 多为需 Referer 的播放 API(抖音 aweme/play、视频号等),
  411. # 浏览器 <video> 直连给不了 Referer → 慢/403。图片代理 _proxy_image 又是整文件
  412. # 缓冲、无 Range,拿来喂视频会让 <video> 无法边下边播+拖拽,且客户端中断即 BrokenPipe。
  413. # 本代理:按 host 注入 Referer + 透传 Range 头 + 分块流式回传(支持 206 断点/拖拽)。
  414. _VIDEO_REFERERS = {
  415. "douyin.com": "https://www.douyin.com/",
  416. "weixin.qq.com": "https://channels.weixin.qq.com/",
  417. "xiaohongshu.com": "https://www.xiaohongshu.com/",
  418. "xhscdn.com": "https://www.xiaohongshu.com/",
  419. "bilibili.com": "https://www.bilibili.com/",
  420. "bilivideo.c": "https://www.bilibili.com/",
  421. "weibo.c": "https://weibo.com/",
  422. }
  423. def _proxy_video(self, url):
  424. if not url or not (url.startswith("http://") or url.startswith("https://")):
  425. return self._err("非法视频地址", 400)
  426. host = (urlparse(url).hostname or "").lower()
  427. if host in ("localhost", "127.0.0.1", "0.0.0.0", "::1") or \
  428. host.startswith("10.") or host.startswith("192.168.") or \
  429. host.startswith("169.254.") or host.endswith(".internal"):
  430. return self._err("禁止的视频地址", 403)
  431. headers = {
  432. "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
  433. "AppleWebKit/537.36 (KHTML, like Gecko) "
  434. "Chrome/120.0 Safari/537.36",
  435. "Accept": "*/*",
  436. }
  437. for key, ref in self._VIDEO_REFERERS.items():
  438. if key in host:
  439. headers["Referer"] = ref
  440. break
  441. rng = self.headers.get("Range")
  442. if rng:
  443. headers["Range"] = rng
  444. req = urllib.request.Request(url, headers=headers)
  445. try:
  446. resp = urllib.request.urlopen(req, timeout=60)
  447. except urllib.error.HTTPError as e:
  448. return self._err(f"上游视频返回 {e.code}", e.code if 400 <= e.code < 600 else 502)
  449. except Exception as e:
  450. return self._err(f"视频不可达:{type(e).__name__}: {e}", 502)
  451. # 头已发出后若中断,不能再走 _err(会二次写头),只能静默吞异常
  452. try:
  453. self.send_response(getattr(resp, "status", 200) or 200)
  454. self.send_header("Content-Type", resp.headers.get("Content-Type", "video/mp4"))
  455. for h in ("Content-Length", "Content-Range"):
  456. v = resp.headers.get(h)
  457. if v:
  458. self.send_header(h, v)
  459. self.send_header("Accept-Ranges", resp.headers.get("Accept-Ranges", "bytes"))
  460. self.send_header("Cache-Control", "public, max-age=3600")
  461. self.end_headers()
  462. while True:
  463. chunk = resp.read(65536)
  464. if not chunk:
  465. break
  466. self.wfile.write(chunk)
  467. except (BrokenPipeError, ConnectionResetError):
  468. pass # 客户端拖进度/关播放器即中断,媒体流常态,静默
  469. except Exception:
  470. pass
  471. finally:
  472. try:
  473. resp.close()
  474. except Exception:
  475. pass
  476. def _proxy_knowledge(self, body=None):
  477. """把 /api/v1/knowledge* 同源反代到 KNOWLEDGE_API_BASE(明文后端)。
  478. 浏览器只跟本服务(经隧道走 https)通信,规避混合内容 + 跨域。"""
  479. if not KNOWLEDGE_API_BASE:
  480. return self._err("KNOWLEDGE_API_BASE 未配置", 502)
  481. target = KNOWLEDGE_API_BASE + self.path # self.path 含 query string
  482. headers = {}
  483. ct = self.headers.get("Content-Type")
  484. if ct:
  485. headers["Content-Type"] = ct
  486. req = urllib.request.Request(target, data=body, headers=headers,
  487. method=self.command)
  488. try:
  489. with urllib.request.urlopen(req, timeout=120) as resp:
  490. payload = resp.read()
  491. code = resp.status
  492. rct = resp.headers.get("Content-Type", "application/json; charset=utf-8")
  493. except urllib.error.HTTPError as e:
  494. payload = e.read()
  495. code = e.code
  496. rct = e.headers.get("Content-Type", "application/json; charset=utf-8")
  497. except Exception as e:
  498. return self._err(f"知识检索后端不可达:{type(e).__name__}: {e}", 502)
  499. self.send_response(code)
  500. self.send_header("Content-Type", rct)
  501. self.send_header("Content-Length", str(len(payload)))
  502. self.end_headers()
  503. self.wfile.write(payload)
  504. def do_GET(self):
  505. u = urlparse(self.path)
  506. qs = {k: v[0] for k, v in parse_qs(u.query).items()}
  507. try:
  508. if u.path == "/" or u.path == "/index.html":
  509. body = (HERE / "index.html").read_bytes()
  510. self.send_response(200)
  511. self.send_header("Content-Type", "text/html; charset=utf-8")
  512. self.send_header("Content-Length", str(len(body)))
  513. # 单文件前端,改版频繁:禁缓存,避免浏览器拿到旧 index.html
  514. self.send_header("Cache-Control", "no-cache, no-store, must-revalidate")
  515. self.end_headers()
  516. self.wfile.write(body)
  517. elif u.path == "/search.html":
  518. # 聚类库 tab 内嵌的知识检索页;API 域名由 .env 注入
  519. body = _render_search_html()
  520. self.send_response(200)
  521. self.send_header("Content-Type", "text/html; charset=utf-8")
  522. self.send_header("Content-Length", str(len(body)))
  523. self.send_header("Cache-Control", "no-cache, no-store, must-revalidate")
  524. self.end_headers()
  525. self.wfile.write(body)
  526. elif u.path == "/api/query_matrix":
  527. self._json_etag(_matrix())
  528. elif u.path == "/api/category_tree":
  529. self._json_etag(_category_tree(qs.get("source_type", "实质")))
  530. elif u.path == "/api/query_score":
  531. sel = qs.get("sel", "")
  532. if not re.fullmatch(r"[0-9a-f]{16}", sel): # 防路径穿越:sel 必为 16 位十六进制
  533. return self._err("bad sel", 400)
  534. cache = SCORE_CACHE_DIR / f"{sel}.json"
  535. if cache.is_file():
  536. self._json_etag(json.loads(cache.read_text(encoding="utf-8")))
  537. else:
  538. self._json({"pending": True}, 202)
  539. elif u.path == "/api/dashboard":
  540. self._json_etag(_dashboard_cached())
  541. elif u.path == "/api/queries":
  542. self._json_etag(_queries_cached(qs.get("mode", "process")))
  543. elif u.path == "/api/posts":
  544. self._json(db.fetch_posts(qs.get("query_id", ""), qs.get("mode", "process")))
  545. elif u.path == "/api/all_posts":
  546. # 某方向「全部帖子」:跨所有 query 的列表,分页 + 可选 采纳过滤/按帖去重
  547. try:
  548. page = max(1, int(qs.get("page", "1")))
  549. page_size = min(500, max(1, int(qs.get("page_size", "100"))))
  550. except ValueError:
  551. return self._err("page/page_size 须为整数", 400)
  552. total, posts = db.fetch_all_posts(
  553. qs.get("mode", "process"),
  554. adopted_only=qs.get("adopted") in ("1", "true"),
  555. distinct=qs.get("distinct") in ("1", "true"),
  556. limit=page_size, offset=(page - 1) * page_size)
  557. self._json({"total": total, "page": page,
  558. "page_size": page_size, "posts": posts})
  559. elif u.path == "/api/search_progress":
  560. self._json(_search_progress(qs.get("mode", "process")))
  561. elif u.path == "/api/post":
  562. # 单帖详情(正文/配图/评估全量):列表已瘦身,详情按需取;带 ETag/304
  563. r = db.fetch_post(qs.get("query_id", ""), qs.get("case_id", ""),
  564. qs.get("mode", "process"))
  565. self._json_etag(r) if r else self._err("无此帖", 404)
  566. elif u.path == "/api/extract":
  567. # 一次点击合一:单连接同时取版本列表 + 解构详情,前端少一次往返。
  568. self._json_etag(db.fetch_extract(
  569. qs.get("mode", "process"), qs.get("case_id", ""), qs.get("version")))
  570. elif u.path == "/api/extract_prompt":
  571. # 取当前解构 prompt 原文,供「重新解构·编辑 Prompt」弹框预填
  572. mode = qs.get("mode", "process")
  573. pf = HERE / "prompts" / (
  574. "procedure_extract_system.md" if mode == "process"
  575. else "tool_extract_system.md")
  576. self._json({"prompt": pf.read_text(encoding="utf-8") if pf.is_file() else ""})
  577. elif u.path == "/api/process_versions":
  578. self._json(db.fetch_process_versions(qs.get("case_id", "")))
  579. elif u.path == "/api/process":
  580. r = db.fetch_process(qs.get("case_id", ""), qs.get("version"))
  581. self._json(r) if r else self._err("无解构记录", 404)
  582. elif u.path == "/api/tools_versions":
  583. self._json(db.fetch_tools_versions(qs.get("case_id", "")))
  584. elif u.path == "/api/tools":
  585. r = db.fetch_tools(qs.get("case_id", ""), qs.get("version"))
  586. self._json(r) if r else self._err("无解构记录", 404)
  587. elif u.path == "/api/task_status":
  588. r = _task_status(qs.get("task_id", ""))
  589. self._json(r) if r else self._err("未知 task_id", 404)
  590. elif u.path == "/api/img":
  591. self._proxy_image(qs.get("u", ""))
  592. elif u.path == "/api/video":
  593. self._proxy_video(qs.get("u", ""))
  594. elif u.path.startswith("/icons/") and u.path.endswith(".svg"):
  595. # 渠道 logo 静态 SVG;按 basename 取文件,杜绝路径穿越
  596. name = Path(u.path).name
  597. f = HERE / "icons" / name
  598. if not f.is_file():
  599. return self._err("not found", 404)
  600. body = f.read_bytes()
  601. self.send_response(200)
  602. self.send_header("Content-Type", "image/svg+xml; charset=utf-8")
  603. self.send_header("Content-Length", str(len(body)))
  604. self.send_header("Cache-Control", "public, max-age=86400")
  605. self.end_headers()
  606. self.wfile.write(body)
  607. elif u.path.startswith("/api/v1/knowledge"):
  608. self._proxy_knowledge()
  609. else:
  610. self._err("not found", 404)
  611. except Exception as e:
  612. self._err(f"{type(e).__name__}: {e}", 500)
  613. def do_POST(self):
  614. u = urlparse(self.path)
  615. try:
  616. n = int(self.headers.get("Content-Length") or 0)
  617. raw = self.rfile.read(n)
  618. except Exception:
  619. return self._err("读取请求体失败")
  620. # 知识检索接口:原样反代到后端,不在本服务做 JSON 解析
  621. if u.path.startswith("/api/v1/knowledge"):
  622. return self._proxy_knowledge(body=raw)
  623. try:
  624. payload = json.loads(raw or b"{}")
  625. except Exception:
  626. return self._err("body 必须是 JSON")
  627. try:
  628. if u.path in ("/api/extract_process", "/api/extract_tools"):
  629. qid = payload.get("query_id")
  630. cids = payload.get("case_ids") or []
  631. if not qid or not cids:
  632. return self._err("缺 query_id / case_ids")
  633. mode = "process" if u.path.endswith("process") else "tools"
  634. # 认领:剔掉正在解构中的 case,防并发重复解构(白花 LLM 钱)
  635. uniq = list(dict.fromkeys(cids))
  636. claimed = _claim_cases(mode, uniq)
  637. skipped = [c for c in uniq if c not in claimed]
  638. if not claimed:
  639. return self._json({"task_id": None, "skipped": skipped,
  640. "note": "所选帖子正在解构中,已跳过(防并发重复解构)"})
  641. try:
  642. script = ("stages/procedure_extract.py" if mode == "process"
  643. else "stages/tool_extract.py")
  644. cmd = [sys.executable, script, "--query-id", qid,
  645. "--case-ids", ",".join(claimed)]
  646. if payload.get("model"):
  647. cmd += ["--model", payload["model"]]
  648. # 临时 prompt 覆盖:仅本次解构生效,不改 prompts/*.md(写到 .cache 临时文件传给 pipeline)
  649. prompt_override = payload.get("prompt")
  650. if prompt_override and prompt_override.strip():
  651. pdir = HERE / ".cache" / "prompts"
  652. pdir.mkdir(parents=True, exist_ok=True)
  653. pf = pdir / f"{mode}_{int(time.time() * 1000)}.md"
  654. pf.write_text(prompt_override, encoding="utf-8")
  655. cmd += ["--prompt-file", str(pf)]
  656. if payload.get("force"): # 默认按 case 全局去重;force 才强制重解构
  657. cmd += ["--force"]
  658. kind = "proc" if mode == "process" else "tool"
  659. task_id = _spawn_task(kind, cmd, release=(mode, claimed))
  660. except Exception:
  661. _release_cases(mode, claimed) # 起进程失败也要释放认领,避免卡住
  662. raise
  663. self._json({"task_id": task_id, "skipped": skipped})
  664. elif u.path == "/api/query_score":
  665. sel = {
  666. "tool_type": payload.get("tool_type", ""),
  667. "modality": payload.get("modality", ""),
  668. "suffix": payload.get("suffix", ""),
  669. "substance_path": payload.get("substance_path") or [],
  670. "form_path": payload.get("form_path") or [],
  671. "model": payload.get("model", "anthropic/claude-sonnet-4-6"),
  672. }
  673. sel_hash = _query_sel_hash(sel)
  674. cache = SCORE_CACHE_DIR / f"{sel_hash}.json"
  675. if cache.is_file() and not payload.get("force"):
  676. return self._json({"sel": sel_hash, "cached": True})
  677. cmd = [sys.executable, "stages/query_score.py", "--sel", sel_hash,
  678. "--tool-type", sel["tool_type"], "--modality", sel["modality"],
  679. "--suffix", sel["suffix"],
  680. "--substance-path", ",".join(sel["substance_path"]),
  681. "--form-path", ",".join(sel["form_path"]),
  682. "--model", sel["model"]]
  683. if payload.get("force"):
  684. cmd += ["--force"]
  685. self._json({"sel": sel_hash, "task_id": _spawn_task("score", cmd), "cached": False})
  686. elif u.path == "/api/run_search":
  687. query = (payload.get("query") or "").strip()
  688. if not query:
  689. return self._err("缺 query")
  690. qid = payload.get("query_id") or _next_query_id()
  691. cmd = [sys.executable, "stages/search_eval.py",
  692. "--query-id", qid, "--query", query]
  693. if payload.get("query_text"): # 用改写词搜索,但 query 列表存原始组合词
  694. cmd += ["--query-text", payload["query_text"]]
  695. if payload.get("synonyms"):
  696. cmd += ["--synonyms", payload["synonyms"]]
  697. if payload.get("mode_type") in ("工序", "工具"):
  698. cmd += ["--mode-type", payload["mode_type"]]
  699. if payload.get("platforms"):
  700. cmd += ["--platforms", payload["platforms"]]
  701. if payload.get("max_count"):
  702. cmd += ["--max-count", str(payload["max_count"])]
  703. self._json({"task_id": _spawn_task("search", cmd,
  704. meta={"query_id": qid, "query": query}),
  705. "query_id": qid})
  706. else:
  707. self._err("not found", 404)
  708. except Exception as e:
  709. self._err(f"{type(e).__name__}: {e}", 500)
  710. def log_message(self, fmt, *a):
  711. pass # 静默访问日志
  712. if __name__ == "__main__":
  713. print(f"🚀 mode_workflow server → http://0.0.0.0:{PORT}")
  714. # 预热连接池:把首请求要付的 RDS 握手提前到启动阶段(失败不阻断启动)
  715. try:
  716. db._conn().close()
  717. print("✅ DB 连接池已预热")
  718. except Exception as e:
  719. print(f"⚠ 连接池预热失败(忽略,首请求会重试):{type(e).__name__}: {e}")
  720. ThreadingHTTPServer(("0.0.0.0", PORT), Handler).serve_forever()