| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- """把一个 run 目录(如 runs_full/q0000)里**每个帖子**的 workflow.json,
- 与它对应的 post 信息(含 llm_evaluation)以及 query 词合并成一个 JSON。
- 以帖子为单位:一个 procedure 输出一个文件,文件落在 search_eval/workflows/ 下。
- (例:q0000 有 3 个 procedure -> 写出 3 个 json)
- 映射逻辑:
- procedure 文件夹名形如 {FORM}_{platform}_{hash前缀} 例: A_gzh_8f5fbfb0
- -> 读 form_{FORM}.json,在 results[] 里找 case_id 以 "{platform}_{hash前缀}" 开头的那条
- -> 该 result 即对应的 post(post / comments / llm_evaluation / source_url ...)
- -> query / original_q 取自 form_{FORM}.json 顶层
- 输出文件名: {run_id}_{folder}.json 例: q0000_A_gzh_8f5fbfb0.json
- 本模块既是 build 脚本,也是一个 HTTP 接口:
- * build 函数(build_run / write_run)保留,供外部 import 调用或经 POST /build 触发;
- * 接口本身实时扫描 workflows/ 目录,把里面所有 json 以数组形式返回。
- 用法(build):
- python build_workflows.py # 默认处理 q0000
- python build_workflows.py q0003 # 处理指定 run
- python build_workflows.py --all # 处理 runs_full 下所有 q* 目录
- 用法(接口):
- python build_workflows.py serve [port] # 默认 8771
- GET /workflows -> 实时扫描 workflows/*.json,返回数组
- GET / -> 同上(方便直接访问)
- POST /build -> body {"q":"q0003"} 或 {"all":true},触发 build 后返回结果
- """
- import json
- import os
- import re
- import sys
- import glob
- from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
- HERE = os.path.dirname(os.path.abspath(__file__))
- RUNS_DIR = os.path.join(HERE, "runs_full")
- OUT_DIR = os.path.join(HERE, "workflows")
- DEFAULT_PORT = 8771
- # 文件夹名: 表单字母 _ 平台 _ case_id 哈希前缀
- FOLDER_RE = re.compile(r"^([A-Za-z])_([a-z0-9]+)_([0-9a-fA-F]+)$")
- def load_json(path):
- with open(path, encoding="utf-8") as f:
- return json.load(f)
- def build_run(run_id, runs_dir=None):
- """为单个 run 目录构建合并结果。
- 以帖子为单位:返回一个 list,每个元素是 (folder, merged_dict),
- merged_dict 即单个帖子的合并 JSON(query + post + llm_evaluation + workflow)。
- 找不到 procedures 时返回空 list。
- runs_dir 缺省用模块的 RUNS_DIR;外部脚本(如 batch_extract_procedures.py 用了
- --output-dir)可传入自己的 runs_full,避免两边路径不一致。"""
- run_dir = os.path.join(runs_dir or RUNS_DIR, run_id)
- proc_root = os.path.join(run_dir, "procedures")
- if not os.path.isdir(proc_root):
- print(f"[skip] {run_id}: 没有 procedures/ 目录")
- return []
- # 缓存已加载的 form_{X}.json,并记录 query(取第一个见到的)
- forms = {}
- query = original_q = None
- platforms = None
- def get_form(letter):
- nonlocal query, original_q, platforms
- if letter not in forms:
- forms[letter] = load_json(os.path.join(run_dir, f"form_{letter}.json"))
- if query is None:
- query = forms[letter].get("query")
- original_q = forms[letter].get("original_q")
- platforms = forms[letter].get("platforms")
- return forms[letter]
- out = []
- for folder in sorted(os.listdir(proc_root)):
- folder_path = os.path.join(proc_root, folder)
- if not os.path.isdir(folder_path):
- continue
- m = FOLDER_RE.match(folder)
- if not m:
- print(f"[warn] {run_id}/{folder}: 文件夹名不符合命名规则,跳过")
- continue
- form_letter, platform, hash_prefix = m.groups()
- wf_path = os.path.join(folder_path, "workflow.json")
- if not os.path.isfile(wf_path):
- print(f"[warn] {run_id}/{folder}: 没有 workflow.json,跳过")
- continue
- workflow = load_json(wf_path)
- # 在对应 form 里按 case_id 前缀找匹配的 post
- form_data = get_form(form_letter)
- want_prefix = f"{platform}_{hash_prefix}"
- hits = [r for r in form_data.get("results", [])
- if r.get("case_id", "").startswith(want_prefix)]
- if len(hits) != 1:
- print(f"[warn] {run_id}/{folder}: 匹配到 {len(hits)} 条 result(期望 1),跳过")
- continue
- result = hits[0]
- # 可选:用 _source.json 的 link 校验映射没串台
- src_path = os.path.join(folder_path, "_source.json")
- if os.path.isfile(src_path):
- src = load_json(src_path)
- if src.get("link") and src["link"] != result.get("source_url"):
- print(f"[warn] {run_id}/{folder}: _source.link 与 result.source_url 不一致")
- # 以帖子为单位合并,只保留 5 个字段
- merged = {
- "query_id": run_id,
- "query": query,
- "platform": result.get("platform", platform),
- "post": result.get("post"),
- "llm_evaluation": result.get("llm_evaluation"),
- "workflow": workflow,
- }
- out.append((folder, merged))
- if not out:
- print(f"[skip] {run_id}: 没有可合并的 procedure")
- return out
- def write_run(run_id, runs_dir=None):
- entries = build_run(run_id, runs_dir=runs_dir)
- if not entries:
- return 0
- os.makedirs(OUT_DIR, exist_ok=True)
- for folder, merged in entries:
- out_path = os.path.join(OUT_DIR, f"{run_id}_{folder}.json")
- with open(out_path, "w", encoding="utf-8") as f:
- json.dump(merged, f, ensure_ascii=False, indent=2)
- print(f"[ok] {run_id}/{folder} -> {os.path.basename(out_path)}")
- return len(entries)
- def build_runs(run_ids):
- """对一组 run 执行 write_run,返回写出的帖子 json 总数。"""
- total = 0
- for run_id in run_ids:
- total += write_run(run_id)
- return total
- def all_run_ids():
- """runs_full 下所有 q* 目录。"""
- return sorted(d for d in os.listdir(RUNS_DIR)
- if re.match(r"^q\d+$", d)
- and os.path.isdir(os.path.join(RUNS_DIR, d)))
- # ---------- 接口:实时扫描 workflows/ 并以数组返回 ----------
- def scan_workflows():
- """实时扫描 workflows/*.json,把每个文件读成 dict,按文件名排序返回数组。
- 每次调用都重新读盘,所以 build 新写入的文件会立刻在接口里出现(无缓存)。"""
- items = []
- for fp in sorted(glob.glob(os.path.join(OUT_DIR, "*.json"))):
- try:
- items.append(load_json(fp))
- except Exception as e:
- print(f"[warn] 读取 {os.path.basename(fp)} 失败:{e}")
- return items
- class Handler(BaseHTTPRequestHandler):
- def _send(self, code, obj):
- body = json.dumps(obj, ensure_ascii=False).encode("utf-8")
- self.send_response(code)
- self.send_header("Content-Type", "application/json; charset=utf-8")
- self.send_header("Access-Control-Allow-Origin", "*")
- self.send_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS")
- self.send_header("Access-Control-Allow-Headers", "Content-Type")
- self.send_header("Content-Length", str(len(body)))
- self.end_headers()
- self.wfile.write(body)
- def do_OPTIONS(self): # CORS 预检
- self._send(204, {})
- def do_GET(self):
- path = self.path.split("?")[0]
- if path in ("/", "/workflows", "/api/workflows"):
- self._send(200, scan_workflows())
- else:
- self._send(404, {"error": "not found"})
- def do_POST(self):
- if self.path.split("?")[0] != "/build":
- self._send(404, {"error": "not found"}); return
- length = int(self.headers.get("Content-Length") or 0)
- raw = self.rfile.read(length).decode("utf-8") if length > 0 else "{}"
- try:
- payload = json.loads(raw)
- except Exception as e:
- self._send(400, {"error": f"bad json: {e}"}); return
- if payload.get("all"):
- run_ids = all_run_ids()
- else:
- q = (payload.get("q") or "").strip()
- if not re.match(r"^q\d+$", q): # 限定 qNN 形式,避免路径注入
- self._send(400, {"error": f"bad q (expect 'qNN' or all=true): {q!r}"}); return
- run_ids = [q]
- try:
- n = build_runs(run_ids)
- self._send(200, {"status": "ok", "runs": run_ids, "written": n})
- except Exception as e:
- self._send(500, {"error": f"build failed: {e}"})
- def log_message(self, *a):
- pass
- def serve(port):
- n = len(scan_workflows())
- print(f"workflows 接口:http://0.0.0.0:{port}/workflows "
- f"(workflows/ 下当前 {n} 个 json,实时扫描)")
- ThreadingHTTPServer(("0.0.0.0", port), Handler).serve_forever()
- def main(argv):
- args = argv[1:]
- if args and args[0] == "serve":
- port = int(args[1]) if len(args) > 1 else DEFAULT_PORT
- serve(port)
- return
- if "--all" in args:
- run_ids = all_run_ids()
- elif args:
- run_ids = args
- else:
- run_ids = ["q0000"]
- total_files = build_runs(run_ids)
- print(f"\n完成:处理 {len(run_ids)} 个 run,共写出 {total_files} 个帖子 json")
- if __name__ == "__main__":
- try: # Windows 控制台默认 cp1252,中文 print 会崩,统一切 utf-8
- sys.stdout.reconfigure(encoding="utf-8")
- except Exception:
- pass
- main(sys.argv)
|