batch_extract_procedures.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329
  1. """批量化跑 procedure-dsl: 把这次列出的所有 q 的全部帖子 (每 q 三 form ≈90 帖) 倒进
  2. 一个大池, 跨 q 按评分降序取全局 top-N (默认 20) 个未被丢弃的"工序"帖, 喂给
  3. procedure-dsl/run_procedure_dsl.py 提取工序。每条仍落回它自己所属那个 q 的目录:
  4. runs_full/qNNNN/procedures/{form}_{short_case}/。
  5. ⚠️ top-N 是"这批 q 加起来总共 N 条", 不是"每 q N 条"。列 5 个 q 也只跑全局最高的 N 条,
  6. 可能全集中在某几个 q 上。想多跑就调大 --top-n 或换一批 q。
  7. 筛选: ① 非异常 (anomaly) 且 decision == report (即未被丢弃)
  8. ② 知识类型 含 "工序" (procedure) —— 工序帖才有端到端流程可抽; 能力/工具帖抽不出
  9. 丢弃判定与评分都直接复用 server.evaluate_result (跟前端 /api/data 同一真源),
  10. schema 再变也不会两边漂移; 阈值见 server.py (制作相关性/时效/综合均分)。
  11. 评分: server.evaluate_result(r)["overall"] —— 相关性均分与质量均分的两级平均。
  12. 选条: top-N 是「累计目标」—— workflows/ 里这些 q 最终要有 N 条; 本次预算 = N − 已完成数。
  13. 全部 q 合池后按 overall 降序取 budget 条。去重:
  14. ① 池内同帖 (case_id 撞, 跨 form / 跨 q) 只留最高分那条, 落在它得分最高的 q 下
  15. ② 选片只排除「已完成」(workflows/ 里有的); 在跑/跑挂的不排, 交给跑时原子 mkdir 分流
  16. 产物: HTML/MD/workflow.json/_source.json/_meta.json 全落
  17. runs_full/qNNNN/procedures/{form}_{short_case}/
  18. 并发: 多窗口直接粘贴同一条指令并行 —— 每条 procedure 靠 mkdir(exist_ok=False) 原子认领,
  19. 谁先建谁跑、其余撞 FileExistsError 即跳, 自动把 budget 条瓜分到各窗口, 不重不超额。
  20. (预算按 workflows/ 已完成数算, 所以各窗口算出的计划对齐同一 top-N 前沿, 不会越跑越多)
  21. --force 不走认领锁 (允许覆盖), 故 --force 不适合多窗口并行。
  22. 用法:
  23. # 跨多个 q 合池, 全局取 top-20
  24. python batch_extract_procedures.py --only-q q0018,q0036,q2300
  25. # 区间内的 q 合池
  26. python batch_extract_procedures.py --start 0 --count 5
  27. # 全局只取 top-5 + 强制重跑 (--top-k 是 --top-n 的兼容别名)
  28. python batch_extract_procedures.py --only-q q0018,q0036 --top-n 5 --force
  29. # 仅列出会跑哪些 (不真跑)
  30. python batch_extract_procedures.py --only-q q0018,q0036 --dry-run
  31. """
  32. import argparse
  33. import json
  34. import re
  35. import subprocess
  36. import sys
  37. from datetime import datetime
  38. from pathlib import Path
  39. from typing import Any, Dict, List, Set, Tuple
  40. import build_workflows # 复用其 write_run: 跑完一个 q 就把合并 json 写进 workflows/
  41. from server import evaluate_result # 评分 + 丢弃判定的唯一真源 (与前端 /api/data 一致)
  42. HERE = Path(__file__).resolve().parent
  43. RUNS_FULL_DEFAULT = HERE / "runs_full"
  44. RUN_DSL = HERE / "procedure-dsl" / "run_procedure_dsl.py"
  45. # 图过多 (>此值) 直接不选这帖, 名额顺延给下一名 —— "最次则跳过" 那一档。
  46. # 上限以内的多图帖由 run_procedure_dsl 侧降分辨率/拼九宫格扛 (见那边 _IMG_MONTAGE_OVER)。
  47. # 72 = 8 个九宫格; 再多拼图也压不住, 放弃更省。
  48. _IMG_SKIP_CAP = 72
  49. def _norm_q(s: str) -> int:
  50. m = re.search(r"\d+", s or "")
  51. if not m:
  52. sys.exit(f"❌ q 参数不含数字: {s!r}")
  53. return int(m.group())
  54. def _pick_pool_candidates(q_dirs: List[Path], exclude_ids: Set[str], top_n: int
  55. ) -> List[Tuple[str, str, Dict[str, Any], float, float]]:
  56. """把列出的所有 q 的全部 form (每 q ≈90 帖) 倒进一个大池, 跨 q 取全局 top_n 个工序帖。
  57. 返回 [(q_name, form_key, result, norm, raw), ...] (已按归一化分降序)。
  58. 评分: 排名用 *归一化分* norm = overall / 满分 —— 因为数据里有两套评分量纲 (mod 0-10 /
  59. old 0-5), 直接比 overall 会让 0-5 的 q 永远被 0-10 压死。norm 统一到 0-1, 跨 q
  60. 可比; 同 schema 批次里 norm 与 overall 同序, 不影响。raw=overall 仅供展示/落档。
  61. 筛选: 非异常 + decision==report (未丢弃) + 知识类型含 procedure —— 评分与丢弃判定
  62. 全部走 server.evaluate_result, 与前端 /api/data 同一真源。
  63. 去重: ① 池内同 case_id 撞了只留最高分那条 (同帖可能被多 form / 多 q 同时召回,
  64. 最终落在它得分最高的那个 q 目录下)
  65. ② 跳过 exclude_ids (磁盘上 procedure 目录已存在的), 键统一 _short_case 形式。"""
  66. # short_case -> (norm, raw, q, form, r)
  67. pooled: Dict[str, Tuple[float, float, str, str, Dict[str, Any]]] = {}
  68. n_img_skip = 0 # 因图过多 (>_IMG_SKIP_CAP) 被放弃的工序帖数, 末尾汇报
  69. for qd in q_dirs:
  70. for fp in sorted(qd.glob("form_*.json")):
  71. try:
  72. d = json.loads(fp.read_text(encoding="utf-8"))
  73. except Exception as e:
  74. print(f" ⚠️ 读 {qd.name}/{fp.name} 失败: {e}")
  75. continue
  76. fkey = fp.stem.replace("form_", "") # A/B/C
  77. for r in d.get("results", []):
  78. ev = evaluate_result(r)
  79. if ev["anomaly"] or ev["decision"] != "report":
  80. continue # 异常 / 被丢弃 → 不参选
  81. if "procedure" not in ev["knowledge_type"]:
  82. continue # 只跑工序帖
  83. n_img = len((r.get("post") or {}).get("images") or [])
  84. if n_img > _IMG_SKIP_CAP:
  85. n_img_skip += 1 # 图太多, 拼图也压不住 → 放弃, 名额让给下一名
  86. continue
  87. raw = ev["overall"] or 0.0
  88. norm = raw / (ev["scale"] or 5) # 归一化到 0-1, 跨 schema 可比
  89. ck = _short_case(r.get("case_id") or "")
  90. if ck not in pooled or norm > pooled[ck][0]:
  91. pooled[ck] = (norm, raw, qd.name, fkey, r) # 同帖跨 form/q 撞分, 留最高
  92. if n_img_skip:
  93. print(f" ⓘ 因图过多 (>{_IMG_SKIP_CAP} 张) 放弃 {n_img_skip} 个工序帖 (名额已顺延)")
  94. ranked = sorted(pooled.items(), key=lambda kv: kv[1][0], reverse=True)
  95. out: List[Tuple[str, str, Dict[str, Any], float, float]] = []
  96. for ck, (norm, raw, qname, fkey, r) in ranked:
  97. if ck in exclude_ids:
  98. continue # 磁盘上已占
  99. out.append((qname, fkey, r, norm, raw))
  100. if len(out) >= top_n:
  101. break
  102. return out
  103. # ── 累计目标: 扫 workflows/ 里属于这些 q 的「已完成并合并」帖 → short_case 集 ───────
  104. # top-N 当「累计目标」用: 本次预算 = N − 已完成数 (见 main)。配合跑时原子 mkdir 分流,
  105. # 多窗口跑同一条指令也只会瓜分到 N 条、不超额 (不排在跑/跑挂的, 那些交给 mkdir 撞锁)。
  106. def _gather_done(workflows_dir: Path, q_names: Set[str]) -> Set[str]:
  107. """workflows/ 里属于 q_names 的已完成帖, 返回其 _short_case 集。
  108. 文件名形如 {qname}_{form}_{short}.json (build_workflows.write_one 落盘约定)。"""
  109. done: Set[str] = set()
  110. if not workflows_dir.is_dir():
  111. return done
  112. for qn in q_names:
  113. for f in workflows_dir.glob(f"{qn}_*.json"):
  114. folder = f.stem[len(qn) + 1:] # 去 '{qname}_' → '{form}_{short}'
  115. parts = folder.split("_", 1)
  116. if len(parts) == 2:
  117. done.add(parts[1]) # short_case
  118. return done
  119. def _write_meta(out_dir: Path, case_id: str, from_q: str, form: str,
  120. score: float, score_norm: float) -> None:
  121. """_meta.json 记 full case_id + 出处 + 时间, 供其他终端/回查识别。
  122. score=原始 overall (量纲随 schema), score_norm=归一化 0-1 (跨 q 排名实际用的分)。"""
  123. out_dir.mkdir(parents=True, exist_ok=True)
  124. (out_dir / "_meta.json").write_text(json.dumps({
  125. "case_id": case_id, "from_q": from_q, "form": form,
  126. "score": round(score, 4), "score_norm": round(score_norm, 4),
  127. "started_at": datetime.now().isoformat(timespec="seconds"),
  128. }, ensure_ascii=False, indent=2), encoding="utf-8")
  129. def _short_case(case_id: str) -> str:
  130. """xhs_69e223a700000000230267c8 → xhs_69e223a7 (前缀 + 时间戳段, 够辨识)。"""
  131. m = re.match(r"^([a-z]+)_([0-9a-f]{8})", case_id or "")
  132. return f"{m.group(1)}_{m.group(2)}" if m else (case_id or "unknown")[:20]
  133. def _source_to_dsl_input(result: Dict[str, Any]) -> Dict[str, Any]:
  134. """form_*.json 的 result 转成 run_procedure_dsl 接受的 source 格式
  135. (新 schema: images 扁平数组)。"""
  136. post = result.get("post") or {}
  137. return {
  138. "title": post.get("title", ""),
  139. "link": result.get("source_url") or post.get("link") or "",
  140. "body_text": post.get("body_text", "") or "",
  141. "images": [u for u in (post.get("images") or []) if isinstance(u, str) and u],
  142. "publish_timestamp": post.get("publish_timestamp", ""),
  143. "channel_account_name": post.get("channel_account_name") or post.get("channel") or "",
  144. }
  145. def _already_done(out_dir: Path) -> bool:
  146. """目录已存且含 case-*.html 视为已跑完 (跟 run_procedure_dsl 落盘约定一致)。"""
  147. return out_dir.is_dir() and any(out_dir.glob("case-*.html"))
  148. def run_one(source_json_path: Path, out_dir_abs: Path, args: argparse.Namespace) -> int:
  149. """启动 run_procedure_dsl.py 子进程, 返回 exit code。"""
  150. cmd = [
  151. sys.executable, str(RUN_DSL),
  152. str(source_json_path),
  153. "--out-dir", str(out_dir_abs),
  154. "--model", args.model,
  155. "--max-turns", str(args.max_turns),
  156. "--max-retries", str(args.max_retries),
  157. ]
  158. if args.resume:
  159. cmd.append("--resume")
  160. if args.version:
  161. cmd.extend(["--version", args.version])
  162. print(f" $ {' '.join(cmd)}")
  163. return subprocess.call(cmd)
  164. def main() -> None:
  165. sys.stdout.reconfigure(encoding="utf-8")
  166. p = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter)
  167. p.add_argument("--output-dir", default=str(RUNS_FULL_DEFAULT),
  168. help=f"runs_full 路径 (默认 {RUNS_FULL_DEFAULT.name})")
  169. p.add_argument("--only-q", default=None,
  170. help="离散指定 q (如 '36' / 'q0036' / '36,55,720')")
  171. p.add_argument("--start", type=int, default=0, help="区间起始 idx (默认 0)")
  172. p.add_argument("--count", type=int, default=10, help="区间取几条 q (默认 10)")
  173. p.add_argument("--top-n", "--top-k", dest="top_n", type=int, default=20,
  174. help="每个 q 合池后取评分前几条 (默认 20; --top-k 为兼容别名)")
  175. p.add_argument("--model", default="claude-sonnet-4-6", help="透传给 run_procedure_dsl")
  176. p.add_argument("--max-turns", type=int, default=300, help="透传给 run_procedure_dsl")
  177. p.add_argument("--max-retries", type=int, default=3, help="透传给 run_procedure_dsl")
  178. p.add_argument("--version", default=None, help="spec 版本, 透传 (e.g. test / backup)")
  179. p.add_argument("--resume", action="store_true", help="透传 --resume")
  180. p.add_argument("--force", action="store_true",
  181. help="目录已存在也强制纳入候选 + 重跑 (会覆盖原 _scratch / workflow.json)")
  182. p.add_argument("--dry-run", action="store_true", help="只列出会跑啥, 不真跑")
  183. args = p.parse_args()
  184. runs_full = Path(args.output_dir).resolve()
  185. if not runs_full.is_dir():
  186. sys.exit(f"❌ {runs_full} 不存在")
  187. if not RUN_DSL.exists():
  188. sys.exit(f"❌ {RUN_DSL} 不存在")
  189. # 选 q 集合
  190. all_q = sorted([d for d in runs_full.iterdir() if d.is_dir() and d.name.startswith("q")],
  191. key=lambda p: _norm_q(p.name))
  192. if args.only_q:
  193. wanted = {f"q{_norm_q(t):04d}" for t in args.only_q.split(",") if t.strip()}
  194. q_dirs = [d for d in all_q if d.name in wanted]
  195. if not q_dirs:
  196. sys.exit(f"❌ --only-q {args.only_q!r} 在 {runs_full} 下没匹配")
  197. else:
  198. q_dirs = all_q[args.start : args.start + args.count]
  199. if not q_dirs:
  200. sys.exit(f"❌ 区间 [{args.start}:{args.start+args.count}] 取不到 q")
  201. # ── 累计目标: top-N 是「workflows/ 里这些 q 最终要有 N 条」, 本次只补差额 ──────
  202. q_names = {d.name for d in q_dirs}
  203. workflows_dir = Path(build_workflows.OUT_DIR)
  204. done_set: Set[str] = set() if args.force else _gather_done(workflows_dir, q_names)
  205. budget = args.top_n if args.force else max(0, args.top_n - len(done_set))
  206. print(f"📋 {len(q_dirs)} 个 q 合池, 累计目标 top-{args.top_n}; "
  207. f"workflows/ 已有 {len(done_set)} 条 → 本次还需 {budget} 条"
  208. f"{' (--force 忽略已完成)' if args.force else ''}")
  209. n_ran = n_skipped = n_failed = 0
  210. if budget == 0:
  211. print("\n✅ 累计目标已达成 (workflows/ 里这些 q 已够 top-N), 无需再跑.")
  212. print(f"\n{'='*60}\n📊 完成: 跑 0 / 已存跳过 0 / 失败 0")
  213. return
  214. # ── 跨 q 合池挑 top-budget ──────────────────────────────────────────────────
  215. # 只排除「已完成」(done_set); 在跑/跑挂的留在池里, 跑时靠原子 mkdir 占位分流到各窗口。
  216. plan = _pick_pool_candidates(q_dirs, done_set, budget)
  217. if not plan:
  218. print("\n⚠️ 这批 q 合池后无可跑候选 (全被丢弃/异常过滤, 或都已完成)")
  219. print(f"\n{'='*60}\n📊 完成: 跑 0 / 已存跳过 0 / 失败 0")
  220. return
  221. print(f"\n▶ 本次挑出 {len(plan)} 条 (补差额, score=归一化 0-1, 括号内原始/满分):")
  222. for qname, fkey, r, norm, raw in plan:
  223. cid_full = r.get("case_id") or ""
  224. title = (r.get("post") or {}).get("title", "")[:30]
  225. print(f" [{qname} {fkey}] score={norm:.3f} ({raw:g}) {cid_full[:24]} {title}")
  226. if len(plan) < budget:
  227. # 池里够格的工序帖不足本次预算 (被丢弃过滤掉 / 已完成)
  228. print(f" ⓘ 可跑候选不足本次预算 {budget} (仅 {len(plan)} 条; 想多跑就加 q 或调大 --top-n)")
  229. # ── 落产物 + 跑; 每跑完一条就立刻把它的合并 json 写进 workflows/ ──────────────
  230. def _emit_workflow(qname: str, folder: str) -> None:
  231. """单条 procedure 跑完 (或已存) → 立刻写出 workflows/{qname}_{folder}.json。
  232. (dry-run 不产 workflow.json 故跳过; build 失败不应中断批量, 故 try 包住)"""
  233. if args.dry_run:
  234. return
  235. try:
  236. if build_workflows.write_one(qname, folder, runs_dir=runs_full):
  237. print(f" 🧩 workflows/: {qname}_{folder}.json 已写")
  238. except Exception as e:
  239. print(f" ⚠️ build_workflows 失败 ({qname}/{folder}): {e}")
  240. for qname, fkey, r, norm, raw in plan:
  241. cid = r.get("case_id") or "unknown"
  242. name = f"{fkey}_{_short_case(cid)}"
  243. out_dir = runs_full / qname / "procedures" / name
  244. # dry-run 绝不碰磁盘 (建目录会污染状态), 只报会不会跑
  245. if args.dry_run:
  246. mark = ("已完成" if _already_done(out_dir)
  247. else "已占(在跑/跑挂)" if out_dir.exists() else "would run")
  248. print(f" [dry-run] {mark}: {qname}/{name}/")
  249. continue
  250. # ── 原子认领: mkdir(exist_ok=False) 内核级唯一, 谁先建谁跑 ──────────────
  251. # 多窗口跑同一指令时, 同一条 procedure 只有一个窗口能建成目录、其余撞 FileExistsError
  252. # → 跳过 (那条已被别窗口领走 / 或是历史跑挂的残目录)。--force 时允许覆盖已存。
  253. try:
  254. out_dir.mkdir(parents=True, exist_ok=args.force)
  255. except FileExistsError:
  256. if _already_done(out_dir):
  257. _emit_workflow(qname, name) # 跑完但没合并的, 顺手补进 workflows/
  258. print(f" ⏭️ {qname}/{name}/ 已完成, 跳过")
  259. else:
  260. print(f" ⏭️ {qname}/{name}/ 已被别窗口认领 / 或历史跑挂, 跳过")
  261. n_skipped += 1
  262. continue
  263. # 认领成功 (此刻 out_dir 归本窗口) → 写 _source.json + _meta.json
  264. src_path = out_dir / "_source.json"
  265. src_path.write_text(
  266. json.dumps(_source_to_dsl_input(r), ensure_ascii=False, indent=2),
  267. encoding="utf-8")
  268. _write_meta(out_dir, case_id=cid, from_q=qname, form=fkey, score=raw, score_norm=norm)
  269. try:
  270. code = run_one(src_path, out_dir, args)
  271. except KeyboardInterrupt:
  272. print(f"\n⚠️ Ctrl-C 中断, {qname}/{name}/ 保留 (含 _meta.json), 下次跑会跳过这条; 想复跑用 --force 或手动删 dir")
  273. raise
  274. if code == 0:
  275. n_ran += 1
  276. print(f" ✓ {qname}/{name} done")
  277. _emit_workflow(qname, name) # 跑完一个就出一个, 不等同 q 其他帖子
  278. else:
  279. n_failed += 1
  280. print(f" ❌ {qname}/{name} exit={code} (目录已占; 想重试 --force 或 手动 rm 该 dir)")
  281. print(f"\n{'='*60}")
  282. print(f"📊 完成: 跑 {n_ran} / 跳过(已完成或别窗口领走) {n_skipped} / 失败 {n_failed} "
  283. f"(累计目标 top-{args.top_n}, 本次预算 {budget}, 选中 {len(plan)})")
  284. if __name__ == "__main__":
  285. main()