build_workflows.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. """把一个 run 目录(如 runs_full/q0000)里**每个帖子**的 workflow.json,
  4. 与它对应的 post 信息(含 llm_evaluation)以及 query 词合并成一个 JSON。
  5. 以帖子为单位:一个 procedure 输出一个文件,文件落在 search_eval/workflows/ 下。
  6. (例:q0000 有 3 个 procedure -> 写出 3 个 json)
  7. 映射逻辑:
  8. procedure 文件夹名形如 {FORM}_{platform}_{hash前缀} 例: A_gzh_8f5fbfb0
  9. -> 读 form_{FORM}.json,在 results[] 里找 case_id 以 "{platform}_{hash前缀}" 开头的那条
  10. -> 该 result 即对应的 post(post / comments / llm_evaluation / source_url ...)
  11. -> query / original_q 取自 form_{FORM}.json 顶层
  12. 输出文件名: {run_id}_{folder}.json 例: q0000_A_gzh_8f5fbfb0.json
  13. 本模块既是 build 脚本,也是一个 HTTP 接口:
  14. * build 函数(build_run / write_run)保留,供外部 import 调用或经 POST /build 触发;
  15. * 接口本身实时扫描 workflows/ 目录,把里面所有 json 以数组形式返回。
  16. 用法(build):
  17. python build_workflows.py # 默认处理 q0000
  18. python build_workflows.py q0003 # 处理指定 run
  19. python build_workflows.py --all # 处理 runs_full 下所有 q* 目录
  20. 用法(接口):
  21. python build_workflows.py serve [port] # 默认 8771
  22. GET /workflows -> 实时扫描 workflows/*.json,返回数组
  23. GET / -> 同上(方便直接访问)
  24. POST /build -> body {"q":"q0003"} 或 {"all":true},触发 build 后返回结果
  25. """
  26. import json
  27. import os
  28. import re
  29. import sys
  30. import glob
  31. from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
  32. HERE = os.path.dirname(os.path.abspath(__file__))
  33. RUNS_DIR = os.path.join(HERE, "runs_full")
  34. OUT_DIR = os.path.join(HERE, "workflows")
  35. DEFAULT_PORT = 8771
  36. # 文件夹名: 表单字母 _ 平台 _ case_id 哈希前缀
  37. FOLDER_RE = re.compile(r"^([A-Za-z])_([a-z0-9]+)_([0-9a-fA-F]+)$")
  38. def load_json(path):
  39. with open(path, encoding="utf-8") as f:
  40. return json.load(f)
  41. def build_run(run_id, runs_dir=None, only_folder=None):
  42. """为单个 run 目录构建合并结果。
  43. 以帖子为单位:返回一个 list,每个元素 is (folder, merged_dict),
  44. merged_dict 即单个帖子的合并 JSON(query + post + llm_evaluation + workflow)。
  45. 找不到 procedures 时返回空 list。
  46. runs_dir 缺省用模块的 RUNS_DIR;外部脚本(如 batch_extract_procedures.py 用了
  47. --output-dir)可传入自己的 runs_full,避免两边路径不一致。
  48. only_folder 非空时只处理那一个 folder(其余直接跳过、不刷 warn)——给 write_one
  49. 用,避免同 q 里别的未完成 folder 每次都被扫一遍刷一堆 warn。"""
  50. run_dir = os.path.join(runs_dir or RUNS_DIR, run_id)
  51. proc_root = os.path.join(run_dir, "procedures")
  52. if not os.path.isdir(proc_root):
  53. print(f"[skip] {run_id}: 没有 procedures/ 目录")
  54. return []
  55. # 缓存已加载的 form_{X}.json,并记录 query(取第一个见到的)
  56. forms = {}
  57. query = original_q = None
  58. platforms = None
  59. def get_form(letter):
  60. nonlocal query, original_q, platforms
  61. if letter not in forms:
  62. forms[letter] = load_json(os.path.join(run_dir, f"form_{letter}.json"))
  63. if query is None:
  64. query = forms[letter].get("query")
  65. original_q = forms[letter].get("original_q")
  66. platforms = forms[letter].get("platforms")
  67. return forms[letter]
  68. out = []
  69. for folder in sorted(os.listdir(proc_root)):
  70. if only_folder and folder != only_folder:
  71. continue # write_one 只要这一个, 其余静默跳过
  72. folder_path = os.path.join(proc_root, folder)
  73. if not os.path.isdir(folder_path):
  74. continue
  75. m = FOLDER_RE.match(folder)
  76. result = None
  77. platform = None
  78. if m:
  79. form_letter, platform, hash_prefix = m.groups()
  80. wf_path = os.path.join(folder_path, "workflow.json")
  81. if not os.path.isfile(wf_path):
  82. print(f"[warn] {run_id}/{folder}: 没有 workflow.json,跳过")
  83. continue
  84. workflow = load_json(wf_path)
  85. # 在对应 form 里按 case_id 前缀找匹配的 post
  86. form_data = get_form(form_letter)
  87. want_prefix = f"{platform}_{hash_prefix}"
  88. hits = [r for r in form_data.get("results", [])
  89. if r.get("case_id", "").startswith(want_prefix)]
  90. if len(hits) != 1:
  91. print(f"[warn] {run_id}/{folder}: 匹配到 {len(hits)} 条 result(期望 1),跳过")
  92. continue
  93. result = hits[0]
  94. else:
  95. # Fallback: check if the folder matches the case_id of any post in form_*.json
  96. case_id = folder
  97. found_form = None
  98. for letter in ['A', 'B', 'C']:
  99. form_json_path = os.path.join(run_dir, f"form_{letter}.json")
  100. if os.path.isfile(form_json_path):
  101. try:
  102. form_data = get_form(letter)
  103. hits = [r for r in form_data.get("results", []) if r.get("case_id") == case_id]
  104. if hits:
  105. found_form = letter
  106. result = hits[0]
  107. break
  108. except Exception:
  109. pass
  110. if not found_form:
  111. print(f"[warn] {run_id}/{folder}: 文件夹名不符合命名规则且无法在 form_*.json 中匹配,跳过")
  112. continue
  113. form_letter = found_form
  114. platform = case_id.split("_")[0]
  115. wf_path = os.path.join(folder_path, "workflow.json")
  116. if not os.path.isfile(wf_path):
  117. print(f"[warn] {run_id}/{folder}: 没有 workflow.json,跳过")
  118. continue
  119. workflow = load_json(wf_path)
  120. # 可选:用 _source.json 的 link 校验映射没串台
  121. src_path = os.path.join(folder_path, "_source.json")
  122. if os.path.isfile(src_path):
  123. src = load_json(src_path)
  124. if src.get("link") and src["link"] != result.get("source_url"):
  125. print(f"[warn] {run_id}/{folder}: _source.link 与 result.source_url 不一致")
  126. # 以帖子为单位合并,只保留 5 个字段
  127. merged = {
  128. "query_id": run_id,
  129. "query": query,
  130. "platform": result.get("platform", platform),
  131. "post": result.get("post"),
  132. "llm_evaluation": result.get("llm_evaluation"),
  133. "workflow": workflow,
  134. }
  135. out.append((folder, merged))
  136. if not out:
  137. print(f"[skip] {run_id}: 没有可合并的 procedure")
  138. return out
  139. def _dump_merged(run_id, folder, merged):
  140. """把单个帖子的 merged 写成 workflows/{run_id}_{folder}.json。"""
  141. os.makedirs(OUT_DIR, exist_ok=True)
  142. out_path = os.path.join(OUT_DIR, f"{run_id}_{folder}.json")
  143. with open(out_path, "w", encoding="utf-8") as f:
  144. json.dump(merged, f, ensure_ascii=False, indent=2)
  145. print(f"[ok] {run_id}/{folder} -> {os.path.basename(out_path)}")
  146. def write_run(run_id, runs_dir=None):
  147. entries = build_run(run_id, runs_dir=runs_dir)
  148. if not entries:
  149. return 0
  150. for folder, merged in entries:
  151. _dump_merged(run_id, folder, merged)
  152. return len(entries)
  153. def write_one(run_id, folder, runs_dir=None):
  154. """只把指定 procedure folder 的合并 json 写出 —— 跑完一个工序就立刻出一个,
  155. 无需等同 q 其他帖子。命中并写出返回 1, 没匹配到 (缺 workflow.json 等) 返回 0。
  156. 复用 build_run 的解析/校验逻辑, 只处理 folder 那一个 (only_folder 让同 q 其余
  157. 未完成目录被静默跳过, 不刷 warn)。"""
  158. for f, merged in build_run(run_id, runs_dir=runs_dir, only_folder=folder):
  159. if f == folder:
  160. _dump_merged(run_id, folder, merged)
  161. return 1
  162. return 0
  163. def build_runs(run_ids):
  164. """对一组 run 执行 write_run,返回写出的帖子 json 总数。"""
  165. total = 0
  166. for run_id in run_ids:
  167. total += write_run(run_id)
  168. return total
  169. def all_run_ids():
  170. """runs_full 下所有 q* 目录。"""
  171. return sorted(d for d in os.listdir(RUNS_DIR)
  172. if re.match(r"^q\d+$", d)
  173. and os.path.isdir(os.path.join(RUNS_DIR, d)))
  174. # ---------- 接口:实时扫描 workflows/ 并以数组返回 ----------
  175. def scan_workflows():
  176. """实时扫描 workflows/*.json,把每个文件读成 dict,按文件名排序返回数组。
  177. 每次调用都重新读盘,所以 build 新写入的文件会立刻在接口里出现(无缓存)。"""
  178. items = []
  179. for fp in sorted(glob.glob(os.path.join(OUT_DIR, "*.json"))):
  180. try:
  181. items.append(load_json(fp))
  182. except Exception as e:
  183. print(f"[warn] 读取 {os.path.basename(fp)} 失败:{e}")
  184. return items
  185. class Handler(BaseHTTPRequestHandler):
  186. def _send(self, code, obj):
  187. body = json.dumps(obj, ensure_ascii=False).encode("utf-8")
  188. self.send_response(code)
  189. self.send_header("Content-Type", "application/json; charset=utf-8")
  190. self.send_header("Access-Control-Allow-Origin", "*")
  191. self.send_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS")
  192. self.send_header("Access-Control-Allow-Headers", "Content-Type")
  193. self.send_header("Content-Length", str(len(body)))
  194. self.end_headers()
  195. self.wfile.write(body)
  196. def do_OPTIONS(self): # CORS 预检
  197. self._send(204, {})
  198. def do_GET(self):
  199. path = self.path.split("?")[0]
  200. if path in ("/", "/workflows", "/api/workflows"):
  201. self._send(200, scan_workflows())
  202. else:
  203. self._send(404, {"error": "not found"})
  204. def do_POST(self):
  205. if self.path.split("?")[0] != "/build":
  206. self._send(404, {"error": "not found"}); return
  207. length = int(self.headers.get("Content-Length") or 0)
  208. raw = self.rfile.read(length).decode("utf-8") if length > 0 else "{}"
  209. try:
  210. payload = json.loads(raw)
  211. except Exception as e:
  212. self._send(400, {"error": f"bad json: {e}"}); return
  213. if payload.get("all"):
  214. run_ids = all_run_ids()
  215. else:
  216. q = (payload.get("q") or "").strip()
  217. if not re.match(r"^q\d+$", q): # 限定 qNN 形式,避免路径注入
  218. self._send(400, {"error": f"bad q (expect 'qNN' or all=true): {q!r}"}); return
  219. run_ids = [q]
  220. try:
  221. n = build_runs(run_ids)
  222. self._send(200, {"status": "ok", "runs": run_ids, "written": n})
  223. except Exception as e:
  224. self._send(500, {"error": f"build failed: {e}"})
  225. def log_message(self, *a):
  226. pass
  227. def serve(port):
  228. n = len(scan_workflows())
  229. print(f"workflows 接口:http://0.0.0.0:{port}/workflows "
  230. f"(workflows/ 下当前 {n} 个 json,实时扫描)")
  231. ThreadingHTTPServer(("0.0.0.0", port), Handler).serve_forever()
  232. def main(argv):
  233. args = argv[1:]
  234. if args and args[0] == "serve":
  235. port = int(args[1]) if len(args) > 1 else DEFAULT_PORT
  236. serve(port)
  237. return
  238. if "--all" in args:
  239. run_ids = all_run_ids()
  240. elif args:
  241. run_ids = args
  242. else:
  243. run_ids = ["q0000"]
  244. total_files = build_runs(run_ids)
  245. print(f"\n完成:处理 {len(run_ids)} 个 run,共写出 {total_files} 个帖子 json")
  246. if __name__ == "__main__":
  247. try: # Windows 控制台默认 cp1252,中文 print 会崩,统一切 utf-8
  248. sys.stdout.reconfigure(encoding="utf-8")
  249. except Exception:
  250. pass
  251. main(sys.argv)