| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329 |
- """批量化跑 procedure-dsl: 把这次列出的所有 q 的全部帖子 (每 q 三 form ≈90 帖) 倒进
- 一个大池, 跨 q 按评分降序取全局 top-N (默认 20) 个未被丢弃的"工序"帖, 喂给
- procedure-dsl/run_procedure_dsl.py 提取工序。每条仍落回它自己所属那个 q 的目录:
- runs_full/qNNNN/procedures/{form}_{short_case}/。
- ⚠️ top-N 是"这批 q 加起来总共 N 条", 不是"每 q N 条"。列 5 个 q 也只跑全局最高的 N 条,
- 可能全集中在某几个 q 上。想多跑就调大 --top-n 或换一批 q。
- 筛选: ① 非异常 (anomaly) 且 decision == report (即未被丢弃)
- ② 知识类型 含 "工序" (procedure) —— 工序帖才有端到端流程可抽; 能力/工具帖抽不出
- 丢弃判定与评分都直接复用 server.evaluate_result (跟前端 /api/data 同一真源),
- schema 再变也不会两边漂移; 阈值见 server.py (制作相关性/时效/综合均分)。
- 评分: server.evaluate_result(r)["overall"] —— 相关性均分与质量均分的两级平均。
- 选条: top-N 是「累计目标」—— workflows/ 里这些 q 最终要有 N 条; 本次预算 = N − 已完成数。
- 全部 q 合池后按 overall 降序取 budget 条。去重:
- ① 池内同帖 (case_id 撞, 跨 form / 跨 q) 只留最高分那条, 落在它得分最高的 q 下
- ② 选片只排除「已完成」(workflows/ 里有的); 在跑/跑挂的不排, 交给跑时原子 mkdir 分流
- 产物: HTML/MD/workflow.json/_source.json/_meta.json 全落
- runs_full/qNNNN/procedures/{form}_{short_case}/
- 并发: 多窗口直接粘贴同一条指令并行 —— 每条 procedure 靠 mkdir(exist_ok=False) 原子认领,
- 谁先建谁跑、其余撞 FileExistsError 即跳, 自动把 budget 条瓜分到各窗口, 不重不超额。
- (预算按 workflows/ 已完成数算, 所以各窗口算出的计划对齐同一 top-N 前沿, 不会越跑越多)
- --force 不走认领锁 (允许覆盖), 故 --force 不适合多窗口并行。
- 用法:
- # 跨多个 q 合池, 全局取 top-20
- python batch_extract_procedures.py --only-q q0018,q0036,q2300
- # 区间内的 q 合池
- python batch_extract_procedures.py --start 0 --count 5
- # 全局只取 top-5 + 强制重跑 (--top-k 是 --top-n 的兼容别名)
- python batch_extract_procedures.py --only-q q0018,q0036 --top-n 5 --force
- # 仅列出会跑哪些 (不真跑)
- python batch_extract_procedures.py --only-q q0018,q0036 --dry-run
- """
- import argparse
- import json
- import re
- import subprocess
- import sys
- from datetime import datetime
- from pathlib import Path
- from typing import Any, Dict, List, Set, Tuple
- import build_workflows # 复用其 write_run: 跑完一个 q 就把合并 json 写进 workflows/
- from server import evaluate_result # 评分 + 丢弃判定的唯一真源 (与前端 /api/data 一致)
- HERE = Path(__file__).resolve().parent
- RUNS_FULL_DEFAULT = HERE / "runs_full"
- RUN_DSL = HERE / "procedure-dsl" / "run_procedure_dsl.py"
- # 图过多 (>此值) 直接不选这帖, 名额顺延给下一名 —— "最次则跳过" 那一档。
- # 上限以内的多图帖由 run_procedure_dsl 侧降分辨率/拼九宫格扛 (见那边 _IMG_MONTAGE_OVER)。
- # 72 = 8 个九宫格; 再多拼图也压不住, 放弃更省。
- _IMG_SKIP_CAP = 72
- def _norm_q(s: str) -> int:
- m = re.search(r"\d+", s or "")
- if not m:
- sys.exit(f"❌ q 参数不含数字: {s!r}")
- return int(m.group())
- def _pick_pool_candidates(q_dirs: List[Path], exclude_ids: Set[str], top_n: int
- ) -> List[Tuple[str, str, Dict[str, Any], float, float]]:
- """把列出的所有 q 的全部 form (每 q ≈90 帖) 倒进一个大池, 跨 q 取全局 top_n 个工序帖。
- 返回 [(q_name, form_key, result, norm, raw), ...] (已按归一化分降序)。
- 评分: 排名用 *归一化分* norm = overall / 满分 —— 因为数据里有两套评分量纲 (mod 0-10 /
- old 0-5), 直接比 overall 会让 0-5 的 q 永远被 0-10 压死。norm 统一到 0-1, 跨 q
- 可比; 同 schema 批次里 norm 与 overall 同序, 不影响。raw=overall 仅供展示/落档。
- 筛选: 非异常 + decision==report (未丢弃) + 知识类型含 procedure —— 评分与丢弃判定
- 全部走 server.evaluate_result, 与前端 /api/data 同一真源。
- 去重: ① 池内同 case_id 撞了只留最高分那条 (同帖可能被多 form / 多 q 同时召回,
- 最终落在它得分最高的那个 q 目录下)
- ② 跳过 exclude_ids (磁盘上 procedure 目录已存在的), 键统一 _short_case 形式。"""
- # short_case -> (norm, raw, q, form, r)
- pooled: Dict[str, Tuple[float, float, str, str, Dict[str, Any]]] = {}
- n_img_skip = 0 # 因图过多 (>_IMG_SKIP_CAP) 被放弃的工序帖数, 末尾汇报
- for qd in q_dirs:
- for fp in sorted(qd.glob("form_*.json")):
- try:
- d = json.loads(fp.read_text(encoding="utf-8"))
- except Exception as e:
- print(f" ⚠️ 读 {qd.name}/{fp.name} 失败: {e}")
- continue
- fkey = fp.stem.replace("form_", "") # A/B/C
- for r in d.get("results", []):
- ev = evaluate_result(r)
- if ev["anomaly"] or ev["decision"] != "report":
- continue # 异常 / 被丢弃 → 不参选
- if "procedure" not in ev["knowledge_type"]:
- continue # 只跑工序帖
- n_img = len((r.get("post") or {}).get("images") or [])
- if n_img > _IMG_SKIP_CAP:
- n_img_skip += 1 # 图太多, 拼图也压不住 → 放弃, 名额让给下一名
- continue
- raw = ev["overall"] or 0.0
- norm = raw / (ev["scale"] or 5) # 归一化到 0-1, 跨 schema 可比
- ck = _short_case(r.get("case_id") or "")
- if ck not in pooled or norm > pooled[ck][0]:
- pooled[ck] = (norm, raw, qd.name, fkey, r) # 同帖跨 form/q 撞分, 留最高
- if n_img_skip:
- print(f" ⓘ 因图过多 (>{_IMG_SKIP_CAP} 张) 放弃 {n_img_skip} 个工序帖 (名额已顺延)")
- ranked = sorted(pooled.items(), key=lambda kv: kv[1][0], reverse=True)
- out: List[Tuple[str, str, Dict[str, Any], float, float]] = []
- for ck, (norm, raw, qname, fkey, r) in ranked:
- if ck in exclude_ids:
- continue # 磁盘上已占
- out.append((qname, fkey, r, norm, raw))
- if len(out) >= top_n:
- break
- return out
- # ── 累计目标: 扫 workflows/ 里属于这些 q 的「已完成并合并」帖 → short_case 集 ───────
- # top-N 当「累计目标」用: 本次预算 = N − 已完成数 (见 main)。配合跑时原子 mkdir 分流,
- # 多窗口跑同一条指令也只会瓜分到 N 条、不超额 (不排在跑/跑挂的, 那些交给 mkdir 撞锁)。
- def _gather_done(workflows_dir: Path, q_names: Set[str]) -> Set[str]:
- """workflows/ 里属于 q_names 的已完成帖, 返回其 _short_case 集。
- 文件名形如 {qname}_{form}_{short}.json (build_workflows.write_one 落盘约定)。"""
- done: Set[str] = set()
- if not workflows_dir.is_dir():
- return done
- for qn in q_names:
- for f in workflows_dir.glob(f"{qn}_*.json"):
- folder = f.stem[len(qn) + 1:] # 去 '{qname}_' → '{form}_{short}'
- parts = folder.split("_", 1)
- if len(parts) == 2:
- done.add(parts[1]) # short_case
- return done
- def _write_meta(out_dir: Path, case_id: str, from_q: str, form: str,
- score: float, score_norm: float) -> None:
- """_meta.json 记 full case_id + 出处 + 时间, 供其他终端/回查识别。
- score=原始 overall (量纲随 schema), score_norm=归一化 0-1 (跨 q 排名实际用的分)。"""
- out_dir.mkdir(parents=True, exist_ok=True)
- (out_dir / "_meta.json").write_text(json.dumps({
- "case_id": case_id, "from_q": from_q, "form": form,
- "score": round(score, 4), "score_norm": round(score_norm, 4),
- "started_at": datetime.now().isoformat(timespec="seconds"),
- }, ensure_ascii=False, indent=2), encoding="utf-8")
- def _short_case(case_id: str) -> str:
- """xhs_69e223a700000000230267c8 → xhs_69e223a7 (前缀 + 时间戳段, 够辨识)。"""
- m = re.match(r"^([a-z]+)_([0-9a-f]{8})", case_id or "")
- return f"{m.group(1)}_{m.group(2)}" if m else (case_id or "unknown")[:20]
- def _source_to_dsl_input(result: Dict[str, Any]) -> Dict[str, Any]:
- """form_*.json 的 result 转成 run_procedure_dsl 接受的 source 格式
- (新 schema: images 扁平数组)。"""
- post = result.get("post") or {}
- return {
- "title": post.get("title", ""),
- "link": result.get("source_url") or post.get("link") or "",
- "body_text": post.get("body_text", "") or "",
- "images": [u for u in (post.get("images") or []) if isinstance(u, str) and u],
- "publish_timestamp": post.get("publish_timestamp", ""),
- "channel_account_name": post.get("channel_account_name") or post.get("channel") or "",
- }
- def _already_done(out_dir: Path) -> bool:
- """目录已存且含 case-*.html 视为已跑完 (跟 run_procedure_dsl 落盘约定一致)。"""
- return out_dir.is_dir() and any(out_dir.glob("case-*.html"))
- def run_one(source_json_path: Path, out_dir_abs: Path, args: argparse.Namespace) -> int:
- """启动 run_procedure_dsl.py 子进程, 返回 exit code。"""
- cmd = [
- sys.executable, str(RUN_DSL),
- str(source_json_path),
- "--out-dir", str(out_dir_abs),
- "--model", args.model,
- "--max-turns", str(args.max_turns),
- "--max-retries", str(args.max_retries),
- ]
- if args.resume:
- cmd.append("--resume")
- if args.version:
- cmd.extend(["--version", args.version])
- print(f" $ {' '.join(cmd)}")
- return subprocess.call(cmd)
- def main() -> None:
- sys.stdout.reconfigure(encoding="utf-8")
- p = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter)
- p.add_argument("--output-dir", default=str(RUNS_FULL_DEFAULT),
- help=f"runs_full 路径 (默认 {RUNS_FULL_DEFAULT.name})")
- p.add_argument("--only-q", default=None,
- help="离散指定 q (如 '36' / 'q0036' / '36,55,720')")
- p.add_argument("--start", type=int, default=0, help="区间起始 idx (默认 0)")
- p.add_argument("--count", type=int, default=10, help="区间取几条 q (默认 10)")
- p.add_argument("--top-n", "--top-k", dest="top_n", type=int, default=20,
- help="每个 q 合池后取评分前几条 (默认 20; --top-k 为兼容别名)")
- p.add_argument("--model", default="claude-sonnet-4-6", help="透传给 run_procedure_dsl")
- p.add_argument("--max-turns", type=int, default=300, help="透传给 run_procedure_dsl")
- p.add_argument("--max-retries", type=int, default=3, help="透传给 run_procedure_dsl")
- p.add_argument("--version", default=None, help="spec 版本, 透传 (e.g. test / backup)")
- p.add_argument("--resume", action="store_true", help="透传 --resume")
- p.add_argument("--force", action="store_true",
- help="目录已存在也强制纳入候选 + 重跑 (会覆盖原 _scratch / workflow.json)")
- p.add_argument("--dry-run", action="store_true", help="只列出会跑啥, 不真跑")
- args = p.parse_args()
- runs_full = Path(args.output_dir).resolve()
- if not runs_full.is_dir():
- sys.exit(f"❌ {runs_full} 不存在")
- if not RUN_DSL.exists():
- sys.exit(f"❌ {RUN_DSL} 不存在")
- # 选 q 集合
- all_q = sorted([d for d in runs_full.iterdir() if d.is_dir() and d.name.startswith("q")],
- key=lambda p: _norm_q(p.name))
- if args.only_q:
- wanted = {f"q{_norm_q(t):04d}" for t in args.only_q.split(",") if t.strip()}
- q_dirs = [d for d in all_q if d.name in wanted]
- if not q_dirs:
- sys.exit(f"❌ --only-q {args.only_q!r} 在 {runs_full} 下没匹配")
- else:
- q_dirs = all_q[args.start : args.start + args.count]
- if not q_dirs:
- sys.exit(f"❌ 区间 [{args.start}:{args.start+args.count}] 取不到 q")
- # ── 累计目标: top-N 是「workflows/ 里这些 q 最终要有 N 条」, 本次只补差额 ──────
- q_names = {d.name for d in q_dirs}
- workflows_dir = Path(build_workflows.OUT_DIR)
- done_set: Set[str] = set() if args.force else _gather_done(workflows_dir, q_names)
- budget = args.top_n if args.force else max(0, args.top_n - len(done_set))
- print(f"📋 {len(q_dirs)} 个 q 合池, 累计目标 top-{args.top_n}; "
- f"workflows/ 已有 {len(done_set)} 条 → 本次还需 {budget} 条"
- f"{' (--force 忽略已完成)' if args.force else ''}")
- n_ran = n_skipped = n_failed = 0
- if budget == 0:
- print("\n✅ 累计目标已达成 (workflows/ 里这些 q 已够 top-N), 无需再跑.")
- print(f"\n{'='*60}\n📊 完成: 跑 0 / 已存跳过 0 / 失败 0")
- return
- # ── 跨 q 合池挑 top-budget ──────────────────────────────────────────────────
- # 只排除「已完成」(done_set); 在跑/跑挂的留在池里, 跑时靠原子 mkdir 占位分流到各窗口。
- plan = _pick_pool_candidates(q_dirs, done_set, budget)
- if not plan:
- print("\n⚠️ 这批 q 合池后无可跑候选 (全被丢弃/异常过滤, 或都已完成)")
- print(f"\n{'='*60}\n📊 完成: 跑 0 / 已存跳过 0 / 失败 0")
- return
- print(f"\n▶ 本次挑出 {len(plan)} 条 (补差额, score=归一化 0-1, 括号内原始/满分):")
- for qname, fkey, r, norm, raw in plan:
- cid_full = r.get("case_id") or ""
- title = (r.get("post") or {}).get("title", "")[:30]
- print(f" [{qname} {fkey}] score={norm:.3f} ({raw:g}) {cid_full[:24]} {title}")
- if len(plan) < budget:
- # 池里够格的工序帖不足本次预算 (被丢弃过滤掉 / 已完成)
- print(f" ⓘ 可跑候选不足本次预算 {budget} (仅 {len(plan)} 条; 想多跑就加 q 或调大 --top-n)")
- # ── 落产物 + 跑; 每跑完一条就立刻把它的合并 json 写进 workflows/ ──────────────
- def _emit_workflow(qname: str, folder: str) -> None:
- """单条 procedure 跑完 (或已存) → 立刻写出 workflows/{qname}_{folder}.json。
- (dry-run 不产 workflow.json 故跳过; build 失败不应中断批量, 故 try 包住)"""
- if args.dry_run:
- return
- try:
- if build_workflows.write_one(qname, folder, runs_dir=runs_full):
- print(f" 🧩 workflows/: {qname}_{folder}.json 已写")
- except Exception as e:
- print(f" ⚠️ build_workflows 失败 ({qname}/{folder}): {e}")
- for qname, fkey, r, norm, raw in plan:
- cid = r.get("case_id") or "unknown"
- name = f"{fkey}_{_short_case(cid)}"
- out_dir = runs_full / qname / "procedures" / name
- # dry-run 绝不碰磁盘 (建目录会污染状态), 只报会不会跑
- if args.dry_run:
- mark = ("已完成" if _already_done(out_dir)
- else "已占(在跑/跑挂)" if out_dir.exists() else "would run")
- print(f" [dry-run] {mark}: {qname}/{name}/")
- continue
- # ── 原子认领: mkdir(exist_ok=False) 内核级唯一, 谁先建谁跑 ──────────────
- # 多窗口跑同一指令时, 同一条 procedure 只有一个窗口能建成目录、其余撞 FileExistsError
- # → 跳过 (那条已被别窗口领走 / 或是历史跑挂的残目录)。--force 时允许覆盖已存。
- try:
- out_dir.mkdir(parents=True, exist_ok=args.force)
- except FileExistsError:
- if _already_done(out_dir):
- _emit_workflow(qname, name) # 跑完但没合并的, 顺手补进 workflows/
- print(f" ⏭️ {qname}/{name}/ 已完成, 跳过")
- else:
- print(f" ⏭️ {qname}/{name}/ 已被别窗口认领 / 或历史跑挂, 跳过")
- n_skipped += 1
- continue
- # 认领成功 (此刻 out_dir 归本窗口) → 写 _source.json + _meta.json
- src_path = out_dir / "_source.json"
- src_path.write_text(
- json.dumps(_source_to_dsl_input(r), ensure_ascii=False, indent=2),
- encoding="utf-8")
- _write_meta(out_dir, case_id=cid, from_q=qname, form=fkey, score=raw, score_norm=norm)
- try:
- code = run_one(src_path, out_dir, args)
- except KeyboardInterrupt:
- print(f"\n⚠️ Ctrl-C 中断, {qname}/{name}/ 保留 (含 _meta.json), 下次跑会跳过这条; 想复跑用 --force 或手动删 dir")
- raise
- if code == 0:
- n_ran += 1
- print(f" ✓ {qname}/{name} done")
- _emit_workflow(qname, name) # 跑完一个就出一个, 不等同 q 其他帖子
- else:
- n_failed += 1
- print(f" ❌ {qname}/{name} exit={code} (目录已占; 想重试 --force 或 手动 rm 该 dir)")
- print(f"\n{'='*60}")
- print(f"📊 完成: 跑 {n_ran} / 跳过(已完成或别窗口领走) {n_skipped} / 失败 {n_failed} "
- f"(累计目标 top-{args.top_n}, 本次预算 {budget}, 选中 {len(plan)})")
- if __name__ == "__main__":
- main()
|