| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899 |
- #!/usr/bin/env python3
- """
- run_cyber.py — run_procedure_dsl.py 的 Cyber Agent 移植版 (POC)。
- 与 run_procedure_dsl.py 的唯一区别是**执行引擎**:
- - run_procedure_dsl.py: Claude Agent SDK (ClaudeSDKClient) → 走 ~/.claude OAuth Max,
- 只能跑 Anthropic 协议端点。
- - run_cyber.py: 本仓库自研 AgentRunner (agent/core/runner.py) → 多 Provider。
- 本 POC 默认走 OpenRouter (create_openrouter_llm_call), 一个 provider 通打
- GPT / Gemini / Qwen / DeepSeek / Claude 全家, 换模型只改 --model 字符串。
- 复用 run_procedure_dsl.py 的:
- - 起手 prompt 全文 (_build_initial_blocks 的 text 块) —— 三阶段指令一字不改。
- - 图片抽取 (_images_from_source) + 客户端下载缓存 (_url_to_cached_path)。
- 图片块从 Anthropic base64 格式转成 OpenRouter 要的 OpenAI `image_url` data-URL 格式。
- 单 Agent 全程跑 (与 spec 对齐, 见 spec/tools.md §7): Phase 2 (归类标注) 由主 Agent
- 自己一趟做完, 不再分发 phase-2a/2b 子 Agent。presets_cyber.json 只保留 main preset;
- exclude_tools 关掉 agent/evaluate 两个分发工具, 防弱模型自作主张去 delegate。
- 用法 (与 run_procedure_dsl.py 对齐):
- python run_cyber.py input/case-2-raw.json --out-dir case-2-cyber
- python run_cyber.py input/case-2-raw.json --out-dir case-2-cyber \
- --model openai/gpt-4o
- # 中断后续跑 (从 outputs/<out-dir>/.trace_id 读 trace 接着跑):
- python run_cyber.py input/case-2-raw.json --out-dir case-2-cyber --resume
- ⚠️ POC 已知缺口 (非 Claude 模型上需逐步调):
- - 起手 prompt 与 spec/ 里写的是 Claude 工具名 (Read/Write/Bash)。Cyber 实际工具是
- read_file/write_file/edit_file/bash_command。下方 _CYBER_RUNTIME_NOTE 给了映射, 但
- spec 文档内 `详见 Read(...)` 这类示例仍是 Claude 名 —— 弱模型可能被带偏, 需观察 trace。
- - edit_file 在非 Claude 模型上的 exact-match 命中率不如 Claude, workflow.json 反复 Edit
- 可能卡壳。先拿单 case smoke test, 别直接上量。
- """
- import argparse
- import asyncio
- import base64
- import httpx
- import importlib.util
- import json
- import logging
- import os
- import sys
- import time
- from datetime import datetime
- from pathlib import Path
- from typing import Any, Dict, List
- # run_cyber.py → procedure-dsl/
- DSL_ROOT = Path(__file__).resolve().parent
- def _find_repo_root(start: Path) -> Path:
- """向上找含 pyproject.toml 的目录 (cyber-agent 仓库根), 用于 sys.path 兜底。"""
- for p in [start, *start.parents]:
- if (p / "pyproject.toml").exists():
- return p
- return start
- REPO_ROOT = _find_repo_root(DSL_ROOT)
- for _p in (str(REPO_ROOT), str(DSL_ROOT)):
- if _p not in sys.path:
- sys.path.insert(0, _p)
- # 技能本地「计划」内置工具 (plan_procedures): import 即把它注册进全局工具表 (groups=["core"]),
- # 主 Agent 因 tool_groups 含 core 而能看到它。Phase 1 第一步让 LLM 调用它做 understanding +
- # 自动生成 workflow.json 骨架。run_cyber 仅做注册 + 注入原文上下文, 业务逻辑全在 plan_tool.py。
- import plan_tool # noqa: E402 (必须在 sys.path 设好之后)
- # 阶段文件 → 阶段标识 (监听 read_file 调用判断 agent 有没有读对应阶段规格)
- _PHASE_FILES = {"phase1-skeleton": "phase1", "phase2-normalize": "phase2", "phase3-finalize": "phase3"}
- def _phase_read_gaps(out_dir: Path, read_phase: set) -> List[str]:
- """检查 agent 有没有「做了某阶段的活却没读那个阶段的规格文件」(弱模型惯犯, 导致格式/规则全靠瞎猜)。
- 判「做了某阶段」: Phase 2 = 填了 effect/action/intent; Phase 3 = 出了 HTML。
- 做了却没 read_file 对应文件 → 报出来, 续跑时逼它先读再修。
- """
- gaps: List[str] = []
- wf = out_dir / "workflow.json"
- if not wf.exists():
- return gaps
- try:
- d = json.loads(wf.read_text(encoding="utf-8"))
- except Exception:
- return gaps
- did_p1 = any(p.get("steps") for p in d.get("procedures", []))
- did_p2 = any((s.get("effect") or s.get("action") or s.get("intent"))
- for p in d.get("procedures", []) for s in (p.get("steps") or []) if isinstance(s, dict))
- did_p3 = bool(list(out_dir.glob("*.html")))
- if did_p1 and "phase1" not in read_phase:
- gaps.append("你建了 workflow(Phase 1)却**没 read_file** spec/extraction/phase1-skeleton.md —— "
- "先读它(字段填法、@quote 用法、IO 校验要求), 再对照检查你的 Phase 1 输出。")
- if did_p2 and "phase2" not in read_phase:
- gaps.append("你做了 Phase 2(填了 effect/action/intent)却**没 read_file** "
- "spec/extraction/phase2-normalize.md —— 先读它(尤其 §5 intent 的 {in-type:}/{out-type:} "
- "标记格式、词表归类规则), 再对照修正你的 Phase 2 输出。")
- if did_p3 and "phase3" not in read_phase:
- gaps.append("你做了 Phase 3(出了 HTML)却**没 read_file** "
- "spec/extraction/phase3-finalize.md —— 先读它的收尾检查清单再确认。")
- return gaps
- def _completion_gaps(out_dir: Path) -> List[str]:
- """跑完后查 workflow 是否真做完了 (防弱模型吐空消息提前自停)。返回未完成项清单, 空=完成。
- 判据: ① 每步填了 effect+action(Phase 2 归类完); ② intent 用了 {in-type:}/{out-type:} 标记格式
- (Phase 2 §5, 渲染成彩色类型胶囊); ③ 出了 HTML(Phase 3 渲染完)。
- """
- gaps: List[str] = []
- wf = out_dir / "workflow.json"
- if not wf.exists():
- return ["workflow.json 还没建 (Phase 1 没做完)"]
- try:
- d = json.loads(wf.read_text(encoding="utf-8"))
- except Exception:
- return ["workflow.json 不是合法 JSON (修好再继续)"]
- _MARKERS = ("in-type:", "out-type:", "act:", "via:", "effect:")
- missing, intent_bad = [], []
- for p in d.get("procedures", []):
- for s in p.get("steps", []):
- if not isinstance(s, dict) or s.get("kind") == "block":
- continue # 控制块不要求 effect/action
- sid = f"{p.get('id')}.{s.get('id')}"
- if not (s.get("effect") or "").strip() or not (s.get("action") or "").strip():
- missing.append(sid)
- # intent 标记格式: 有 IO 的步, intent 必须带 {in-type:}/{out-type:} 等标记
- has_io = bool(s.get("inputs") or s.get("outputs"))
- intent = (s.get("intent") or "").strip()
- if has_io and (not intent or "{" not in intent or not any(m in intent for m in _MARKERS)):
- intent_bad.append(sid)
- if missing:
- gaps.append(f"{len(missing)} 个步骤缺 effect/action (Phase 2 没做完): "
- f"{', '.join(missing[:8])}{' …' if len(missing) > 8 else ''}")
- if intent_bad:
- gaps.append(f"{len(intent_bad)} 个步骤的 intent 没用标记格式 (Phase 2 §5: 写成带 "
- f"{{in-type:X}}/{{out-type:Y}}/{{act:Z}} 的句子, 如 `{{act:元素生成}}从{{in-type:提示词}}得到{{out-type:场景图}}`): "
- f"{', '.join(intent_bad[:8])}{' …' if len(intent_bad) > 8 else ''}")
- if not list(out_dir.glob("*.html")):
- gaps.append("还没渲染出 HTML (Phase 3 没做: 跑 render-case.py)")
- return gaps
- def _load_env() -> None:
- """加载仓库根 .env 到环境变量。
- 各 provider 的 create_*_llm_call 直接 os.getenv 读 key / base_url
- (OPEN_ROUTER_API_KEY / QWEN_API_KEY / QWEN_BASE_URL 等), 但本脚本绕过
- agent.client (那里才 load_dotenv), 故在此显式加载, 否则 .env 里的配置读不到。
- override=False: 已在 shell 里 export 的值优先, 不被 .env 覆盖。
- """
- try:
- from dotenv import load_dotenv
- except ImportError:
- return
- env_file = REPO_ROOT / ".env"
- if env_file.exists():
- load_dotenv(env_file, override=False)
- def _load_sibling_module(name: str, path: Path):
- """按文件路径 import 同目录脚本 (run_procedure_dsl.py 不是包, 用 spec 加载)。"""
- spec = importlib.util.spec_from_file_location(name, path)
- mod = importlib.util.module_from_spec(spec)
- spec.loader.exec_module(mod)
- return mod
- # 复用 run_procedure_dsl.py 的纯函数 (它 main() 有 __main__ 守卫, import 无副作用)。
- _rpd = _load_sibling_module("run_procedure_dsl", DSL_ROOT / "run_procedure_dsl.py")
- _build_initial_blocks = _rpd._build_initial_blocks
- _images_from_source = _rpd._images_from_source
- _url_to_cached_path = _rpd._url_to_cached_path
- _derive_case_id = _rpd._derive_case_id
- _resolve_out_dir = _rpd._resolve_out_dir
- _MEDIA_TYPE = _rpd._MEDIA_TYPE
- # 极简引导: 只告诉主 Agent「你是 Cyber Agent + 工具名 + 去读 README 的运行时约定节」。
- # 其余所有运行时规则都搬进了 spec/README.md 的「🛠 运行时约定」节(agent 本来就先读 README)。
- _CYBER_RUNTIME_NOTE = """
- ⚠️ 你的执行引擎是 **Cyber Agent**(不是 Claude Code):可用工具是 `read_file` / `write_file` / `edit_file` / `bash_command` / `glob_files` / `grep_content` / `read_images`(**不是** Read/Write/Edit/Bash/Glob/Grep)。
- **第一步就 read_file `spec/README.md`**(在你 cwd `procedure-dsl/` 下),然后**严格照它的「🛠 运行时约定」节操作** —— 尤其『仅 Cyber Agent 引擎』那部分(`bash_command` 是 cmd.exe、`goal` 用纯数字 focus、spec 相对链接补 `spec/` 前缀、read_file 会截断长文本)。「怎么建 workflow.json / 怎么用 @quote 填 value / patch 路径怎么写」全以 README 那节 + 各阶段文件为准, 不要自行发挥。
- """
- def _downscale_image(raw: bytes, max_dim: int, quality: int) -> tuple:
- """把图片下采样 + 重压缩成 JPEG, 返回 (bytes, mime)。
- 为什么必须做: 实测 case 的 14 张原图 base64 合计 ~12MB, 直接发会把
- OpenRouter→Claude 的上游流打断 (api_error: internal stream ended unexpectedly);
- 减到 ~3MB 内就稳。同时大幅省 input token (PNG 截图 base64 极占 token)。
- 策略: 最长边 > max_dim 才缩放 (保持比例); 一律转 JPEG (PNG 截图转 JPEG 体积降一个量级);
- 有透明通道的拍平到白底 (截图场景安全)。max_dim<=0 表示关闭, 原样返回。
- PIL 不可用或处理失败 → 原样返回 (降级不阻塞)。
- """
- if max_dim <= 0:
- return raw, "image/jpeg"
- try:
- import io
- from PIL import Image
- im = Image.open(io.BytesIO(raw))
- if im.mode in ("RGBA", "LA", "P"):
- bg = Image.new("RGB", im.size, (255, 255, 255))
- im = im.convert("RGBA")
- bg.paste(im, mask=im.split()[-1])
- im = bg
- else:
- im = im.convert("RGB")
- w, h = im.size
- if max(w, h) > max_dim:
- scale = max_dim / max(w, h)
- im = im.resize((max(1, int(w * scale)), max(1, int(h * scale))), Image.LANCZOS)
- out = io.BytesIO()
- im.save(out, format="JPEG", quality=quality, optimize=True)
- return out.getvalue(), "image/jpeg"
- except Exception as e:
- print(f"[image] downscale 失败, 用原图: {type(e).__name__}: {e}", flush=True)
- return raw, "image/png"
- def _to_openai_content(text: str, images: List[str],
- max_dim: int = 1280, quality: int = 85) -> List[Dict[str, Any]]:
- """把 (text, 图URL列表) 拼成 OpenAI 格式的 content blocks (OpenRouter / 各家通吃)。
- - 文本块: {"type": "text", "text": ...}
- - 图片块: {"type": "image_url", "image_url": {"url": "data:<mime>;base64,<...>"}}
- URL 先经 run_procedure_dsl._url_to_cached_path 客户端下载缓存 (绕图床 robots.txt)。
- 每张图经 _downscale_image 下采样+转 JPEG (max_dim<=0 关闭), 防大 payload 打断上游流。
- 单张图失败不阻塞整批。
- """
- blocks: List[Dict[str, Any]] = [{"type": "text", "text": text}]
- n_ok, n_fail = 0, 0
- bytes_before, bytes_after = 0, 0
- for ref in images:
- try:
- if ref.startswith(("http://", "https://")):
- local = _url_to_cached_path(ref)
- else:
- local = Path(ref).expanduser().resolve()
- if not local.exists():
- raise FileNotFoundError(ref)
- raw = local.read_bytes()
- small, mime = _downscale_image(raw, max_dim, quality)
- bytes_before += len(raw)
- bytes_after += len(small)
- data = base64.standard_b64encode(small).decode()
- blocks.append({
- "type": "image_url",
- "image_url": {"url": f"data:{mime};base64,{data}"},
- })
- n_ok += 1
- except Exception as e:
- n_fail += 1
- print(f"[image] skip {ref[:80]}... ({type(e).__name__}: {e})", flush=True)
- if n_ok and max_dim > 0:
- print(f"[image] 下采样: {bytes_before//1024}KB → {bytes_after//1024}KB "
- f"(max_dim={max_dim}, q={quality})", flush=True)
- if images:
- print(f"[image] {n_ok}/{len(images)} 成功 base64 化, {n_fail} 失败已跳过", flush=True)
- return blocks
- # ──── 执行前预 OCR: 每张配图 → 文本 (供 quote-source --ocr 搜) ──────────────────
- # 截图教程的 prompt / JSON / 参数常只在图里, body_text 抽不到。执行前 OCR 成文本,
- # 让 LLM 也能从图片内容里 quote 出真实 value/directive。按图字节 hash 缓存, 不重复花钱。
- def _ocr_one(raw: bytes, model: str, api_key: str,
- max_dim: int = 2000, quality: int = 90) -> str:
- """单张图 → OCR 文本 (OpenRouter 视觉调用)。失败抛异常由上层兜。"""
- small, mime = _downscale_image(raw, max_dim, quality)
- data = base64.standard_b64encode(small).decode()
- instr = ("请把这张图片里的所有文字逐字提取出来, 按从上到下、从左到右的阅读顺序输出。"
- "只输出图中文字本身, 不要翻译、不要解释、不要添加任何说明。"
- "图中若有代码/JSON/提示词, 请保留其原始换行与格式。若图中无文字, 输出空。")
- payload = {
- "model": model,
- "messages": [{"role": "user", "content": [
- {"type": "text", "text": instr},
- {"type": "image_url", "image_url": {"url": f"data:{mime};base64,{data}"}},
- ]}],
- }
- r = httpx.post("https://openrouter.ai/api/v1/chat/completions",
- headers={"Authorization": f"Bearer {api_key}"}, json=payload, timeout=120)
- r.raise_for_status()
- j = r.json()
- return (j.get("choices") or [{}])[0].get("message", {}).get("content", "") or ""
- def _ocr_images(refs: List[str], model: str, api_key: str, cache_dir: Path) -> str:
- """对每张图 OCR, 合并成带分段标记的文本。按图字节 hash 缓存, 单张失败跳过不阻塞。"""
- import hashlib
- cache_dir.mkdir(exist_ok=True)
- out, n_ok = [], 0
- for n, ref in enumerate(refs, 1):
- try:
- if ref.startswith(("http://", "https://")):
- local = _url_to_cached_path(ref)
- else:
- local = Path(ref).expanduser().resolve()
- raw = local.read_bytes()
- h = hashlib.sha256(raw).hexdigest()[:24]
- cf = cache_dir / f"{h}.txt"
- if cf.exists():
- txt, tag = cf.read_text(encoding="utf-8"), " (cache)"
- else:
- txt, tag = _ocr_one(raw, model, api_key), ""
- cf.write_text(txt, encoding="utf-8")
- out.append(f"\n===== [图 {n}] 来源: {ref[:90]} =====\n{txt.strip()}\n")
- n_ok += 1
- print(f"[ocr] 图 {n}/{len(refs)}: {len(txt.strip())} 字{tag}", flush=True)
- except Exception as e:
- print(f"[ocr] 图 {n}/{len(refs)} 失败跳过: {type(e).__name__}: {e}", flush=True)
- print(f"[ocr] {n_ok}/{len(refs)} 张成功", flush=True)
- return "".join(out)
- def _trace_append(trace_path: Path, chunk: str) -> None:
- with trace_path.open("a", encoding="utf-8") as f:
- f.write(chunk)
- def _content_to_text(content: Any) -> str:
- """把 Message.content 归一成纯文本。
- 不同 provider 的 content 形态不一:
- - str: 直接用 (OpenRouter / 多数情况)。
- - list[block]: OpenAI/Qwen 多模态格式 [{"type":"text","text":...}, ...],
- 抽出各块的 text 字段拼起来。
- - dict: 单个 block, 取其 text / content 字段, 取不到就 str() 兜底。
- 切片 (content[:2000]) 前必须先过这里, 否则对 dict/list 切片会抛 KeyError/TypeError。
- """
- if isinstance(content, str):
- return content
- if isinstance(content, list):
- parts = []
- for b in content:
- if isinstance(b, str):
- parts.append(b)
- elif isinstance(b, dict):
- parts.append(b.get("text") or b.get("content") or "")
- return "".join(parts)
- if isinstance(content, dict):
- # Qwen/DeepSeek assistant: text 可能为空, 真正的话在 reasoning_content。
- # tool 结果消息: 内容在 "result" 键 (之前漏读它, 导致 [tool result] 控制台全空白)。
- # 都取不到就返回 "" (不要 str(content) 把整个 dict 当文本 dump 出来)。
- return (content.get("text") or content.get("reasoning_content")
- or content.get("result") or content.get("content") or "")
- return str(content)
- # 两段式 (--phase1-model): Pass 1 只做 Phase 1, 然后换模型 resume 做 Phase 2+。
- _PHASE1_STOP_NOTE = """
- ## ⏸️ 本段任务: 先完成 Phase 1, 然后**暂停等待下一步指示**
- 这是一个**分阶段协作**的任务, 你负责的是**第一阶段**。本段请专注做完 Phase 1:
- - Phase 1.1 心智模型 → 写 understanding.md
- - Phase 1.2 workflow.json 骨架 (procedures/steps/IO 结构 + name/purpose/declarations)
- - Phase 1.3 anchor 闭合 (IO 引用)
- 完成 Phase 1.3 后, 请**暂停**: 用一句话报告产出, **本轮不再发任何工具调用**, 等待后续指示来推进 Phase 2/3。
- (注意: 这**不是禁止** Phase 2, 只是分工上**这一段先到 Phase 1 为止**; 后续会有新指示让你或另一协作者继续。)
- 你的 Phase 1 产出质量直接决定后续阶段, 所以 understanding.md 和 workflow.json 骨架务必扎实、完整。
- """
- # ── 实验模式 note (--exp) ──────────────────────────────────────────────────
- # 实验只比 Phase 1 骨架质量, 两方案都"产出 workflow.json 骨架+anchor 后停", 不跑 Phase 2/3。
- # 方案 1 (direct): 强模型不写 understanding.md, 边想边直接出 workflow.json。
- _EXP_DIRECT_NOTE = """
- ## 🧪 实验模式 (direct): 不写 understanding.md, 直接产 workflow.json
- 本次**跳过 Phase 1.1 的 understanding.md 文件** (spec 里提到的这一步本次作废, 不要 Write 它)。
- 把"有几个独立工序、每个工序的步骤/IO/控制流"的分析**直接写在你的文字回复里**(简明扼要), 然后:
- - 直接 Write `workflow.json` 骨架 (Phase 1.2: procedures/steps/IO 结构 + name/purpose/declarations);
- - 用 wf-patch.py 加 anchor (Phase 1.3: IO 闭合)。
- 完成 anchor 闭合后**立即停止**, 一句话总结即可, **不要进入 Phase 2** (不填 effect/action/type/substance/form, 不分发子 Agent)。
- """
- # 方案 2 第一步 (split-A): 强模型只产 understanding.md, 不碰 workflow.json。精简读单。
- _EXP_UNDERSTANDING_ONLY_NOTE = """
- ## 🧪 实验模式 (split · 第一步): 只产 understanding.md
- 本次**只做 Phase 1.1**: 通读原文(含图)建立心智模型, 写进 understanding.md, 然后**立即停止**
- (**不要 Write workflow.json**, 不进 Phase 1.2+, **本轮不再发任何工具调用**)。
- ### ⚡ 精简读单 (覆盖上面起手指令里的完整清单 — 以本节为准)
- 本步**只读**这几样, 其余一律不读 (读了纯烧 context, 它们是给下游做 workflow.json 的步骤用的):
- - `spec/README.md` (已在起手读过, 别重读)
- - `spec/syntax.md` (DSL 概念: procedure/step/IO/effect/action — 让心智模型用对术语)
- - `spec/extraction/phase1-skeleton.md` (**多工序判断标准** — 怎么判定有几个独立工序)
- - 原文 case json (body_text + 元数据) + 本消息所附的图
- **明确不要读** (本步用不上):
- - `spec/tools.md` (脚本接口 — 本步不调任何脚本)
- - `spec/extraction/fields.md` (23 字段填法)
- - `spec/extraction/control-flow.md` (block/nested 的 JSON 建模)
- - `spec/format/md-structure.md` (.md 产物结构)
- ### understanding.md 要写到"能让另一个模型照着填出 workflow.json"的程度
- - 有几个独立工序 (按 phase1-skeleton 判断标准), 每个: 工序名 + 终态产物 + 大致步骤数 + 工艺类型;
- - 每个工序的步骤序列, 每步的输入/输出 (是什么数据、从哪来、到哪去);
- - **控制流用大白话讲清** (哪步是循环/并行/分支、循环什么、并行几路) —— 你不必读 control-flow.md,
- 文字描述即可, 下游模型据此建 block/nested。
- 后续由另一个模型读你的 understanding.md + JSON schema 生成 workflow.json。
- """
- # 方案 2 第二步 (split-B): 全新一段、不给图, 弱模型只凭 understanding.md + schema 产 workflow.json。
- _EXP_WORKFLOW_FROM_UNDERSTANDING_NOTE = """
- ## 🧪 实验模式 (split · 第二步): 据 understanding.md 产 workflow.json (本次不附原图)
- Phase 1.1 的心智模型已由**另一个(更强的)模型**写好 —— 就是上面"输出目录"里的 **understanding.md**。
- 本次你的任务是 **Phase 1.2 + 1.3**, 且**只依据 understanding.md + schema**(本消息不附原图, 你看不到截图):
- 1. read_file 输出目录里的 `understanding.md`, 以及 spec 的 `format/case-data.schema.json`;
- 2. 按 understanding.md 的工序划分, Write `workflow.json` 骨架 (procedures/steps/IO 结构 + name/purpose/declarations);
- 3. 用 bash_command 跑 wf-patch.py 加 anchor (IO 闭合, 单条命令不要拼 `;`)。
- 完成 anchor 后**立即停止**, **不要进入 Phase 2**, 也**不要重写 understanding.md**。
- """
- async def run(args: argparse.Namespace) -> int:
- from agent.core.runner import AgentRunner, RunConfig, KnowledgeConfig
- from agent.core.presets import load_presets_from_json
- from agent.trace import FileSystemTraceStore, Trace, Message
- from agent.llm import create_openrouter_llm_call, create_qwen_llm_call
- # provider 选择: 决定 llm_call 走哪家端点。
- # openrouter → OPEN_ROUTER_API_KEY, 一个 URL 通打各家 (model 形如 qwen/qwen-max)。
- # qwen → QWEN_API_KEY + QWEN_BASE_URL (.env), 阿里 dashscope 原生
- # (model 形如 qwen-plus / qwen-max, 无 "qwen/" 前缀)。
- if args.provider == "qwen":
- make_llm_call = lambda: create_qwen_llm_call(model=args.model)
- else:
- make_llm_call = lambda: create_openrouter_llm_call(model=args.model)
- workdir = DSL_ROOT
- source_path = Path(args.source).expanduser().resolve()
- if not source_path.exists():
- print(f"❌ source not found: {source_path}", file=sys.stderr)
- return 1
- out_dir = _resolve_out_dir(args.out_dir, workdir)
- out_dir.mkdir(parents=True, exist_ok=True)
- (out_dir / "_scratch").mkdir(exist_ok=True)
- trace_id_file = out_dir / ".trace_id"
- trace_path = out_dir / "_trace_cyber.md"
- # 注册 main preset (单 Agent; phase-2a/2b 子 Agent 已废弃, 见 spec/tools.md §7)。
- presets_json = DSL_ROOT / "presets_cyber.json"
- if presets_json.exists():
- load_presets_from_json(str(presets_json))
- else:
- print(f"⚠️ 缺少 {presets_json}, 用 runner 默认 main preset", file=sys.stderr)
- # source 路径给 Agent (workdir 相对优先)。
- try:
- source_for_agent = source_path.relative_to(workdir).as_posix()
- except ValueError:
- source_for_agent = str(source_path)
- # resume: 读已存 trace_id, 只发增量 "接着做" 消息。
- resume_tid = None
- if args.resume:
- if not trace_id_file.exists():
- print(f"❌ --resume 但无 {trace_id_file}; 先正常跑一次", file=sys.stderr)
- return 1
- resume_tid = trace_id_file.read_text(encoding="utf-8").strip() or None
- images = _images_from_source(source_path) + (args.extra_image or [])
- # 执行前预 OCR: 把每张配图的文字提取成文本, 落 _scratch/ocr.txt, 供 quote-source --ocr 搜。
- # 只在 fresh run 做 (resume 时上次的 ocr.txt 还在); 按图字节 hash 缓存, 重跑不重复花钱。
- ocr_path = out_dir / "_scratch" / "ocr.txt"
- if not resume_tid and not getattr(args, "no_ocr", False) and images:
- api_key = os.getenv("OPEN_ROUTER_API_KEY")
- if not api_key:
- print("[ocr] 跳过: 未设 OPEN_ROUTER_API_KEY", flush=True)
- else:
- print(f"[ocr] 对 {len(images)} 张配图预 OCR (model={args.ocr_model}) ...", flush=True)
- ocr_text = _ocr_images(images, args.ocr_model, api_key, DSL_ROOT / ".ocr_cache")
- if ocr_text.strip():
- ocr_path.write_text(ocr_text, encoding="utf-8")
- print(f"[ocr] -> {ocr_path} (共 {len(ocr_text)} 字)", flush=True)
- spec_name = "spec" if not getattr(args, "spec_version", None) else f"spec-{args.spec_version}"
- if resume_tid and getattr(args, "_phase2_handoff", False):
- # 两段式 Pass 2: Phase 1 已由另一模型做完, 从 Phase 2 开始。
- # ⚠️ 必须强硬作废历史里 Pass1 的"只做 Phase1 就停"指令, 否则弱模型会跟着旧指令
- # 重做 Phase1 再停 (实测 gemini-flash-lite 就这么干了)。
- cd = out_dir.as_posix()
- msgs = [{"role": "user", "content": (
- f"【阶段交接 — 之前的指令已变更, 请严格按本条执行】\n\n"
- f"Phase 1 (understanding.md + workflow.json 骨架 + anchor 闭合) **已经全部完成并落盘**, 是上一个模型做的。\n\n"
- f"⚠️ 历史里那条『本次只做 Phase 1、做完即停、不要进 Phase 2』的指令**现已作废**。"
- f"你现在的唯一任务是完成 **Phase 2 和 Phase 3**。\n\n"
- f"❌ **绝对不要**重写 / 重新生成 understanding.md 或 workflow.json 的骨架 —— 它们已经做好了, 重做即错误。\n"
- f"✅ 现在立刻执行 (用 bash_command, 单条命令不要拼 `;`):\n"
- f" 1. read_file `{cd}/workflow.json` 看当前骨架 (不要凭记忆, 也不要重写它)。\n"
- f" 2. 读 spec/extraction/phase2-normalize.md, 由你**自己一趟做完 Phase 2**: 作用/动作/类型 对词表、"
- f"实质/形式 直接提炼元素点 (不查词表)、每步填 intent。**不要**切任务 / 分发子 Agent。\n"
- f" 3. 用 wf-patch.py (bash_command, --set 或 --patch) 回填 effect/action/type/substance/form/intent 到 workflow.json。\n"
- f" 4. Phase 3: 跑 lint-case.py 校验, 再 render-case.py 出 HTML (.html 是唯一产物, .md 已取消)。"
- )}]
- elif resume_tid:
- msgs = [{"role": "user", "content": (
- f"上次中断了, 接续做 case-{args.case_id} 的提取流程。\n"
- f"先用 bash_command `ls` 看 {out_dir.as_posix()}/ 当前已落盘哪些产物, "
- f"再 read_file 这些**当前磁盘版本** (understanding.md / workflow.json) 接着跑, "
- f"不要凭记忆。Phase 2 由你自己一趟做完 (wf-patch.py 落盘, 不分发子 Agent)。"
- )}]
- else:
- # 复用原脚本的起手 prompt 全文 (取 text 块), 再补 Cyber 运行时说明。
- anth_blocks = _build_initial_blocks(
- source_for_agent, args.case_id, args.out_dir, images, workdir, spec_name
- )
- base_text = anth_blocks[0]["text"] + _CYBER_RUNTIME_NOTE
- if ocr_path.exists():
- base_text += (
- f"\n\n## 🖼️ 配图已 OCR 成文本\n"
- f"原文配图的文字已 OCR 提取到 `{ocr_path.as_posix()}`。"
- f"填 value/directive 需要图里的文字(prompt/JSON/参数常只在图中)时, "
- f"用 `python spec/tools/quote-source.py --source {source_for_agent} --query \"<短语>\" --ocr {ocr_path.as_posix()}` "
- f"一并搜原文+图片 (quote-source 读全文件, 不受 read_file 截断影响; 别用 read_file 通读大 ocr.txt)。"
- )
- # 内联完整正文: read_file 会把 body_text 这种超长单行砍在 2000 字 (弱模型常不续 char_offset
- # → 正文后半段静默丢失)。把完整 body_text 直接附进 prompt, agent 理解正文以这份为准。
- try:
- _sd = json.loads(source_path.read_text(encoding="utf-8"))
- _body = _sd.get("body_text") if isinstance(_sd, dict) else None
- except Exception:
- _body = None
- if _body:
- base_text += (
- f"\n\n## 📄 原文正文 (完整版, 已内联 — 别再 read_file 原文取正文)\n"
- f"⚠️ read_file 读 `{source_for_agent}` 会把 body_text 这一长行砍在 2000 字 → 丢后半段。"
- f"理解正文、提取 value/directive **以下面这份完整正文为准**; read_file 原文文件只为取 "
- f"title/link/publish_timestamp 等短字段。\n\n```\n{_body}\n```"
- )
- # 给「计划」+「IO 校验」工具注入原文 + 配图 OCR + 输出目录 + source 元信息。
- _sd2 = _sd if isinstance(_sd, dict) else {}
- _ocr_text = ocr_path.read_text(encoding="utf-8") if ocr_path.exists() else ""
- _lint_ocr = f" --ocr {ocr_path.as_posix()}" if ocr_path.exists() else "" # verify-io / lint 共用
- plan_tool.set_plan_context(
- body_text=_body or "",
- ocr=_ocr_text,
- out_dir=out_dir,
- case_id=args.case_id,
- source={
- "platform": "", # LLM/后续可补
- "author": _sd2.get("channel_account_name", ""),
- "url": _sd2.get("link", ""),
- "title": _sd2.get("title", ""),
- "date": str(_sd2.get("publish_timestamp", "") or ""),
- "excerpt": (_body or "")[:120],
- },
- )
- base_text += (
- f"\n\n## 🧭 第一步(必做): 调用 plan_procedures 工具做计划\n"
- f"在动手建 workflow.json 前, **先调用一次 `plan_procedures` 工具**, 交上你的工序计划: "
- f"把这篇拆成几个工序、每个工序的步骤逐条展开(工具·输入·动作·输出四要素)、并声明每个工序"
- f"覆盖原文哪些 `0N` 章节(source_sections)。工具会校验: 有章节没被任何工序认领 → 报错让你补; "
- f"通过后**自动据计划生成 workflow.json 骨架**(工序/步骤/顺序锁定)。之后你只在骨架上用 "
- f"wf-patch 填 value/directive/anchor, **不要再 write_file 重写 workflow.json、也不增删工序/步骤**。\n"
- f"填完 value/directive/anchor(这是 **Phase 2.0 填内容**)后, **必须跑一次 IO 校验脚本**(照抄):\n"
- f"```bash\npython spec/tools/verify-io.py --workflow {out_dir.as_posix()}/workflow.json "
- f"--source {source_for_agent}{_lint_ocr}\n```\n"
- f"它校验每个文本 IO 的 value 是否逐字、**每个生成步有没有 type=提示词 的输入**(提示词是数据不是 directive)、"
- f"提示词 value 是否完整不截断、declarations 是否补全; "
- f"报 ✗ 就修(此时可重读原文, 提示词用 @quote 提**全**别缩写/截断)再跑, 校验通过才进 **Phase 2.1 归类标注**(effect/action/type/substance/form/intent)。"
- )
- # Phase 3 lint 必须带 --source(+--ocr)才会跑「章节覆盖」+「value 逐字」两条结构/值强制;
- # 这里钉死精确路径, 免得 agent 用错文件名导致校验静默跳过。(_lint_ocr 已在上面定义)
- base_text += (
- f"\n\n## ✅ Phase 3 lint 命令(照抄, 别省 --source)\n"
- f"```bash\npython spec/tools/lint-case.py --workflow {out_dir.as_posix()}/workflow.json "
- f"--case-id {args.case_id} --source {source_for_agent}{_lint_ocr}\n```\n"
- f"带 `--source` 才会查**章节覆盖**(原文每个 `0N` 章节都要落进某工序/步骤, 别整段漏抽)"
- f"和 **value 逐字**(文本类 value 要是原文一整段连续文本, 别抄开头后缩写)。"
- f"报「章节疑似漏抽」回 Phase 1 补工序; 报「value 疑似缩写」回 Phase 2.0 用 `@quote` 重填。"
- )
- imgs_for_prompt = images
- if getattr(args, "_exp_direct", False):
- base_text += _EXP_DIRECT_NOTE # 方案1: 不写 understanding, 直接 workflow.json
- elif getattr(args, "_exp_understanding_only", False):
- base_text += _EXP_UNDERSTANDING_ONLY_NOTE # 方案2 第一步: 只产 understanding
- elif getattr(args, "_exp_workflow_from_understanding", False):
- base_text += _EXP_WORKFLOW_FROM_UNDERSTANDING_NOTE # 方案2 第二步: 据 understanding 产 workflow
- imgs_for_prompt = [] # 不给图, 纯凭 understanding.md + schema
- elif getattr(args, "phase1_only", False):
- base_text += _PHASE1_STOP_NOTE # 两段式 Pass 1: 做完整 Phase 1 即停
- msgs = [{"role": "user", "content": _to_openai_content(
- base_text, imgs_for_prompt, max_dim=args.max_image_dim, quality=args.image_quality)}]
- cfg = RunConfig(
- model=args.model,
- temperature=0.3,
- max_iterations=args.max_turns,
- agent_type="main",
- name=f"procedure-dsl case-{args.case_id} (cyber)",
- tool_groups=["core", "system"], # core=read/write/edit/glob/grep (agent 工具下面 exclude 掉);
- # system=bash_command
- # ⚠️ 没 system 组 → 主 Agent 无 bash, 跑不了 spec/tools/*.py
- # (wf-patch / lint-case / render-case 全靠 bash)
- parallel_tool_execution=True, # 允许同轮并行工具调用 (如多个 read_file); 子 Agent 分发已废弃
- context_injection_interval=0, # 关掉周期性自动注入 get_current_context: procedure-dsl 单 Agent
- # 不用 goal/协作者/IM, 那些注入只是给弱模型添乱 + 烧 token
- enable_prompt_caching=False, # 非 Claude 模型无效, 关掉省得干扰
- # 关掉 goal 压缩: 它会在 goal 完成后把详细消息压成 [[SUMMARY]], 而弱模型 (如
- # gemini-flash-lite) 一丢细节就倾向"推倒重做 Phase 1", 覆盖掉已完成的 Phase 2
- # 归一化数据。单 case 运行上下文有限, 保留全量更安全。
- goal_compression="none",
- # 关掉知识沉淀: 否则任务结束会被自动注入"复盘→knowledge_save_pending"prompt
- # (上次 Claude 在 seq6 被它带跑偏、qwen 浪费 turn51-52)。procedure-dsl 不需要它。
- knowledge=KnowledgeConfig(
- enable_extraction=False, # 压缩时不反思
- enable_completion_extraction=False, # 结束后不复盘 (核心: 去掉那段收尾 prompt)
- enable_injection=False, # focus goal 时不注入知识
- ),
- exclude_tools=["knowledge_save_pending", "agent", "evaluate"], # 去知识沉淀 + 子 Agent 分发工具 (agent/evaluate): 单 Agent 全程
- trace_id=resume_tid,
- )
- print(f"[setup] engine = Cyber AgentRunner")
- print(f"[setup] provider = {args.provider}")
- print(f"[setup] model = {args.model}")
- print(f"[setup] source = {source_path}")
- print(f"[setup] case_id = {args.case_id}")
- print(f"[setup] out_dir = {out_dir}")
- print(f"[setup] images = {len(images)}")
- print(f"[setup] max_iter = {args.max_turns}")
- print(f"[setup] resume = {resume_tid[:8] + '...' if resume_tid else 'no'}")
- print(flush=True)
- now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
- _trace_append(trace_path, f"\n\n---\n\n## ▶ {'Resume' if resume_tid else 'Fresh'} @ {now}\n"
- f"- model: `{args.model}` · case: `{args.case_id}` · images: `{len(images)}`\n")
- # ⚠️ trace store 必须放**短路径**(仓库根 .trace), 不能放 out_dir/.trace。
- # 原因 (Windows MAX_PATH=260): 子 Agent 的 trace_id 是 <父UUID>@delegate-<时间戳>-NNN,
- # 消息文件名还把整个 id 重复一次。若 base 是深层的 outputs/<case>/.trace,
- # 子 agent 消息文件路径会到 ~285 字符 > 260, 落盘报 [Errno 2] 子 Agent 直接失败。
- # 放仓库根 .trace 后同样路径 ~204 < 260。各 case 的 trace 按 trace_id 区分, 不冲突。
- trace_store_base = REPO_ROOT / ".trace"
- runner = AgentRunner(
- llm_call=make_llm_call(),
- trace_store=FileSystemTraceStore(base_path=str(trace_store_base)),
- debug=True, # subagent.py 据此打印子 Agent (phase-2a/2b) 的实时执行过程,
- # 否则子 Agent 全程静默, 只有最后 delegate 汇总可见。
- )
- turn = 0
- t0 = time.time()
- status = "unknown"
- # token / 成本累计 (主 trace; 子 Agent 的 token 在各自子 trace, 不计入此处)。
- usage = {"in": 0, "out": 0, "cache_w": 0, "cache_r": 0, "cost": 0.0}
- # 完成度兜底: 一轮跑完若 workflow 没填全/没出 HTML(弱模型常吐空消息提前自停),
- # 带「还差哪些」的具体清单**续同一条 trace** 再跑, 直到完成或达上限。
- # 实验/两段式模式(phase1_only/_exp_*)是故意中途停的, 不兜底。
- _exp_mode = (getattr(args, "phase1_only", False) or getattr(args, "_exp_direct", False)
- or getattr(args, "_exp_understanding_only", False)
- or getattr(args, "_exp_workflow_from_understanding", False))
- max_auto = 0 if _exp_mode else getattr(args, "max_auto_continue", 2)
- run_msgs = msgs
- cur_trace = resume_tid
- attempt = 0
- read_phase: set = set() # agent 读过哪些阶段规格文件 (监听 read_file 累计)
- try:
- while True:
- async for item in runner.run(messages=run_msgs, config=cfg):
- if isinstance(item, Trace):
- status = item.status
- if item.trace_id:
- cur_trace = item.trace_id
- trace_id_file.write_text(item.trace_id, encoding="utf-8")
- print(f"[trace] {item.trace_id} status={item.status}", flush=True)
- elif isinstance(item, Message):
- role = getattr(item, "role", "?")
- raw_content = getattr(item, "content", "") or ""
- tool_calls = getattr(item, "tool_calls", None)
- # Qwen 原生: 整条消息塞在 content dict 里, tool_calls 也嵌在其中,
- # item.tool_calls 属性反而是空 —— 从 content 兜底捞出来。
- if not tool_calls and isinstance(raw_content, dict):
- tool_calls = raw_content.get("tool_calls")
- content = _content_to_text(raw_content)
- # 累计 token/成本 (token 字段挂在 assistant 消息上; tool 消息为 None → or 0)
- usage["in"] += getattr(item, "prompt_tokens", 0) or 0
- usage["out"] += getattr(item, "completion_tokens", 0) or 0
- usage["cache_w"] += getattr(item, "cache_creation_tokens", 0) or 0
- usage["cache_r"] += getattr(item, "cache_read_tokens", 0) or 0
- usage["cost"] += getattr(item, "cost", 0.0) or 0.0
- if role == "assistant":
- turn += 1
- if content:
- print(f"\n[turn {turn} · text]\n{content}\n", flush=True)
- _trace_append(trace_path, f"\n### Turn {turn}\n> {content[:2000]}\n")
- for tc in (tool_calls or []):
- fn = (tc.get("function") or {}) if isinstance(tc, dict) else {}
- nm = fn.get("name", tc.get("name", "?") if isinstance(tc, dict) else "?")
- args_full = str(fn.get("arguments", ""))
- ar = args_full[:200]
- print(f"[turn {turn} · tool] {nm}({ar})", flush=True)
- _trace_append(trace_path, f"- `{nm}` — `{ar}`\n")
- # 监听阶段文件读取 (read_file 的 file_path 里命中阶段文件名)
- if nm == "read_file":
- for _key, _ph in _PHASE_FILES.items():
- if _key in args_full:
- read_phase.add(_ph)
- elif role == "tool":
- preview = str(content)[:300]
- print(f" ↳ [tool result] {preview}", flush=True)
- # 一轮跑完 → 查完成度 (阶段文件没读的排最前: 先读规则再修输出)
- gaps = _phase_read_gaps(out_dir, read_phase) + _completion_gaps(out_dir)
- if not gaps:
- break
- if attempt >= max_auto:
- if max_auto > 0:
- print(f"\n⚠️ 达自动续跑上限({max_auto})仍未完成: {'; '.join(gaps)}", flush=True)
- _trace_append(trace_path, f"\n### ⚠ 达续跑上限仍未完成: {'; '.join(gaps)}\n")
- break
- attempt += 1
- nudge = (
- "⚠️ 任务还没做完, 别停。当前还差(**按顺序处理**):\n"
- + "\n".join(f" - {g}" for g in gaps)
- + "\n**先 read_file 上面点名没读过的阶段规格文件**(里面写了格式/词表/检查规则), "
- "再据规则修后面的问题, **别重做已完成的部分**。提示: 缺 effect/action 的步骤用 "
- "wf-patch.py --set 补(action 要对到 action.json 的合法叶子, 如 `元素生成`/`提取/化学提取`, "
- "别拼 `图像生成/文生图` 这种不存在的; wf-patch 部分应用, 对的会留下、只补错的); "
- "intent 要写成带 {in-type:}/{out-type:}/{act:} 标记的句子; 没出 HTML 就跑 render-case.py。"
- )
- print(f"\n[auto-continue {attempt}/{max_auto}] 续跑补完: {'; '.join(gaps)}\n", flush=True)
- _trace_append(trace_path, f"\n### ↻ auto-continue {attempt}: {'; '.join(gaps)}\n")
- cfg.trace_id = cur_trace # 续同一条 trace (不重开)
- run_msgs = [{"role": "user", "content": nudge}]
- except KeyboardInterrupt:
- print(f"\n⚠️ 中断. {out_dir}/ 产物已保留. 续跑: --resume", file=sys.stderr)
- return 130
- except Exception as e:
- logging.exception("cyber run failed")
- print(f"❌ {type(e).__name__}: {e}", file=sys.stderr)
- return 1
- elapsed = time.time() - t0
- print(f"\n[done] status={status} turns={turn} wall={elapsed:.1f}s", flush=True)
- print(f"[usage] tokens in={usage['in']:,} out={usage['out']:,} "
- f"cache_w={usage['cache_w']:,} cache_r={usage['cache_r']:,} · cost=${usage['cost']:.4f} "
- f"(model={args.model}; 不含子 Agent)", flush=True)
- _trace_append(
- trace_path,
- f"\n### ◀ done · status={status} · turns={turn} · {elapsed:.1f}s\n"
- f"- tokens: in={usage['in']:,} out={usage['out']:,} "
- f"cache_w={usage['cache_w']:,} cache_r={usage['cache_r']:,} · cost=${usage['cost']:.4f}\n"
- )
- args._last_stats = dict(usage) # 供 main() 两段式汇总
- return 0 if status in ("completed", "unknown") else 2
- def _parse_args() -> argparse.Namespace:
- p = argparse.ArgumentParser(
- description="跑 procedure-dsl 提取流程 (Cyber AgentRunner + OpenRouter)",
- formatter_class=argparse.RawDescriptionHelpFormatter,
- epilog=__doc__,
- )
- p.add_argument("source", help="原始 post 文件 (input/case-N-raw.json)")
- p.add_argument("--out-dir", required=True,
- help="输出目录名, 落在 outputs/ 下. case_id 自动从 basename 推。")
- p.add_argument("--extra-image", action="append", default=[],
- help="额外配图 (本地路径 or URL), 可多次。")
- p.add_argument("--provider", default="openrouter", choices=["openrouter", "qwen"],
- help="LLM 端点: openrouter (默认, OPEN_ROUTER_API_KEY, 一个 URL 通打各家) "
- "或 qwen (阿里 dashscope 原生, 读 .env 的 QWEN_API_KEY + QWEN_BASE_URL)。")
- p.add_argument("--model", default="openai/gpt-4o",
- help="模型名。provider=openrouter 时形如 openai/gpt-4o / qwen/qwen-max / "
- "anthropic/claude-sonnet-4.5; provider=qwen 时形如 qwen-plus / qwen-max (无前缀)。")
- p.add_argument("--phase1-model", default=None,
- help="启用两段式: Phase 1 (心智模型+骨架+anchor) 用这个模型跑完即停, "
- "Phase 2+ 换 --model resume 续跑。不传=全程单模型。"
- "例: --phase1-model anthropic/claude-sonnet-4.6 --model google/gemini-3.1-flash-lite")
- p.add_argument("--phase1-provider", default=None, choices=["openrouter", "qwen"],
- help="Phase 1 段的 provider, 默认继承 --provider。")
- p.add_argument("--exp", default=None, choices=["direct", "split"],
- help="Phase 1 实验模式 (产出 workflow.json 骨架后即停, 不跑 Phase 2/3):\n"
- " direct = 强模型(--model)不写 understanding, 边想边直接出 workflow.json;\n"
- " split = 强模型(--phase1-model)只产 understanding → 弱模型(--model)据 understanding+schema 产 workflow.json。")
- p.add_argument("--spec-version", default=None, metavar="SUFFIX",
- help="用 spec-<SUFFIX>/ 目录而非默认 spec/ (实验变体, 不污染原 spec)。")
- p.add_argument("--max-turns", type=int, default=300, help="最大迭代轮数 (default: 300)")
- p.add_argument("--max-image-dim", type=int, default=1280,
- help="图片下采样最长边像素 (default: 1280, 0=关闭)。多张大图 base64 合计过大会"
- "打断 OpenRouter→Claude 上游流 (internal stream ended); 下采样+转JPEG 防此并省 token。")
- p.add_argument("--image-quality", type=int, default=85,
- help="下采样后 JPEG 质量 (default: 85)。截图含文字, 别压太低伤可读性。")
- p.add_argument("--resume", action="store_true",
- help="从 outputs/<out-dir>/.trace_id 读 trace 续跑")
- p.add_argument("--no-ocr", action="store_true",
- help="跳过执行前的配图预 OCR (默认开启: 每张图 OCR 成文本落 _scratch/ocr.txt, 供 quote-source --ocr 搜)")
- p.add_argument("--ocr-model", default="google/gemini-3.1-flash-lite",
- help="预 OCR 用的视觉模型 (default: google/gemini-3.1-flash-lite, 走 OpenRouter)")
- p.add_argument("--max-auto-continue", type=int, default=2,
- help="完成度兜底: 跑完若 workflow 没填全/没出 HTML(弱模型常吐空消息自停), "
- "自动带'还差X'续跑的最大次数 (default: 2, 0=关闭)")
- return p.parse_args()
- def main() -> None:
- for stream in (sys.stdout, sys.stderr):
- if hasattr(stream, "reconfigure"):
- stream.reconfigure(encoding="utf-8", errors="replace")
- logging.basicConfig(level=logging.WARNING)
- _load_env() # 把 .env (OPEN_ROUTER_API_KEY / QWEN_API_KEY / QWEN_BASE_URL) 载入环境
- args = _parse_args()
- args.case_id = _derive_case_id(args.out_dir)
- args.phase1_only = False
- args._phase2_handoff = False
- args._exp_direct = False
- args._exp_understanding_only = False
- args._exp_workflow_from_understanding = False
- args._last_stats = {}
- def _g(d, k):
- return d.get(k, 0) if d else 0
- # ── 实验模式 (--exp): 只产 workflow.json 骨架, 不跑 Phase 2/3 ──
- if args.exp == "direct":
- # 方案1: 强模型(--model)不写 understanding, 边想边直接出 workflow.json。
- print(f"\n{'='*64}\n [exp:direct] {args.provider}/{args.model} · 直接产 workflow.json\n{'='*64}", flush=True)
- args._exp_direct = True
- sys.exit(asyncio.run(run(args)))
- if args.exp == "split":
- # 方案2: 强模型(--phase1-model)只产 understanding → 弱模型(--model)据其产 workflow.json。
- if not args.phase1_model:
- print("❌ --exp split 需要 --phase1-model (强模型, 产 understanding)。", file=sys.stderr)
- sys.exit(2)
- main_provider, main_model = args.provider, args.model
- p1_provider = args.phase1_provider or args.provider
- print(f"\n{'='*64}\n [exp:split] A · understanding · {p1_provider}/{args.phase1_model}\n{'='*64}", flush=True)
- args.provider, args.model = p1_provider, args.phase1_model
- args.resume, args._exp_understanding_only = False, True
- rcA = asyncio.run(run(args)); statsA = dict(args._last_stats)
- if rcA != 0:
- print(f"❌ split-A (understanding) 退出码={rcA}, 不继续。", file=sys.stderr)
- sys.exit(rcA)
- print(f"\n{'='*64}\n [exp:split] B · workflow.json · {main_provider}/{main_model}\n{'='*64}", flush=True)
- args.provider, args.model = main_provider, main_model
- args.resume = False # 全新一段: 不继承强模型历史(含图), 只凭 understanding.md + schema
- args._exp_understanding_only, args._exp_workflow_from_understanding = False, True
- rcB = asyncio.run(run(args)); statsB = dict(args._last_stats)
- print(f"\n{'='*64}\n [exp:split] 成本汇总 (case {args.case_id})\n{'='*64}", flush=True)
- print(f" A understanding [{args.phase1_model}]: in={_g(statsA,'in'):,} out={_g(statsA,'out'):,} · ${_g(statsA,'cost'):.4f}")
- print(f" B workflow.json [{main_model}]: in={_g(statsB,'in'):,} out={_g(statsB,'out'):,} · ${_g(statsB,'cost'):.4f}")
- print(f" 合计: ${_g(statsA,'cost') + _g(statsB,'cost'):.4f}", flush=True)
- sys.exit(rcB)
- # 单模型: 直接跑。
- if not args.phase1_model:
- sys.exit(asyncio.run(run(args)))
- # 两段式: Pass 1 (Phase 1, 模型A) → Pass 2 (Phase 2+, 模型B, resume 同一 trace)。
- main_provider, main_model = args.provider, args.model
- p1_provider = args.phase1_provider or args.provider
- print(f"\n{'='*64}\n Pass 1/2 · Phase 1 only · {p1_provider}/{args.phase1_model}\n{'='*64}", flush=True)
- args.provider, args.model = p1_provider, args.phase1_model
- args.resume, args.phase1_only, args._phase2_handoff = False, True, False
- rc1 = asyncio.run(run(args))
- stats1 = dict(args._last_stats)
- if rc1 != 0:
- print(f"❌ Pass 1 退出码={rc1}, 不继续 Phase 2。", file=sys.stderr)
- sys.exit(rc1)
- print(f"\n{'='*64}\n Pass 2/2 · Phase 2+ (resume) · {main_provider}/{main_model}\n{'='*64}", flush=True)
- args.provider, args.model = main_provider, main_model
- args.resume, args.phase1_only, args._phase2_handoff = True, False, True
- rc2 = asyncio.run(run(args))
- stats2 = dict(args._last_stats)
- # 两段成本汇总
- print(f"\n{'='*64}\n 两段式成本汇总 (case {args.case_id})\n{'='*64}", flush=True)
- print(f" Pass1 [{args.phase1_model}]: in={_g(stats1,'in'):,} out={_g(stats1,'out'):,} · ${_g(stats1,'cost'):.4f}")
- print(f" Pass2 [{main_model}]: in={_g(stats2,'in'):,} out={_g(stats2,'out'):,} · ${_g(stats2,'cost'):.4f}")
- print(f" 合计: ${_g(stats1,'cost') + _g(stats2,'cost'):.4f} (不含子 Agent)", flush=True)
- sys.exit(rc2)
- if __name__ == "__main__":
- main()
|