| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353 |
- """批量化跑 procedure-dsl: 对每个 q 的三 form 选评分最高的"工序"帖子,
- 喂给 procedure-dsl/run_procedure_dsl.py 提取工序, HTML 直接落在
- runs_full/qNNNN/procedures/{form}_{short_case}/ 下。
- 筛选: llm_evaluation.知识类型 含 "工序" (兼容英文 "procedure") 且非 _error
- —— 工序帖才有端到端流程可抽; 能力/工具帖太原子, 抽不出 procedure。
- 评分: 两级均值, 避免子项多的组天然占便宜:
- group_rel = mean(相关性 下所有得分)
- group_qual = mean(质量 下所有得分, 含固定+动态)
- final = mean(group_rel, group_qual)
- 选条: 每 form top-K (默认 1)。两层去重, 撞了顺延下一个候选:
- ① 本 q 跨 form (同帖只跑一次)
- ② 全局: 扫所有 q*/procedures/*/ 目录名 → 抽 short_case → 已存在即视为占用
- (无论跑完没有, 跑挂了也算占了 case; 想强复跑用 --force)
- 产物: HTML/MD/workflow.json/_source.json/_meta.json 全落
- runs_full/qNNNN/procedures/{form}_{short_case}/
- 跳过: 目录已存 → skip (--force 可强覆盖)
- 并发: 多终端开几个就跑几个; 每条 procedure 一启动就 mkdir, 其他终端立刻见到目录视作已占
- 用法:
- # 单 q 试
- python batch_extract_procedures.py --only-q q0036
- # 区间
- python batch_extract_procedures.py --start 0 --count 5
- # top-3 + 强制重跑
- python batch_extract_procedures.py --only-q q0036 --top-k 3 --force
- # 仅列出会跑哪些 (不真跑)
- python batch_extract_procedures.py --only-q q0036 --dry-run
- """
- import argparse
- import json
- import os
- import re
- import subprocess
- import sys
- from datetime import datetime
- from pathlib import Path
- from statistics import mean
- from typing import Any, Dict, List, Optional, Set, Tuple
- import build_workflows # 复用其 write_run: 跑完一个 q 就把合并 json 写进 workflows/
- HERE = Path(__file__).resolve().parent
- RUNS_FULL_DEFAULT = HERE / "runs_full"
- RUN_DSL = HERE / "procedure-dsl" / "run_procedure_dsl.py"
- _KT_PROCEDURE = {"工序", "procedure"} # 工序没改名, 仅兼容英文枚举
- def _norm_q(s: str) -> int:
- m = re.search(r"\d+", s or "")
- if not m:
- sys.exit(f"❌ q 参数不含数字: {s!r}")
- return int(m.group())
- def _has_procedure(ev: Dict[str, Any]) -> bool:
- """llm_evaluation.知识类型 是否声明了 工序 类型 (含英文 procedure)。"""
- kt = ev.get("知识类型") or ev.get("knowledge_type") or []
- if isinstance(kt, str):
- kt = [kt]
- return any(k in _KT_PROCEDURE for k in kt) if isinstance(kt, list) else False
- def _safe_float(x: Any) -> Optional[float]:
- try:
- return float(x)
- except (TypeError, ValueError):
- return None
- def _all_scores(obj: Any) -> List[float]:
- """递归挖一棵嵌套 dict 里所有 '得分' 叶子的数值。"""
- out: List[float] = []
- if isinstance(obj, dict):
- for k, v in obj.items():
- if k == "得分":
- f = _safe_float(v)
- if f is not None:
- out.append(f)
- elif isinstance(v, (dict, list)):
- out.extend(_all_scores(v))
- elif isinstance(obj, list):
- for it in obj:
- out.extend(_all_scores(it))
- return out
- def _composite_score(ev: Dict[str, Any]) -> float:
- """两级均值:
- group_rel = mean(相关性 下所有 得分)
- group_qual = mean(质量 下所有 得分, 含固定+动态全部子项)
- final = mean(group_rel, group_qual)
- 某一组缺失 (该组下没任何得分字段) → 降级用另一组的均值; 都缺 → 0。"""
- rel_scores = _all_scores(ev.get("相关性"))
- qual_scores = _all_scores(ev.get("质量"))
- rel_mean = mean(rel_scores) if rel_scores else None
- qual_mean = mean(qual_scores) if qual_scores else None
- parts = [m for m in (rel_mean, qual_mean) if m is not None]
- return mean(parts) if parts else 0.0
- def _pick_candidates(form_path: Path, exclude_ids: Set[str], top_k: int
- ) -> List[Tuple[float, Dict[str, Any]]]:
- """读 form_*.json, 取评分最高的 top_k 个工序帖, 跳过 exclude_ids 里的 case_id
- (已被其他 q 跑过 / 正在跑 / 本进程已规划)。exclude_ids 用 _short_case 形式。"""
- try:
- d = json.loads(form_path.read_text(encoding="utf-8"))
- except Exception as e:
- print(f" ⚠️ 读 {form_path.name} 失败: {e}")
- return []
- cands: List[Tuple[float, Dict[str, Any]]] = []
- for r in d.get("results", []):
- ev = r.get("llm_evaluation") or {}
- if ev.get("_error") or not _has_procedure(ev):
- continue
- cands.append((_composite_score(ev), r))
- cands.sort(key=lambda x: x[0], reverse=True)
- out: List[Tuple[float, Dict[str, Any]]] = []
- for s, r in cands:
- # 用 _short_case 化的 case_id 对齐 global 状态 / 目录命名
- if _short_case(r.get("case_id") or "") in exclude_ids:
- continue
- out.append((s, r))
- if len(out) >= top_k:
- break
- return out
- # ── 全局去重: 扫所有 q*/procedures/*/ 目录名 → short_case 占用集 ──────────────────
- def _gather_claimed(runs_full: Path) -> Dict[str, str]:
- """扫所有 q*/procedures/*/ 返回 {short_case: rel_dir} —— 目录存在即视为占用,
- 无论它含 HTML 还是空、是否还在跑。多终端场景下, 一启动就 mkdir, 其他终端立刻见到。
- key 解析顺序: ① _meta.json 里的 full case_id → _short_case 化
- ② 目录名 `{form}_{short_case}` 直接抽 short_case
- """
- claimed: Dict[str, str] = {}
- if not runs_full.is_dir():
- return claimed
- for pd in runs_full.glob("q*/procedures/*"):
- if not pd.is_dir():
- continue
- key = None
- meta_path = pd / "_meta.json"
- if meta_path.exists():
- try:
- full = json.loads(meta_path.read_text(encoding="utf-8")).get("case_id")
- if full:
- key = _short_case(full)
- except Exception:
- pass
- if not key:
- parts = pd.name.split("_", 1)
- key = parts[1] if len(parts) == 2 else pd.name
- claimed[key] = pd.relative_to(runs_full).as_posix()
- return claimed
- def _write_meta(out_dir: Path, case_id: str, from_q: str, form: str, score: float) -> None:
- """_meta.json 记 full case_id + 出处 + 时间, 供其他终端/回查识别。"""
- out_dir.mkdir(parents=True, exist_ok=True)
- (out_dir / "_meta.json").write_text(json.dumps({
- "case_id": case_id, "from_q": from_q, "form": form, "score": round(score, 4),
- "started_at": datetime.now().isoformat(timespec="seconds"),
- }, ensure_ascii=False, indent=2), encoding="utf-8")
- def _short_case(case_id: str) -> str:
- """xhs_69e223a700000000230267c8 → xhs_69e223a7 (前缀 + 时间戳段, 够辨识)。"""
- m = re.match(r"^([a-z]+)_([0-9a-f]{8})", case_id or "")
- return f"{m.group(1)}_{m.group(2)}" if m else (case_id or "unknown")[:20]
- def _source_to_dsl_input(result: Dict[str, Any]) -> Dict[str, Any]:
- """form_*.json 的 result 转成 run_procedure_dsl 接受的 source 格式
- (新 schema: images 扁平数组)。"""
- post = result.get("post") or {}
- return {
- "title": post.get("title", ""),
- "link": result.get("source_url") or post.get("link") or "",
- "body_text": post.get("body_text", "") or "",
- "images": [u for u in (post.get("images") or []) if isinstance(u, str) and u],
- "publish_timestamp": post.get("publish_timestamp", ""),
- "channel_account_name": post.get("channel_account_name") or post.get("channel") or "",
- }
- def _already_done(out_dir: Path) -> bool:
- """目录已存且含 case-*.html 视为已跑完 (跟 run_procedure_dsl 落盘约定一致)。"""
- return out_dir.is_dir() and any(out_dir.glob("case-*.html"))
- def run_one(source_json_path: Path, out_dir_abs: Path, args: argparse.Namespace) -> int:
- """启动 run_procedure_dsl.py 子进程, 返回 exit code。"""
- cmd = [
- sys.executable, str(RUN_DSL),
- str(source_json_path),
- "--out-dir", str(out_dir_abs),
- "--model", args.model,
- "--max-turns", str(args.max_turns),
- "--max-retries", str(args.max_retries),
- ]
- if args.resume:
- cmd.append("--resume")
- if args.version:
- cmd.extend(["--version", args.version])
- print(f" $ {' '.join(cmd)}")
- return subprocess.call(cmd)
- def main() -> None:
- sys.stdout.reconfigure(encoding="utf-8")
- p = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter)
- p.add_argument("--output-dir", default=str(RUNS_FULL_DEFAULT),
- help=f"runs_full 路径 (默认 {RUNS_FULL_DEFAULT.name})")
- p.add_argument("--only-q", default=None,
- help="离散指定 q (如 '36' / 'q0036' / '36,55,720')")
- p.add_argument("--start", type=int, default=0, help="区间起始 idx (默认 0)")
- p.add_argument("--count", type=int, default=10, help="区间取几条 q (默认 10)")
- p.add_argument("--top-k", type=int, default=1, help="每 form 取几条 (默认 1)")
- p.add_argument("--model", default="claude-sonnet-4-6", help="透传给 run_procedure_dsl")
- p.add_argument("--max-turns", type=int, default=300, help="透传给 run_procedure_dsl")
- p.add_argument("--max-retries", type=int, default=3, help="透传给 run_procedure_dsl")
- p.add_argument("--version", default=None, help="spec 版本, 透传 (e.g. test / backup)")
- p.add_argument("--resume", action="store_true", help="透传 --resume")
- p.add_argument("--force", action="store_true",
- help="目录已存在也强制纳入候选 + 重跑 (会覆盖原 _scratch / workflow.json)")
- p.add_argument("--dry-run", action="store_true", help="只列出会跑啥, 不真跑")
- args = p.parse_args()
- runs_full = Path(args.output_dir).resolve()
- if not runs_full.is_dir():
- sys.exit(f"❌ {runs_full} 不存在")
- if not RUN_DSL.exists():
- sys.exit(f"❌ {RUN_DSL} 不存在")
- # 选 q 集合
- all_q = sorted([d for d in runs_full.iterdir() if d.is_dir() and d.name.startswith("q")],
- key=lambda p: _norm_q(p.name))
- if args.only_q:
- wanted = {f"q{_norm_q(t):04d}" for t in args.only_q.split(",") if t.strip()}
- q_dirs = [d for d in all_q if d.name in wanted]
- if not q_dirs:
- sys.exit(f"❌ --only-q {args.only_q!r} 在 {runs_full} 下没匹配")
- else:
- q_dirs = all_q[args.start : args.start + args.count]
- if not q_dirs:
- sys.exit(f"❌ 区间 [{args.start}:{args.start+args.count}] 取不到 q")
- # ── 启动前: 全局扫描所有 q*/procedures/*/ 目录名占用集 ────────────────────
- global_claimed = {} if args.force else _gather_claimed(runs_full)
- print(f"📋 {len(q_dirs)} 个 q, top-{args.top_k}/form, "
- f"目标 ≤{len(q_dirs) * 3 * args.top_k} 条工序 (去重后会少)")
- print(f"📦 全局占用 case 数 = {len(global_claimed)}"
- f"{' (--force 已忽略)' if args.force else ''}")
- n_ran = n_skipped = n_failed = n_no_cap = n_global_skip = 0
- # 本进程已规划的 case_id (即使还没开跑也算"占用", 防同进程后续 q 重复选)
- local_claimed: Set[str] = set()
- for qd in q_dirs:
- forms = sorted(qd.glob("form_*.json"))
- if not forms:
- print(f"\n⏭️ {qd.name}: 无 form_*.json")
- continue
- print(f"\n▶ {qd.name}")
- # exclude = 全局已占用 (磁盘上 procedure 目录存在的) + 本进程已规划 (跨 q 也屏蔽)
- # 所有键统一 _short_case 形式 (跟目录名约定一致)
- excluded: Set[str] = set(global_claimed) | local_claimed
- plan: List[Tuple[str, Dict[str, Any], float]] = []
- for fp in forms:
- fkey = fp.stem.replace("form_", "") # A/B/C
- picks = _pick_candidates(fp, excluded, args.top_k)
- for score, r in picks:
- cid_full = r.get("case_id") or ""
- cid_key = _short_case(cid_full)
- plan.append((fkey, r, score))
- excluded.add(cid_key) # 本 q 内其他 form 不会再选
- local_claimed.add(cid_key) # 后续 q 也不再选
- title = (r.get("post") or {}).get("title", "")[:30]
- print(f" [{fkey}] score={score:.2f} {cid_full[:24]} {title}")
- if len(picks) < args.top_k:
- # 想取 top-k 但被 excluded 顶替到不够 → 报一行说明
- n_global_skip += args.top_k - len(picks)
- print(f" [{fkey}] 候选不足 top-{args.top_k} (其他 q 已占了同帖)")
- if not plan:
- print(f" ⚠️ {qd.name} 无 工序 类候选 (无新 case 或都被 全局去重 顶了)")
- n_no_cap += 1
- continue
- # 落产物
- proc_root = qd / "procedures"
- proc_root.mkdir(exist_ok=True)
- for fkey, r, score in plan:
- cid = r.get("case_id") or "unknown"
- name = f"{fkey}_{_short_case(cid)}"
- out_dir = proc_root / name
- if _already_done(out_dir) and not args.force:
- print(f" ⏭️ {name}/ 已存在 case-*.html (用 --force 覆盖)")
- n_skipped += 1
- continue
- # 写 _source.json (DSL 输入) + _meta.json (case_id 索引, 供其他终端识别)
- out_dir.mkdir(parents=True, exist_ok=True)
- src_path = out_dir / "_source.json"
- src_path.write_text(
- json.dumps(_source_to_dsl_input(r), ensure_ascii=False, indent=2),
- encoding="utf-8")
- if args.dry_run:
- print(f" [dry-run] would run on {src_path.name} → {out_dir.name}/")
- continue
- # _meta.json 即占位标识: 其他终端扫到 out_dir/_meta.json 就视作 已占
- _write_meta(out_dir, case_id=cid, from_q=qd.name, form=fkey, score=score)
- try:
- code = run_one(src_path, out_dir, args)
- except KeyboardInterrupt:
- print(f"\n⚠️ Ctrl-C 中断, {name}/ 保留 (含 _meta.json), 下次跑会跳过这条; 想复跑用 --force 或手动删 dir")
- raise
- if code == 0:
- n_ran += 1
- global_claimed[_short_case(cid)] = (out_dir.relative_to(runs_full)).as_posix()
- print(f" ✓ {name} done")
- else:
- n_failed += 1
- print(f" ❌ {name} exit={code} (目录已占; 想重试 --force 或 手动 rm 该 dir)")
- # 本 q 的 procedure 跑完 → 把合并 json 增量写进 workflows/
- # (dry-run 不产 workflow.json, 跳过; build 失败不应中断批量, 故 try 包住)
- if not args.dry_run:
- try:
- n_wf = build_workflows.write_run(qd.name, runs_dir=runs_full)
- print(f" 🧩 workflows/: {qd.name} 写出 {n_wf} 个合并 json")
- except Exception as e:
- print(f" ⚠️ build_workflows 失败 ({qd.name}): {e}")
- print(f"\n{'='*60}")
- print(f"📊 完成: 跑 {n_ran} / 本次已存目录跳过 {n_skipped} / 全局占用顶替 {n_global_skip} / "
- f"失败 {n_failed} / 无候选 q {n_no_cap}")
- if __name__ == "__main__":
- main()
|