#!/usr/bin/env python # -*- coding: utf-8 -*- """把一个 run 目录(如 runs_full/q0000)里**每个帖子**的 workflow.json, 与它对应的 post 信息(含 llm_evaluation)以及 query 词合并成一个 JSON。 以帖子为单位:一个 procedure 输出一个文件,文件落在 search_eval/workflows/ 下。 (例:q0000 有 3 个 procedure -> 写出 3 个 json) 映射逻辑: procedure 文件夹名形如 {FORM}_{platform}_{hash前缀} 例: A_gzh_8f5fbfb0 -> 读 form_{FORM}.json,在 results[] 里找 case_id 以 "{platform}_{hash前缀}" 开头的那条 -> 该 result 即对应的 post(post / comments / llm_evaluation / source_url ...) -> query / original_q 取自 form_{FORM}.json 顶层 输出文件名: {run_id}_{folder}.json 例: q0000_A_gzh_8f5fbfb0.json 本模块既是 build 脚本,也是一个 HTTP 接口: * build 函数(build_run / write_run)保留,供外部 import 调用或经 POST /build 触发; * 接口本身实时扫描 workflows/ 目录,把里面所有 json 以数组形式返回。 用法(build): python build_workflows.py # 默认处理 q0000 python build_workflows.py q0003 # 处理指定 run python build_workflows.py --all # 处理 runs_full 下所有 q* 目录 用法(接口): python build_workflows.py serve [port] # 默认 8771 GET /workflows -> 实时扫描 workflows/*.json,返回数组 GET / -> 同上(方便直接访问) POST /build -> body {"q":"q0003"} 或 {"all":true},触发 build 后返回结果 """ import json import os import re import sys import glob from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer HERE = os.path.dirname(os.path.abspath(__file__)) RUNS_DIR = os.path.join(HERE, "runs_full") OUT_DIR = os.path.join(HERE, "workflows") DEFAULT_PORT = 8771 # 文件夹名: 表单字母 _ 平台 _ case_id 哈希前缀 FOLDER_RE = re.compile(r"^([A-Za-z])_([a-z0-9]+)_([0-9a-fA-F]+)$") def load_json(path): with open(path, encoding="utf-8") as f: return json.load(f) def build_run(run_id, runs_dir=None, only_folder=None): """为单个 run 目录构建合并结果。 以帖子为单位:返回一个 list,每个元素 is (folder, merged_dict), merged_dict 即单个帖子的合并 JSON(query + post + llm_evaluation + workflow)。 找不到 procedures 时返回空 list。 runs_dir 缺省用模块的 RUNS_DIR;外部脚本(如 batch_extract_procedures.py 用了 --output-dir)可传入自己的 runs_full,避免两边路径不一致。 only_folder 非空时只处理那一个 folder(其余直接跳过、不刷 warn)——给 write_one 用,避免同 q 里别的未完成 folder 每次都被扫一遍刷一堆 warn。""" run_dir = os.path.join(runs_dir or RUNS_DIR, run_id) proc_root = os.path.join(run_dir, "procedures") if not os.path.isdir(proc_root): print(f"[skip] {run_id}: 没有 procedures/ 目录") return [] # 缓存已加载的 form_{X}.json,并记录 query(取第一个见到的) forms = {} query = original_q = None platforms = None def get_form(letter): nonlocal query, original_q, platforms if letter not in forms: forms[letter] = load_json(os.path.join(run_dir, f"form_{letter}.json")) if query is None: query = forms[letter].get("query") original_q = forms[letter].get("original_q") platforms = forms[letter].get("platforms") return forms[letter] out = [] for folder in sorted(os.listdir(proc_root)): if only_folder and folder != only_folder: continue # write_one 只要这一个, 其余静默跳过 folder_path = os.path.join(proc_root, folder) if not os.path.isdir(folder_path): continue m = FOLDER_RE.match(folder) if not m: print(f"[warn] {run_id}/{folder}: 文件夹名不符合命名规则,跳过") continue form_letter, platform, hash_prefix = m.groups() wf_path = os.path.join(folder_path, "workflow.json") if not os.path.isfile(wf_path): print(f"[warn] {run_id}/{folder}: 没有 workflow.json,跳过") continue workflow = load_json(wf_path) # 在对应 form 里按 case_id 前缀找匹配的 post form_data = get_form(form_letter) want_prefix = f"{platform}_{hash_prefix}" hits = [r for r in form_data.get("results", []) if r.get("case_id", "").startswith(want_prefix)] if len(hits) != 1: print(f"[warn] {run_id}/{folder}: 匹配到 {len(hits)} 条 result(期望 1),跳过") continue result = hits[0] # 可选:用 _source.json 的 link 校验映射没串台 src_path = os.path.join(folder_path, "_source.json") if os.path.isfile(src_path): src = load_json(src_path) if src.get("link") and src["link"] != result.get("source_url"): print(f"[warn] {run_id}/{folder}: _source.link 与 result.source_url 不一致") # 以帖子为单位合并,只保留 5 个字段 merged = { "query_id": run_id, "query": query, "platform": result.get("platform", platform), "post": result.get("post"), "llm_evaluation": result.get("llm_evaluation"), "workflow": workflow, } out.append((folder, merged)) if not out: print(f"[skip] {run_id}: 没有可合并的 procedure") return out def _dump_merged(run_id, folder, merged): """把单个帖子的 merged 写成 workflows/{run_id}_{folder}.json。""" os.makedirs(OUT_DIR, exist_ok=True) out_path = os.path.join(OUT_DIR, f"{run_id}_{folder}.json") with open(out_path, "w", encoding="utf-8") as f: json.dump(merged, f, ensure_ascii=False, indent=2) print(f"[ok] {run_id}/{folder} -> {os.path.basename(out_path)}") def write_run(run_id, runs_dir=None): entries = build_run(run_id, runs_dir=runs_dir) if not entries: return 0 for folder, merged in entries: _dump_merged(run_id, folder, merged) return len(entries) def write_one(run_id, folder, runs_dir=None): """只把指定 procedure folder 的合并 json 写出 —— 跑完一个工序就立刻出一个, 无需等同 q 其他帖子。命中并写出返回 1, 没匹配到 (缺 workflow.json 等) 返回 0。 复用 build_run 的解析/校验逻辑, 只处理 folder 那一个 (only_folder 让同 q 其余 未完成目录被静默跳过, 不刷 warn)。""" for f, merged in build_run(run_id, runs_dir=runs_dir, only_folder=folder): if f == folder: _dump_merged(run_id, folder, merged) return 1 return 0 def build_runs(run_ids): """对一组 run 执行 write_run,返回写出的帖子 json 总数。""" total = 0 for run_id in run_ids: total += write_run(run_id) return total def all_run_ids(): """runs_full 下所有 q* 目录。""" return sorted(d for d in os.listdir(RUNS_DIR) if re.match(r"^q\d+$", d) and os.path.isdir(os.path.join(RUNS_DIR, d))) # ---------- 接口:实时扫描 workflows/ 并以数组返回 ---------- def scan_workflows(): """实时扫描 workflows/*.json,把每个文件读成 dict,按文件名排序返回数组。 每次调用都重新读盘,所以 build 新写入的文件会立刻在接口里出现(无缓存)。""" items = [] for fp in sorted(glob.glob(os.path.join(OUT_DIR, "*.json"))): try: items.append(load_json(fp)) except Exception as e: print(f"[warn] 读取 {os.path.basename(fp)} 失败:{e}") return items class Handler(BaseHTTPRequestHandler): def _send(self, code, obj): body = json.dumps(obj, ensure_ascii=False).encode("utf-8") self.send_response(code) self.send_header("Content-Type", "application/json; charset=utf-8") self.send_header("Access-Control-Allow-Origin", "*") self.send_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS") self.send_header("Access-Control-Allow-Headers", "Content-Type") self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body) def do_OPTIONS(self): # CORS 预检 self._send(204, {}) def do_GET(self): path = self.path.split("?")[0] if path in ("/", "/workflows", "/api/workflows"): self._send(200, scan_workflows()) else: self._send(404, {"error": "not found"}) def do_POST(self): if self.path.split("?")[0] != "/build": self._send(404, {"error": "not found"}); return length = int(self.headers.get("Content-Length") or 0) raw = self.rfile.read(length).decode("utf-8") if length > 0 else "{}" try: payload = json.loads(raw) except Exception as e: self._send(400, {"error": f"bad json: {e}"}); return if payload.get("all"): run_ids = all_run_ids() else: q = (payload.get("q") or "").strip() if not re.match(r"^q\d+$", q): # 限定 qNN 形式,避免路径注入 self._send(400, {"error": f"bad q (expect 'qNN' or all=true): {q!r}"}); return run_ids = [q] try: n = build_runs(run_ids) self._send(200, {"status": "ok", "runs": run_ids, "written": n}) except Exception as e: self._send(500, {"error": f"build failed: {e}"}) def log_message(self, *a): pass def serve(port): n = len(scan_workflows()) print(f"workflows 接口:http://0.0.0.0:{port}/workflows " f"(workflows/ 下当前 {n} 个 json,实时扫描)") ThreadingHTTPServer(("0.0.0.0", port), Handler).serve_forever() def main(argv): args = argv[1:] if args and args[0] == "serve": port = int(args[1]) if len(args) > 1 else DEFAULT_PORT serve(port) return if "--all" in args: run_ids = all_run_ids() elif args: run_ids = args else: run_ids = ["q0000"] total_files = build_runs(run_ids) print(f"\n完成:处理 {len(run_ids)} 个 run,共写出 {total_files} 个帖子 json") if __name__ == "__main__": try: # Windows 控制台默认 cp1252,中文 print 会崩,统一切 utf-8 sys.stdout.reconfigure(encoding="utf-8") except Exception: pass main(sys.argv)