"""批量化跑 procedure-dsl: 对每个 q 的三 form 选评分最高的"工序"帖子, 喂给 procedure-dsl/run_procedure_dsl.py 提取工序, HTML 直接落在 runs_full/qNNNN/procedures/{form}_{short_case}/ 下。 筛选: llm_evaluation.知识类型 含 "工序" (兼容英文 "procedure") 且非 _error —— 工序帖才有端到端流程可抽; 能力/工具帖太原子, 抽不出 procedure。 评分: 两级均值, 避免子项多的组天然占便宜: group_rel = mean(相关性 下所有得分) group_qual = mean(质量 下所有得分, 含固定+动态) final = mean(group_rel, group_qual) 选条: 每 form top-K (默认 1)。两层去重, 撞了顺延下一个候选: ① 本 q 跨 form (同帖只跑一次) ② 全局: 扫所有 q*/procedures/*/ 目录名 → 抽 short_case → 已存在即视为占用 (无论跑完没有, 跑挂了也算占了 case; 想强复跑用 --force) 产物: HTML/MD/workflow.json/_source.json/_meta.json 全落 runs_full/qNNNN/procedures/{form}_{short_case}/ 跳过: 目录已存 → skip (--force 可强覆盖) 并发: 多终端开几个就跑几个; 每条 procedure 一启动就 mkdir, 其他终端立刻见到目录视作已占 用法: # 单 q 试 python batch_extract_procedures.py --only-q q0036 # 区间 python batch_extract_procedures.py --start 0 --count 5 # top-3 + 强制重跑 python batch_extract_procedures.py --only-q q0036 --top-k 3 --force # 仅列出会跑哪些 (不真跑) python batch_extract_procedures.py --only-q q0036 --dry-run """ import argparse import json import os import re import subprocess import sys from datetime import datetime from pathlib import Path from statistics import mean from typing import Any, Dict, List, Optional, Set, Tuple HERE = Path(__file__).resolve().parent RUNS_FULL_DEFAULT = HERE / "runs_full" RUN_DSL = HERE / "procedure-dsl" / "run_procedure_dsl.py" _KT_PROCEDURE = {"工序", "procedure"} # 工序没改名, 仅兼容英文枚举 def _norm_q(s: str) -> int: m = re.search(r"\d+", s or "") if not m: sys.exit(f"❌ q 参数不含数字: {s!r}") return int(m.group()) def _has_procedure(ev: Dict[str, Any]) -> bool: """llm_evaluation.知识类型 是否声明了 工序 类型 (含英文 procedure)。""" kt = ev.get("知识类型") or ev.get("knowledge_type") or [] if isinstance(kt, str): kt = [kt] return any(k in _KT_PROCEDURE for k in kt) if isinstance(kt, list) else False def _safe_float(x: Any) -> Optional[float]: try: return float(x) except (TypeError, ValueError): return None def _all_scores(obj: Any) -> List[float]: """递归挖一棵嵌套 dict 里所有 '得分' 叶子的数值。""" out: List[float] = [] if isinstance(obj, dict): for k, v in obj.items(): if k == "得分": f = _safe_float(v) if f is not None: out.append(f) elif isinstance(v, (dict, list)): out.extend(_all_scores(v)) elif isinstance(obj, list): for it in obj: out.extend(_all_scores(it)) return out def _composite_score(ev: Dict[str, Any]) -> float: """两级均值: group_rel = mean(相关性 下所有 得分) group_qual = mean(质量 下所有 得分, 含固定+动态全部子项) final = mean(group_rel, group_qual) 某一组缺失 (该组下没任何得分字段) → 降级用另一组的均值; 都缺 → 0。""" rel_scores = _all_scores(ev.get("相关性")) qual_scores = _all_scores(ev.get("质量")) rel_mean = mean(rel_scores) if rel_scores else None qual_mean = mean(qual_scores) if qual_scores else None parts = [m for m in (rel_mean, qual_mean) if m is not None] return mean(parts) if parts else 0.0 def _pick_candidates(form_path: Path, exclude_ids: Set[str], top_k: int ) -> List[Tuple[float, Dict[str, Any]]]: """读 form_*.json, 取评分最高的 top_k 个工序帖, 跳过 exclude_ids 里的 case_id (已被其他 q 跑过 / 正在跑 / 本进程已规划)。exclude_ids 用 _short_case 形式。""" try: d = json.loads(form_path.read_text(encoding="utf-8")) except Exception as e: print(f" ⚠️ 读 {form_path.name} 失败: {e}") return [] cands: List[Tuple[float, Dict[str, Any]]] = [] for r in d.get("results", []): ev = r.get("llm_evaluation") or {} if ev.get("_error") or not _has_procedure(ev): continue cands.append((_composite_score(ev), r)) cands.sort(key=lambda x: x[0], reverse=True) out: List[Tuple[float, Dict[str, Any]]] = [] for s, r in cands: # 用 _short_case 化的 case_id 对齐 global 状态 / 目录命名 if _short_case(r.get("case_id") or "") in exclude_ids: continue out.append((s, r)) if len(out) >= top_k: break return out # ── 全局去重: 扫所有 q*/procedures/*/ 目录名 → short_case 占用集 ────────────────── def _gather_claimed(runs_full: Path) -> Dict[str, str]: """扫所有 q*/procedures/*/ 返回 {short_case: rel_dir} —— 目录存在即视为占用, 无论它含 HTML 还是空、是否还在跑。多终端场景下, 一启动就 mkdir, 其他终端立刻见到。 key 解析顺序: ① _meta.json 里的 full case_id → _short_case 化 ② 目录名 `{form}_{short_case}` 直接抽 short_case """ claimed: Dict[str, str] = {} if not runs_full.is_dir(): return claimed for pd in runs_full.glob("q*/procedures/*"): if not pd.is_dir(): continue key = None meta_path = pd / "_meta.json" if meta_path.exists(): try: full = json.loads(meta_path.read_text(encoding="utf-8")).get("case_id") if full: key = _short_case(full) except Exception: pass if not key: parts = pd.name.split("_", 1) key = parts[1] if len(parts) == 2 else pd.name claimed[key] = pd.relative_to(runs_full).as_posix() return claimed def _write_meta(out_dir: Path, case_id: str, from_q: str, form: str, score: float) -> None: """_meta.json 记 full case_id + 出处 + 时间, 供其他终端/回查识别。""" out_dir.mkdir(parents=True, exist_ok=True) (out_dir / "_meta.json").write_text(json.dumps({ "case_id": case_id, "from_q": from_q, "form": form, "score": round(score, 4), "started_at": datetime.now().isoformat(timespec="seconds"), }, ensure_ascii=False, indent=2), encoding="utf-8") def _short_case(case_id: str) -> str: """xhs_69e223a700000000230267c8 → xhs_69e223a7 (前缀 + 时间戳段, 够辨识)。""" m = re.match(r"^([a-z]+)_([0-9a-f]{8})", case_id or "") return f"{m.group(1)}_{m.group(2)}" if m else (case_id or "unknown")[:20] def _source_to_dsl_input(result: Dict[str, Any]) -> Dict[str, Any]: """form_*.json 的 result 转成 run_procedure_dsl 接受的 source 格式 (新 schema: images 扁平数组)。""" post = result.get("post") or {} return { "title": post.get("title", ""), "link": result.get("source_url") or post.get("link") or "", "body_text": post.get("body_text", "") or "", "images": [u for u in (post.get("images") or []) if isinstance(u, str) and u], "publish_timestamp": post.get("publish_timestamp", ""), "channel_account_name": post.get("channel_account_name") or post.get("channel") or "", } def _already_done(out_dir: Path) -> bool: """目录已存且含 case-*.html 视为已跑完 (跟 run_procedure_dsl 落盘约定一致)。""" return out_dir.is_dir() and any(out_dir.glob("case-*.html")) def run_one(source_json_path: Path, out_dir_abs: Path, args: argparse.Namespace) -> int: """启动 run_procedure_dsl.py 子进程, 返回 exit code。""" cmd = [ sys.executable, str(RUN_DSL), str(source_json_path), "--out-dir", str(out_dir_abs), "--model", args.model, "--max-turns", str(args.max_turns), "--max-retries", str(args.max_retries), ] if args.resume: cmd.append("--resume") if args.version: cmd.extend(["--version", args.version]) print(f" $ {' '.join(cmd)}") return subprocess.call(cmd) def main() -> None: sys.stdout.reconfigure(encoding="utf-8") p = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) p.add_argument("--output-dir", default=str(RUNS_FULL_DEFAULT), help=f"runs_full 路径 (默认 {RUNS_FULL_DEFAULT.name})") p.add_argument("--only-q", default=None, help="离散指定 q (如 '36' / 'q0036' / '36,55,720')") p.add_argument("--start", type=int, default=0, help="区间起始 idx (默认 0)") p.add_argument("--count", type=int, default=10, help="区间取几条 q (默认 10)") p.add_argument("--top-k", type=int, default=1, help="每 form 取几条 (默认 1)") p.add_argument("--model", default="claude-sonnet-4-6", help="透传给 run_procedure_dsl") p.add_argument("--max-turns", type=int, default=300, help="透传给 run_procedure_dsl") p.add_argument("--max-retries", type=int, default=3, help="透传给 run_procedure_dsl") p.add_argument("--version", default=None, help="spec 版本, 透传 (e.g. test / backup)") p.add_argument("--resume", action="store_true", help="透传 --resume") p.add_argument("--force", action="store_true", help="目录已存在也强制纳入候选 + 重跑 (会覆盖原 _scratch / workflow.json)") p.add_argument("--dry-run", action="store_true", help="只列出会跑啥, 不真跑") args = p.parse_args() runs_full = Path(args.output_dir).resolve() if not runs_full.is_dir(): sys.exit(f"❌ {runs_full} 不存在") if not RUN_DSL.exists(): sys.exit(f"❌ {RUN_DSL} 不存在") # 选 q 集合 all_q = sorted([d for d in runs_full.iterdir() if d.is_dir() and d.name.startswith("q")], key=lambda p: _norm_q(p.name)) if args.only_q: wanted = {f"q{_norm_q(t):04d}" for t in args.only_q.split(",") if t.strip()} q_dirs = [d for d in all_q if d.name in wanted] if not q_dirs: sys.exit(f"❌ --only-q {args.only_q!r} 在 {runs_full} 下没匹配") else: q_dirs = all_q[args.start : args.start + args.count] if not q_dirs: sys.exit(f"❌ 区间 [{args.start}:{args.start+args.count}] 取不到 q") # ── 启动前: 全局扫描所有 q*/procedures/*/ 目录名占用集 ──────────────────── global_claimed = {} if args.force else _gather_claimed(runs_full) print(f"📋 {len(q_dirs)} 个 q, top-{args.top_k}/form, " f"目标 ≤{len(q_dirs) * 3 * args.top_k} 条工序 (去重后会少)") print(f"📦 全局占用 case 数 = {len(global_claimed)}" f"{' (--force 已忽略)' if args.force else ''}") n_ran = n_skipped = n_failed = n_no_cap = n_global_skip = 0 # 本进程已规划的 case_id (即使还没开跑也算"占用", 防同进程后续 q 重复选) local_claimed: Set[str] = set() for qd in q_dirs: forms = sorted(qd.glob("form_*.json")) if not forms: print(f"\n⏭️ {qd.name}: 无 form_*.json") continue print(f"\n▶ {qd.name}") # exclude = 全局已占用 (磁盘上 procedure 目录存在的) + 本进程已规划 (跨 q 也屏蔽) # 所有键统一 _short_case 形式 (跟目录名约定一致) excluded: Set[str] = set(global_claimed) | local_claimed plan: List[Tuple[str, Dict[str, Any], float]] = [] for fp in forms: fkey = fp.stem.replace("form_", "") # A/B/C picks = _pick_candidates(fp, excluded, args.top_k) for score, r in picks: cid_full = r.get("case_id") or "" cid_key = _short_case(cid_full) plan.append((fkey, r, score)) excluded.add(cid_key) # 本 q 内其他 form 不会再选 local_claimed.add(cid_key) # 后续 q 也不再选 title = (r.get("post") or {}).get("title", "")[:30] print(f" [{fkey}] score={score:.2f} {cid_full[:24]} {title}") if len(picks) < args.top_k: # 想取 top-k 但被 excluded 顶替到不够 → 报一行说明 n_global_skip += args.top_k - len(picks) print(f" [{fkey}] 候选不足 top-{args.top_k} (其他 q 已占了同帖)") if not plan: print(f" ⚠️ {qd.name} 无 工序 类候选 (无新 case 或都被 全局去重 顶了)") n_no_cap += 1 continue # 落产物 proc_root = qd / "procedures" proc_root.mkdir(exist_ok=True) for fkey, r, score in plan: cid = r.get("case_id") or "unknown" name = f"{fkey}_{_short_case(cid)}" out_dir = proc_root / name if _already_done(out_dir) and not args.force: print(f" ⏭️ {name}/ 已存在 case-*.html (用 --force 覆盖)") n_skipped += 1 continue # 写 _source.json (DSL 输入) + _meta.json (case_id 索引, 供其他终端识别) out_dir.mkdir(parents=True, exist_ok=True) src_path = out_dir / "_source.json" src_path.write_text( json.dumps(_source_to_dsl_input(r), ensure_ascii=False, indent=2), encoding="utf-8") if args.dry_run: print(f" [dry-run] would run on {src_path.name} → {out_dir.name}/") continue # _meta.json 即占位标识: 其他终端扫到 out_dir/_meta.json 就视作 已占 _write_meta(out_dir, case_id=cid, from_q=qd.name, form=fkey, score=score) try: code = run_one(src_path, out_dir, args) except KeyboardInterrupt: print(f"\n⚠️ Ctrl-C 中断, {name}/ 保留 (含 _meta.json), 下次跑会跳过这条; 想复跑用 --force 或手动删 dir") raise if code == 0: n_ran += 1 global_claimed[_short_case(cid)] = (out_dir.relative_to(runs_full)).as_posix() print(f" ✓ {name} done") else: n_failed += 1 print(f" ❌ {name} exit={code} (目录已占; 想重试 --force 或 手动 rm 该 dir)") print(f"\n{'='*60}") print(f"📊 完成: 跑 {n_ran} / 本次已存目录跳过 {n_skipped} / 全局占用顶替 {n_global_skip} / " f"失败 {n_failed} / 无候选 q {n_no_cap}") if __name__ == "__main__": main()