batch_extract_procedures.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353
  1. """批量化跑 procedure-dsl: 对每个 q 的三 form 选评分最高的"工序"帖子,
  2. 喂给 procedure-dsl/run_procedure_dsl.py 提取工序, HTML 直接落在
  3. runs_full/qNNNN/procedures/{form}_{short_case}/ 下。
  4. 筛选: llm_evaluation.知识类型 含 "工序" (兼容英文 "procedure") 且非 _error
  5. —— 工序帖才有端到端流程可抽; 能力/工具帖太原子, 抽不出 procedure。
  6. 评分: 两级均值, 避免子项多的组天然占便宜:
  7. group_rel = mean(相关性 下所有得分)
  8. group_qual = mean(质量 下所有得分, 含固定+动态)
  9. final = mean(group_rel, group_qual)
  10. 选条: 每 form top-K (默认 1)。两层去重, 撞了顺延下一个候选:
  11. ① 本 q 跨 form (同帖只跑一次)
  12. ② 全局: 扫所有 q*/procedures/*/ 目录名 → 抽 short_case → 已存在即视为占用
  13. (无论跑完没有, 跑挂了也算占了 case; 想强复跑用 --force)
  14. 产物: HTML/MD/workflow.json/_source.json/_meta.json 全落
  15. runs_full/qNNNN/procedures/{form}_{short_case}/
  16. 跳过: 目录已存 → skip (--force 可强覆盖)
  17. 并发: 多终端开几个就跑几个; 每条 procedure 一启动就 mkdir, 其他终端立刻见到目录视作已占
  18. 用法:
  19. # 单 q 试
  20. python batch_extract_procedures.py --only-q q0036
  21. # 区间
  22. python batch_extract_procedures.py --start 0 --count 5
  23. # top-3 + 强制重跑
  24. python batch_extract_procedures.py --only-q q0036 --top-k 3 --force
  25. # 仅列出会跑哪些 (不真跑)
  26. python batch_extract_procedures.py --only-q q0036 --dry-run
  27. """
  28. import argparse
  29. import json
  30. import os
  31. import re
  32. import subprocess
  33. import sys
  34. from datetime import datetime
  35. from pathlib import Path
  36. from statistics import mean
  37. from typing import Any, Dict, List, Optional, Set, Tuple
  38. import build_workflows # 复用其 write_run: 跑完一个 q 就把合并 json 写进 workflows/
  39. HERE = Path(__file__).resolve().parent
  40. RUNS_FULL_DEFAULT = HERE / "runs_full"
  41. RUN_DSL = HERE / "procedure-dsl" / "run_procedure_dsl.py"
  42. _KT_PROCEDURE = {"工序", "procedure"} # 工序没改名, 仅兼容英文枚举
  43. def _norm_q(s: str) -> int:
  44. m = re.search(r"\d+", s or "")
  45. if not m:
  46. sys.exit(f"❌ q 参数不含数字: {s!r}")
  47. return int(m.group())
  48. def _has_procedure(ev: Dict[str, Any]) -> bool:
  49. """llm_evaluation.知识类型 是否声明了 工序 类型 (含英文 procedure)。"""
  50. kt = ev.get("知识类型") or ev.get("knowledge_type") or []
  51. if isinstance(kt, str):
  52. kt = [kt]
  53. return any(k in _KT_PROCEDURE for k in kt) if isinstance(kt, list) else False
  54. def _safe_float(x: Any) -> Optional[float]:
  55. try:
  56. return float(x)
  57. except (TypeError, ValueError):
  58. return None
  59. def _all_scores(obj: Any) -> List[float]:
  60. """递归挖一棵嵌套 dict 里所有 '得分' 叶子的数值。"""
  61. out: List[float] = []
  62. if isinstance(obj, dict):
  63. for k, v in obj.items():
  64. if k == "得分":
  65. f = _safe_float(v)
  66. if f is not None:
  67. out.append(f)
  68. elif isinstance(v, (dict, list)):
  69. out.extend(_all_scores(v))
  70. elif isinstance(obj, list):
  71. for it in obj:
  72. out.extend(_all_scores(it))
  73. return out
  74. def _composite_score(ev: Dict[str, Any]) -> float:
  75. """两级均值:
  76. group_rel = mean(相关性 下所有 得分)
  77. group_qual = mean(质量 下所有 得分, 含固定+动态全部子项)
  78. final = mean(group_rel, group_qual)
  79. 某一组缺失 (该组下没任何得分字段) → 降级用另一组的均值; 都缺 → 0。"""
  80. rel_scores = _all_scores(ev.get("相关性"))
  81. qual_scores = _all_scores(ev.get("质量"))
  82. rel_mean = mean(rel_scores) if rel_scores else None
  83. qual_mean = mean(qual_scores) if qual_scores else None
  84. parts = [m for m in (rel_mean, qual_mean) if m is not None]
  85. return mean(parts) if parts else 0.0
  86. def _pick_candidates(form_path: Path, exclude_ids: Set[str], top_k: int
  87. ) -> List[Tuple[float, Dict[str, Any]]]:
  88. """读 form_*.json, 取评分最高的 top_k 个工序帖, 跳过 exclude_ids 里的 case_id
  89. (已被其他 q 跑过 / 正在跑 / 本进程已规划)。exclude_ids 用 _short_case 形式。"""
  90. try:
  91. d = json.loads(form_path.read_text(encoding="utf-8"))
  92. except Exception as e:
  93. print(f" ⚠️ 读 {form_path.name} 失败: {e}")
  94. return []
  95. cands: List[Tuple[float, Dict[str, Any]]] = []
  96. for r in d.get("results", []):
  97. ev = r.get("llm_evaluation") or {}
  98. if ev.get("_error") or not _has_procedure(ev):
  99. continue
  100. cands.append((_composite_score(ev), r))
  101. cands.sort(key=lambda x: x[0], reverse=True)
  102. out: List[Tuple[float, Dict[str, Any]]] = []
  103. for s, r in cands:
  104. # 用 _short_case 化的 case_id 对齐 global 状态 / 目录命名
  105. if _short_case(r.get("case_id") or "") in exclude_ids:
  106. continue
  107. out.append((s, r))
  108. if len(out) >= top_k:
  109. break
  110. return out
  111. # ── 全局去重: 扫所有 q*/procedures/*/ 目录名 → short_case 占用集 ──────────────────
  112. def _gather_claimed(runs_full: Path) -> Dict[str, str]:
  113. """扫所有 q*/procedures/*/ 返回 {short_case: rel_dir} —— 目录存在即视为占用,
  114. 无论它含 HTML 还是空、是否还在跑。多终端场景下, 一启动就 mkdir, 其他终端立刻见到。
  115. key 解析顺序: ① _meta.json 里的 full case_id → _short_case 化
  116. ② 目录名 `{form}_{short_case}` 直接抽 short_case
  117. """
  118. claimed: Dict[str, str] = {}
  119. if not runs_full.is_dir():
  120. return claimed
  121. for pd in runs_full.glob("q*/procedures/*"):
  122. if not pd.is_dir():
  123. continue
  124. key = None
  125. meta_path = pd / "_meta.json"
  126. if meta_path.exists():
  127. try:
  128. full = json.loads(meta_path.read_text(encoding="utf-8")).get("case_id")
  129. if full:
  130. key = _short_case(full)
  131. except Exception:
  132. pass
  133. if not key:
  134. parts = pd.name.split("_", 1)
  135. key = parts[1] if len(parts) == 2 else pd.name
  136. claimed[key] = pd.relative_to(runs_full).as_posix()
  137. return claimed
  138. def _write_meta(out_dir: Path, case_id: str, from_q: str, form: str, score: float) -> None:
  139. """_meta.json 记 full case_id + 出处 + 时间, 供其他终端/回查识别。"""
  140. out_dir.mkdir(parents=True, exist_ok=True)
  141. (out_dir / "_meta.json").write_text(json.dumps({
  142. "case_id": case_id, "from_q": from_q, "form": form, "score": round(score, 4),
  143. "started_at": datetime.now().isoformat(timespec="seconds"),
  144. }, ensure_ascii=False, indent=2), encoding="utf-8")
  145. def _short_case(case_id: str) -> str:
  146. """xhs_69e223a700000000230267c8 → xhs_69e223a7 (前缀 + 时间戳段, 够辨识)。"""
  147. m = re.match(r"^([a-z]+)_([0-9a-f]{8})", case_id or "")
  148. return f"{m.group(1)}_{m.group(2)}" if m else (case_id or "unknown")[:20]
  149. def _source_to_dsl_input(result: Dict[str, Any]) -> Dict[str, Any]:
  150. """form_*.json 的 result 转成 run_procedure_dsl 接受的 source 格式
  151. (新 schema: images 扁平数组)。"""
  152. post = result.get("post") or {}
  153. return {
  154. "title": post.get("title", ""),
  155. "link": result.get("source_url") or post.get("link") or "",
  156. "body_text": post.get("body_text", "") or "",
  157. "images": [u for u in (post.get("images") or []) if isinstance(u, str) and u],
  158. "publish_timestamp": post.get("publish_timestamp", ""),
  159. "channel_account_name": post.get("channel_account_name") or post.get("channel") or "",
  160. }
  161. def _already_done(out_dir: Path) -> bool:
  162. """目录已存且含 case-*.html 视为已跑完 (跟 run_procedure_dsl 落盘约定一致)。"""
  163. return out_dir.is_dir() and any(out_dir.glob("case-*.html"))
  164. def run_one(source_json_path: Path, out_dir_abs: Path, args: argparse.Namespace) -> int:
  165. """启动 run_procedure_dsl.py 子进程, 返回 exit code。"""
  166. cmd = [
  167. sys.executable, str(RUN_DSL),
  168. str(source_json_path),
  169. "--out-dir", str(out_dir_abs),
  170. "--model", args.model,
  171. "--max-turns", str(args.max_turns),
  172. "--max-retries", str(args.max_retries),
  173. ]
  174. if args.resume:
  175. cmd.append("--resume")
  176. if args.version:
  177. cmd.extend(["--version", args.version])
  178. print(f" $ {' '.join(cmd)}")
  179. return subprocess.call(cmd)
  180. def main() -> None:
  181. sys.stdout.reconfigure(encoding="utf-8")
  182. p = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter)
  183. p.add_argument("--output-dir", default=str(RUNS_FULL_DEFAULT),
  184. help=f"runs_full 路径 (默认 {RUNS_FULL_DEFAULT.name})")
  185. p.add_argument("--only-q", default=None,
  186. help="离散指定 q (如 '36' / 'q0036' / '36,55,720')")
  187. p.add_argument("--start", type=int, default=0, help="区间起始 idx (默认 0)")
  188. p.add_argument("--count", type=int, default=10, help="区间取几条 q (默认 10)")
  189. p.add_argument("--top-k", type=int, default=1, help="每 form 取几条 (默认 1)")
  190. p.add_argument("--model", default="claude-sonnet-4-6", help="透传给 run_procedure_dsl")
  191. p.add_argument("--max-turns", type=int, default=300, help="透传给 run_procedure_dsl")
  192. p.add_argument("--max-retries", type=int, default=3, help="透传给 run_procedure_dsl")
  193. p.add_argument("--version", default=None, help="spec 版本, 透传 (e.g. test / backup)")
  194. p.add_argument("--resume", action="store_true", help="透传 --resume")
  195. p.add_argument("--force", action="store_true",
  196. help="目录已存在也强制纳入候选 + 重跑 (会覆盖原 _scratch / workflow.json)")
  197. p.add_argument("--dry-run", action="store_true", help="只列出会跑啥, 不真跑")
  198. args = p.parse_args()
  199. runs_full = Path(args.output_dir).resolve()
  200. if not runs_full.is_dir():
  201. sys.exit(f"❌ {runs_full} 不存在")
  202. if not RUN_DSL.exists():
  203. sys.exit(f"❌ {RUN_DSL} 不存在")
  204. # 选 q 集合
  205. all_q = sorted([d for d in runs_full.iterdir() if d.is_dir() and d.name.startswith("q")],
  206. key=lambda p: _norm_q(p.name))
  207. if args.only_q:
  208. wanted = {f"q{_norm_q(t):04d}" for t in args.only_q.split(",") if t.strip()}
  209. q_dirs = [d for d in all_q if d.name in wanted]
  210. if not q_dirs:
  211. sys.exit(f"❌ --only-q {args.only_q!r} 在 {runs_full} 下没匹配")
  212. else:
  213. q_dirs = all_q[args.start : args.start + args.count]
  214. if not q_dirs:
  215. sys.exit(f"❌ 区间 [{args.start}:{args.start+args.count}] 取不到 q")
  216. # ── 启动前: 全局扫描所有 q*/procedures/*/ 目录名占用集 ────────────────────
  217. global_claimed = {} if args.force else _gather_claimed(runs_full)
  218. print(f"📋 {len(q_dirs)} 个 q, top-{args.top_k}/form, "
  219. f"目标 ≤{len(q_dirs) * 3 * args.top_k} 条工序 (去重后会少)")
  220. print(f"📦 全局占用 case 数 = {len(global_claimed)}"
  221. f"{' (--force 已忽略)' if args.force else ''}")
  222. n_ran = n_skipped = n_failed = n_no_cap = n_global_skip = 0
  223. # 本进程已规划的 case_id (即使还没开跑也算"占用", 防同进程后续 q 重复选)
  224. local_claimed: Set[str] = set()
  225. for qd in q_dirs:
  226. forms = sorted(qd.glob("form_*.json"))
  227. if not forms:
  228. print(f"\n⏭️ {qd.name}: 无 form_*.json")
  229. continue
  230. print(f"\n▶ {qd.name}")
  231. # exclude = 全局已占用 (磁盘上 procedure 目录存在的) + 本进程已规划 (跨 q 也屏蔽)
  232. # 所有键统一 _short_case 形式 (跟目录名约定一致)
  233. excluded: Set[str] = set(global_claimed) | local_claimed
  234. plan: List[Tuple[str, Dict[str, Any], float]] = []
  235. for fp in forms:
  236. fkey = fp.stem.replace("form_", "") # A/B/C
  237. picks = _pick_candidates(fp, excluded, args.top_k)
  238. for score, r in picks:
  239. cid_full = r.get("case_id") or ""
  240. cid_key = _short_case(cid_full)
  241. plan.append((fkey, r, score))
  242. excluded.add(cid_key) # 本 q 内其他 form 不会再选
  243. local_claimed.add(cid_key) # 后续 q 也不再选
  244. title = (r.get("post") or {}).get("title", "")[:30]
  245. print(f" [{fkey}] score={score:.2f} {cid_full[:24]} {title}")
  246. if len(picks) < args.top_k:
  247. # 想取 top-k 但被 excluded 顶替到不够 → 报一行说明
  248. n_global_skip += args.top_k - len(picks)
  249. print(f" [{fkey}] 候选不足 top-{args.top_k} (其他 q 已占了同帖)")
  250. if not plan:
  251. print(f" ⚠️ {qd.name} 无 工序 类候选 (无新 case 或都被 全局去重 顶了)")
  252. n_no_cap += 1
  253. continue
  254. # 落产物
  255. proc_root = qd / "procedures"
  256. proc_root.mkdir(exist_ok=True)
  257. for fkey, r, score in plan:
  258. cid = r.get("case_id") or "unknown"
  259. name = f"{fkey}_{_short_case(cid)}"
  260. out_dir = proc_root / name
  261. if _already_done(out_dir) and not args.force:
  262. print(f" ⏭️ {name}/ 已存在 case-*.html (用 --force 覆盖)")
  263. n_skipped += 1
  264. continue
  265. # 写 _source.json (DSL 输入) + _meta.json (case_id 索引, 供其他终端识别)
  266. out_dir.mkdir(parents=True, exist_ok=True)
  267. src_path = out_dir / "_source.json"
  268. src_path.write_text(
  269. json.dumps(_source_to_dsl_input(r), ensure_ascii=False, indent=2),
  270. encoding="utf-8")
  271. if args.dry_run:
  272. print(f" [dry-run] would run on {src_path.name} → {out_dir.name}/")
  273. continue
  274. # _meta.json 即占位标识: 其他终端扫到 out_dir/_meta.json 就视作 已占
  275. _write_meta(out_dir, case_id=cid, from_q=qd.name, form=fkey, score=score)
  276. try:
  277. code = run_one(src_path, out_dir, args)
  278. except KeyboardInterrupt:
  279. print(f"\n⚠️ Ctrl-C 中断, {name}/ 保留 (含 _meta.json), 下次跑会跳过这条; 想复跑用 --force 或手动删 dir")
  280. raise
  281. if code == 0:
  282. n_ran += 1
  283. global_claimed[_short_case(cid)] = (out_dir.relative_to(runs_full)).as_posix()
  284. print(f" ✓ {name} done")
  285. else:
  286. n_failed += 1
  287. print(f" ❌ {name} exit={code} (目录已占; 想重试 --force 或 手动 rm 该 dir)")
  288. # 本 q 的 procedure 跑完 → 把合并 json 增量写进 workflows/
  289. # (dry-run 不产 workflow.json, 跳过; build 失败不应中断批量, 故 try 包住)
  290. if not args.dry_run:
  291. try:
  292. n_wf = build_workflows.write_run(qd.name, runs_dir=runs_full)
  293. print(f" 🧩 workflows/: {qd.name} 写出 {n_wf} 个合并 json")
  294. except Exception as e:
  295. print(f" ⚠️ build_workflows 失败 ({qd.name}): {e}")
  296. print(f"\n{'='*60}")
  297. print(f"📊 完成: 跑 {n_ran} / 本次已存目录跳过 {n_skipped} / 全局占用顶替 {n_global_skip} / "
  298. f"失败 {n_failed} / 无候选 q {n_no_cap}")
  299. if __name__ == "__main__":
  300. main()