"""批量化跑 procedure-dsl: 把这次列出的所有 q 的全部帖子 (每 q 三 form ≈90 帖) 倒进 一个大池, 跨 q 按评分降序取全局 top-N (默认 20) 个未被丢弃的"工序"帖, 喂给 procedure-dsl/run_procedure_dsl.py 提取工序。每条仍落回它自己所属那个 q 的目录: runs_full/qNNNN/procedures/{form}_{short_case}/。 ⚠️ top-N 是"这批 q 加起来总共 N 条", 不是"每 q N 条"。列 5 个 q 也只跑全局最高的 N 条, 可能全集中在某几个 q 上。想多跑就调大 --top-n 或换一批 q。 筛选: ① 非异常 (anomaly) 且 decision == report (即未被丢弃) ② 知识类型 含 "工序" (procedure) —— 工序帖才有端到端流程可抽; 能力/工具帖抽不出 丢弃判定与评分都直接复用 server.evaluate_result (跟前端 /api/data 同一真源), schema 再变也不会两边漂移; 阈值见 server.py (制作相关性/时效/综合均分)。 评分: server.evaluate_result(r)["overall"] —— 相关性均分与质量均分的两级平均。 选条: top-N 是「累计目标」—— workflows/ 里这些 q 最终要有 N 条; 本次预算 = N − 已完成数。 全部 q 合池后按 overall 降序取 budget 条。去重: ① 池内同帖 (case_id 撞, 跨 form / 跨 q) 只留最高分那条, 落在它得分最高的 q 下 ② 选片只排除「已完成」(workflows/ 里有的); 在跑/跑挂的不排, 交给跑时原子 mkdir 分流 产物: HTML/MD/workflow.json/_source.json/_meta.json 全落 runs_full/qNNNN/procedures/{form}_{short_case}/ 并发: 多窗口直接粘贴同一条指令并行 —— 每条 procedure 靠 mkdir(exist_ok=False) 原子认领, 谁先建谁跑、其余撞 FileExistsError 即跳, 自动把 budget 条瓜分到各窗口, 不重不超额。 (预算按 workflows/ 已完成数算, 所以各窗口算出的计划对齐同一 top-N 前沿, 不会越跑越多) --force 不走认领锁 (允许覆盖), 故 --force 不适合多窗口并行。 用法: # 跨多个 q 合池, 全局取 top-20 python batch_extract_procedures.py --only-q q0018,q0036,q2300 # 区间内的 q 合池 python batch_extract_procedures.py --start 0 --count 5 # 全局只取 top-5 + 强制重跑 (--top-k 是 --top-n 的兼容别名) python batch_extract_procedures.py --only-q q0018,q0036 --top-n 5 --force # 仅列出会跑哪些 (不真跑) python batch_extract_procedures.py --only-q q0018,q0036 --dry-run """ import argparse import json import re import subprocess import sys from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Set, Tuple import build_workflows # 复用其 write_run: 跑完一个 q 就把合并 json 写进 workflows/ from server import evaluate_result # 评分 + 丢弃判定的唯一真源 (与前端 /api/data 一致) HERE = Path(__file__).resolve().parent RUNS_FULL_DEFAULT = HERE / "runs_full" RUN_DSL = HERE / "procedure-dsl" / "run_procedure_dsl.py" # 图过多 (>此值) 直接不选这帖, 名额顺延给下一名 —— "最次则跳过" 那一档。 # 上限以内的多图帖由 run_procedure_dsl 侧降分辨率/拼九宫格扛 (见那边 _IMG_MONTAGE_OVER)。 # 72 = 8 个九宫格; 再多拼图也压不住, 放弃更省。 _IMG_SKIP_CAP = 72 def _norm_q(s: str) -> int: m = re.search(r"\d+", s or "") if not m: sys.exit(f"❌ q 参数不含数字: {s!r}") return int(m.group()) def _pick_pool_candidates(q_dirs: List[Path], exclude_ids: Set[str], top_n: int ) -> List[Tuple[str, str, Dict[str, Any], float, float]]: """把列出的所有 q 的全部 form (每 q ≈90 帖) 倒进一个大池, 跨 q 取全局 top_n 个工序帖。 返回 [(q_name, form_key, result, norm, raw), ...] (已按归一化分降序)。 评分: 排名用 *归一化分* norm = overall / 满分 —— 因为数据里有两套评分量纲 (mod 0-10 / old 0-5), 直接比 overall 会让 0-5 的 q 永远被 0-10 压死。norm 统一到 0-1, 跨 q 可比; 同 schema 批次里 norm 与 overall 同序, 不影响。raw=overall 仅供展示/落档。 筛选: 非异常 + decision==report (未丢弃) + 知识类型含 procedure —— 评分与丢弃判定 全部走 server.evaluate_result, 与前端 /api/data 同一真源。 去重: ① 池内同 case_id 撞了只留最高分那条 (同帖可能被多 form / 多 q 同时召回, 最终落在它得分最高的那个 q 目录下) ② 跳过 exclude_ids (磁盘上 procedure 目录已存在的), 键统一 _short_case 形式。""" # short_case -> (norm, raw, q, form, r) pooled: Dict[str, Tuple[float, float, str, str, Dict[str, Any]]] = {} n_img_skip = 0 # 因图过多 (>_IMG_SKIP_CAP) 被放弃的工序帖数, 末尾汇报 for qd in q_dirs: for fp in sorted(qd.glob("form_*.json")): try: d = json.loads(fp.read_text(encoding="utf-8")) except Exception as e: print(f" ⚠️ 读 {qd.name}/{fp.name} 失败: {e}") continue fkey = fp.stem.replace("form_", "") # A/B/C for r in d.get("results", []): ev = evaluate_result(r) if ev["anomaly"] or ev["decision"] != "report": continue # 异常 / 被丢弃 → 不参选 if "procedure" not in ev["knowledge_type"]: continue # 只跑工序帖 n_img = len((r.get("post") or {}).get("images") or []) if n_img > _IMG_SKIP_CAP: n_img_skip += 1 # 图太多, 拼图也压不住 → 放弃, 名额让给下一名 continue raw = ev["overall"] or 0.0 norm = raw / (ev["scale"] or 5) # 归一化到 0-1, 跨 schema 可比 ck = _short_case(r.get("case_id") or "") if ck not in pooled or norm > pooled[ck][0]: pooled[ck] = (norm, raw, qd.name, fkey, r) # 同帖跨 form/q 撞分, 留最高 if n_img_skip: print(f" ⓘ 因图过多 (>{_IMG_SKIP_CAP} 张) 放弃 {n_img_skip} 个工序帖 (名额已顺延)") ranked = sorted(pooled.items(), key=lambda kv: kv[1][0], reverse=True) out: List[Tuple[str, str, Dict[str, Any], float, float]] = [] for ck, (norm, raw, qname, fkey, r) in ranked: if ck in exclude_ids: continue # 磁盘上已占 out.append((qname, fkey, r, norm, raw)) if len(out) >= top_n: break return out # ── 累计目标: 扫 workflows/ 里属于这些 q 的「已完成并合并」帖 → short_case 集 ─────── # top-N 当「累计目标」用: 本次预算 = N − 已完成数 (见 main)。配合跑时原子 mkdir 分流, # 多窗口跑同一条指令也只会瓜分到 N 条、不超额 (不排在跑/跑挂的, 那些交给 mkdir 撞锁)。 def _gather_done(workflows_dir: Path, q_names: Set[str]) -> Set[str]: """workflows/ 里属于 q_names 的已完成帖, 返回其 _short_case 集。 文件名形如 {qname}_{form}_{short}.json (build_workflows.write_one 落盘约定)。""" done: Set[str] = set() if not workflows_dir.is_dir(): return done for qn in q_names: for f in workflows_dir.glob(f"{qn}_*.json"): folder = f.stem[len(qn) + 1:] # 去 '{qname}_' → '{form}_{short}' parts = folder.split("_", 1) if len(parts) == 2: done.add(parts[1]) # short_case return done def _write_meta(out_dir: Path, case_id: str, from_q: str, form: str, score: float, score_norm: float) -> None: """_meta.json 记 full case_id + 出处 + 时间, 供其他终端/回查识别。 score=原始 overall (量纲随 schema), score_norm=归一化 0-1 (跨 q 排名实际用的分)。""" out_dir.mkdir(parents=True, exist_ok=True) (out_dir / "_meta.json").write_text(json.dumps({ "case_id": case_id, "from_q": from_q, "form": form, "score": round(score, 4), "score_norm": round(score_norm, 4), "started_at": datetime.now().isoformat(timespec="seconds"), }, ensure_ascii=False, indent=2), encoding="utf-8") def _short_case(case_id: str) -> str: """xhs_69e223a700000000230267c8 → xhs_69e223a7 (前缀 + 时间戳段, 够辨识)。""" m = re.match(r"^([a-z]+)_([0-9a-f]{8})", case_id or "") return f"{m.group(1)}_{m.group(2)}" if m else (case_id or "unknown")[:20] def _source_to_dsl_input(result: Dict[str, Any]) -> Dict[str, Any]: """form_*.json 的 result 转成 run_procedure_dsl 接受的 source 格式 (新 schema: images 扁平数组)。""" post = result.get("post") or {} return { "title": post.get("title", ""), "link": result.get("source_url") or post.get("link") or "", "body_text": post.get("body_text", "") or "", "images": [u for u in (post.get("images") or []) if isinstance(u, str) and u], "publish_timestamp": post.get("publish_timestamp", ""), "channel_account_name": post.get("channel_account_name") or post.get("channel") or "", } def _already_done(out_dir: Path) -> bool: """目录已存且含 case-*.html 视为已跑完 (跟 run_procedure_dsl 落盘约定一致)。""" return out_dir.is_dir() and any(out_dir.glob("case-*.html")) def run_one(source_json_path: Path, out_dir_abs: Path, args: argparse.Namespace) -> int: """启动 run_procedure_dsl.py 子进程, 返回 exit code。""" cmd = [ sys.executable, str(RUN_DSL), str(source_json_path), "--out-dir", str(out_dir_abs), "--model", args.model, "--max-turns", str(args.max_turns), "--max-retries", str(args.max_retries), ] if args.resume: cmd.append("--resume") if args.version: cmd.extend(["--version", args.version]) print(f" $ {' '.join(cmd)}") return subprocess.call(cmd) def main() -> None: sys.stdout.reconfigure(encoding="utf-8") p = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) p.add_argument("--output-dir", default=str(RUNS_FULL_DEFAULT), help=f"runs_full 路径 (默认 {RUNS_FULL_DEFAULT.name})") p.add_argument("--only-q", default=None, help="离散指定 q (如 '36' / 'q0036' / '36,55,720')") p.add_argument("--start", type=int, default=0, help="区间起始 idx (默认 0)") p.add_argument("--count", type=int, default=10, help="区间取几条 q (默认 10)") p.add_argument("--top-n", "--top-k", dest="top_n", type=int, default=20, help="每个 q 合池后取评分前几条 (默认 20; --top-k 为兼容别名)") p.add_argument("--model", default="claude-sonnet-4-6", help="透传给 run_procedure_dsl") p.add_argument("--max-turns", type=int, default=300, help="透传给 run_procedure_dsl") p.add_argument("--max-retries", type=int, default=3, help="透传给 run_procedure_dsl") p.add_argument("--version", default=None, help="spec 版本, 透传 (e.g. test / backup)") p.add_argument("--resume", action="store_true", help="透传 --resume") p.add_argument("--force", action="store_true", help="目录已存在也强制纳入候选 + 重跑 (会覆盖原 _scratch / workflow.json)") p.add_argument("--dry-run", action="store_true", help="只列出会跑啥, 不真跑") args = p.parse_args() runs_full = Path(args.output_dir).resolve() if not runs_full.is_dir(): sys.exit(f"❌ {runs_full} 不存在") if not RUN_DSL.exists(): sys.exit(f"❌ {RUN_DSL} 不存在") # 选 q 集合 all_q = sorted([d for d in runs_full.iterdir() if d.is_dir() and d.name.startswith("q")], key=lambda p: _norm_q(p.name)) if args.only_q: wanted = {f"q{_norm_q(t):04d}" for t in args.only_q.split(",") if t.strip()} q_dirs = [d for d in all_q if d.name in wanted] if not q_dirs: sys.exit(f"❌ --only-q {args.only_q!r} 在 {runs_full} 下没匹配") else: q_dirs = all_q[args.start : args.start + args.count] if not q_dirs: sys.exit(f"❌ 区间 [{args.start}:{args.start+args.count}] 取不到 q") # ── 累计目标: top-N 是「workflows/ 里这些 q 最终要有 N 条」, 本次只补差额 ────── q_names = {d.name for d in q_dirs} workflows_dir = Path(build_workflows.OUT_DIR) done_set: Set[str] = set() if args.force else _gather_done(workflows_dir, q_names) budget = args.top_n if args.force else max(0, args.top_n - len(done_set)) print(f"📋 {len(q_dirs)} 个 q 合池, 累计目标 top-{args.top_n}; " f"workflows/ 已有 {len(done_set)} 条 → 本次还需 {budget} 条" f"{' (--force 忽略已完成)' if args.force else ''}") n_ran = n_skipped = n_failed = 0 if budget == 0: print("\n✅ 累计目标已达成 (workflows/ 里这些 q 已够 top-N), 无需再跑.") print(f"\n{'='*60}\n📊 完成: 跑 0 / 已存跳过 0 / 失败 0") return # ── 跨 q 合池挑 top-budget ────────────────────────────────────────────────── # 只排除「已完成」(done_set); 在跑/跑挂的留在池里, 跑时靠原子 mkdir 占位分流到各窗口。 plan = _pick_pool_candidates(q_dirs, done_set, budget) if not plan: print("\n⚠️ 这批 q 合池后无可跑候选 (全被丢弃/异常过滤, 或都已完成)") print(f"\n{'='*60}\n📊 完成: 跑 0 / 已存跳过 0 / 失败 0") return print(f"\n▶ 本次挑出 {len(plan)} 条 (补差额, score=归一化 0-1, 括号内原始/满分):") for qname, fkey, r, norm, raw in plan: cid_full = r.get("case_id") or "" title = (r.get("post") or {}).get("title", "")[:30] print(f" [{qname} {fkey}] score={norm:.3f} ({raw:g}) {cid_full[:24]} {title}") if len(plan) < budget: # 池里够格的工序帖不足本次预算 (被丢弃过滤掉 / 已完成) print(f" ⓘ 可跑候选不足本次预算 {budget} (仅 {len(plan)} 条; 想多跑就加 q 或调大 --top-n)") # ── 落产物 + 跑; 每跑完一条就立刻把它的合并 json 写进 workflows/ ────────────── def _emit_workflow(qname: str, folder: str) -> None: """单条 procedure 跑完 (或已存) → 立刻写出 workflows/{qname}_{folder}.json。 (dry-run 不产 workflow.json 故跳过; build 失败不应中断批量, 故 try 包住)""" if args.dry_run: return try: if build_workflows.write_one(qname, folder, runs_dir=runs_full): print(f" 🧩 workflows/: {qname}_{folder}.json 已写") except Exception as e: print(f" ⚠️ build_workflows 失败 ({qname}/{folder}): {e}") for qname, fkey, r, norm, raw in plan: cid = r.get("case_id") or "unknown" name = f"{fkey}_{_short_case(cid)}" out_dir = runs_full / qname / "procedures" / name # dry-run 绝不碰磁盘 (建目录会污染状态), 只报会不会跑 if args.dry_run: mark = ("已完成" if _already_done(out_dir) else "已占(在跑/跑挂)" if out_dir.exists() else "would run") print(f" [dry-run] {mark}: {qname}/{name}/") continue # ── 原子认领: mkdir(exist_ok=False) 内核级唯一, 谁先建谁跑 ────────────── # 多窗口跑同一指令时, 同一条 procedure 只有一个窗口能建成目录、其余撞 FileExistsError # → 跳过 (那条已被别窗口领走 / 或是历史跑挂的残目录)。--force 时允许覆盖已存。 try: out_dir.mkdir(parents=True, exist_ok=args.force) except FileExistsError: if _already_done(out_dir): _emit_workflow(qname, name) # 跑完但没合并的, 顺手补进 workflows/ print(f" ⏭️ {qname}/{name}/ 已完成, 跳过") else: print(f" ⏭️ {qname}/{name}/ 已被别窗口认领 / 或历史跑挂, 跳过") n_skipped += 1 continue # 认领成功 (此刻 out_dir 归本窗口) → 写 _source.json + _meta.json src_path = out_dir / "_source.json" src_path.write_text( json.dumps(_source_to_dsl_input(r), ensure_ascii=False, indent=2), encoding="utf-8") _write_meta(out_dir, case_id=cid, from_q=qname, form=fkey, score=raw, score_norm=norm) try: code = run_one(src_path, out_dir, args) except KeyboardInterrupt: print(f"\n⚠️ Ctrl-C 中断, {qname}/{name}/ 保留 (含 _meta.json), 下次跑会跳过这条; 想复跑用 --force 或手动删 dir") raise if code == 0: n_ran += 1 print(f" ✓ {qname}/{name} done") _emit_workflow(qname, name) # 跑完一个就出一个, 不等同 q 其他帖子 else: n_failed += 1 print(f" ❌ {qname}/{name} exit={code} (目录已占; 想重试 --force 或 手动 rm 该 dir)") print(f"\n{'='*60}") print(f"📊 完成: 跑 {n_ran} / 跳过(已完成或别窗口领走) {n_skipped} / 失败 {n_failed} " f"(累计目标 top-{args.top_n}, 本次预算 {budget}, 选中 {len(plan)})") if __name__ == "__main__": main()