build_workflows.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. """把一个 run 目录(如 runs_full/q0000)里**每个帖子**的 workflow.json,
  4. 与它对应的 post 信息(含 llm_evaluation)以及 query 词合并成一个 JSON。
  5. 以帖子为单位:一个 procedure 输出一个文件,文件落在 search_eval/workflows/ 下。
  6. (例:q0000 有 3 个 procedure -> 写出 3 个 json)
  7. 映射逻辑:
  8. procedure 文件夹名形如 {FORM}_{platform}_{hash前缀} 例: A_gzh_8f5fbfb0
  9. -> 读 form_{FORM}.json,在 results[] 里找 case_id 以 "{platform}_{hash前缀}" 开头的那条
  10. -> 该 result 即对应的 post(post / comments / llm_evaluation / source_url ...)
  11. -> query / original_q 取自 form_{FORM}.json 顶层
  12. 输出文件名: {run_id}_{folder}.json 例: q0000_A_gzh_8f5fbfb0.json
  13. 本模块既是 build 脚本,也是一个 HTTP 接口:
  14. * build 函数(build_run / write_run)保留,供外部 import 调用或经 POST /build 触发;
  15. * 接口本身实时扫描 workflows/ 目录,把里面所有 json 以数组形式返回。
  16. 用法(build):
  17. python build_workflows.py # 默认处理 q0000
  18. python build_workflows.py q0003 # 处理指定 run
  19. python build_workflows.py --all # 处理 runs_full 下所有 q* 目录
  20. 用法(接口):
  21. python build_workflows.py serve [port] # 默认 8771
  22. GET /workflows -> 实时扫描 workflows/*.json,返回数组
  23. GET / -> 同上(方便直接访问)
  24. POST /build -> body {"q":"q0003"} 或 {"all":true},触发 build 后返回结果
  25. """
  26. import json
  27. import os
  28. import re
  29. import sys
  30. import glob
  31. from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
  32. HERE = os.path.dirname(os.path.abspath(__file__))
  33. RUNS_DIR = os.path.join(HERE, "runs_full")
  34. OUT_DIR = os.path.join(HERE, "workflows")
  35. DEFAULT_PORT = 8771
  36. # 文件夹名: 表单字母 _ 平台 _ case_id 哈希前缀
  37. FOLDER_RE = re.compile(r"^([A-Za-z])_([a-z0-9]+)_([0-9a-fA-F]+)$")
  38. def load_json(path):
  39. with open(path, encoding="utf-8") as f:
  40. return json.load(f)
  41. def build_run(run_id, runs_dir=None, only_folder=None):
  42. """为单个 run 目录构建合并结果。
  43. 以帖子为单位:返回一个 list,每个元素是 (folder, merged_dict),
  44. merged_dict 即单个帖子的合并 JSON(query + post + llm_evaluation + workflow)。
  45. 找不到 procedures 时返回空 list。
  46. runs_dir 缺省用模块的 RUNS_DIR;外部脚本(如 batch_extract_procedures.py 用了
  47. --output-dir)可传入自己的 runs_full,避免两边路径不一致。
  48. only_folder 非空时只处理那一个 folder(其余直接跳过、不刷 warn)——给 write_one
  49. 用,避免同 q 里别的未完成 folder 每次都被扫一遍刷一堆 warn。"""
  50. run_dir = os.path.join(runs_dir or RUNS_DIR, run_id)
  51. proc_root = os.path.join(run_dir, "procedures")
  52. if not os.path.isdir(proc_root):
  53. print(f"[skip] {run_id}: 没有 procedures/ 目录")
  54. return []
  55. # 缓存已加载的 form_{X}.json,并记录 query(取第一个见到的)
  56. forms = {}
  57. query = original_q = None
  58. platforms = None
  59. def get_form(letter):
  60. nonlocal query, original_q, platforms
  61. if letter not in forms:
  62. forms[letter] = load_json(os.path.join(run_dir, f"form_{letter}.json"))
  63. if query is None:
  64. query = forms[letter].get("query")
  65. original_q = forms[letter].get("original_q")
  66. platforms = forms[letter].get("platforms")
  67. return forms[letter]
  68. out = []
  69. for folder in sorted(os.listdir(proc_root)):
  70. if only_folder and folder != only_folder:
  71. continue # write_one 只要这一个, 其余静默跳过
  72. folder_path = os.path.join(proc_root, folder)
  73. if not os.path.isdir(folder_path):
  74. continue
  75. m = FOLDER_RE.match(folder)
  76. if not m:
  77. print(f"[warn] {run_id}/{folder}: 文件夹名不符合命名规则,跳过")
  78. continue
  79. form_letter, platform, hash_prefix = m.groups()
  80. wf_path = os.path.join(folder_path, "workflow.json")
  81. if not os.path.isfile(wf_path):
  82. print(f"[warn] {run_id}/{folder}: 没有 workflow.json,跳过")
  83. continue
  84. workflow = load_json(wf_path)
  85. # 在对应 form 里按 case_id 前缀找匹配的 post
  86. form_data = get_form(form_letter)
  87. want_prefix = f"{platform}_{hash_prefix}"
  88. hits = [r for r in form_data.get("results", [])
  89. if r.get("case_id", "").startswith(want_prefix)]
  90. if len(hits) != 1:
  91. print(f"[warn] {run_id}/{folder}: 匹配到 {len(hits)} 条 result(期望 1),跳过")
  92. continue
  93. result = hits[0]
  94. # 可选:用 _source.json 的 link 校验映射没串台
  95. src_path = os.path.join(folder_path, "_source.json")
  96. if os.path.isfile(src_path):
  97. src = load_json(src_path)
  98. if src.get("link") and src["link"] != result.get("source_url"):
  99. print(f"[warn] {run_id}/{folder}: _source.link 与 result.source_url 不一致")
  100. # 以帖子为单位合并,只保留 5 个字段
  101. merged = {
  102. "query_id": run_id,
  103. "query": query,
  104. "platform": result.get("platform", platform),
  105. "post": result.get("post"),
  106. "llm_evaluation": result.get("llm_evaluation"),
  107. "workflow": workflow,
  108. }
  109. out.append((folder, merged))
  110. if not out:
  111. print(f"[skip] {run_id}: 没有可合并的 procedure")
  112. return out
  113. def _dump_merged(run_id, folder, merged):
  114. """把单个帖子的 merged 写成 workflows/{run_id}_{folder}.json。"""
  115. os.makedirs(OUT_DIR, exist_ok=True)
  116. out_path = os.path.join(OUT_DIR, f"{run_id}_{folder}.json")
  117. with open(out_path, "w", encoding="utf-8") as f:
  118. json.dump(merged, f, ensure_ascii=False, indent=2)
  119. print(f"[ok] {run_id}/{folder} -> {os.path.basename(out_path)}")
  120. def write_run(run_id, runs_dir=None):
  121. entries = build_run(run_id, runs_dir=runs_dir)
  122. if not entries:
  123. return 0
  124. for folder, merged in entries:
  125. _dump_merged(run_id, folder, merged)
  126. return len(entries)
  127. def write_one(run_id, folder, runs_dir=None):
  128. """只把指定 procedure folder 的合并 json 写出 —— 跑完一个工序就立刻出一个,
  129. 无需等同 q 其他帖子。命中并写出返回 1, 没匹配到 (缺 workflow.json 等) 返回 0。
  130. 复用 build_run 的解析/校验逻辑, 只处理 folder 那一个 (only_folder 让同 q 其余
  131. 未完成目录被静默跳过, 不刷 warn)。"""
  132. for f, merged in build_run(run_id, runs_dir=runs_dir, only_folder=folder):
  133. if f == folder:
  134. _dump_merged(run_id, folder, merged)
  135. return 1
  136. return 0
  137. def build_runs(run_ids):
  138. """对一组 run 执行 write_run,返回写出的帖子 json 总数。"""
  139. total = 0
  140. for run_id in run_ids:
  141. total += write_run(run_id)
  142. return total
  143. def all_run_ids():
  144. """runs_full 下所有 q* 目录。"""
  145. return sorted(d for d in os.listdir(RUNS_DIR)
  146. if re.match(r"^q\d+$", d)
  147. and os.path.isdir(os.path.join(RUNS_DIR, d)))
  148. # ---------- 接口:实时扫描 workflows/ 并以数组返回 ----------
  149. def scan_workflows():
  150. """实时扫描 workflows/*.json,把每个文件读成 dict,按文件名排序返回数组。
  151. 每次调用都重新读盘,所以 build 新写入的文件会立刻在接口里出现(无缓存)。"""
  152. items = []
  153. for fp in sorted(glob.glob(os.path.join(OUT_DIR, "*.json"))):
  154. try:
  155. items.append(load_json(fp))
  156. except Exception as e:
  157. print(f"[warn] 读取 {os.path.basename(fp)} 失败:{e}")
  158. return items
  159. class Handler(BaseHTTPRequestHandler):
  160. def _send(self, code, obj):
  161. body = json.dumps(obj, ensure_ascii=False).encode("utf-8")
  162. self.send_response(code)
  163. self.send_header("Content-Type", "application/json; charset=utf-8")
  164. self.send_header("Access-Control-Allow-Origin", "*")
  165. self.send_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS")
  166. self.send_header("Access-Control-Allow-Headers", "Content-Type")
  167. self.send_header("Content-Length", str(len(body)))
  168. self.end_headers()
  169. self.wfile.write(body)
  170. def do_OPTIONS(self): # CORS 预检
  171. self._send(204, {})
  172. def do_GET(self):
  173. path = self.path.split("?")[0]
  174. if path in ("/", "/workflows", "/api/workflows"):
  175. self._send(200, scan_workflows())
  176. else:
  177. self._send(404, {"error": "not found"})
  178. def do_POST(self):
  179. if self.path.split("?")[0] != "/build":
  180. self._send(404, {"error": "not found"}); return
  181. length = int(self.headers.get("Content-Length") or 0)
  182. raw = self.rfile.read(length).decode("utf-8") if length > 0 else "{}"
  183. try:
  184. payload = json.loads(raw)
  185. except Exception as e:
  186. self._send(400, {"error": f"bad json: {e}"}); return
  187. if payload.get("all"):
  188. run_ids = all_run_ids()
  189. else:
  190. q = (payload.get("q") or "").strip()
  191. if not re.match(r"^q\d+$", q): # 限定 qNN 形式,避免路径注入
  192. self._send(400, {"error": f"bad q (expect 'qNN' or all=true): {q!r}"}); return
  193. run_ids = [q]
  194. try:
  195. n = build_runs(run_ids)
  196. self._send(200, {"status": "ok", "runs": run_ids, "written": n})
  197. except Exception as e:
  198. self._send(500, {"error": f"build failed: {e}"})
  199. def log_message(self, *a):
  200. pass
  201. def serve(port):
  202. n = len(scan_workflows())
  203. print(f"workflows 接口:http://0.0.0.0:{port}/workflows "
  204. f"(workflows/ 下当前 {n} 个 json,实时扫描)")
  205. ThreadingHTTPServer(("0.0.0.0", port), Handler).serve_forever()
  206. def main(argv):
  207. args = argv[1:]
  208. if args and args[0] == "serve":
  209. port = int(args[1]) if len(args) > 1 else DEFAULT_PORT
  210. serve(port)
  211. return
  212. if "--all" in args:
  213. run_ids = all_run_ids()
  214. elif args:
  215. run_ids = args
  216. else:
  217. run_ids = ["q0000"]
  218. total_files = build_runs(run_ids)
  219. print(f"\n完成:处理 {len(run_ids)} 个 run,共写出 {total_files} 个帖子 json")
  220. if __name__ == "__main__":
  221. try: # Windows 控制台默认 cp1252,中文 print 会崩,统一切 utf-8
  222. sys.stdout.reconfigure(encoding="utf-8")
  223. except Exception:
  224. pass
  225. main(sys.argv)