#!/usr/bin/env python3 """ run_procedure_dsl.py — 拿 procedure-dsl/spec.md 当指令本, 让 Claude Agent SDK (走 ~/.claude OAuth Max 订阅额度) 对单个 post 跑完三阶段提取, 落 case-N.{md,html}. 设计: - 复用 Agent 仓 examples/process_pipeline/run_pipeline.py --use-claude-sdk 的 OAuth 思路: 子进程 env 把 ANTHROPIC_API_KEY / BASE_URL / AUTH_TOKEN 显式置空, 让 claude CLI 回落到 OAuth 凭证. - 与 agent/llm/claude_code_oauth.py 的 one-shot wrapper 不同: 那个 max_turns=1 + allowed_tools=[], 本脚本让 Agent 真正自主跑——开 Read/Write/Bash/Edit/Glob/Grep, multi-turn, 自己读 spec.md / source / 调 bin/taxonomy-lookup.py / 写产物. - 单 post 输入, 多终端 = 多并行 (用户自己开). 用法 (本脚本位于 procedure-dsl/, SDK cwd 默认设到 procedure-dsl/): # source 是 input/case-N-raw.json, 脚本自动从 image_url_list 拉图作多模态. # --out-dir 必填: 是 outputs/ 下的目录名, 产物全落这里; case_id 自动从它的 basename 推. python run_procedure_dsl.py input/case-2-raw.json --out-dir case-2 python run_procedure_dsl.py input/case-2-raw.json --out-dir case-2 \\ --extra-image /path/to/local-ref.png --model claude-sonnet-4-6 # 跑实验版 spec-test/ 对比 (产物落 outputs/case-2_test/): python run_procedure_dsl.py input/case-2-raw.json --out-dir case-2_test --version test # 中断后恢复 (产物保留, agent 重读磁盘接着跑): python run_procedure_dsl.py input/case-2-raw.json --out-dir case-2 --resume source 文件 schema (procedure-dsl/input/*.json 约定): { "title": str, "link": str, "body_text": str, "image_url_list": [{"image_type": int, "image_url": str}, ...], "publish_timestamp": ..., "channel_account_name": str, } """ import argparse import asyncio import base64 import json import logging import sys import time from datetime import datetime from pathlib import Path from typing import Any, Dict, List # run_procedure_dsl.py → procedure-dsl/ DSL_ROOT = Path(__file__).resolve().parent def _derive_case_id(out_dir: str) -> str: """从 --out-dir 自动派生 case_id (用于 prompt 文件名 + case_data.case_id + suggestions record). 规则: Path(out_dir).name 去掉 "case-" 前缀 (--out-dir 是 outputs/ 下的相对名). --out-dir case-5 → "5" --out-dir case-5-newflow → "5-newflow" --out-dir photo-album → "photo-album" (无 case- 前缀也行) --out-dir case-11 → "11" 这样用户传 --out-dir 控制目录名, case_id 自动跟随 — 不再撞历史 case 目录. """ name = Path(out_dir).name if name.startswith("case-"): name = name[5:] return name or "?" def _resolve_out_dir(out_dir_arg: str, workdir: Path) -> Path: """把 --out-dir 解析成实际工作目录. --out-dir 是 `outputs/` 下的相对名: --out-dir newdir → /outputs/newdir --out-dir case-5 → /outputs/case-5 --out-dir outputs/newdir → /outputs/newdir (容忍已带 outputs/ 前缀, 不重复嵌套) 传绝对路径则原样用 (escape hatch). """ p = Path(out_dir_arg) if p.is_absolute(): return p.resolve() parts = p.parts if parts and parts[0] == "outputs": # 容忍历史习惯的 outputs/ 前缀 parts = parts[1:] base = workdir / "outputs" return (base / Path(*parts)).resolve() if parts else base.resolve() # Sonnet 4.6 公开价目 (per 1M tokens, USD), 用于 estimated cost. # OAuth Max 模式 SDK 报的 total_cost_usd 通常是 None — 我们自己按 token 算个 # "如果走 API 等价多少钱"作为 quota 消耗的直观代理. PRICE_SONNET_4_6 = { "input": 3.00, "output": 15.00, "cache_creation": 3.75, "cache_read": 0.30, } def _estimate_cost_usd(usage: Dict[str, Any]) -> float: if not usage: return 0.0 return ( usage.get("input_tokens", 0) / 1_000_000 * PRICE_SONNET_4_6["input"] + usage.get("output_tokens", 0) / 1_000_000 * PRICE_SONNET_4_6["output"] + usage.get("cache_creation_input_tokens", 0) / 1_000_000 * PRICE_SONNET_4_6["cache_creation"] + usage.get("cache_read_input_tokens", 0) / 1_000_000 * PRICE_SONNET_4_6["cache_read"] ) # ──── 多模态: 图片 → Anthropic content block ────────────────────────────────── _MEDIA_TYPE = { ".png": "image/png", ".jpg": "image/jpeg", ".jpeg": "image/jpeg", ".gif": "image/gif", ".webp": "image/webp", } # client-side image cache: DSL_ROOT/.image_cache/. # 同一 URL 多次跑 / resume 不重复下载. 不影响 SDK / OAuth, 纯本地 IO. _IMAGE_CACHE_DIR = DSL_ROOT / ".image_cache" # ── 多图压缩策略 (防止几十张图把单轮 output 撑破 32K/64K 上限) ────────────────── # 阶梯: 图少 → 单图只降分辨率; 图多 (>OVER) → 拼 3×3 九宫格把"图块数"压 9:1, 模型逐图 # 铺陈的冲动小了, 骨架/output 也跟着小. (跳过极多图帖那一档在 batch 选片侧做, 见 # batch_extract_procedures.py 的 _IMG_SKIP_CAP — 那边知道图数又控制 top-N 名额回填.) _IMG_MAX_EDGE = 1024 # 单图降分辨率: 长边上限 px _IMG_MONTAGE_OVER = 18 # 图数 > 此值 → 拼九宫格 (与 batch 选片阈值需各自维护, 语义不同) _IMG_GRID_CELL = 512 # 九宫格每格 px → 3×3 拼图长边 1536, 压在 Anthropic 1568 上限内 # 浏览器 UA, 避免 mmbiz / 其他图床对裸 python-requests 默认 UA 直接 403. _DOWNLOAD_UA = ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 " "(KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36" ) def _url_to_cached_path(url: str, timeout: float = 15.0) -> Path: """下载 url 到 _IMAGE_CACHE_DIR, 返回本地路径. 已缓存直接返回不重下. 用 URL 的 sha256 当文件名, 扩展名从 URL path 推断 (找不到默认 .png). 设计目的: 绕过 Anthropic 服务端对 image URL 的 robots.txt 检查 (mmbiz.qpic.cn 等图床 robots disallow 会让 API 直接 400). """ import hashlib from urllib.parse import urlparse from urllib.request import Request, urlopen _IMAGE_CACHE_DIR.mkdir(exist_ok=True) digest = hashlib.sha256(url.encode("utf-8")).hexdigest()[:24] ext = Path(urlparse(url).path).suffix.lower() if ext not in _MEDIA_TYPE: ext = ".png" local = _IMAGE_CACHE_DIR / f"{digest}{ext}" if local.exists() and local.stat().st_size > 0: return local req = Request(url, headers={"User-Agent": _DOWNLOAD_UA, "Accept": "image/*,*/*;q=0.8"}) with urlopen(req, timeout=timeout) as resp: data = resp.read() local.write_bytes(data) return local def _resolve_local(ref: str) -> Path: """图 ref → 本地路径: URL 先下载进缓存 (绕 robots.txt), 本地路径校验存在。""" if ref.startswith(("http://", "https://")): return _url_to_cached_path(ref) p = Path(ref).expanduser().resolve() if not p.exists(): raise FileNotFoundError(f"image not found: {ref}") return p def _pil_to_block(im: Any) -> Dict[str, Any]: """PIL.Image → Anthropic base64 content block (统一 JPEG q90, 体积可控)。""" import io buf = io.BytesIO() im.save(buf, format="JPEG", quality=90) return { "type": "image", "source": {"type": "base64", "media_type": "image/jpeg", "data": base64.standard_b64encode(buf.getvalue()).decode()}, } _OCR_ENGINE: Any = None _OCR_UNAVAILABLE = False def _ocr_image(im: Any) -> str: """对一张 PIL.Image 做全分辨率 OCR, 返回拼接后的文字 (失败/无字 → 空串)。 九宫格会把图压到 512px 小字糊掉, 故在缩放前先 OCR 把文字走文本通道保住。 RapidOCR 未装 → 整体禁用 OCR (只此一次, 之后不再尝试), 退化成纯拼图。""" global _OCR_ENGINE, _OCR_UNAVAILABLE if _OCR_UNAVAILABLE: return "" try: if _OCR_ENGINE is None: from rapidocr_onnxruntime import RapidOCR _OCR_ENGINE = RapidOCR() import numpy as np result, _ = _OCR_ENGINE(np.array(im)) if not result: return "" # result: [[box, text, score], ...] 按检测顺序 (大致从上到下) → 直接拼 return " ".join(seg[1] for seg in result if seg and len(seg) >= 2 and seg[1]).strip() except ImportError: _OCR_UNAVAILABLE = True print("[ocr] ⚠️ rapidocr 未装, 本次禁用 OCR (九宫格仅拼图无文字旁挂)", flush=True) return "" except Exception as e: print(f"[ocr] skip 1 张 ({type(e).__name__}: {e})", flush=True) return "" def _format_ocr_sidecar(ocr_texts: List[str]) -> str: """把每图 OCR 文字按"拼图#·格#"标注成一个文本块 (跟九宫格阅读顺序对齐)。 全空 → 返回空串 (不加块)。i 从 0 起: grid=i//9, cell=i%9 (逐行 左→右 上→下)。""" lines = [] for i, t in enumerate(ocr_texts): if not t: continue lines.append(f"■ 拼图{i // 9 + 1}·格{i % 9 + 1}: {t}") if not lines: return "" return ("【配图 OCR 文字】图已压成九宫格, 小字可能糊; 以下是各格**全分辨率** OCR 原文, " "与拼图位置一一对应 (格号即拼图内 从左到右、从上到下 的序), 用它补全图里看不清的" "菜单/参数/prompt 文字:\n" + "\n".join(lines)) def _montage(pil_imgs: List[Any], cell: int) -> List[Any]: """把图按每 9 张拼成一张 3×3 九宫格 (每格 cell px, 白底居中)。返回拼图列表。 最后一组不足 9 张, 余下格子留白。阅读顺序: 逐行 从左到右、从上到下 (封面=第一张拼图左上)。""" from PIL import Image grids: List[Any] = [] for i in range(0, len(pil_imgs), 9): canvas = Image.new("RGB", (cell * 3, cell * 3), (255, 255, 255)) for j, im in enumerate(pil_imgs[i:i + 9]): t = im.copy() t.thumbnail((cell, cell)) # 等比缩进格子 row, col = divmod(j, 3) canvas.paste(t, (col * cell + (cell - t.width) // 2, row * cell + (cell - t.height) // 2)) grids.append(canvas) return grids def _append_image_blocks_raw(blocks: List[Dict[str, Any]], images: List[str]) -> None: """回落路径 (PIL 不可用时): 逐图原样 base64, 不降分辨率/不拼图。""" n_ok = n_fail = 0 for ref in images: try: local = _resolve_local(ref) blocks.append({ "type": "image", "source": {"type": "base64", "media_type": _MEDIA_TYPE.get(local.suffix.lower(), "image/png"), "data": base64.standard_b64encode(local.read_bytes()).decode()}, }) n_ok += 1 except Exception as e: n_fail += 1 print(f"[image] skip {ref[:80]}... ({type(e).__name__}: {e})", flush=True) if images: print(f"[image] (无 PIL) {n_ok}/{len(images)} 原样 base64, {n_fail} 失败跳过", flush=True) def _append_image_blocks(blocks: List[Dict[str, Any]], images: List[str]) -> None: """把 images 转成 content block 加到 blocks,按阶梯压缩 (见 _IMG_* 常量): ≤OVER 张 → 逐图降分辨率单发; >OVER 张 → 拼 3×3 九宫格减块数。 单张下载/解码失败不阻塞整批。PIL 不可用 → 回落原样 base64。 base / variant driver 共用 — 保证 image 容错策略一致。""" if not images: return try: from PIL import Image except Exception: _append_image_blocks_raw(blocks, images) return pil_imgs: List[Any] = [] n_fail = 0 for ref in images: try: pil_imgs.append(Image.open(_resolve_local(ref)).convert("RGB")) except Exception as e: n_fail += 1 print(f"[image] skip {ref[:80]}... ({type(e).__name__}: {e})", flush=True) n = len(pil_imgs) if n == 0: print(f"[image] 0/{len(images)} 可用 ({n_fail} 失败)", flush=True) return if n > _IMG_MONTAGE_OVER: ocr_texts = [_ocr_image(im) for im in pil_imgs] # 缩放前全分辨率 OCR 保文字 grids = _montage(pil_imgs, _IMG_GRID_CELL) for g in grids: blocks.append(_pil_to_block(g)) sidecar = _format_ocr_sidecar(ocr_texts) if sidecar: blocks.append({"type": "text", "text": sidecar}) n_ocr = sum(1 for t in ocr_texts if t) print(f"[image] {n} 张 > {_IMG_MONTAGE_OVER} → 拼九宫格 {len(grids)} 张 " f"(每张≤9图·格{_IMG_GRID_CELL}px) + {n_ocr} 格 OCR 文字旁挂, " f"{n_fail} 失败跳过", flush=True) else: for im in pil_imgs: im.thumbnail((_IMG_MAX_EDGE, _IMG_MAX_EDGE)) # 等比降分辨率 (小图不放大) blocks.append(_pil_to_block(im)) print(f"[image] {n} 张单图 (降分辨率≤{_IMG_MAX_EDGE}px), {n_fail} 失败跳过", flush=True) def _image_prompt_note(n_images: int) -> str: """配图行文案: 跟 _append_image_blocks 的阶梯保持一致, 拼图时务必告诉模型 它收到的是九宫格 (否则会以为有 N 张独立图, 引用错位 / 逐图铺陈撑爆 output)。""" if n_images == 0: return "本消息未附图." if n_images > _IMG_MONTAGE_OVER: import math n_grid = math.ceil(n_images / 9) return (f"本消息附了 {n_grid} 张【九宫格拼图】—— 由原始 {n_images} 张配图按 3×3 网格拼接 " f"(每张最多含 9 个子图, 已统一降分辨率)。原图过多, 拼图是为压 token; " f"分析时把**每个格子当一张独立配图**看, 阅读顺序: 每张拼图内 从左到右、从上到下逐行; " f"封面图在第一张拼图的左上格 (老格式 image_type=2 / 新格式原序第一张)。" f"图后另附一个【配图 OCR 文字】文本块 (各格全分辨率 OCR), 拼图里小字糊掉时**以 OCR 文字为准**.") return (f"本消息附了 {n_images} 张图作多模态内容 (已降分辨率; URL 抽自老格式的 image_url_list " f"或新格式的 images; 老格式 image_type=2 封面排最前, 新格式按原序).") # ──── source JSON → 自动拉 image_url_list ──────────────────────────────────── def _images_from_source(source_path: Path) -> List[str]: """从 procedure-dsl/input/*.json 抽 URL 列表, 兼容两套 schema: 1. 老 case-*-raw.json: `image_url_list: [{image_type, image_url}, ...]`, image_type=2 (封面) 排最前, 其余维持原序. 2. 新 eval_case-*.json: `images: [url_str, ...]`, 无封面信号, 原序返回. schema 错配时如果检测到疑似图字段但抽不到任何 URL, 会打 warning — multimodal pipeline 静默 0 张图比报错更难排查. """ if source_path.suffix.lower() != ".json": return [] try: with source_path.open(encoding="utf-8") as f: data = json.load(f) except (json.JSONDecodeError, OSError): return [] if not isinstance(data, dict): return [] # 新 schema (eval_case-*.json): 裸 URL 数组 flat = data.get("images") if isinstance(flat, list): urls = [u for u in flat if isinstance(u, str) and u] if not urls and flat: print(f"[image] ⚠️ {source_path.name} 有 images 字段但抽不到 URL " f"(首项类型={type(flat[0]).__name__})", flush=True) return urls # 老 schema (case-*-raw.json): {image_type, image_url} dict 数组 items = data.get("image_url_list") if not isinstance(items, list): return [] covers, others = [], [] for it in items: if not isinstance(it, dict): continue url = it.get("image_url") if not isinstance(url, str) or not url: continue (covers if it.get("image_type") == 2 else others).append(url) return covers + others # ──── 初始 user 消息 (text + 图片 blocks) ───────────────────────────────────── def _build_initial_blocks( source_ref: str, case_id: str, out_dir_arg: str, images: List[str], workdir: Path, spec_name: str = "spec", ) -> List[Dict[str, Any]]: # 全部路径用绝对路径喂给 Agent — 它训练习惯就吃绝对, 顺水推舟省心. # 用 .as_posix() 统一 forward slash, 跟 f-string 拼接的 "/..." 一致. # spec_name 由 --version 决定 (spec / spec-backup / spec-test ...), 默认 "spec". spec_dir = (workdir / spec_name).as_posix() spec_readme = (workdir / spec_name / "README.md").as_posix() spec_tools = (workdir / spec_name / "tools.md").as_posix() # --out-dir 是 outputs/ 下的相对名 (--out-dir newdir → outputs/newdir); 绝对路径原样用. _out_dir = _resolve_out_dir(out_dir_arg, workdir) case_dir = _out_dir.as_posix() scratch_dir = (_out_dir / "_scratch").as_posix() # source 路径直接用 source_ref (用户传的 positional arg). 不再从 case_id 推 — # 否则当 --out-dir 跟 source 文件名不对齐时 (e.g. --out-dir outputs/update + source=input/case-5.json), # Agent 会收到 "input/case-update-raw.json" 这种不存在路径, 浪费 10+ turn 自我探索. _src = Path(source_ref) case_input = _src.as_posix() if _src.is_absolute() else (workdir / _src).resolve().as_posix() text = f"""请按 {spec_name}/ 目录里的 SKILL 处理这个 post. ## 起手指令 (路径已给绝对值, 直接照搬, 不要改, 不要先 find 探查) 1. `Read(file_path="{spec_readme}")` — self-driven skill 的入口, 含完整 phase 加载指南 (累积式) + 自查清单 + 工具调用规则. 读完不要再 Read 它. 2. `Read(file_path="{spec_tools}")` — 外部脚本接口手册. 读完不要重读. 3. 按 README phase 加载指南**累积式**前进, 中间产物是**单个 `workflow.json` 文件**, 各 phase 都 in-place Edit 它 (不写多个中间快照): - Phase 1.1 心智模型 → `{case_dir}/understanding.md` (含 procedure 数量判断) - Phase 1.2 骨架 → **Write** `{case_dir}/workflow.json` (procedures 数组骨架) - Phase 1.3 IO 闭合 → **Edit** workflow.json 加 anchor - Phase 2A/2B/2C 归一化 → **Edit** workflow.json 加 effect/action/type/sub/form + procedures[i].type_registry - Phase 3 lint + 渲染 → 调 render-case.py + lint-case.py (不写 case_data.json, renderer 内存组装) - Phase 3 .md → Write `{case_dir}/case-{case_id}-.md` ## ❌ 重复读取禁令 (CRITICAL: ZERO REPEATED READS) 你拥有完美的长期记忆(由于 Context 累积,你读取过的所有文件内容会永远保留在你的 Context 中)。 请**绝对不要**重复读取任何文件!任何重复的 `Read` 动作都是对 Token 和回合数 Budget 的极大浪费。 - **禁忌 1**:不要因为看到 spec 文档中写了 `详见 [tools.md §2]` 就去重新 `Read(file_path="{spec_tools}")`。你已经在 Turn 3 读过它了,直接检索你的记忆! - **禁忌 2**:不要因为进入了 Phase 2B,就去重新 `Read` 任何 Phase 2 的 spec 文件(如 `phase2-normalize.md`)。你已经在 Phase 2A 开始时读过它了,它就在你的记忆中,直接使用它! - 在发出任何 `Read` 指令前,必须自我核对:“我之前读过这个文件吗?”。如果读过,绝对不要再次 Read! ## 输入 - case 原文: `{case_input}` schema (两种都可能): · 老格式 (case-*-raw.json): {{title, link, body_text, image_url_list:[{{image_type,image_url}}], publish_timestamp, channel_account_name}} · 新格式 (eval_case-*.json): {{title, link, body_text, images:[url_str], videos, channel, content_type, like_count, publish_timestamp, channel_content_id, ...}} 正文 (body_text) 和元数据从此文件 Read; 两种 schema 的 body_text 含义一致. - 配图: {_image_prompt_note(len(images))} ## 输出目录 `{case_dir}/` (这一 case 的所有产物都放这里, 不要污染其他目录) ## 读取范围 (硬约束) 你**只能** Read 以下三处 (用绝对路径 + 子路径): 1. `{spec_dir}/` 及其所有子目录 (skill 全部内容: README.md / tools.md / 各 phase 规格文件 / output/ / taxonomy/ / templates/ / tools/ — 但 tools/*.py **不要 Read 源码**, 只通过 Bash 调用; 具体子目录名以 {spec_name}/README.md 的章节地图为准, **不要脑补 part1-/part2- 这类对称目录**) 2. `{case_input}` (当前 case 的原文, 只此一个) 3. `{case_dir}/` 目录 (你自己的工作产物, 含其下 `_scratch/`) ## 长输出 dump 区 (sanctioned scratch) 如果你需要 dump 大 Bash 输出做后续分析 (e.g. `taxonomy-lookup --subtree` 输出过大想分段读, 或 `find` 结果想保留), **只写到 `{scratch_dir}/`** (runner 已预创建): ✅ `python {spec_dir}/tools/taxonomy-lookup.py --dim 实质 --subtree /表象/视觉 > {scratch_dir}/subst_visual.txt` ✅ 然后 `Read(file_path="{scratch_dir}/subst_visual.txt")` 提取需要的段 ✅ 一次性小探查 / smoke test (验证某条 tool 命令、看某段数据长啥样) 也可以放 `{scratch_dir}/`, 用完即弃 ❌ **不要用 Python 脚本生成 / 批量改写 `workflow.json`** — 它由你**直接 Write 骨架 + 逐字段填** 演化 (见各 phase 规格). 写 build_workflow / add_anchors / normalize 这类脚本去拼 JSON, 反而踩转义 / 控制字符坑 (LLM 拼长 JSON 极易坏), 还把本该逐 step 的语义判断埋进一次性脚本里. 几十处字段要批量改 → 用 **`spec/tools/wf-patch.py`** (path=value 清单, 工具负责安全写 JSON + 写入即校验; 见 {spec_name}/tools.md §2), 零星单处用 Edit. `_scratch/` 的 Python **只用于「dump 大 tool 输出」和「smoke test」, 不是 workflow.json 的生产线**. ❌ **不要写到项目根 `scratch/`** (那是 dev/repo 维护用的临时区, 禁区) ❌ **不要 ls / Read 项目根 `scratch/`** (别人的 dev 临时文件, 跟你无关; 你的 scratch 在上面的 `_scratch/`) **禁读区** (不要 Read 也不要 ls 这些位置): 项目根的 spec.md / design.md (旧文件), examples/ 全目录, input/ 下其他 case-*-raw.json, outputs/case-OTHER/ (其他 case 目录), bin/ 全目录 (已废弃), **项目根 `scratch/`** (跟你的 `_scratch/` 不是一回事 — 上面 ✅ 那个 `_scratch/` 作用类似 outputs 目录下的 `_scratch/`). ## 阶段三流程 (脚本组装, 不写 case_data.json) 新工作流: Phase 2 已经把所有结构化数据 in-place Edit 进 `{case_dir}/workflow.json`. Phase 3 直接调 2 个脚本命令, **不需要写 case_data.json** (renderer 在内存把 workflow + source-input + page_title + case_id 组装成 case_data 喂给 build_html). ```bash # Step A: lint + 自动 record 新 type 到 type_suggestions.md python {spec_dir}/tools/lint-case.py --workflow {case_dir}/workflow.json --case-id {case_id} # stdout 报 "type 完整性: N 个提示" 时, 回 Phase 2 Edit workflow.json 补 procedures[i].type_registry, 重跑 lint # stdout 报 "已 record M 条新 type" 时, 表示 {spec_name}/taxonomy/type_suggestions.md 已自动同步 # Step B: 渲染 HTML (--source-input 必带, --page-title --case-id 必带) python {spec_dir}/tools/render-case.py \ --workflow {case_dir}/workflow.json \ --source-input {case_input} \ --page-title "Case {case_id} · <主题>" \ --case-id {case_id} \ --out {case_dir}/case-{case_id}-.html # Step C: Write {case_dir}/case-{case_id}-.md (DSL 文本版, 按 {spec_name}/output/md-structure.md §11 结构) ``` **`--source-input` 行为**: renderer 自动从 raw 抽 body_text + 封面图 + 图集兜底填到 case_data.source (内存里). 微信公众号长文走 inline 图 + 封面; 小红书短文走 body + "--- 附图 ---" 末尾追加非封面图. 你**不必手工**复制 raw.body_text — 完全交给 renderer. `workflow.json` 的契约见 [{spec_name}/output/case-data.schema.json]({spec_name}/output/case-data.schema.json) 里 Procedure 的 definition (顶层 `{{procedures: [{{id, name, purpose, declarations, type_registry?, steps, return_row?}}]}}`). **不要参考其他 `outputs/case-*/` 下任何文件**. ## 其他约束 - 推断补全用 `inferred: true` + `inferred_reason` 标在 IO item 上, 不要静默插入. - 完成后用一段话总结: 工序梗概 + 输出文件路径 + 你对 DSL 的关键发现. """ blocks: List[Dict[str, Any]] = [{"type": "text", "text": text}] _append_image_blocks(blocks, images) return blocks # ──── Trace 写盘 (outputs/case-N/_trace.md) ─────────────────────────────────── # 每次 run 把 turn / tool_use / text / result 写到一份 markdown 流水, 实时 append. # Ctrl-C 后还能离线 review Agent 走过哪些 turn / 读了哪些文件 / 调了哪些 Bash. def _fmt_tool_input(name: str, inp: Any) -> str: """tool_use input → 人读 1 行摘要.""" if not isinstance(inp, dict): return f"`{repr(inp)[:200]}`" if name == "Read": suffix = "" if inp.get("offset") or inp.get("limit"): suffix = f" (offset={inp.get('offset', 0)}, limit={inp.get('limit', '-')})" return f"`{inp.get('file_path', '')}`{suffix}" if name == "Write": return f"`{inp.get('file_path', '')}` ({len(inp.get('content', '')):,} chars)" if name == "Edit": return f"`{inp.get('file_path', '')}`" if name == "Bash": cmd = inp.get("command", "") if len(cmd) > 200: cmd = cmd[:200] + "..." return f"`{cmd}`" if name == "Grep": return f"pattern=`{inp.get('pattern', '')}` path=`{inp.get('path', '')}`" if name == "Glob": return f"pattern=`{inp.get('pattern', '')}`" if name in ("Task", "Agent"): return f"`{inp.get('description', '')}` [subagent={inp.get('subagent_type', '?')}]" preview = str(inp) return f"`{preview[:200] + '...' if len(preview) > 200 else preview}`" def _trace_banner(trace_path: Path, args, resume_sid, source_path, n_images: int) -> None: """session 起手 banner. Append 模式 — 历次 run 累积在同一 _trace.md.""" trace_path.parent.mkdir(parents=True, exist_ok=True) now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") if resume_sid: chunk = ( f"\n\n---\n\n## ▶ Resume @ {now}\n\n" f"恢复 session `{resume_sid[:8]}` (case-{args.case_id})\n" ) else: chunk = ( f"\n\n---\n\n## ▶ Fresh @ {now}\n\n" f"- case: `{args.case_id}`\n" f"- source: `{source_path}`\n" f"- spec: `{getattr(args, 'spec_name', 'spec')}`\n" f"- images: `{n_images}`\n" f"- model: `{args.model}`\n" f"- max_turns: `{args.max_turns}`\n" ) with trace_path.open("a", encoding="utf-8") as f: f.write(chunk) def _trace_init(trace_path: Path, sid: str, data: Dict[str, Any]) -> None: """SDK init → 写 session_id / model 到 trace.""" line = ( f"\n_session={sid[:8]} · model={data.get('model')!r} " f"· apiKeySource={data.get('apiKeySource')!r}_\n" ) with trace_path.open("a", encoding="utf-8") as f: f.write(line) def _trace_turn(trace_path: Path, turn: int, msg: Any) -> None: """每个 AssistantMessage → text + tool_use 摘要追加到 trace.""" now = datetime.now().strftime("%H:%M:%S") parts = [f"\n\n### Turn {turn} · {now}\n"] texts: List[str] = [] tools: List[tuple] = [] for block in msg.content: if hasattr(block, "text"): t = block.text.strip() if t: texts.append(t) elif hasattr(block, "name") and hasattr(block, "input"): tools.append((block.name, block.input)) # thinking blocks 跳过 if texts: for t in texts: for line in t.split("\n"): parts.append(f"> {line}\n" if line else ">\n") parts.append("\n") if tools: for name, inp in tools: parts.append(f"- `{name}` — {_fmt_tool_input(name, inp)}\n") with trace_path.open("a", encoding="utf-8") as f: f.write("".join(parts)) def _trace_result(trace_path: Path, msg: Any, elapsed: float, usage: Dict[str, Any], est_cost: float) -> None: """ResultMessage → final summary 追加到 trace.""" now = datetime.now().strftime("%H:%M:%S") sdk_cost = msg.total_cost_usd sdk_cost_str = f"${sdk_cost:.4f}" if sdk_cost is not None else "None (OAuth Max)" chunk = ( f"\n\n### ◀ Result · {now}\n\n" f"- subtype: `{msg.subtype}` · is_error: `{msg.is_error}`\n" f"- num_turns: `{msg.num_turns}` · duration: `{msg.duration_ms}ms` · wall: `{elapsed:.1f}s`\n" f"- tokens: in={usage.get('input_tokens', 0):,} " f"out={usage.get('output_tokens', 0):,} " f"cache_w={usage.get('cache_creation_input_tokens', 0):,} " f"cache_r={usage.get('cache_read_input_tokens', 0):,}\n" f"- cost: sdk={sdk_cost_str}, est_if_api=${est_cost:.4f}\n" ) with trace_path.open("a", encoding="utf-8") as f: f.write(chunk) # ──── 主流程: 跑 ClaudeSDKClient ────────────────────────────────────────────── async def run(args: argparse.Namespace) -> None: from claude_agent_sdk import ( AssistantMessage, ClaudeAgentOptions, ClaudeSDKClient, ClaudeSDKError, RateLimitEvent, ResultMessage, TextBlock, AgentDefinition, ) workdir = Path(args.workdir or DSL_ROOT).resolve() if not workdir.exists(): sys.exit(f"❌ workdir not found: {workdir}") source_path = Path(args.source).expanduser().resolve() if not source_path.exists(): sys.exit(f"❌ source not found: {source_path}") auto_imgs = _images_from_source(source_path) extra_imgs = args.extra_image or [] images = auto_imgs + extra_imgs # 给 Agent 的路径优先用 workdir 相对路径 (它的 cwd 就是 workdir), # 不在 workdir 内才回落到绝对路径. try: source_for_agent = source_path.relative_to(workdir).as_posix() except ValueError: source_for_agent = str(source_path) # --out-dir 是 outputs/ 下的相对名 (--out-dir newdir → outputs/newdir); 绝对路径原样用. out_dir = _resolve_out_dir(args.out_dir, workdir) out_dir.mkdir(parents=True, exist_ok=True) sid_file = out_dir / ".session_id" # 预创建 sanctioned scratch 目录, 让 Agent 看到就在那, 不用犹豫能不能 mkdir (out_dir / "_scratch").mkdir(exist_ok=True) resume_sid = None if args.resume: if not sid_file.exists(): sys.exit(f"❌ --resume 但未找到 {sid_file}; 先正常跑一次建立 session") resume_sid = sid_file.read_text(encoding="utf-8").strip() or None if not resume_sid: sys.exit(f"❌ {sid_file} 为空, 无法 resume") try: rel_out = out_dir.relative_to(workdir).as_posix() except ValueError: rel_out = out_dir.as_posix() if resume_sid: # resume 时不重发图和原始指令 (历史里都有); 只给一段 "接着做" 增量. # 关键: 提醒 Agent 用户可能改过中间产物, 必须 Read 当前磁盘版本. blocks = [{"type": "text", "text": ( f"上次中断了, 接续做 case-{args.case_id} 的提取流程.\n\n" f"先 ls {rel_out}/ 看当前已落盘哪些产物;\n" f"用户可能在中断期间编辑过任何中间产物 (understanding.md / workflow.json) " f"或改过 spec/ 内任何文件 — 务必 Read 这些**当前磁盘版本**, " f"不要凭之前记忆继续. 如发现明显人工修订痕迹, 沿用用户改过的版本.\n\n" f"⚠️【重要禁令与强制要求】:如果流程进行到 Phase 2(归一化与分类匹配),主 Agent **绝对禁止**手动调用 taxonomy-lookup.py 查询或手动决策!你必须强制阅读最新的 `spec/extraction/phase2-normalize.md` 规范,通过运行 `prepare-subtask.py` 生成物理任务切片,然后调用 `Agent`(或 `Task`)工具将任务分别分发给预定义好的 `phase-2a-normalizer` 和 `phase-2b-matcher` 子 Agent 并行协作执行!" )}] else: blocks = _build_initial_blocks(source_for_agent, args.case_id, args.out_dir, images, workdir, args.spec_name) print(f"[setup] workdir = {workdir}") print(f"[setup] version = {args.version if args.version else '(default)'}") print(f"[setup] spec dir = {args.spec_name}/") print(f"[setup] source = {source_path}") print(f"[setup] → as agent = {source_for_agent}") print(f"[setup] case_id = {args.case_id}") print(f"[setup] output dir = {out_dir}") print(f"[setup] auto images = {len(auto_imgs)} (from image_url_list)" if not resume_sid else f"[setup] auto images = -- (resume 跳过)") print(f"[setup] extra images = {len(extra_imgs)} {extra_imgs if extra_imgs else ''}" if not resume_sid else f"[setup] extra images = -- (resume 跳过)") print(f"[setup] model = {args.model}") print(f"[setup] max_turns = {args.max_turns}") if resume_sid: print(f"[setup] resume = {resume_sid[:8]} (跳过初始 prompt + 图)") else: print(f"[setup] resume = no (fresh start)") print(flush=True) # 起手写 trace banner. 之后每 turn / result 都会 append 到这个 _trace.md. trace_path = out_dir / "_trace.md" _trace_banner(trace_path, args, resume_sid, source_path, len(images)) print(f"[trace] -> {trace_path}", flush=True) stderr_buf: List[str] = [] def _capture_stderr(line: str) -> None: if line: stderr_buf.append(line) print(f"[stderr] {line}", flush=True) agents = { "phase-2a-normalizer": AgentDefinition( description="Expert in Phase 2A (effect/action/type normalization). Use this agent to read task_2a.json, normalize action/effect/type against spec trees, manage procedure-level type_registry, and generate outputs/case-N/_scratch/patch_2a.json.", prompt="""You are a dedicated Phase 2A normalization sub-agent. Your goal is to process the inputs and outputs of a workflow for effect, action, and type normalization: 1. Read the outputs/case-N/_scratch/task_2a.json file to get the steps and IO variables. 2. Normalize every step's `effect` and `action` against the taxonomy specs in `spec/taxonomy/effect.json` and `spec/taxonomy/action.json`. 3. Normalize every IO variable's `type` against `spec/taxonomy/type.json`. If a custom type is used, register it in the procedure's `type_registry` with extends and description. 4. Output a standard `patch_2a.json` JSON file under outputs/case-N/_scratch/. IMPORTANT: The format of `patch_2a.json` MUST be a flat JSON array of objects, where each object has a "path" and a "value" key (exactly conforming to the `wf-patch.py` tool contract). Example format: [ {"path": "p1.s1.effect", "value": "预处理"}, {"path": "p1.s1.action", "value": "提取/化学提取/反推"}, {"path": "p1.s1.inputs[0].type", "value": "工具选型标准"}, {"path": "p1.type_registry.工具配置.extends", "value": "评语"}, {"path": "p1.type_registry.工具配置.desc", "value": "工具选型依据..."} ] Do not output raw dictionary structure or any other nesting. Do not touch or modify other files.""", tools=["Read", "Grep", "Glob", "Write", "Edit", "Bash"], model="sonnet", ), "phase-2b-matcher": AgentDefinition( description="Expert in Phase 2B (substance/form taxonomy matching). Use this agent to read task_2b.json, invoke taxonomy-lookup.py to query substance and form paths for each variable, and generate outputs/case-N/_scratch/patch_2b.json.", prompt="""You are a dedicated Phase 2B taxonomy matching sub-agent. Your goal is to query and match the substance and form for each workflow IO variable: 1. Read the outputs/case-N/_scratch/task_2b.json file to get the variables to match. 2. For each variable, run `python spec/tools/taxonomy-lookup.py --dim 实质 --match "..."` and `--dim 形式 --match "..."` to search for the most precise taxonomy paths matching the variable's value, name, and related_images. 3. Output a standard `patch_2b.json` JSON file under outputs/case-N/_scratch/. IMPORTANT: The format of `patch_2b.json` MUST be a flat JSON array of objects, where each object has a "path" and a "value" key (exactly conforming to the `wf-patch.py` tool contract). Substance and form values can be single string paths, multiple paths separated by ' + ', or JSON arrays of strings for multi-path matching. Example format: [ {"path": "p1.s1.inputs[0].substance", "value": "/理念/知识/思想/概念范畴/性质属性/功能效用"}, {"path": "p1.s1.inputs[0].form", "value": "/呈现/视觉/视觉制作/构图编排/版面设计/版面结构"}, {"path": "p1.s2.inputs[0].substance", "value": ["/理念/知识/商业/前沿技术/AI智能/AI应用", "/理念/知识/思想"]} ] Do not output raw dictionary structure or any other nesting. Do not touch or modify other files.""", tools=["Read", "Grep", "Glob", "Write", "Edit", "Bash"], model="sonnet", ) } options = ClaudeAgentOptions( model=args.model, cwd=str(workdir), resume=resume_sid, # ⚠️ 支持旧版 "Task" 与新版 "Agent" 标识符以确保在所有 Claude Code / SDK 版本中均可激活子 agent allowed_tools=["Read", "Write", "Edit", "Bash", "Glob", "Grep", "Task", "Agent"], agents=agents, max_turns=args.max_turns, permission_mode="bypassPermissions", setting_sources=[], env={ "ANTHROPIC_API_KEY": "", "ANTHROPIC_BASE_URL": "", "ANTHROPIC_AUTH_TOKEN": "", # 多图帖 (几十张图) 的提取响应容易顶破默认 32000 output token 上限 → # 调到 sonnet 4.x 的出参天花板 64000。外部已设同名环境变量则以这里为准。 "CLAUDE_CODE_MAX_OUTPUT_TOKENS": "64000", }, stderr=_capture_stderr, ) turn = 0 usage: Dict[str, Any] = {} t0 = time.time() try: async with ClaudeSDKClient(options=options) as client: async def _input_stream(): yield { "type": "user", "message": {"role": "user", "content": blocks}, "parent_tool_use_id": None, "session_id": "default", } await client.query(_input_stream()) async for msg in client.receive_response(): if isinstance(msg, AssistantMessage): turn += 1 for block in msg.content: if isinstance(block, TextBlock): print(f"\n[turn {turn} · text]\n{block.text}\n", flush=True) elif hasattr(block, "name") and hasattr(block, "input"): preview = str(block.input) if len(preview) > 200: preview = preview[:200] + "..." print(f"[turn {turn} · tool_use] {block.name}({preview})", flush=True) elif hasattr(block, "thinking"): pass # thinking 太长, 跳过 else: print(f"[turn {turn} · {type(block).__name__}] {block!r}", flush=True) _trace_turn(trace_path, turn, msg) elif isinstance(msg, ResultMessage): if msg.usage: usage = dict(msg.usage) elapsed = time.time() - t0 sdk_cost = msg.total_cost_usd sdk_cost_str = ( f"${sdk_cost:.4f}" if sdk_cost is not None else "None (OAuth Max — not metered as $)" ) est_cost = _estimate_cost_usd(usage) print( f"\n[result] subtype={msg.subtype} is_error={msg.is_error} " f"turns={msg.num_turns} duration={msg.duration_ms}ms wall={elapsed:.1f}s\n" f" tokens: in={usage.get('input_tokens', 0):,} " f"out={usage.get('output_tokens', 0):,} " f"cache_w={usage.get('cache_creation_input_tokens', 0):,} " f"cache_r={usage.get('cache_read_input_tokens', 0):,}\n" f" cost : sdk={sdk_cost_str}, est_if_api=${est_cost:.4f} " f"(Sonnet 4.6 价目)", flush=True, ) _trace_result(trace_path, msg, elapsed, usage, est_cost) if msg.is_error: print(f"❌ result is_error=True", file=sys.stderr) sys.exit(2) elif isinstance(msg, RateLimitEvent): info = getattr(msg, "rate_limit_info", None) info_status = getattr(info, "status", None) if info else None if info_status == "allowed_warning": print(f"⚠️ [rate_limit_warning] Max 订阅 5h 窗口余额较少, `claude /status` 看余量", file=sys.stderr) elif info_status and info_status not in ("allowed", "allowed_warning"): print(f"❌ [rate_limit_blocked] {info_status!r} — Max 订阅 5h 窗口已耗尽, `claude /status` 看余量", file=sys.stderr) sys.exit(3) else: name = type(msg).__name__ if name == "SystemMessage": data = getattr(msg, "data", {}) or {} subtype = getattr(msg, "subtype", "?") if subtype == "init": sid = data.get('session_id', '') or '' if sid: # 早写: 一拿到 sid 就落盘, Ctrl-C 再早也能 resume sid_file.write_text(sid, encoding="utf-8") _trace_init(trace_path, sid, data) print( f"[init] model={data.get('model')!r} " f"apiKeySource={data.get('apiKeySource')!r} " f"session={sid[:8]}", flush=True, ) except ClaudeSDKError as e: tail = "\n".join(stderr_buf[-20:]) print( f"❌ SDK error: {type(e).__name__}: {e}\n" f"--- CLI stderr (last 20 lines) ---\n{tail}", file=sys.stderr, ) sys.exit(1) # ──── CLI ───────────────────────────────────────────────────────────────────── def _parse_args() -> argparse.Namespace: p = argparse.ArgumentParser( description="跑 procedure-dsl 提取流程 (Claude Agent SDK, OAuth Max)", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=__doc__, ) p.add_argument("source", help="原始 post 文件 (input/case-N-raw.json; 也接受 .md/.txt)") p.add_argument("--extra-image", action="append", default=[], help="额外配图: 本地路径 or http(s) URL. 加在 image_url_list " "自动抽出的图后面. 可多次传.") p.add_argument("--out-dir", type=str, required=True, help="输出工作目录名, 落在 outputs/ 下. e.g. --out-dir case-5-newflow → 实际 outputs/case-5-newflow/ . " "Agent 所有产物 (understanding.md / workflow.json / case-X.html / .md) 都落在这. " "case_id 自动从 basename 推 (e.g. --out-dir case-5-newflow → case_id='5-newflow'). " "已带 outputs/ 前缀会被容忍 (不重复嵌套); 传绝对路径则原样用.") p.add_argument("--model", default="claude-sonnet-4-6", help="Claude 模型名 (default: claude-sonnet-4-6)") p.add_argument("--workdir", default=None, help=f"SDK cwd. 默认 {DSL_ROOT} (即本脚本所在 procedure-dsl/)") p.add_argument("--version", default=None, metavar="SUFFIX", help="spec 版本: 跑 workdir 下的 spec-/ 目录. " "e.g. --version backup → spec-backup/, --version test → spec-test/. " "不传则默认 spec/.") p.add_argument("--max-turns", type=int, default=300, help="Agent 最大回合数 (default: 300, 跑完整三阶段 + lint 留足余量)") p.add_argument("--resume", action="store_true", help="恢复中断的 session: 从 outputs/case-N/.session_id 读 sid, " "Agent 拿上次完整历史接着跑. 中断期间可手改任何中间产物, " "Agent 会重新 Read 磁盘版本而不是凭记忆.") p.add_argument("--max-retries", type=int, default=3, help="Stream idle / 临时错误自动 --resume 重试次数 (default: 3). " "退避 10s/20s/40s, 不重试 RateLimit (exit 3) 和 Ctrl-C.") return p.parse_args() def main() -> None: # Windows 默认控制台编码 (cp1252/gbk) 撑不住 spec.md / Agent 输出里的中文 + # emoji, 第一次打印就 UnicodeEncodeError. 在 main 入口统一切到 UTF-8. for stream in (sys.stdout, sys.stderr): if hasattr(stream, "reconfigure"): stream.reconfigure(encoding="utf-8", errors="replace") logging.basicConfig(level=logging.WARNING) args = _parse_args() # 注入派生属性: case_id 自动从 --out-dir basename 算 (去掉 case- 前缀). # 下游 prompt / banner / type_suggestions record 都用这个 args.case_id. args.case_id = _derive_case_id(args.out_dir) # spec 目录名: --version backup → "spec-backup", 不传 → "spec". args.spec_name = "spec" if not args.version else f"spec-{args.version}" # ──── 自动重试循环 ────────────────────────────────────────────────────── # exit code 约定: # 0 = 成功 # 1 = ClaudeSDKError (含 setup 错, 一般非 transient, 不重试) # 2 = ResultMessage is_error=True (含 stream idle timeout, **可重试**) # 3 = RateLimitEvent (配额耗尽, 不重试 — 等 5h window 滑动) # 130 = KeyboardInterrupt (用户主动中断, 不重试) # 重试退避: 10s → 20s → 40s, 都在 prompt cache TTL (5min) 内 max_retries = args.max_retries workdir_check = Path(args.workdir or DSL_ROOT).resolve() out_dir_check = Path(args.out_dir) sid_file_check = (out_dir_check if out_dir_check.is_absolute() else (workdir_check / out_dir_check)).resolve() / ".session_id" # --version 选定的 spec 目录必须存在. 在进重试循环前 fail-fast: # 注意 run() 里 sys.exit(字符串) 的消息会被下面 except SystemExit 吞掉, # 所以这里在 main 早校验, 直接 print 到 stderr 再 exit(1). spec_dir_check = workdir_check / args.spec_name if not spec_dir_check.is_dir(): avail = sorted(p.name for p in workdir_check.glob("spec*") if p.is_dir()) print( f"❌ spec dir not found: {spec_dir_check}\n" f" (--version {args.version!r} → 期望目录 '{args.spec_name}/')\n" f" 可用版本: {avail}", file=sys.stderr, ) sys.exit(1) for attempt in range(max_retries + 1): try: asyncio.run(run(args)) return # 成功完成 except KeyboardInterrupt: # session_id 在 init 时就落盘了, 已写到磁盘的产物全部保留. # 加 --resume 重启即可接着跑. print( f"\n⚠️ 中断 (Ctrl-C). {args.out_dir}/ 产物已保留.\n" f" 恢复: python run_procedure_dsl.py {args.source} " f"--out-dir {args.out_dir} --resume", file=sys.stderr, ) sys.exit(130) except SystemExit as e: code = e.code if isinstance(e.code, int) else (1 if e.code else 0) # exit 2 = is_error (stream timeout 之类 transient), 可重试 if code == 2 and attempt < max_retries: wait_s = 10 * (2 ** attempt) # 10s / 20s / 40s print( f"\n⚠️ attempt {attempt+1}/{max_retries+1} exit={code} " f"(可能 stream idle timeout). 等 {wait_s}s 后 --resume 重试...", file=sys.stderr, ) time.sleep(wait_s) # 强制 --resume (前提是 session_id 已落盘 — init 阶段就写了, 通常都在) if sid_file_check.exists(): args.resume = True else: print(f"⚠️ {sid_file_check} 不存在, fresh restart 而非 resume", file=sys.stderr) continue # 不重试: exit 0 (成功) / exit 1 (SDK 错) / exit 3 (RateLimit) / 用尽 retries sys.exit(code) # 用尽 max_retries 后还到这里 (按 for-else 语义实际不会, 但保险) print(f"\n❌ 用尽 {max_retries} 次重试.", file=sys.stderr) sys.exit(2) if __name__ == "__main__": main()