| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841 |
- #!/usr/bin/env python3
- """
- run_procedure_dsl.py — 拿 procedure-dsl/spec.md 当指令本, 让 Claude Agent SDK
- (走 ~/.claude OAuth Max 订阅额度) 对单个 post 跑完三阶段提取, 落 case-N.{md,html}.
- 设计:
- - 复用 Agent 仓 examples/process_pipeline/run_pipeline.py --use-claude-sdk 的 OAuth
- 思路: 子进程 env 把 ANTHROPIC_API_KEY / BASE_URL / AUTH_TOKEN 显式置空, 让
- claude CLI 回落到 OAuth 凭证.
- - 与 agent/llm/claude_code_oauth.py 的 one-shot wrapper 不同: 那个 max_turns=1
- + allowed_tools=[], 本脚本让 Agent 真正自主跑——开 Read/Write/Bash/Edit/Glob/Grep,
- multi-turn, 自己读 spec.md / source / 调 bin/taxonomy-lookup.py / 写产物.
- - 单 post 输入, 多终端 = 多并行 (用户自己开).
- 用法 (本脚本位于 procedure-dsl/, SDK cwd 默认设到 procedure-dsl/):
- # source 是 input/case-N-raw.json, 脚本自动从 image_url_list 拉图作多模态.
- # --out-dir 必填: 是 outputs/ 下的目录名, 产物全落这里; case_id 自动从它的 basename 推.
- python run_procedure_dsl.py input/case-2-raw.json --out-dir case-2
- python run_procedure_dsl.py input/case-2-raw.json --out-dir case-2 \\
- --extra-image /path/to/local-ref.png --model claude-sonnet-4-6
- # 跑实验版 spec-test/ 对比 (产物落 outputs/case-2_test/):
- python run_procedure_dsl.py input/case-2-raw.json --out-dir case-2_test --version test
- # 中断后恢复 (产物保留, agent 重读磁盘接着跑):
- python run_procedure_dsl.py input/case-2-raw.json --out-dir case-2 --resume
- source 文件 schema (procedure-dsl/input/*.json 约定):
- {
- "title": str, "link": str, "body_text": str,
- "image_url_list": [{"image_type": int, "image_url": str}, ...],
- "publish_timestamp": ..., "channel_account_name": str,
- }
- """
- import argparse
- import asyncio
- import base64
- import json
- import logging
- import sys
- import time
- from datetime import datetime
- from pathlib import Path
- from typing import Any, Dict, List
- # run_procedure_dsl.py → procedure-dsl/
- DSL_ROOT = Path(__file__).resolve().parent
- def _derive_case_id(out_dir: str) -> str:
- """从 --out-dir 自动派生 case_id (用于 prompt 文件名 + case_data.case_id + suggestions record).
- 规则: Path(out_dir).name 去掉 "case-" 前缀 (--out-dir 是 outputs/ 下的相对名).
- --out-dir case-5 → "5"
- --out-dir case-5-newflow → "5-newflow"
- --out-dir photo-album → "photo-album" (无 case- 前缀也行)
- --out-dir case-11 → "11"
- 这样用户传 --out-dir 控制目录名, case_id 自动跟随 — 不再撞历史 case 目录.
- """
- name = Path(out_dir).name
- if name.startswith("case-"):
- name = name[5:]
- return name or "?"
- def _resolve_out_dir(out_dir_arg: str, workdir: Path) -> Path:
- """把 --out-dir 解析成实际工作目录. --out-dir 是 `outputs/` 下的相对名:
- --out-dir newdir → <workdir>/outputs/newdir
- --out-dir case-5 → <workdir>/outputs/case-5
- --out-dir outputs/newdir → <workdir>/outputs/newdir (容忍已带 outputs/ 前缀, 不重复嵌套)
- 传绝对路径则原样用 (escape hatch).
- """
- p = Path(out_dir_arg)
- if p.is_absolute():
- return p.resolve()
- parts = p.parts
- if parts and parts[0] == "outputs": # 容忍历史习惯的 outputs/ 前缀
- parts = parts[1:]
- base = workdir / "outputs"
- return (base / Path(*parts)).resolve() if parts else base.resolve()
- # Sonnet 4.6 公开价目 (per 1M tokens, USD), 用于 estimated cost.
- # OAuth Max 模式 SDK 报的 total_cost_usd 通常是 None — 我们自己按 token 算个
- # "如果走 API 等价多少钱"作为 quota 消耗的直观代理.
- PRICE_SONNET_4_6 = {
- "input": 3.00,
- "output": 15.00,
- "cache_creation": 3.75,
- "cache_read": 0.30,
- }
- def _estimate_cost_usd(usage: Dict[str, Any]) -> float:
- if not usage:
- return 0.0
- return (
- usage.get("input_tokens", 0) / 1_000_000 * PRICE_SONNET_4_6["input"]
- + usage.get("output_tokens", 0) / 1_000_000 * PRICE_SONNET_4_6["output"]
- + usage.get("cache_creation_input_tokens", 0) / 1_000_000 * PRICE_SONNET_4_6["cache_creation"]
- + usage.get("cache_read_input_tokens", 0) / 1_000_000 * PRICE_SONNET_4_6["cache_read"]
- )
- # ──── 多模态: 图片 → Anthropic content block ──────────────────────────────────
- _MEDIA_TYPE = {
- ".png": "image/png",
- ".jpg": "image/jpeg",
- ".jpeg": "image/jpeg",
- ".gif": "image/gif",
- ".webp": "image/webp",
- }
- # client-side image cache: DSL_ROOT/.image_cache/<sha256>.<ext>
- # 同一 URL 多次跑 / resume 不重复下载. 不影响 SDK / OAuth, 纯本地 IO.
- _IMAGE_CACHE_DIR = DSL_ROOT / ".image_cache"
- # 浏览器 UA, 避免 mmbiz / 其他图床对裸 python-requests 默认 UA 直接 403.
- _DOWNLOAD_UA = (
- "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
- "(KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36"
- )
- def _url_to_cached_path(url: str, timeout: float = 15.0) -> Path:
- """下载 url 到 _IMAGE_CACHE_DIR, 返回本地路径. 已缓存直接返回不重下.
- 用 URL 的 sha256 当文件名, 扩展名从 URL path 推断 (找不到默认 .png).
- 设计目的: 绕过 Anthropic 服务端对 image URL 的 robots.txt 检查
- (mmbiz.qpic.cn 等图床 robots disallow 会让 API 直接 400).
- """
- import hashlib
- from urllib.parse import urlparse
- from urllib.request import Request, urlopen
- _IMAGE_CACHE_DIR.mkdir(exist_ok=True)
- digest = hashlib.sha256(url.encode("utf-8")).hexdigest()[:24]
- ext = Path(urlparse(url).path).suffix.lower()
- if ext not in _MEDIA_TYPE:
- ext = ".png"
- local = _IMAGE_CACHE_DIR / f"{digest}{ext}"
- if local.exists() and local.stat().st_size > 0:
- return local
- req = Request(url, headers={"User-Agent": _DOWNLOAD_UA, "Accept": "image/*,*/*;q=0.8"})
- with urlopen(req, timeout=timeout) as resp:
- data = resp.read()
- local.write_bytes(data)
- return local
- def _append_image_blocks(blocks: List[Dict[str, Any]], images: List[str]) -> None:
- """循环把 images 转成 content block 加到 blocks. 单张失败不阻塞整批.
- base / variant driver 共用 — 保证 image 容错策略一致.
- """
- n_ok, n_fail = 0, 0
- for ref in images:
- try:
- blocks.append(_image_block(ref))
- n_ok += 1
- except Exception as e:
- n_fail += 1
- print(f"[image] skip {ref[:80]}... ({type(e).__name__}: {e})", flush=True)
- if images:
- print(f"[image] {n_ok}/{len(images)} 成功下载并 base64 化, {n_fail} 失败已跳过", flush=True)
- def _image_block(ref: str) -> Dict[str, Any]:
- """图 ref → Anthropic content block.
- URL → 客户端下载后 base64 (不再走 server-side fetch, 绕开 robots.txt 限制).
- 本地路径 → 直接 base64.
- 下载失败 (网络问题 / 404 等) → raise, 让上层决定跳过还是阻塞.
- """
- if ref.startswith(("http://", "https://")):
- local = _url_to_cached_path(ref)
- data = base64.standard_b64encode(local.read_bytes()).decode()
- return {
- "type": "image",
- "source": {
- "type": "base64",
- "media_type": _MEDIA_TYPE.get(local.suffix.lower(), "image/png"),
- "data": data,
- },
- }
- p = Path(ref).expanduser().resolve()
- if not p.exists():
- raise FileNotFoundError(f"image not found: {ref}")
- data = base64.standard_b64encode(p.read_bytes()).decode()
- return {
- "type": "image",
- "source": {
- "type": "base64",
- "media_type": _MEDIA_TYPE.get(p.suffix.lower(), "image/png"),
- "data": data,
- },
- }
- # ──── source JSON → 自动拉 image_url_list ────────────────────────────────────
- def _images_from_source(source_path: Path) -> List[str]:
- """从 procedure-dsl/input/*.json 抽 URL 列表, 兼容两套 schema:
- 1. 老 case-*-raw.json: `image_url_list: [{image_type, image_url}, ...]`,
- image_type=2 (封面) 排最前, 其余维持原序.
- 2. 新 eval_case-*.json: `images: [url_str, ...]`, 无封面信号, 原序返回.
- schema 错配时如果检测到疑似图字段但抽不到任何 URL, 会打 warning —
- multimodal pipeline 静默 0 张图比报错更难排查.
- """
- if source_path.suffix.lower() != ".json":
- return []
- try:
- with source_path.open(encoding="utf-8") as f:
- data = json.load(f)
- except (json.JSONDecodeError, OSError):
- return []
- if not isinstance(data, dict):
- return []
- # 新 schema (eval_case-*.json): 裸 URL 数组
- flat = data.get("images")
- if isinstance(flat, list):
- urls = [u for u in flat if isinstance(u, str) and u]
- if not urls and flat:
- print(f"[image] ⚠️ {source_path.name} 有 images 字段但抽不到 URL "
- f"(首项类型={type(flat[0]).__name__})", flush=True)
- return urls
- # 老 schema (case-*-raw.json): {image_type, image_url} dict 数组
- items = data.get("image_url_list")
- if not isinstance(items, list):
- return []
- covers, others = [], []
- for it in items:
- if not isinstance(it, dict):
- continue
- url = it.get("image_url")
- if not isinstance(url, str) or not url:
- continue
- (covers if it.get("image_type") == 2 else others).append(url)
- return covers + others
- # ──── 初始 user 消息 (text + 图片 blocks) ─────────────────────────────────────
- def _build_initial_blocks(
- source_ref: str, case_id: str, out_dir_arg: str, images: List[str], workdir: Path,
- spec_name: str = "spec",
- ) -> List[Dict[str, Any]]:
- # 全部路径用绝对路径喂给 Agent — 它训练习惯就吃绝对, 顺水推舟省心.
- # 用 .as_posix() 统一 forward slash, 跟 f-string 拼接的 "/..." 一致.
- # spec_name 由 --version 决定 (spec / spec-backup / spec-test ...), 默认 "spec".
- spec_dir = (workdir / spec_name).as_posix()
- spec_readme = (workdir / spec_name / "README.md").as_posix()
- spec_tools = (workdir / spec_name / "tools.md").as_posix()
- # --out-dir 是 outputs/ 下的相对名 (--out-dir newdir → outputs/newdir); 绝对路径原样用.
- _out_dir = _resolve_out_dir(out_dir_arg, workdir)
- case_dir = _out_dir.as_posix()
- scratch_dir = (_out_dir / "_scratch").as_posix()
- # source 路径直接用 source_ref (用户传的 positional arg). 不再从 case_id 推 —
- # 否则当 --out-dir 跟 source 文件名不对齐时 (e.g. --out-dir outputs/update + source=input/case-5.json),
- # Agent 会收到 "input/case-update-raw.json" 这种不存在路径, 浪费 10+ turn 自我探索.
- _src = Path(source_ref)
- case_input = _src.as_posix() if _src.is_absolute() else (workdir / _src).resolve().as_posix()
- text = f"""请按 {spec_name}/ 目录里的 SKILL 处理这个 post.
- ## 起手指令 (路径已给绝对值, 直接照搬, 不要改, 不要先 find 探查)
- 1. `Read(file_path="{spec_readme}")` — self-driven skill 的入口, 含完整 phase 加载指南 (累积式) + 自查清单 + 工具调用规则. 读完不要再 Read 它.
- 2. `Read(file_path="{spec_tools}")` — 外部脚本接口手册. 读完不要重读.
- 3. 按 README phase 加载指南**累积式**前进, 中间产物是**单个 `workflow.json` 文件**, 各 phase 都 in-place Edit 它 (不写多个中间快照):
- - Phase 1.1 心智模型 → `{case_dir}/understanding.md` (含 procedure 数量判断)
- - Phase 1.2 骨架 → **Write** `{case_dir}/workflow.json` (procedures 数组骨架)
- - Phase 1.3 IO 闭合 → **Edit** workflow.json 加 anchor
- - Phase 2A/2B/2C 归一化 → **Edit** workflow.json 加 effect/action/type/sub/form + procedures[i].type_registry
- - Phase 3 lint + 渲染 → 调 render-case.py + lint-case.py (不写 case_data.json, renderer 内存组装)
- - Phase 3 .md → Write `{case_dir}/case-{case_id}-<slug>.md`
- ## ❌ 重复读取禁令 (CRITICAL: ZERO REPEATED READS)
- 你拥有完美的长期记忆(由于 Context 累积,你读取过的所有文件内容会永远保留在你的 Context 中)。
- 请**绝对不要**重复读取任何文件!任何重复的 `Read` 动作都是对 Token 和回合数 Budget 的极大浪费。
- - **禁忌 1**:不要因为看到 spec 文档中写了 `详见 [tools.md §2]` 就去重新 `Read(file_path="{spec_tools}")`。你已经在 Turn 3 读过它了,直接检索你的记忆!
- - **禁忌 2**:不要因为进入了 Phase 2B,就去重新 `Read` 任何 Phase 2 的 spec 文件(如 `phase2-normalize.md`)。你已经在 Phase 2A 开始时读过它了,它就在你的记忆中,直接使用它!
- - 在发出任何 `Read` 指令前,必须自我核对:“我之前读过这个文件吗?”。如果读过,绝对不要再次 Read!
- ## 输入
- - case 原文: `{case_input}`
- schema (两种都可能):
- · 老格式 (case-*-raw.json): {{title, link, body_text, image_url_list:[{{image_type,image_url}}], publish_timestamp, channel_account_name}}
- · 新格式 (eval_case-*.json): {{title, link, body_text, images:[url_str], videos, channel, content_type, like_count, publish_timestamp, channel_content_id, ...}}
- 正文 (body_text) 和元数据从此文件 Read; 两种 schema 的 body_text 含义一致.
- - 配图: 本消息附了 {len(images)} 张图作多模态内容 (URL 抽自老格式的 image_url_list 或新格式的 images; 老格式 image_type=2 封面排最前, 新格式按原序).
- ## 输出目录
- `{case_dir}/` (这一 case 的所有产物都放这里, 不要污染其他目录)
- ## 读取范围 (硬约束)
- 你**只能** Read 以下三处 (用绝对路径 + 子路径):
- 1. `{spec_dir}/` 及其所有子目录 (skill 全部内容: README.md / tools.md / 各 phase 规格文件 / output/ / taxonomy/ / templates/ / tools/ — 但 tools/*.py **不要 Read 源码**, 只通过 Bash 调用; 具体子目录名以 {spec_name}/README.md 的章节地图为准, **不要脑补 part1-/part2- 这类对称目录**)
- 2. `{case_input}` (当前 case 的原文, 只此一个)
- 3. `{case_dir}/` 目录 (你自己的工作产物, 含其下 `_scratch/`)
- ## 长输出 dump 区 (sanctioned scratch)
- 如果你需要 dump 大 Bash 输出做后续分析 (e.g. `taxonomy-lookup --subtree` 输出过大想分段读, 或 `find` 结果想保留), **只写到 `{scratch_dir}/`** (runner 已预创建):
- ✅ `python {spec_dir}/tools/taxonomy-lookup.py --dim 实质 --subtree /表象/视觉 > {scratch_dir}/subst_visual.txt`
- ✅ 然后 `Read(file_path="{scratch_dir}/subst_visual.txt")` 提取需要的段
- ✅ 一次性小探查 / smoke test (验证某条 tool 命令、看某段数据长啥样) 也可以放 `{scratch_dir}/`, 用完即弃
- ❌ **不要用 Python 脚本生成 / 批量改写 `workflow.json`** — 它由你**直接 Write 骨架 + 逐字段填** 演化 (见各 phase 规格). 写 build_workflow / add_anchors / normalize 这类脚本去拼 JSON, 反而踩转义 / 控制字符坑 (LLM 拼长 JSON 极易坏), 还把本该逐 step 的语义判断埋进一次性脚本里. 几十处字段要批量改 → 用 **`spec/tools/wf-patch.py`** (path=value 清单, 工具负责安全写 JSON + 写入即校验; 见 {spec_name}/tools.md §2), 零星单处用 Edit. `_scratch/` 的 Python **只用于「dump 大 tool 输出」和「smoke test」, 不是 workflow.json 的生产线**.
- ❌ **不要写到项目根 `scratch/`** (那是 dev/repo 维护用的临时区, 禁区)
- ❌ **不要 ls / Read 项目根 `scratch/`** (别人的 dev 临时文件, 跟你无关; 你的 scratch 在上面的 `_scratch/`)
- **禁读区** (不要 Read 也不要 ls 这些位置): 项目根的 spec.md / design.md (旧文件), examples/ 全目录, input/ 下其他 case-*-raw.json, outputs/case-OTHER/ (其他 case 目录), bin/ 全目录 (已废弃), **项目根 `scratch/`** (跟你的 `_scratch/` 不是一回事 — 上面 ✅ 那个 `_scratch/` 作用类似 outputs 目录下的 `_scratch/`).
- ## 阶段三流程 (脚本组装, 不写 case_data.json)
- 新工作流: Phase 2 已经把所有结构化数据 in-place Edit 进 `{case_dir}/workflow.json`. Phase 3 直接调 2 个脚本命令, **不需要写 case_data.json** (renderer 在内存把 workflow + source-input + page_title + case_id 组装成 case_data 喂给 build_html).
- ```bash
- # Step A: lint + 自动 record 新 type 到 type_suggestions.md
- python {spec_dir}/tools/lint-case.py --workflow {case_dir}/workflow.json --case-id {case_id}
- # stdout 报 "type 完整性: N 个提示" 时, 回 Phase 2 Edit workflow.json 补 procedures[i].type_registry, 重跑 lint
- # stdout 报 "已 record M 条新 type" 时, 表示 {spec_name}/taxonomy/type_suggestions.md 已自动同步
- # Step B: 渲染 HTML (--source-input 必带, --page-title --case-id 必带)
- python {spec_dir}/tools/render-case.py \
- --workflow {case_dir}/workflow.json \
- --source-input {case_input} \
- --page-title "Case {case_id} · <主题>" \
- --case-id {case_id} \
- --out {case_dir}/case-{case_id}-<slug>.html
- # Step C: Write {case_dir}/case-{case_id}-<slug>.md (DSL 文本版, 按 {spec_name}/output/md-structure.md §11 结构)
- ```
- **`--source-input` 行为**: renderer 自动从 raw 抽 body_text + 封面图 + 图集兜底填到 case_data.source (内存里). 微信公众号长文走 inline 图 + 封面; 小红书短文走 body + "--- 附图 ---" 末尾追加非封面图. 你**不必手工**复制 raw.body_text — 完全交给 renderer.
- `workflow.json` 的契约见 [{spec_name}/output/case-data.schema.json]({spec_name}/output/case-data.schema.json) 里 Procedure 的 definition (顶层 `{{procedures: [{{id, name, purpose, declarations, type_registry?, steps, return_row?}}]}}`). **不要参考其他 `outputs/case-*/` 下任何文件**.
- ## 其他约束
- - 推断补全用 `inferred: true` + `inferred_reason` 标在 IO item 上, 不要静默插入.
- - 完成后用一段话总结: 工序梗概 + 输出文件路径 + 你对 DSL 的关键发现.
- """
- blocks: List[Dict[str, Any]] = [{"type": "text", "text": text}]
- _append_image_blocks(blocks, images)
- return blocks
- # ──── Trace 写盘 (outputs/case-N/_trace.md) ───────────────────────────────────
- # 每次 run 把 turn / tool_use / text / result 写到一份 markdown 流水, 实时 append.
- # Ctrl-C 后还能离线 review Agent 走过哪些 turn / 读了哪些文件 / 调了哪些 Bash.
- def _fmt_tool_input(name: str, inp: Any) -> str:
- """tool_use input → 人读 1 行摘要."""
- if not isinstance(inp, dict):
- return f"`{repr(inp)[:200]}`"
- if name == "Read":
- suffix = ""
- if inp.get("offset") or inp.get("limit"):
- suffix = f" (offset={inp.get('offset', 0)}, limit={inp.get('limit', '-')})"
- return f"`{inp.get('file_path', '')}`{suffix}"
- if name == "Write":
- return f"`{inp.get('file_path', '')}` ({len(inp.get('content', '')):,} chars)"
- if name == "Edit":
- return f"`{inp.get('file_path', '')}`"
- if name == "Bash":
- cmd = inp.get("command", "")
- if len(cmd) > 200:
- cmd = cmd[:200] + "..."
- return f"`{cmd}`"
- if name == "Grep":
- return f"pattern=`{inp.get('pattern', '')}` path=`{inp.get('path', '')}`"
- if name == "Glob":
- return f"pattern=`{inp.get('pattern', '')}`"
- if name in ("Task", "Agent"):
- return f"`{inp.get('description', '')}` [subagent={inp.get('subagent_type', '?')}]"
- preview = str(inp)
- return f"`{preview[:200] + '...' if len(preview) > 200 else preview}`"
- def _trace_banner(trace_path: Path, args, resume_sid, source_path, n_images: int) -> None:
- """session 起手 banner. Append 模式 — 历次 run 累积在同一 _trace.md."""
- trace_path.parent.mkdir(parents=True, exist_ok=True)
- now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
- if resume_sid:
- chunk = (
- f"\n\n---\n\n## ▶ Resume @ {now}\n\n"
- f"恢复 session `{resume_sid[:8]}` (case-{args.case_id})\n"
- )
- else:
- chunk = (
- f"\n\n---\n\n## ▶ Fresh @ {now}\n\n"
- f"- case: `{args.case_id}`\n"
- f"- source: `{source_path}`\n"
- f"- spec: `{getattr(args, 'spec_name', 'spec')}`\n"
- f"- images: `{n_images}`\n"
- f"- model: `{args.model}`\n"
- f"- max_turns: `{args.max_turns}`\n"
- )
- with trace_path.open("a", encoding="utf-8") as f:
- f.write(chunk)
- def _trace_init(trace_path: Path, sid: str, data: Dict[str, Any]) -> None:
- """SDK init → 写 session_id / model 到 trace."""
- line = (
- f"\n_session={sid[:8]} · model={data.get('model')!r} "
- f"· apiKeySource={data.get('apiKeySource')!r}_\n"
- )
- with trace_path.open("a", encoding="utf-8") as f:
- f.write(line)
- def _trace_turn(trace_path: Path, turn: int, msg: Any) -> None:
- """每个 AssistantMessage → text + tool_use 摘要追加到 trace."""
- now = datetime.now().strftime("%H:%M:%S")
- parts = [f"\n\n### Turn {turn} · {now}\n"]
- texts: List[str] = []
- tools: List[tuple] = []
- for block in msg.content:
- if hasattr(block, "text"):
- t = block.text.strip()
- if t:
- texts.append(t)
- elif hasattr(block, "name") and hasattr(block, "input"):
- tools.append((block.name, block.input))
- # thinking blocks 跳过
- if texts:
- for t in texts:
- for line in t.split("\n"):
- parts.append(f"> {line}\n" if line else ">\n")
- parts.append("\n")
- if tools:
- for name, inp in tools:
- parts.append(f"- `{name}` — {_fmt_tool_input(name, inp)}\n")
- with trace_path.open("a", encoding="utf-8") as f:
- f.write("".join(parts))
- def _trace_result(trace_path: Path, msg: Any, elapsed: float,
- usage: Dict[str, Any], est_cost: float) -> None:
- """ResultMessage → final summary 追加到 trace."""
- now = datetime.now().strftime("%H:%M:%S")
- sdk_cost = msg.total_cost_usd
- sdk_cost_str = f"${sdk_cost:.4f}" if sdk_cost is not None else "None (OAuth Max)"
- chunk = (
- f"\n\n### ◀ Result · {now}\n\n"
- f"- subtype: `{msg.subtype}` · is_error: `{msg.is_error}`\n"
- f"- num_turns: `{msg.num_turns}` · duration: `{msg.duration_ms}ms` · wall: `{elapsed:.1f}s`\n"
- f"- tokens: in={usage.get('input_tokens', 0):,} "
- f"out={usage.get('output_tokens', 0):,} "
- f"cache_w={usage.get('cache_creation_input_tokens', 0):,} "
- f"cache_r={usage.get('cache_read_input_tokens', 0):,}\n"
- f"- cost: sdk={sdk_cost_str}, est_if_api=${est_cost:.4f}\n"
- )
- with trace_path.open("a", encoding="utf-8") as f:
- f.write(chunk)
- # ──── 主流程: 跑 ClaudeSDKClient ──────────────────────────────────────────────
- async def run(args: argparse.Namespace) -> None:
- from claude_agent_sdk import (
- AssistantMessage,
- ClaudeAgentOptions,
- ClaudeSDKClient,
- ClaudeSDKError,
- RateLimitEvent,
- ResultMessage,
- TextBlock,
- AgentDefinition,
- )
- workdir = Path(args.workdir or DSL_ROOT).resolve()
- if not workdir.exists():
- sys.exit(f"❌ workdir not found: {workdir}")
- source_path = Path(args.source).expanduser().resolve()
- if not source_path.exists():
- sys.exit(f"❌ source not found: {source_path}")
- auto_imgs = _images_from_source(source_path)
- extra_imgs = args.extra_image or []
- images = auto_imgs + extra_imgs
- # 给 Agent 的路径优先用 workdir 相对路径 (它的 cwd 就是 workdir),
- # 不在 workdir 内才回落到绝对路径.
- try:
- source_for_agent = source_path.relative_to(workdir).as_posix()
- except ValueError:
- source_for_agent = str(source_path)
- # --out-dir 是 outputs/ 下的相对名 (--out-dir newdir → outputs/newdir); 绝对路径原样用.
- out_dir = _resolve_out_dir(args.out_dir, workdir)
- out_dir.mkdir(parents=True, exist_ok=True)
- sid_file = out_dir / ".session_id"
- # 预创建 sanctioned scratch 目录, 让 Agent 看到就在那, 不用犹豫能不能 mkdir
- (out_dir / "_scratch").mkdir(exist_ok=True)
- resume_sid = None
- if args.resume:
- if not sid_file.exists():
- sys.exit(f"❌ --resume 但未找到 {sid_file}; 先正常跑一次建立 session")
- resume_sid = sid_file.read_text(encoding="utf-8").strip() or None
- if not resume_sid:
- sys.exit(f"❌ {sid_file} 为空, 无法 resume")
- try:
- rel_out = out_dir.relative_to(workdir).as_posix()
- except ValueError:
- rel_out = out_dir.as_posix()
- if resume_sid:
- # resume 时不重发图和原始指令 (历史里都有); 只给一段 "接着做" 增量.
- # 关键: 提醒 Agent 用户可能改过中间产物, 必须 Read 当前磁盘版本.
- blocks = [{"type": "text", "text": (
- f"上次中断了, 接续做 case-{args.case_id} 的提取流程.\n\n"
- f"先 ls {rel_out}/ 看当前已落盘哪些产物;\n"
- f"用户可能在中断期间编辑过任何中间产物 (understanding.md / workflow.json) "
- f"或改过 spec/ 内任何文件 — 务必 Read 这些**当前磁盘版本**, "
- f"不要凭之前记忆继续. 如发现明显人工修订痕迹, 沿用用户改过的版本.\n\n"
- f"⚠️【重要禁令与强制要求】:如果流程进行到 Phase 2(归一化与分类匹配),主 Agent **绝对禁止**手动调用 taxonomy-lookup.py 查询或手动决策!你必须强制阅读最新的 `spec/extraction/phase2-normalize.md` 规范,通过运行 `prepare-subtask.py` 生成物理任务切片,然后调用 `Agent`(或 `Task`)工具将任务分别分发给预定义好的 `phase-2a-normalizer` 和 `phase-2b-matcher` 子 Agent 并行协作执行!"
- )}]
- else:
- blocks = _build_initial_blocks(source_for_agent, args.case_id, args.out_dir, images, workdir, args.spec_name)
- print(f"[setup] workdir = {workdir}")
- print(f"[setup] version = {args.version if args.version else '(default)'}")
- print(f"[setup] spec dir = {args.spec_name}/")
- print(f"[setup] source = {source_path}")
- print(f"[setup] → as agent = {source_for_agent}")
- print(f"[setup] case_id = {args.case_id}")
- print(f"[setup] output dir = {out_dir}")
- print(f"[setup] auto images = {len(auto_imgs)} (from image_url_list)"
- if not resume_sid else f"[setup] auto images = -- (resume 跳过)")
- print(f"[setup] extra images = {len(extra_imgs)} {extra_imgs if extra_imgs else ''}"
- if not resume_sid else f"[setup] extra images = -- (resume 跳过)")
- print(f"[setup] model = {args.model}")
- print(f"[setup] max_turns = {args.max_turns}")
- if resume_sid:
- print(f"[setup] resume = {resume_sid[:8]} (跳过初始 prompt + 图)")
- else:
- print(f"[setup] resume = no (fresh start)")
- print(flush=True)
- # 起手写 trace banner. 之后每 turn / result 都会 append 到这个 _trace.md.
- trace_path = out_dir / "_trace.md"
- _trace_banner(trace_path, args, resume_sid, source_path, len(images))
- print(f"[trace] -> {trace_path}", flush=True)
- stderr_buf: List[str] = []
- def _capture_stderr(line: str) -> None:
- if line:
- stderr_buf.append(line)
- print(f"[stderr] {line}", flush=True)
- agents = {
- "phase-2a-normalizer": AgentDefinition(
- description="Expert in Phase 2A (effect/action/type normalization). Use this agent to read task_2a.json, normalize action/effect/type against spec trees, manage procedure-level type_registry, and generate outputs/case-N/_scratch/patch_2a.json.",
- prompt="""You are a dedicated Phase 2A normalization sub-agent.
- Your goal is to process the inputs and outputs of a workflow for effect, action, and type normalization:
- 1. Read the outputs/case-N/_scratch/task_2a.json file to get the steps and IO variables.
- 2. Normalize every step's `effect` and `action` against the taxonomy specs in `spec/taxonomy/effect.json` and `spec/taxonomy/action.json`.
- 3. Normalize every IO variable's `type` against `spec/taxonomy/type.json`. If a custom type is used, register it in the procedure's `type_registry` with extends and description.
- 4. Output a standard `patch_2a.json` JSON file under outputs/case-N/_scratch/.
- IMPORTANT: The format of `patch_2a.json` MUST be a flat JSON array of objects, where each object has a "path" and a "value" key (exactly conforming to the `wf-patch.py` tool contract).
- Example format:
- [
- {"path": "p1.s1.effect", "value": "预处理"},
- {"path": "p1.s1.action", "value": "提取/化学提取/反推"},
- {"path": "p1.s1.inputs[0].type", "value": "工具选型标准"},
- {"path": "p1.type_registry.工具配置.extends", "value": "评语"},
- {"path": "p1.type_registry.工具配置.desc", "value": "工具选型依据..."}
- ]
- Do not output raw dictionary structure or any other nesting. Do not touch or modify other files.""",
- tools=["Read", "Grep", "Glob", "Write", "Edit", "Bash"],
- model="sonnet",
- ),
- "phase-2b-matcher": AgentDefinition(
- description="Expert in Phase 2B (substance/form taxonomy matching). Use this agent to read task_2b.json, invoke taxonomy-lookup.py to query substance and form paths for each variable, and generate outputs/case-N/_scratch/patch_2b.json.",
- prompt="""You are a dedicated Phase 2B taxonomy matching sub-agent.
- Your goal is to query and match the substance and form for each workflow IO variable:
- 1. Read the outputs/case-N/_scratch/task_2b.json file to get the variables to match.
- 2. For each variable, run `python spec/tools/taxonomy-lookup.py --dim 实质 --match "..."` and `--dim 形式 --match "..."` to search for the most precise taxonomy paths matching the variable's value, name, and related_images.
- 3. Output a standard `patch_2b.json` JSON file under outputs/case-N/_scratch/.
- IMPORTANT: The format of `patch_2b.json` MUST be a flat JSON array of objects, where each object has a "path" and a "value" key (exactly conforming to the `wf-patch.py` tool contract). Substance and form values can be single string paths, multiple paths separated by ' + ', or JSON arrays of strings for multi-path matching.
- Example format:
- [
- {"path": "p1.s1.inputs[0].substance", "value": "/理念/知识/思想/概念范畴/性质属性/功能效用"},
- {"path": "p1.s1.inputs[0].form", "value": "/呈现/视觉/视觉制作/构图编排/版面设计/版面结构"},
- {"path": "p1.s2.inputs[0].substance", "value": ["/理念/知识/商业/前沿技术/AI智能/AI应用", "/理念/知识/思想"]}
- ]
- Do not output raw dictionary structure or any other nesting. Do not touch or modify other files.""",
- tools=["Read", "Grep", "Glob", "Write", "Edit", "Bash"],
- model="sonnet",
- )
- }
- options = ClaudeAgentOptions(
- model=args.model,
- cwd=str(workdir),
- resume=resume_sid,
- # ⚠️ 支持旧版 "Task" 与新版 "Agent" 标识符以确保在所有 Claude Code / SDK 版本中均可激活子 agent
- allowed_tools=["Read", "Write", "Edit", "Bash", "Glob", "Grep", "Task", "Agent"],
- agents=agents,
- max_turns=args.max_turns,
- permission_mode="bypassPermissions",
- setting_sources=[],
- env={
- "ANTHROPIC_API_KEY": "",
- "ANTHROPIC_BASE_URL": "",
- "ANTHROPIC_AUTH_TOKEN": "",
- },
- stderr=_capture_stderr,
- )
- turn = 0
- usage: Dict[str, Any] = {}
- t0 = time.time()
- try:
- async with ClaudeSDKClient(options=options) as client:
- async def _input_stream():
- yield {
- "type": "user",
- "message": {"role": "user", "content": blocks},
- "parent_tool_use_id": None,
- "session_id": "default",
- }
- await client.query(_input_stream())
- async for msg in client.receive_response():
- if isinstance(msg, AssistantMessage):
- turn += 1
- for block in msg.content:
- if isinstance(block, TextBlock):
- print(f"\n[turn {turn} · text]\n{block.text}\n", flush=True)
- elif hasattr(block, "name") and hasattr(block, "input"):
- preview = str(block.input)
- if len(preview) > 200:
- preview = preview[:200] + "..."
- print(f"[turn {turn} · tool_use] {block.name}({preview})", flush=True)
- elif hasattr(block, "thinking"):
- pass # thinking 太长, 跳过
- else:
- print(f"[turn {turn} · {type(block).__name__}] {block!r}", flush=True)
- _trace_turn(trace_path, turn, msg)
- elif isinstance(msg, ResultMessage):
- if msg.usage:
- usage = dict(msg.usage)
- elapsed = time.time() - t0
- sdk_cost = msg.total_cost_usd
- sdk_cost_str = (
- f"${sdk_cost:.4f}" if sdk_cost is not None
- else "None (OAuth Max — not metered as $)"
- )
- est_cost = _estimate_cost_usd(usage)
- print(
- f"\n[result] subtype={msg.subtype} is_error={msg.is_error} "
- f"turns={msg.num_turns} duration={msg.duration_ms}ms wall={elapsed:.1f}s\n"
- f" tokens: in={usage.get('input_tokens', 0):,} "
- f"out={usage.get('output_tokens', 0):,} "
- f"cache_w={usage.get('cache_creation_input_tokens', 0):,} "
- f"cache_r={usage.get('cache_read_input_tokens', 0):,}\n"
- f" cost : sdk={sdk_cost_str}, est_if_api=${est_cost:.4f} "
- f"(Sonnet 4.6 价目)",
- flush=True,
- )
- _trace_result(trace_path, msg, elapsed, usage, est_cost)
- if msg.is_error:
- print(f"❌ result is_error=True", file=sys.stderr)
- sys.exit(2)
- elif isinstance(msg, RateLimitEvent):
- info = getattr(msg, "rate_limit_info", None)
- info_status = getattr(info, "status", None) if info else None
- if info_status == "allowed_warning":
- print(f"⚠️ [rate_limit_warning] Max 订阅 5h 窗口余额较少, `claude /status` 看余量", file=sys.stderr)
- elif info_status and info_status not in ("allowed", "allowed_warning"):
- print(f"❌ [rate_limit_blocked] {info_status!r} — Max 订阅 5h 窗口已耗尽, `claude /status` 看余量", file=sys.stderr)
- sys.exit(3)
- else:
- name = type(msg).__name__
- if name == "SystemMessage":
- data = getattr(msg, "data", {}) or {}
- subtype = getattr(msg, "subtype", "?")
- if subtype == "init":
- sid = data.get('session_id', '') or ''
- if sid:
- # 早写: 一拿到 sid 就落盘, Ctrl-C 再早也能 resume
- sid_file.write_text(sid, encoding="utf-8")
- _trace_init(trace_path, sid, data)
- print(
- f"[init] model={data.get('model')!r} "
- f"apiKeySource={data.get('apiKeySource')!r} "
- f"session={sid[:8]}",
- flush=True,
- )
- except ClaudeSDKError as e:
- tail = "\n".join(stderr_buf[-20:])
- print(
- f"❌ SDK error: {type(e).__name__}: {e}\n"
- f"--- CLI stderr (last 20 lines) ---\n{tail}",
- file=sys.stderr,
- )
- sys.exit(1)
- # ──── CLI ─────────────────────────────────────────────────────────────────────
- def _parse_args() -> argparse.Namespace:
- p = argparse.ArgumentParser(
- description="跑 procedure-dsl 提取流程 (Claude Agent SDK, OAuth Max)",
- formatter_class=argparse.RawDescriptionHelpFormatter,
- epilog=__doc__,
- )
- p.add_argument("source", help="原始 post 文件 (input/case-N-raw.json; 也接受 .md/.txt)")
- p.add_argument("--extra-image", action="append", default=[],
- help="额外配图: 本地路径 or http(s) URL. 加在 image_url_list "
- "自动抽出的图后面. 可多次传.")
- p.add_argument("--out-dir", type=str, required=True,
- help="输出工作目录名, 落在 outputs/ 下. e.g. --out-dir case-5-newflow → 实际 outputs/case-5-newflow/ . "
- "Agent 所有产物 (understanding.md / workflow.json / case-X.html / .md) 都落在这. "
- "case_id 自动从 basename 推 (e.g. --out-dir case-5-newflow → case_id='5-newflow'). "
- "已带 outputs/ 前缀会被容忍 (不重复嵌套); 传绝对路径则原样用.")
- p.add_argument("--model", default="claude-sonnet-4-6",
- help="Claude 模型名 (default: claude-sonnet-4-6)")
- p.add_argument("--workdir", default=None,
- help=f"SDK cwd. 默认 {DSL_ROOT} (即本脚本所在 procedure-dsl/)")
- p.add_argument("--version", default=None, metavar="SUFFIX",
- help="spec 版本: 跑 workdir 下的 spec-<SUFFIX>/ 目录. "
- "e.g. --version backup → spec-backup/, --version test → spec-test/. "
- "不传则默认 spec/.")
- p.add_argument("--max-turns", type=int, default=300,
- help="Agent 最大回合数 (default: 300, 跑完整三阶段 + lint 留足余量)")
- p.add_argument("--resume", action="store_true",
- help="恢复中断的 session: 从 outputs/case-N/.session_id 读 sid, "
- "Agent 拿上次完整历史接着跑. 中断期间可手改任何中间产物, "
- "Agent 会重新 Read 磁盘版本而不是凭记忆.")
- p.add_argument("--max-retries", type=int, default=3,
- help="Stream idle / 临时错误自动 --resume 重试次数 (default: 3). "
- "退避 10s/20s/40s, 不重试 RateLimit (exit 3) 和 Ctrl-C.")
- return p.parse_args()
- def main() -> None:
- # Windows 默认控制台编码 (cp1252/gbk) 撑不住 spec.md / Agent 输出里的中文 +
- # emoji, 第一次打印就 UnicodeEncodeError. 在 main 入口统一切到 UTF-8.
- for stream in (sys.stdout, sys.stderr):
- if hasattr(stream, "reconfigure"):
- stream.reconfigure(encoding="utf-8", errors="replace")
- logging.basicConfig(level=logging.WARNING)
- args = _parse_args()
- # 注入派生属性: case_id 自动从 --out-dir basename 算 (去掉 case- 前缀).
- # 下游 prompt / banner / type_suggestions record 都用这个 args.case_id.
- args.case_id = _derive_case_id(args.out_dir)
- # spec 目录名: --version backup → "spec-backup", 不传 → "spec".
- args.spec_name = "spec" if not args.version else f"spec-{args.version}"
- # ──── 自动重试循环 ──────────────────────────────────────────────────────
- # exit code 约定:
- # 0 = 成功
- # 1 = ClaudeSDKError (含 setup 错, 一般非 transient, 不重试)
- # 2 = ResultMessage is_error=True (含 stream idle timeout, **可重试**)
- # 3 = RateLimitEvent (配额耗尽, 不重试 — 等 5h window 滑动)
- # 130 = KeyboardInterrupt (用户主动中断, 不重试)
- # 重试退避: 10s → 20s → 40s, 都在 prompt cache TTL (5min) 内
- max_retries = args.max_retries
- workdir_check = Path(args.workdir or DSL_ROOT).resolve()
- out_dir_check = Path(args.out_dir)
- sid_file_check = (out_dir_check if out_dir_check.is_absolute() else (workdir_check / out_dir_check)).resolve() / ".session_id"
- # --version 选定的 spec 目录必须存在. 在进重试循环前 fail-fast:
- # 注意 run() 里 sys.exit(字符串) 的消息会被下面 except SystemExit 吞掉,
- # 所以这里在 main 早校验, 直接 print 到 stderr 再 exit(1).
- spec_dir_check = workdir_check / args.spec_name
- if not spec_dir_check.is_dir():
- avail = sorted(p.name for p in workdir_check.glob("spec*") if p.is_dir())
- print(
- f"❌ spec dir not found: {spec_dir_check}\n"
- f" (--version {args.version!r} → 期望目录 '{args.spec_name}/')\n"
- f" 可用版本: {avail}",
- file=sys.stderr,
- )
- sys.exit(1)
- for attempt in range(max_retries + 1):
- try:
- asyncio.run(run(args))
- return # 成功完成
- except KeyboardInterrupt:
- # session_id 在 init 时就落盘了, 已写到磁盘的产物全部保留.
- # 加 --resume 重启即可接着跑.
- print(
- f"\n⚠️ 中断 (Ctrl-C). {args.out_dir}/ 产物已保留.\n"
- f" 恢复: python run_procedure_dsl.py {args.source} "
- f"--out-dir {args.out_dir} --resume",
- file=sys.stderr,
- )
- sys.exit(130)
- except SystemExit as e:
- code = e.code if isinstance(e.code, int) else (1 if e.code else 0)
- # exit 2 = is_error (stream timeout 之类 transient), 可重试
- if code == 2 and attempt < max_retries:
- wait_s = 10 * (2 ** attempt) # 10s / 20s / 40s
- print(
- f"\n⚠️ attempt {attempt+1}/{max_retries+1} exit={code} "
- f"(可能 stream idle timeout). 等 {wait_s}s 后 --resume 重试...",
- file=sys.stderr,
- )
- time.sleep(wait_s)
- # 强制 --resume (前提是 session_id 已落盘 — init 阶段就写了, 通常都在)
- if sid_file_check.exists():
- args.resume = True
- else:
- print(f"⚠️ {sid_file_check} 不存在, fresh restart 而非 resume",
- file=sys.stderr)
- continue
- # 不重试: exit 0 (成功) / exit 1 (SDK 错) / exit 3 (RateLimit) / 用尽 retries
- sys.exit(code)
- # 用尽 max_retries 后还到这里 (按 for-else 语义实际不会, 但保险)
- print(f"\n❌ 用尽 {max_retries} 次重试.", file=sys.stderr)
- sys.exit(2)
- if __name__ == "__main__":
- main()
|