| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302 |
- #!/usr/bin/env python3
- """
- run_cyber.py — run_procedure_dsl.py 的 Cyber Agent 移植版 (POC)。
- 与 run_procedure_dsl.py 的唯一区别是**执行引擎**:
- - run_procedure_dsl.py: Claude Agent SDK (ClaudeSDKClient) → 走 ~/.claude OAuth Max,
- 只能跑 Anthropic 协议端点。
- - run_cyber.py: 本仓库自研 AgentRunner (agent/core/runner.py) → 多 Provider。
- 本 POC 默认走 OpenRouter (create_openrouter_llm_call), 一个 provider 通打
- GPT / Gemini / Qwen / DeepSeek / Claude 全家, 换模型只改 --model 字符串。
- 复用 run_procedure_dsl.py 的:
- - 起手 prompt 全文 (_build_initial_blocks 的 text 块) —— 三阶段指令一字不改。
- - 图片抽取 (_images_from_source) + 客户端下载缓存 (_url_to_cached_path)。
- 图片块从 Anthropic base64 格式转成 OpenRouter 要的 OpenAI `image_url` data-URL 格式。
- 单 Agent 全程跑 (与 spec 对齐, 见 spec/tools.md §7): Phase 2 (归类标注) 由主 Agent
- 自己一趟做完, 不再分发 phase-2a/2b 子 Agent。RunConfig.exclude_tools 关掉
- agent/evaluate 两个分发工具, 防弱模型自作主张去 delegate。
- 完成度判据走 lint-case.py --json (字段级规则只活在 spec/tools 一处, runner 不复刻);
- 退出码: 0=completed / 2=失败 / 3=status unknown (跑完但引擎没回终态, 批量统计时单算) / 130=中断。
- 用法 (与 run_procedure_dsl.py 对齐):
- python run_cyber.py input/case-2-raw.json --out-dir case-2-cyber
- python run_cyber.py input/case-2-raw.json --out-dir case-2-cyber \
- --model openai/gpt-4o
- # 中断后续跑 (从 outputs/<out-dir>/.trace_id 读 trace 接着跑):
- python run_cyber.py input/case-2-raw.json --out-dir case-2-cyber --resume
- # 批量: source 是 batch_posts.json (results[] 每项含 post), out-dir 是父目录,
- # 每个工序一个子目录 outputs/<out-dir>/<case_id>/ (已出 HTML 的自动跳过):
- python run_cyber.py input/batch_posts.json --out-dir batch-0610 --batch --model flash
- # procedure skill (强模型直写版, 读 procedure/SKILL.md): 一次 write_file 出完整
- # workflow.json + procedure/tools/validate.py 单工具校验, 不渲染 HTML:
- python run_cyber.py input/case-2-raw.json --out-dir case-2-proc \
- --skill procedure --model anthropic/claude-sonnet-4.6
- ⚠️ POC 已知缺口 (非 Claude 模型上需逐步调):
- - 起手 prompt 与 spec/ 里写的是 Claude 工具名 (Read/Write/Bash)。Cyber 实际工具是
- read_file/write_file/edit_file/bash_command。下方 _cyber_runtime_note 给了映射, 但
- spec 文档内 `详见 Read(...)` 这类示例仍是 Claude 名 —— 弱模型可能被带偏, 需观察 trace。
- - edit_file 在非 Claude 模型上的 exact-match 命中率不如 Claude, workflow.json 反复 Edit
- 可能卡壳。先拿单 case smoke test, 别直接上量。
- """
- import argparse
- import asyncio
- import base64
- import httpx
- import importlib.util
- import json
- import logging
- import os
- import subprocess
- import sys
- import time
- from datetime import datetime
- from pathlib import Path
- from typing import Any, Dict, List
- # run_cyber.py → procedure-dsl/
- DSL_ROOT = Path(__file__).resolve().parent
- def _find_repo_root(start: Path) -> Path:
- """向上找含 pyproject.toml 的目录 (cyber-agent 仓库根), 用于 sys.path 兜底。"""
- for p in [start, *start.parents]:
- if (p / "pyproject.toml").exists():
- return p
- return start
- REPO_ROOT = _find_repo_root(DSL_ROOT)
- for _p in (str(REPO_ROOT), str(DSL_ROOT)):
- if _p not in sys.path:
- sys.path.insert(0, _p)
- # 技能本地「计划」内置工具 (plan_procedures): import 即把它注册进全局工具表 (groups=["core"]),
- # 主 Agent 因 tool_groups 含 core 而能看到它。Phase 1 第一步让 LLM 调用它做 understanding +
- # 自动生成 workflow.json 骨架。run_cyber 仅做注册 + 注入原文上下文, 业务逻辑全在 plan_tool.py。
- import plan_tool # noqa: E402 (必须在 sys.path 设好之后)
- # 三阶段规格已全部并入 spec/README.md; 监听有没有 read_file README 判断「做了活却没读规则」
- _PHASE_FILES = {"README": "readme"}
- def _phase_read_gaps(out_dir: Path, read_phase: set) -> List[str]:
- """检查 agent 有没有「做了某阶段的活却没读规则」(弱模型惯犯, 导致格式/规则全靠瞎猜)。
- 三阶段规则已合并进 spec/README.md, 所以只需确认 README 读过没。
- 判「做了某阶段」: Phase 1 = 建了 steps; Phase 2 = 填了 effect/action/intent; Phase 3 = 出了 HTML。
- """
- gaps: List[str] = []
- if "readme" in read_phase: # README 读过 = 三阶段规则都看过, 无 gap
- return gaps
- wf = out_dir / "workflow.json"
- if not wf.exists():
- return gaps
- try:
- d = json.loads(wf.read_text(encoding="utf-8"))
- except Exception:
- return gaps
- did_p1 = any(p.get("steps") for p in d.get("procedures", []))
- did_p2 = any((s.get("effect") or s.get("action") or s.get("intent"))
- for p in d.get("procedures", []) for s in (p.get("steps") or []) if isinstance(s, dict))
- did_p3 = bool(list(out_dir.glob("*.html")))
- done = [name for name, flag in (("第一阶段·搭骨架", did_p1),
- ("第二阶段·填值+归类", did_p2),
- ("第三阶段·渲染", did_p3)) if flag]
- if done:
- gaps.append(
- f"你已经动了 {'、'.join(done)} 却**没 read_file** spec/README.md —— 三阶段全部规则"
- "(字段填法、@quote、IO 校验、intent 的 {in-type:}/{out-type:} 标记、收尾检查清单)都在这一个文件里, "
- "先读对应章节(第一/二/三阶段)再对照检查并修正你的输出。")
- return gaps
- def _completion_gaps(out_dir: Path, spec_name: str = "spec") -> List[str]:
- """跑完后查 workflow 是否真做完了 (防弱模型吐空消息提前自停)。返回未完成项清单, 空=完成。
- 判据: ① workflow.json 存在; ② lint-case.py 的「归类完成度」检查通过 (effect/action/intent
- —— 字段级规则只活在 lint-case.py 一处, 这里消费它的 --json 输出, 不复刻); ③ 出了 HTML。
- """
- wf = out_dir / "workflow.json"
- if not wf.exists():
- return ["workflow.json 还没建 (Phase 1 没做完)"]
- gaps: List[str] = []
- lint = DSL_ROOT / spec_name / "tools" / "lint-case.py"
- try:
- r = subprocess.run(
- [sys.executable, str(lint), "--workflow", str(wf), "--json", "--no-record"],
- capture_output=True, text=True, encoding="utf-8", errors="replace",
- cwd=str(DSL_ROOT), timeout=120,
- env={**os.environ, "PYTHONIOENCODING": "utf-8"})
- if r.returncode != 0:
- gaps.append(f"workflow.json 没过 lint 解析 ({(r.stderr or '').strip()[:120]}) — 修好再继续")
- else:
- gaps.extend(json.loads(r.stdout).get("checks", {}).get("classification_done", []))
- except Exception as e:
- gaps.append(f"完成度检查失败 ({type(e).__name__}: {e}) — 确认 workflow.json 是合法 JSON")
- if not list(out_dir.glob("*.html")):
- gaps.append("还没渲染出 HTML (Phase 3 没做: 跑 render-case.py)")
- return gaps
- # ── procedure skill (--skill procedure): 直写 workflow.json + validate.py 单工具 ──
- # 监听文件是 procedure/SKILL.md (规则单文件); 完成判据是 validate.py 退出码 (不渲染 HTML)。
- _PROCEDURE_PHASE_FILES = {"SKILL": "readme"}
- def _phase_read_gaps_procedure(out_dir: Path, read_phase: set) -> List[str]:
- """procedure 模式版「做了活却没读规则」: 动了 workflow.json 却没 read_file SKILL.md。"""
- if "readme" in read_phase or not (out_dir / "workflow.json").exists():
- return []
- return ["你已经写了 workflow.json 却**没 read_file** procedure/SKILL.md —— 全部规则"
- "(结构、字段规范、词表、value 逐字要求)都在这一个文件里, 先读它再对照修正你的输出。"]
- def _completion_gaps_procedure(out_dir: Path, source_for_agent: str, ocr_path: Path) -> List[str]:
- """procedure 模式完成度: workflow.json 存在 + validate.py 0 错误。
- 错误清单直接消费 validate.py 的 stdout ✗ 行 (字段级规则只活在 validate.py 一处, 不复刻)。
- """
- wf = out_dir / "workflow.json"
- if not wf.exists():
- return ["workflow.json 还没建"]
- cmd = [sys.executable, str(DSL_ROOT / "procedure" / "tools" / "validate.py"),
- "--workflow", str(wf), "--source", source_for_agent]
- if ocr_path.exists():
- cmd += ["--ocr", str(ocr_path)]
- try:
- r = subprocess.run(cmd, capture_output=True, text=True, encoding="utf-8",
- errors="replace", cwd=str(DSL_ROOT), timeout=120,
- env={**os.environ, "PYTHONIOENCODING": "utf-8"})
- except Exception as e:
- return [f"validate 跑不起来 ({type(e).__name__}: {e}) — 确认 workflow.json 是合法 JSON"]
- if r.returncode == 0:
- return []
- errs = [ln.strip() for ln in (r.stdout or "").splitlines() if ln.strip().startswith("✗")]
- if not errs:
- errs = [f"validate 退出码 {r.returncode}: {(r.stderr or r.stdout or '').strip()[:200]}"]
- if len(errs) > 15:
- errs = errs[:15] + [f"…(还有 {len(errs) - 15} 条, 修完上面的重跑 validate 看全量)"]
- return errs
- def _finalize_procedure(out_dir: Path, source_for_agent: str, ocr_path: Path, case_id: str) -> None:
- """procedure 模式收尾后处理 (agent 完成且 validate 全过后, runner 确定性执行):
- ① validate --fix-verbatim: 把未逐字命中的 value 自动替换为原文最相似连续片段;
- ② procedure/tools/render.py: 零校验渲染 HTML (质量门禁已由 validate 把过, 渲染只管出图;
- 也让 batch 的「已有 HTML 即跳过」恢复工作)。
- 渲染失败只警告不改退出码 (workflow.json 已完成, 可手动重渲)。
- """
- env = {**os.environ, "PYTHONIOENCODING": "utf-8"}
- wf = out_dir / "workflow.json"
- fix_cmd = [sys.executable, str(DSL_ROOT / "procedure" / "tools" / "validate.py"),
- "--workflow", str(wf), "--source", source_for_agent, "--fix-verbatim"]
- if ocr_path.exists():
- fix_cmd += ["--ocr", str(ocr_path)]
- try:
- r = subprocess.run(fix_cmd, capture_output=True, text=True, encoding="utf-8",
- errors="replace", cwd=str(DSL_ROOT), timeout=120, env=env)
- fixes = [ln for ln in (r.stdout or "").splitlines()
- if ln.strip().startswith("✦") or "原文连续片段" in ln]
- if fixes:
- print("[finalize] 逐字回填:\n" + "\n".join(fixes), flush=True)
- except Exception as e:
- print(f"[finalize] ⚠ fix-verbatim 跳过 ({type(e).__name__}: {e})", flush=True)
- title = f"Case {case_id}"
- try:
- t = (json.loads(wf.read_text(encoding="utf-8")).get("source") or {}).get("title") or ""
- if t:
- title = f"Case {case_id} · {t[:40]}"
- except Exception:
- pass
- out_html = out_dir / f"case-{case_id}.html"
- render_cmd = [sys.executable, str(DSL_ROOT / "procedure" / "tools" / "render.py"),
- "--workflow", str(wf), "--source-input", source_for_agent,
- "--page-title", title, "--case-id", str(case_id), "--out", str(out_html)]
- try:
- r = subprocess.run(render_cmd, capture_output=True, text=True, encoding="utf-8",
- errors="replace", cwd=str(DSL_ROOT), timeout=120, env=env)
- except Exception as e:
- print(f"[finalize] ⚠ 渲染失败 ({type(e).__name__}: {e}); 可手动重渲", flush=True)
- return
- if r.returncode == 0:
- print(f"[finalize] ✓ HTML → {out_html}", flush=True)
- else:
- print(f"[finalize] ⚠ 渲染失败 (workflow 已完成, 可手动重渲): "
- f"{(r.stdout or r.stderr or '').strip()[:400]}", flush=True)
- # 常用模型别名 → OpenRouter 全名 (--model / --phase1-model / --ocr-model 都认)。
- # 不在表里的值原样透传 (写全名、qwen 原生名都照旧)。
- _MODEL_ALIASES = {
- "flash": "google/gemini-3.5-flash",
- "flash-3.5": "google/gemini-3.5-flash",
- "3.5-flash": "google/gemini-3.5-flash",
- "gemini-3.5-flash": "google/gemini-3.5-flash",
- "flash-lite": "google/gemini-3.1-flash-lite",
- "gemini-3.1-flash-lite": "google/gemini-3.1-flash-lite",
- }
- def _resolve_model(name: str) -> str:
- return _MODEL_ALIASES.get((name or "").strip().lower(), name)
- def _load_env() -> None:
- """加载仓库根 .env 到环境变量。
- 各 provider 的 create_*_llm_call 直接 os.getenv 读 key / base_url
- (OPEN_ROUTER_API_KEY / QWEN_API_KEY / QWEN_BASE_URL 等), 但本脚本绕过
- agent.client (那里才 load_dotenv), 故在此显式加载, 否则 .env 里的配置读不到。
- override=False: 已在 shell 里 export 的值优先, 不被 .env 覆盖。
- """
- try:
- from dotenv import load_dotenv
- except ImportError:
- return
- env_file = REPO_ROOT / ".env"
- if env_file.exists():
- load_dotenv(env_file, override=False)
- def _load_sibling_module(name: str, path: Path):
- """按文件路径 import 同目录脚本 (run_procedure_dsl.py 不是包, 用 spec 加载)。"""
- spec = importlib.util.spec_from_file_location(name, path)
- mod = importlib.util.module_from_spec(spec)
- spec.loader.exec_module(mod)
- return mod
- # 复用 run_procedure_dsl.py 的纯函数 (它 main() 有 __main__ 守卫, import 无副作用)。
- _rpd = _load_sibling_module("run_procedure_dsl", DSL_ROOT / "run_procedure_dsl.py")
- _build_initial_blocks = _rpd._build_initial_blocks
- _images_from_source = _rpd._images_from_source
- _url_to_cached_path = _rpd._url_to_cached_path
- _derive_case_id = _rpd._derive_case_id
- _resolve_out_dir = _rpd._resolve_out_dir
- _MEDIA_TYPE = _rpd._MEDIA_TYPE
- # 极简引导: 只告诉主 Agent「你是 Cyber Agent + 工具名 + 去读 README 的运行时约定节」。
- # 其余所有运行时规则都在 spec/README.md (agent 本来就先读 README), 这里不复述。
- def _cyber_runtime_note(spec_name: str) -> str:
- return f"""
- ⚠️ 你的执行引擎是 **Cyber Agent**(不是 Claude Code):可用工具是 `read_file` / `write_file` / `edit_file` / `bash_command` / `glob_files` / `grep_content` / `read_images`(**不是** Read/Write/Edit/Bash/Glob/Grep)。
- **第一步就 read_file `{spec_name}/README.md`**(在你 cwd `procedure-dsl/` 下),然后**严格照它的「全程约定」节操作** —— 尤其『Cyber Agent 引擎专用』那部分(`bash_command` 是 cmd.exe、`goal` 用纯数字 focus、spec 相对链接补 `{spec_name}/` 前缀、read_file 会截断长文本)。「怎么建 workflow.json / 怎么用 @quote 填 value / patch 路径怎么写」全以 README 为准, 不要自行发挥。
- """
- def _downscale_image(raw: bytes, max_dim: int, quality: int) -> tuple:
- """把图片下采样 + 重压缩成 JPEG, 返回 (bytes, mime)。
- 为什么必须做: 实测 case 的 14 张原图 base64 合计 ~12MB, 直接发会把
- OpenRouter→Claude 的上游流打断 (api_error: internal stream ended unexpectedly);
- 减到 ~3MB 内就稳。同时大幅省 input token (PNG 截图 base64 极占 token)。
- 策略: 最长边 > max_dim 才缩放 (保持比例); 一律转 JPEG (PNG 截图转 JPEG 体积降一个量级);
- 有透明通道的拍平到白底 (截图场景安全)。max_dim<=0 表示关闭, 原样返回。
- PIL 不可用或处理失败 → 原样返回 (降级不阻塞)。
- """
- if max_dim <= 0:
- return raw, "image/jpeg"
- try:
- import io
- from PIL import Image
- im = Image.open(io.BytesIO(raw))
- if im.mode in ("RGBA", "LA", "P"):
- bg = Image.new("RGB", im.size, (255, 255, 255))
- im = im.convert("RGBA")
- bg.paste(im, mask=im.split()[-1])
- im = bg
- else:
- im = im.convert("RGB")
- w, h = im.size
- if max(w, h) > max_dim:
- scale = max_dim / max(w, h)
- im = im.resize((max(1, int(w * scale)), max(1, int(h * scale))), Image.LANCZOS)
- out = io.BytesIO()
- im.save(out, format="JPEG", quality=quality, optimize=True)
- return out.getvalue(), "image/jpeg"
- except Exception as e:
- print(f"[image] downscale 失败, 用原图: {type(e).__name__}: {e}", flush=True)
- return raw, "image/png"
- def _to_openai_content(text: str, images: List[str],
- max_dim: int = 1280, quality: int = 85) -> List[Dict[str, Any]]:
- """把 (text, 图URL列表) 拼成 OpenAI 格式的 content blocks (OpenRouter / 各家通吃)。
- - 文本块: {"type": "text", "text": ...}
- - 图片块: {"type": "image_url", "image_url": {"url": "data:<mime>;base64,<...>"}}
- URL 先经 run_procedure_dsl._url_to_cached_path 客户端下载缓存 (绕图床 robots.txt)。
- 每张图经 _downscale_image 下采样+转 JPEG (max_dim<=0 关闭), 防大 payload 打断上游流。
- 单张图失败不阻塞整批。
- """
- blocks: List[Dict[str, Any]] = [{"type": "text", "text": text}]
- n_ok, n_fail = 0, 0
- bytes_before, bytes_after = 0, 0
- for ref in images:
- try:
- if ref.startswith(("http://", "https://")):
- local = _url_to_cached_path(ref)
- else:
- local = Path(ref).expanduser().resolve()
- if not local.exists():
- raise FileNotFoundError(ref)
- raw = local.read_bytes()
- small, mime = _downscale_image(raw, max_dim, quality)
- bytes_before += len(raw)
- bytes_after += len(small)
- data = base64.standard_b64encode(small).decode()
- blocks.append({
- "type": "image_url",
- "image_url": {"url": f"data:{mime};base64,{data}"},
- })
- n_ok += 1
- except Exception as e:
- n_fail += 1
- print(f"[image] skip {ref[:80]}... ({type(e).__name__}: {e})", flush=True)
- if n_ok and max_dim > 0:
- print(f"[image] 下采样: {bytes_before//1024}KB → {bytes_after//1024}KB "
- f"(max_dim={max_dim}, q={quality})", flush=True)
- if images:
- print(f"[image] {n_ok}/{len(images)} 成功 base64 化, {n_fail} 失败已跳过", flush=True)
- return blocks
- # ──── 执行前预 OCR: 每张配图 → 文本 (供 quote-source --ocr 搜) ──────────────────
- # 截图教程的 prompt / JSON / 参数常只在图里, body_text 抽不到。执行前 OCR 成文本,
- # 让 LLM 也能从图片内容里 quote 出真实 value/directive。按图字节 hash 缓存, 不重复花钱。
- def _ocr_one(raw: bytes, model: str, api_key: str,
- max_dim: int = 2000, quality: int = 90) -> str:
- """单张图 → OCR 文本 (OpenRouter 视觉调用)。失败抛异常由上层兜。"""
- small, mime = _downscale_image(raw, max_dim, quality)
- data = base64.standard_b64encode(small).decode()
- instr = ("请把这张图片里的所有文字逐字提取出来, 按从上到下、从左到右的阅读顺序输出。"
- "只输出图中文字本身, 不要翻译、不要解释、不要添加任何说明。"
- "图中若有代码/JSON/提示词, 请保留其原始换行与格式。若图中无文字, 输出空。")
- payload = {
- "model": model,
- "messages": [{"role": "user", "content": [
- {"type": "text", "text": instr},
- {"type": "image_url", "image_url": {"url": f"data:{mime};base64,{data}"}},
- ]}],
- }
- r = httpx.post("https://openrouter.ai/api/v1/chat/completions",
- headers={"Authorization": f"Bearer {api_key}"}, json=payload, timeout=120)
- r.raise_for_status()
- j = r.json()
- return (j.get("choices") or [{}])[0].get("message", {}).get("content", "") or ""
- def _ocr_images(refs: List[str], model: str, api_key: str, cache_dir: Path) -> str:
- """对每张图 OCR, 合并成带分段标记的文本。按图字节 hash 缓存, 单张失败跳过不阻塞。"""
- import hashlib
- cache_dir.mkdir(exist_ok=True)
- out, n_ok = [], 0
- for n, ref in enumerate(refs, 1):
- try:
- if ref.startswith(("http://", "https://")):
- local = _url_to_cached_path(ref)
- else:
- local = Path(ref).expanduser().resolve()
- raw = local.read_bytes()
- h = hashlib.sha256(raw).hexdigest()[:24]
- cf = cache_dir / f"{h}.txt"
- if cf.exists():
- txt, tag = cf.read_text(encoding="utf-8"), " (cache)"
- else:
- txt, tag = _ocr_one(raw, model, api_key), ""
- cf.write_text(txt, encoding="utf-8")
- out.append(f"\n===== [图 {n}] 来源: {ref[:90]} =====\n{txt.strip()}\n")
- n_ok += 1
- print(f"[ocr] 图 {n}/{len(refs)}: {len(txt.strip())} 字{tag}", flush=True)
- except Exception as e:
- print(f"[ocr] 图 {n}/{len(refs)} 失败跳过: {type(e).__name__}: {e}", flush=True)
- print(f"[ocr] {n_ok}/{len(refs)} 张成功", flush=True)
- return "".join(out)
- def _trace_append(trace_path: Path, chunk: str) -> None:
- with trace_path.open("a", encoding="utf-8") as f:
- f.write(chunk)
- def _content_to_text(content: Any) -> str:
- """把 Message.content 归一成纯文本。
- 不同 provider 的 content 形态不一:
- - str: 直接用 (OpenRouter / 多数情况)。
- - list[block]: OpenAI/Qwen 多模态格式 [{"type":"text","text":...}, ...],
- 抽出各块的 text 字段拼起来。
- - dict: 单个 block, 取其 text / content 字段, 取不到就 str() 兜底。
- 切片 (content[:2000]) 前必须先过这里, 否则对 dict/list 切片会抛 KeyError/TypeError。
- """
- if isinstance(content, str):
- return content
- if isinstance(content, list):
- parts = []
- for b in content:
- if isinstance(b, str):
- parts.append(b)
- elif isinstance(b, dict):
- parts.append(b.get("text") or b.get("content") or "")
- return "".join(parts)
- if isinstance(content, dict):
- # Qwen/DeepSeek assistant: text 可能为空, 真正的话在 reasoning_content。
- # tool 结果消息: 内容在 "result" 键 (之前漏读它, 导致 [tool result] 控制台全空白)。
- # 都取不到就返回 "" (不要 str(content) 把整个 dict 当文本 dump 出来)。
- return (content.get("text") or content.get("reasoning_content")
- or content.get("result") or content.get("content") or "")
- return str(content)
- def _build_fresh_prompt(args, *, source_for_agent: str, out_dir: Path, ocr_path: Path,
- images: List[str], workdir: Path, spec_name: str,
- body: str) -> str:
- """拼 fresh run 的起手 prompt。
- 组成: 原脚本起手全文 (_build_initial_blocks 的 text 块) + Cyber 运行时注 + OCR 提示 +
- 完整正文内联 + 钉死路径的 plan/verify-io/lint 命令 + 模式附注 (--exp / --phase1-model)。
- 原则: **命令钉死精确路径** (弱模型用错文件名会让校验静默跳过), **规则细节不复述**
- (canonical 在 spec/README.md, agent 起手就读它)。
- """
- anth_blocks = _build_initial_blocks(
- source_for_agent, args.case_id, args.out_dir, images, workdir, spec_name)
- text = anth_blocks[0]["text"] + _cyber_runtime_note(spec_name)
- if ocr_path.exists():
- text += (
- f"\n\n## 🖼️ 配图已 OCR 成文本\n"
- f"原文配图的文字已 OCR 提取到 `{ocr_path.as_posix()}`。"
- f"填 value/directive 需要图里的文字时, 用 "
- f"`python {spec_name}/tools/quote-source.py --source {source_for_agent} --query \"<短语>\" --ocr {ocr_path.as_posix()}` "
- f"一并搜原文+图片 (quote-source 读全文件, 不受 read_file 截断影响; 别用 read_file 通读大 ocr.txt)。"
- )
- # 内联完整正文: read_file 会把 body_text 这种超长单行砍在 2000 字 (弱模型常不续 char_offset
- # → 正文后半段静默丢失)。把完整 body_text 直接附进 prompt, agent 理解正文以这份为准。
- if body:
- text += (
- f"\n\n## 📄 原文正文 (完整版, 已内联 — 别再 read_file 原文取正文)\n"
- f"⚠️ read_file 读 `{source_for_agent}` 会把 body_text 这一长行砍在 2000 字 → 丢后半段。"
- f"理解正文、提取 value/directive **以下面这份完整正文为准**; read_file 原文文件只为取 "
- f"title/link/publish_timestamp 等短字段。\n\n```\n{body}\n```"
- )
- _lint_ocr = f" --ocr {ocr_path.as_posix()}" if ocr_path.exists() else ""
- # 命令钉死精确路径; 怎么填/怎么修的规则看 README 对应节, 不在这复述。
- text += (
- f"\n\n## 🧭 第一步(必做): 调用 plan_procedures 工具做计划\n"
- f"动手建 workflow.json 前**先调用一次 `plan_procedures`** 交工序计划 (结构见工具描述): "
- f"工序拆分 + 每步「工具·输入·动作·输出」四要素 + 章节认领 (source_sections)。"
- f"通过后骨架自动生成, 之后只用 wf-patch 在骨架上填值 (批量用 `--patch _scratch/xxx.json --prune`, "
- f"剩余条目带 _error 原因, 改完重跑同一文件直到 `[]`), **不要 write_file 重写 workflow.json**。\n\n"
- f"Phase 2.0 填完 value/directive/anchor 后**必须跑 IO 校验**(照抄), 通过才进 2.1 归类:\n"
- f"```bash\npython {spec_name}/tools/verify-io.py --workflow {out_dir.as_posix()}/workflow.json "
- f"--source {source_for_agent}{_lint_ocr}\n```\n\n"
- f"Phase 3 lint(照抄, `--source` 必带, 否则章节覆盖/value 逐字两条强制静默跳过):\n"
- f"```bash\npython {spec_name}/tools/lint-case.py --workflow {out_dir.as_posix()}/workflow.json "
- f"--case-id {args.case_id} --source {source_for_agent}{_lint_ocr}\n```\n"
- f"报「章节疑似漏抽」回 Phase 1 补工序; 报「value 疑似缩写」回 Phase 2.0 用 `@quote` 重填。"
- )
- if args.mode == "exp-direct":
- text += _EXP_DIRECT_NOTE # 方案1: 不写 understanding, 直接 workflow.json
- elif args.mode == "exp-understanding":
- text += _EXP_UNDERSTANDING_ONLY_NOTE # 方案2 第一步: 只产 understanding
- elif args.mode == "exp-workflow":
- text += _EXP_WORKFLOW_FROM_UNDERSTANDING_NOTE # 方案2 第二步: 据 understanding 产 workflow
- elif args.mode == "phase1":
- text += _PHASE1_STOP_NOTE # 两段式 Pass 1: 做完整 Phase 1 即停
- return text
- def _build_procedure_prompt(args, *, source_for_agent: str, out_dir: Path, ocr_path: Path,
- images: List[str], body: str) -> str:
- """拼 --skill procedure 的 fresh 起手 prompt (强模型直写 workflow.json, 单校验工具)。
- 原则与 dsl 版一致: 命令钉死精确路径, 规则细节不复述 (canonical 在 procedure/SKILL.md)。
- """
- case_dir = out_dir.as_posix()
- _ocr = f" --ocr {ocr_path.as_posix()}" if ocr_path.exists() else ""
- text = (
- f"请按 `procedure/` 目录里的 SKILL 处理这个 post: 提取工序, **直接写出完整的 workflow.json**。\n\n"
- f"⚠️ 你的执行引擎是 **Cyber Agent**: 可用工具是 `read_file` / `write_file` / `edit_file` / "
- f"`bash_command` / `glob_files` / `grep_content`(SKILL.md 里写的 Read/Write/Edit/Bash 对应它们)。"
- f"`bash_command` 是 cmd.exe: 一条命令一次调用, 别用 `;` 串(要串用 `&&`)。"
- f"cwd 是 `procedure-dsl/`。read_file 读长文件会截断, 续读用 char_offset。\n\n"
- f"## 起手指令 (路径直接照搬, 不要改, 不要先探查)\n\n"
- f"1. read_file `procedure/SKILL.md` — 全部规则(结构、字段规范、词表、value 逐字要求)都在"
- f"这一个文件里, 读完不要重读, 也不要读 spec/ 下任何东西(那是另一套旧流程)。\n"
- f"2. 通读下方内联正文 + 本消息所附的 {len(images)} 张配图, 想清楚工序划分"
- f"(在文字回复里简述: 有几个独立工序、每工序的步骤序列), 然后**一次 write_file 写出完整的** "
- f"`{case_dir}/workflow.json`(所有字段一趟填全)。\n"
- f"3. 跑校验(照抄):\n"
- f"```bash\npython procedure/tools/validate.py --workflow {case_dir}/workflow.json "
- f"--source {source_for_agent}{_ocr}\n```\n"
- f"4. 报 ✗ 的用 edit_file 直接修 workflow.json 对应字段, 重跑校验直到 **0 错误**; "
- f"⚠ 警告逐条核对, 确认无误可保留。0 错误后一句话总结即停。\n\n"
- f"## 输入\n\n"
- f"- case 原文: `{source_for_agent}` (read_file 它只为取 title/link/publish_timestamp 等短字段; "
- f"正文以下方内联版为准)\n"
- f"- 配图: 本消息附了 {len(images)} 张图作多模态内容\n\n"
- f"## 输出\n\n"
- f"`{case_dir}/workflow.json` 是唯一产物(本 skill 不渲染 HTML, 不写 understanding.md)。"
- f"过程草稿(若需要)放 `{case_dir}/_scratch/`。"
- )
- if ocr_path.exists():
- text += (
- f"\n\n## 🖼️ 配图已 OCR 成文本\n"
- f"原文配图的文字已 OCR 提取到 `{ocr_path.as_posix()}`。prompt/JSON/参数常只在截图里——"
- f"填 value 需要图里文字的逐字内容时, 从这个文件取(read_file 截断就用 char_offset 续读, "
- f"或 bash_command 用 findstr 定位)。"
- )
- if body:
- text += (
- f"\n\n## 📄 原文正文 (完整版, 已内联 — 别再 read_file 原文取正文)\n"
- f"⚠️ read_file 读 `{source_for_agent}` 会把 body_text 这一长行砍在 2000 字 → 丢后半段。"
- f"理解正文、提取 value **以下面这份完整正文为准**。\n\n```\n{body}\n```"
- )
- return text
- # 两段式 (--phase1-model): Pass 1 只做 Phase 1, 然后换模型 resume 做 Phase 2+。
- _PHASE1_STOP_NOTE = """
- ## ⏸️ 本段任务: 先完成 Phase 1, 然后**暂停等待下一步指示**
- 这是一个**分阶段协作**的任务, 你负责的是**第一阶段**。本段请专注做完 Phase 1:
- - Phase 1.1 心智模型 → 写 understanding.md
- - Phase 1.2 workflow.json 骨架 (procedures/steps/IO 结构 + name/purpose/declarations)
- - Phase 1.3 anchor 闭合 (IO 引用)
- 完成 Phase 1.3 后, 请**暂停**: 用一句话报告产出, **本轮不再发任何工具调用**, 等待后续指示来推进 Phase 2/3。
- (注意: 这**不是禁止** Phase 2, 只是分工上**这一段先到 Phase 1 为止**; 后续会有新指示让你或另一协作者继续。)
- 你的 Phase 1 产出质量直接决定后续阶段, 所以 understanding.md 和 workflow.json 骨架务必扎实、完整。
- """
- # ── 实验模式 note (--exp) ──────────────────────────────────────────────────
- # 实验只比 Phase 1 骨架质量, 两方案都"产出 workflow.json 骨架+anchor 后停", 不跑 Phase 2/3。
- # 方案 1 (direct): 强模型不写 understanding.md, 边想边直接出 workflow.json。
- _EXP_DIRECT_NOTE = """
- ## 🧪 实验模式 (direct): 不写 understanding.md, 直接产 workflow.json
- 本次**跳过 Phase 1.1 的 understanding.md 文件** (spec 里提到的这一步本次作废, 不要 Write 它)。
- 把"有几个独立工序、每个工序的步骤/IO/控制流"的分析**直接写在你的文字回复里**(简明扼要), 然后:
- - 直接 Write `workflow.json` 骨架 (Phase 1.2: procedures/steps/IO 结构 + name/purpose/declarations);
- - 用 wf-patch.py 加 anchor (Phase 1.3: IO 闭合)。
- 完成 anchor 闭合后**立即停止**, 一句话总结即可, **不要进入 Phase 2** (不填 effect/action/type/substance/form, 不分发子 Agent)。
- """
- # 方案 2 第一步 (split-A): 强模型只产 understanding.md, 不碰 workflow.json。精简读单。
- _EXP_UNDERSTANDING_ONLY_NOTE = """
- ## 🧪 实验模式 (split · 第一步): 只产 understanding.md
- 本次**只做 Phase 1.1**: 通读原文(含图)建立心智模型, 写进 understanding.md, 然后**立即停止**
- (**不要 Write workflow.json**, 不进 Phase 1.2+, **本轮不再发任何工具调用**)。
- ### ⚡ 精简读单 (覆盖上面起手指令里的完整清单 — 以本节为准)
- 本步**只读**这几样, 其余一律不读 (读了纯烧 context):
- - `spec/README.md` (已在起手读过, 别重读 —— 三阶段规则 + 多工序判断标准 + DSL 概念全在这一个文件里)
- - 原文 case json (body_text + 元数据) + 本消息所附的图
- **明确不要读** (本步用不上): `spec/tools.md` (脚本接口, 本步不调任何脚本)、`spec/format/` 下的 schema。
- ### understanding.md 要写到"能让另一个模型照着填出 workflow.json"的程度
- - 有几个独立工序 (按 README『判断有几个工序』的标准), 每个: 工序名 + 终态产物 + 大致步骤数 + 工艺类型;
- - 每个工序的步骤序列, 每步的输入/输出 (是什么数据、从哪来、到哪去);
- - **控制流用大白话讲清** (哪步是循环/并行/分支、循环什么、并行几路) —— 不必纠结 block/nested 的 JSON 细节
- (README『有循环/并行怎么切』有), 文字描述即可, 下游模型据此建 block/nested。
- 后续由另一个模型读你的 understanding.md + JSON schema 生成 workflow.json。
- """
- # 方案 2 第二步 (split-B): 全新一段、不给图, 弱模型只凭 understanding.md + schema 产 workflow.json。
- _EXP_WORKFLOW_FROM_UNDERSTANDING_NOTE = """
- ## 🧪 实验模式 (split · 第二步): 据 understanding.md 产 workflow.json (本次不附原图)
- Phase 1.1 的心智模型已由**另一个(更强的)模型**写好 —— 就是上面"输出目录"里的 **understanding.md**。
- 本次你的任务是 **Phase 1.2 + 1.3**, 且**只依据 understanding.md + schema**(本消息不附原图, 你看不到截图):
- 1. read_file 输出目录里的 `understanding.md`, 以及 spec 的 `format/case-data.schema.json`;
- 2. 按 understanding.md 的工序划分, Write `workflow.json` 骨架 (procedures/steps/IO 结构 + name/purpose/declarations);
- 3. 用 bash_command 跑 wf-patch.py 加 anchor (IO 闭合, 单条命令不要拼 `;`)。
- 完成 anchor 后**立即停止**, **不要进入 Phase 2**, 也**不要重写 understanding.md**。
- """
- async def run(args: argparse.Namespace) -> int:
- from agent.core.runner import AgentRunner, RunConfig, KnowledgeConfig
- from agent.trace import FileSystemTraceStore, Trace, Message
- from agent.llm import create_openrouter_llm_call, create_qwen_llm_call
- # provider 选择: 决定 llm_call 走哪家端点。
- # openrouter → OPEN_ROUTER_API_KEY, 一个 URL 通打各家 (model 形如 qwen/qwen-max)。
- # qwen → QWEN_API_KEY + QWEN_BASE_URL (.env), 阿里 dashscope 原生
- # (model 形如 qwen-plus / qwen-max, 无 "qwen/" 前缀)。
- if args.provider == "qwen":
- make_llm_call = lambda: create_qwen_llm_call(model=args.model)
- else:
- make_llm_call = lambda: create_openrouter_llm_call(model=args.model)
- workdir = DSL_ROOT
- source_path = Path(args.source).expanduser().resolve()
- if not source_path.exists():
- print(f"❌ source not found: {source_path}", file=sys.stderr)
- return 1
- out_dir = _resolve_out_dir(args.out_dir, workdir)
- out_dir.mkdir(parents=True, exist_ok=True)
- (out_dir / "_scratch").mkdir(exist_ok=True)
- trace_id_file = out_dir / ".trace_id"
- trace_path = out_dir / "_trace_cyber.md"
- # skill 目录; 不存在直接报错, 别让 agent 跑一半才发现。
- # --skill procedure → procedure/ (强模型直写版); 否则 spec/ (--spec-version 实验变体)。
- if args.mode == "procedure":
- spec_name = "procedure"
- else:
- spec_name = "spec" if not getattr(args, "spec_version", None) else f"spec-{args.spec_version}"
- if not (DSL_ROOT / spec_name).is_dir():
- print(f"❌ skill 目录不存在: {DSL_ROOT / spec_name}", file=sys.stderr)
- return 1
- # source 路径给 Agent (workdir 相对优先)。
- try:
- source_for_agent = source_path.relative_to(workdir).as_posix()
- except ValueError:
- source_for_agent = str(source_path)
- # resume: 读已存 trace_id, 只发增量 "接着做" 消息。
- resume_tid = None
- if args.resume:
- if not trace_id_file.exists():
- print(f"❌ --resume 但无 {trace_id_file}; 先正常跑一次", file=sys.stderr)
- return 1
- resume_tid = trace_id_file.read_text(encoding="utf-8").strip() or None
- images = _images_from_source(source_path) + (args.extra_image or [])
- # 执行前预 OCR: 把每张配图的文字提取成文本, 落 _scratch/ocr.txt, 供 quote-source --ocr 搜。
- # 只在 fresh run 做 (resume 时上次的 ocr.txt 还在); 按图字节 hash 缓存, 重跑不重复花钱。
- ocr_path = out_dir / "_scratch" / "ocr.txt"
- if not resume_tid and not getattr(args, "no_ocr", False) and images:
- api_key = os.getenv("OPEN_ROUTER_API_KEY")
- if not api_key:
- print("[ocr] 跳过: 未设 OPEN_ROUTER_API_KEY", flush=True)
- else:
- print(f"[ocr] 对 {len(images)} 张配图预 OCR (model={args.ocr_model}) ...", flush=True)
- ocr_text = _ocr_images(images, args.ocr_model, api_key, DSL_ROOT / ".ocr_cache")
- if ocr_text.strip():
- ocr_path.write_text(ocr_text, encoding="utf-8")
- print(f"[ocr] -> {ocr_path} (共 {len(ocr_text)} 字)", flush=True)
- if resume_tid and args.mode == "phase2-handoff":
- # 两段式 Pass 2: Phase 1 已由另一模型做完, 从 Phase 2 开始。
- # ⚠️ 必须强硬作废历史里 Pass1 的"只做 Phase1 就停"指令, 否则弱模型会跟着旧指令
- # 重做 Phase1 再停 (实测 gemini-flash-lite 就这么干了)。
- cd = out_dir.as_posix()
- msgs = [{"role": "user", "content": (
- f"【阶段交接 — 之前的指令已变更, 请严格按本条执行】\n\n"
- f"Phase 1 (workflow.json 骨架 + anchor 闭合) **已经全部完成并落盘**, 是上一个模型做的。\n\n"
- f"⚠️ 历史里那条『本次只做 Phase 1、做完即停、不要进 Phase 2』的指令**现已作废**。"
- f"你现在的唯一任务是完成 **Phase 2 和 Phase 3**。\n\n"
- f"❌ **绝对不要**重写 / 重新生成 workflow.json 的骨架 —— 它已经做好了, 重做即错误。\n"
- f"✅ 现在立刻执行 (用 bash_command, 单条命令不要拼 `;`):\n"
- f" 1. read_file `{cd}/workflow.json` 看当前骨架 (不要凭记忆, 也不要重写它)。\n"
- f" 2. 读 {spec_name}/README.md 的『第二阶段』章节, 由你**自己一趟做完 Phase 2** "
- f"(规则全以它为准)。**不要**切任务 / 分发子 Agent。\n"
- f" 3. 用 wf-patch.py (bash_command, --set 或 `--patch _scratch/xxx.json --prune`) 回填 "
- f"effect/action/type/substance/form/intent 到 workflow.json。\n"
- f" 4. Phase 3: 跑 lint-case.py 校验, 再 render-case.py 出 HTML (.html 是唯一产物, .md 已取消)。"
- )}]
- elif resume_tid and args.mode == "procedure":
- _ocr = f" --ocr {ocr_path.as_posix()}" if ocr_path.exists() else ""
- msgs = [{"role": "user", "content": (
- f"上次中断了, 接续做 case-{args.case_id} 的工序提取。\n"
- f"先 read_file **当前磁盘版本**的 {out_dir.as_posix()}/workflow.json (不要凭记忆), 再跑\n"
- f"`python procedure/tools/validate.py --workflow {out_dir.as_posix()}/workflow.json "
- f"--source {source_for_agent}{_ocr}`\n"
- f"看还差什么, 用 edit_file 修到 0 错误 (规则见 procedure/SKILL.md)。"
- )}]
- elif resume_tid:
- msgs = [{"role": "user", "content": (
- f"上次中断了, 接续做 case-{args.case_id} 的提取流程。\n"
- f"先用 bash_command `ls` 看 {out_dir.as_posix()}/ 当前已落盘哪些产物, "
- f"再 read_file **当前磁盘版本**的 workflow.json (计划留档在 _scratch/understanding.json) "
- f"接着跑, 不要凭记忆。Phase 2 由你自己一趟做完 (wf-patch.py 落盘, 不分发子 Agent)。"
- )}]
- elif args.mode == "procedure":
- # procedure skill: 不注册 plan 上下文 (没有 plan_procedures 这一步), 起手 prompt 自包含。
- try:
- _sd = json.loads(source_path.read_text(encoding="utf-8"))
- except Exception:
- _sd = None
- _body = (_sd or {}).get("body_text") or "" if isinstance(_sd, dict) else ""
- base_text = _build_procedure_prompt(
- args, source_for_agent=source_for_agent, out_dir=out_dir, ocr_path=ocr_path,
- images=images, body=_body)
- msgs = [{"role": "user", "content": _to_openai_content(
- base_text, images, max_dim=args.max_image_dim, quality=args.image_quality)}]
- else:
- # 原文 json: 正文给 prompt 内联 + 喂给 plan/IO 校验工具当上下文。
- try:
- _sd = json.loads(source_path.read_text(encoding="utf-8"))
- except Exception:
- _sd = None
- _sd2 = _sd if isinstance(_sd, dict) else {}
- _body = _sd2.get("body_text") or ""
- plan_tool.set_plan_context(
- body_text=_body,
- ocr=ocr_path.read_text(encoding="utf-8") if ocr_path.exists() else "",
- out_dir=out_dir,
- case_id=args.case_id,
- spec_name=spec_name,
- source={
- "platform": "", # LLM/后续可补
- "author": _sd2.get("channel_account_name", ""),
- "url": _sd2.get("link", ""),
- "title": _sd2.get("title", ""),
- "date": str(_sd2.get("publish_timestamp", "") or ""),
- "excerpt": _body[:120],
- },
- )
- base_text = _build_fresh_prompt(
- args, source_for_agent=source_for_agent, out_dir=out_dir, ocr_path=ocr_path,
- images=images, workdir=workdir, spec_name=spec_name, body=_body)
- # exp-workflow: 不给图, 纯凭 understanding.md + schema
- imgs_for_prompt = [] if args.mode == "exp-workflow" else images
- msgs = [{"role": "user", "content": _to_openai_content(
- base_text, imgs_for_prompt, max_dim=args.max_image_dim, quality=args.image_quality)}]
- cfg = RunConfig(
- model=args.model,
- temperature=0.3,
- max_iterations=args.max_turns,
- agent_type="main",
- name=f"procedure-dsl case-{args.case_id} (cyber)",
- tool_groups=["core", "system"], # core=read/write/edit/glob/grep (agent 工具下面 exclude 掉);
- # system=bash_command
- # ⚠️ 没 system 组 → 主 Agent 无 bash, 跑不了 spec/tools/*.py
- # (wf-patch / lint-case / render-case 全靠 bash)
- parallel_tool_execution=True, # 允许同轮并行工具调用 (如多个 read_file); 子 Agent 分发已废弃
- context_injection_interval=0, # 关掉周期性自动注入 get_current_context: procedure-dsl 单 Agent
- # 不用 goal/协作者/IM, 那些注入只是给弱模型添乱 + 烧 token
- enable_prompt_caching=False, # 非 Claude 模型无效, 关掉省得干扰
- # 关掉 goal 压缩: 它会在 goal 完成后把详细消息压成 [[SUMMARY]], 而弱模型 (如
- # gemini-flash-lite) 一丢细节就倾向"推倒重做 Phase 1", 覆盖掉已完成的 Phase 2
- # 归一化数据。单 case 运行上下文有限, 保留全量更安全。
- goal_compression="none",
- # 关掉知识沉淀: 否则任务结束会被自动注入"复盘→knowledge_save_pending"prompt
- # (上次 Claude 在 seq6 被它带跑偏、qwen 浪费 turn51-52)。procedure-dsl 不需要它。
- knowledge=KnowledgeConfig(
- enable_extraction=False, # 压缩时不反思
- enable_completion_extraction=False, # 结束后不复盘 (核心: 去掉那段收尾 prompt)
- enable_injection=False, # focus goal 时不注入知识
- ),
- # 去知识沉淀 + 子 Agent 分发工具 (agent/evaluate): 单 Agent 全程;
- # procedure 模式再去 plan_procedures (那是 dsl 三阶段的 Phase 1 工具, 此模式直写 workflow.json)
- exclude_tools=["knowledge_save_pending", "agent", "evaluate"]
- + (["plan_procedures"] if args.mode == "procedure" else []),
- trace_id=resume_tid,
- )
- print(f"[setup] engine = Cyber AgentRunner")
- print(f"[setup] skill = {spec_name}/")
- print(f"[setup] provider = {args.provider}")
- print(f"[setup] model = {args.model}")
- print(f"[setup] source = {source_path}")
- print(f"[setup] case_id = {args.case_id}")
- print(f"[setup] out_dir = {out_dir}")
- print(f"[setup] images = {len(images)}")
- print(f"[setup] max_iter = {args.max_turns}")
- print(f"[setup] resume = {resume_tid[:8] + '...' if resume_tid else 'no'}")
- print(flush=True)
- now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
- _trace_append(trace_path, f"\n\n---\n\n## ▶ {'Resume' if resume_tid else 'Fresh'} @ {now}\n"
- f"- model: `{args.model}` · case: `{args.case_id}` · images: `{len(images)}`\n")
- # ⚠️ trace store 必须放**短路径**(仓库根 .trace), 不能放 out_dir/.trace。
- # 原因 (Windows MAX_PATH=260): 子 Agent 的 trace_id 是 <父UUID>@delegate-<时间戳>-NNN,
- # 消息文件名还把整个 id 重复一次。若 base 是深层的 outputs/<case>/.trace,
- # 子 agent 消息文件路径会到 ~285 字符 > 260, 落盘报 [Errno 2] 子 Agent 直接失败。
- # 放仓库根 .trace 后同样路径 ~204 < 260。各 case 的 trace 按 trace_id 区分, 不冲突。
- trace_store_base = REPO_ROOT / ".trace"
- runner = AgentRunner(
- llm_call=make_llm_call(),
- trace_store=FileSystemTraceStore(base_path=str(trace_store_base)),
- debug=True, # subagent.py 据此打印子 Agent (phase-2a/2b) 的实时执行过程,
- # 否则子 Agent 全程静默, 只有最后 delegate 汇总可见。
- )
- turn = 0
- t0 = time.time()
- status = "unknown"
- # token / 成本累计 (主 trace; 子 Agent 的 token 在各自子 trace, 不计入此处)。
- usage = {"in": 0, "out": 0, "cache_w": 0, "cache_r": 0, "cost": 0.0}
- # 完成度兜底: 一轮跑完若 workflow 没填全/没出 HTML(弱模型常吐空消息提前自停),
- # 带「还差哪些」的具体清单**续同一条 trace** 再跑, 直到完成或达上限。
- # 实验/两段式 Pass1 (phase1 / exp-*) 是故意中途停的, 不兜底。
- max_auto = (getattr(args, "max_auto_continue", 2)
- if args.mode in ("full", "phase2-handoff", "procedure") else 0)
- run_msgs = msgs
- cur_trace = resume_tid
- attempt = 0
- read_phase: set = set() # agent 读过哪些阶段规格文件 (监听 read_file 累计)
- phase_files = _PROCEDURE_PHASE_FILES if args.mode == "procedure" else _PHASE_FILES
- try:
- while True:
- async for item in runner.run(messages=run_msgs, config=cfg):
- if isinstance(item, Trace):
- status = item.status
- if item.trace_id:
- cur_trace = item.trace_id
- trace_id_file.write_text(item.trace_id, encoding="utf-8")
- print(f"[trace] {item.trace_id} status={item.status}", flush=True)
- elif isinstance(item, Message):
- role = getattr(item, "role", "?")
- raw_content = getattr(item, "content", "") or ""
- tool_calls = getattr(item, "tool_calls", None)
- # Qwen 原生: 整条消息塞在 content dict 里, tool_calls 也嵌在其中,
- # item.tool_calls 属性反而是空 —— 从 content 兜底捞出来。
- if not tool_calls and isinstance(raw_content, dict):
- tool_calls = raw_content.get("tool_calls")
- content = _content_to_text(raw_content)
- # 累计 token/成本 (token 字段挂在 assistant 消息上; tool 消息为 None → or 0)
- usage["in"] += getattr(item, "prompt_tokens", 0) or 0
- usage["out"] += getattr(item, "completion_tokens", 0) or 0
- usage["cache_w"] += getattr(item, "cache_creation_tokens", 0) or 0
- usage["cache_r"] += getattr(item, "cache_read_tokens", 0) or 0
- usage["cost"] += getattr(item, "cost", 0.0) or 0.0
- if role == "assistant":
- turn += 1
- if content:
- print(f"\n[turn {turn} · text]\n{content}\n", flush=True)
- _trace_append(trace_path, f"\n### Turn {turn}\n> {content[:2000]}\n")
- for tc in (tool_calls or []):
- fn = (tc.get("function") or {}) if isinstance(tc, dict) else {}
- nm = fn.get("name", tc.get("name", "?") if isinstance(tc, dict) else "?")
- args_full = str(fn.get("arguments", ""))
- ar = args_full[:200]
- print(f"[turn {turn} · tool] {nm}({ar})", flush=True)
- _trace_append(trace_path, f"- `{nm}` — `{ar}`\n")
- # 监听阶段文件读取 (read_file 的 file_path 里命中阶段文件名)
- if nm == "read_file":
- for _key, _ph in phase_files.items():
- if _key in args_full:
- read_phase.add(_ph)
- elif role == "tool":
- preview = str(content)[:300]
- print(f" ↳ [tool result] {preview}", flush=True)
- # 一轮跑完 → 查完成度 (阶段文件没读的排最前: 先读规则再修输出)
- if args.mode == "procedure":
- gaps = (_phase_read_gaps_procedure(out_dir, read_phase)
- + _completion_gaps_procedure(out_dir, source_for_agent, ocr_path))
- else:
- gaps = _phase_read_gaps(out_dir, read_phase) + _completion_gaps(out_dir, spec_name)
- if not gaps:
- break
- if attempt >= max_auto:
- if max_auto > 0:
- print(f"\n⚠️ 达自动续跑上限({max_auto})仍未完成: {'; '.join(gaps)}", flush=True)
- _trace_append(trace_path, f"\n### ⚠ 达续跑上限仍未完成: {'; '.join(gaps)}\n")
- break
- attempt += 1
- if args.mode == "procedure":
- nudge = (
- "⚠️ 任务还没做完, 别停。validate 还在报错:\n"
- + "\n".join(f" - {g}" for g in gaps)
- + "\n零星错误用 edit_file 修 workflow.json 对应字段; 同类错误一大批用 "
- "bash_command 跑 procedure/tools/wf-patch.py 批量修(--set 或 --patch 清单 --prune, "
- "用法见 SKILL.md「批量修错」节)。修完重跑上面那条 validate 命令, "
- "**0 错误才算完**。别重写整个文件。"
- )
- else:
- nudge = (
- "⚠️ 任务还没做完, 别停。当前还差(**按顺序处理**):\n"
- + "\n".join(f" - {g}" for g in gaps)
- + "\n**先 read_file 上面点名没读过的阶段规格文件**(里面写了格式/词表/检查规则), "
- "再据规则修后面的问题, **别重做已完成的部分**。提示: 缺 effect/action 用 "
- f"wf-patch.py --set 补(值必须命中 {spec_name}/taxonomy 词表; 写错时 wf-patch 整批拒绝并在"
- "报错末尾给出合法值清单, 照清单改了重跑); intent 写成带标记的句子(规则见 README「目的列」); "
- "没出 HTML 就跑 render-case.py。"
- )
- print(f"\n[auto-continue {attempt}/{max_auto}] 续跑补完: {'; '.join(gaps)}\n", flush=True)
- _trace_append(trace_path, f"\n### ↻ auto-continue {attempt}: {'; '.join(gaps)}\n")
- cfg.trace_id = cur_trace # 续同一条 trace (不重开)
- run_msgs = [{"role": "user", "content": nudge}]
- # procedure 模式收尾: validate 全过 → 自动逐字回填 + 渲染 HTML (runner 确定性后处理)
- if args.mode == "procedure" and not gaps:
- _finalize_procedure(out_dir, source_for_agent, ocr_path, args.case_id)
- _trace_append(trace_path, "\n### ⚙ finalize: fix-verbatim + render HTML\n")
- except KeyboardInterrupt:
- print(f"\n⚠️ 中断. {out_dir}/ 产物已保留. 续跑: --resume", file=sys.stderr)
- return 130
- except Exception as e:
- logging.exception("cyber run failed")
- print(f"❌ {type(e).__name__}: {e}", file=sys.stderr)
- return 1
- elapsed = time.time() - t0
- print(f"\n[done] status={status} turns={turn} wall={elapsed:.1f}s", flush=True)
- print(f"[usage] tokens in={usage['in']:,} out={usage['out']:,} "
- f"cache_w={usage['cache_w']:,} cache_r={usage['cache_r']:,} · cost=${usage['cost']:.4f} "
- f"(model={args.model}; 不含子 Agent)", flush=True)
- _trace_append(
- trace_path,
- f"\n### ◀ done · status={status} · turns={turn} · {elapsed:.1f}s\n"
- f"- tokens: in={usage['in']:,} out={usage['out']:,} "
- f"cache_w={usage['cache_w']:,} cache_r={usage['cache_r']:,} · cost=${usage['cost']:.4f}\n"
- )
- args._last_stats = dict(usage) # 供 main() 两段式汇总
- if status == "completed":
- return 0
- if status == "unknown":
- # 跑完但引擎没回终态 (常见: 弱模型空消息自停)。产物可能仍完整, 单列退出码供批量统计区分。
- print("⚠️ status=unknown (引擎未回报终态), 退出码 3; 产物可能完整, 建议核对 out_dir", file=sys.stderr)
- return 3
- return 2
- def _parse_args() -> argparse.Namespace:
- p = argparse.ArgumentParser(
- description="跑 procedure-dsl 提取流程 (Cyber AgentRunner + OpenRouter)",
- formatter_class=argparse.RawDescriptionHelpFormatter,
- epilog=__doc__,
- )
- p.add_argument("source", help="原始 post 文件 (input/case-N-raw.json)")
- p.add_argument("--out-dir", required=True,
- help="输出目录名, 落在 outputs/ 下. case_id 自动从 basename 推。")
- p.add_argument("--extra-image", action="append", default=[],
- help="额外配图 (本地路径 or URL), 可多次。")
- p.add_argument("--provider", default="openrouter", choices=["openrouter", "qwen"],
- help="LLM 端点: openrouter (默认, OPEN_ROUTER_API_KEY, 一个 URL 通打各家) "
- "或 qwen (阿里 dashscope 原生, 读 .env 的 QWEN_API_KEY + QWEN_BASE_URL)。")
- p.add_argument("--model", default="google/gemini-3.1-flash-lite",
- help="模型名 (默认 google/gemini-3.1-flash-lite, 与 UI/批量评估对齐)。"
- "provider=openrouter 时形如 openai/gpt-4o / qwen/qwen-max / "
- "anthropic/claude-sonnet-4.5; provider=qwen 时形如 qwen-plus / qwen-max (无前缀)。"
- "支持别名: flash / flash-3.5 → google/gemini-3.5-flash, "
- "flash-lite → google/gemini-3.1-flash-lite。")
- p.add_argument("--phase1-model", default=None,
- help="启用两段式: Phase 1 (心智模型+骨架+anchor) 用这个模型跑完即停, "
- "Phase 2+ 换 --model resume 续跑。不传=全程单模型。"
- "例: --phase1-model anthropic/claude-sonnet-4.6 --model google/gemini-3.1-flash-lite")
- p.add_argument("--phase1-provider", default=None, choices=["openrouter", "qwen"],
- help="Phase 1 段的 provider, 默认继承 --provider。")
- p.add_argument("--exp", default=None, choices=["direct", "split"],
- help="Phase 1 实验模式 (产出 workflow.json 骨架后即停, 不跑 Phase 2/3):\n"
- " direct = 强模型(--model)不写 understanding, 边想边直接出 workflow.json;\n"
- " split = 强模型(--phase1-model)只产 understanding → 弱模型(--model)据 understanding+schema 产 workflow.json。")
- p.add_argument("--skill", default="dsl", choices=["dsl", "procedure"],
- help="dsl (默认) = spec/ 三阶段流程 (plan_procedures + wf-patch + lint + render); "
- "procedure = procedure/ 强模型直写版 (一次 write_file 出 workflow.json, "
- "validate.py 单工具校验, 不渲染 HTML)。")
- p.add_argument("--spec-version", default=None, metavar="SUFFIX",
- help="用 spec-<SUFFIX>/ 目录而非默认 spec/ (实验变体, 不污染原 spec)。")
- p.add_argument("--max-turns", type=int, default=300, help="最大迭代轮数 (default: 300)")
- p.add_argument("--max-image-dim", type=int, default=1280,
- help="图片下采样最长边像素 (default: 1280, 0=关闭)。多张大图 base64 合计过大会"
- "打断 OpenRouter→Claude 上游流 (internal stream ended); 下采样+转JPEG 防此并省 token。")
- p.add_argument("--image-quality", type=int, default=85,
- help="下采样后 JPEG 质量 (default: 85)。截图含文字, 别压太低伤可读性。")
- p.add_argument("--resume", action="store_true",
- help="从 outputs/<out-dir>/.trace_id 读 trace 续跑 (batch 模式下按子目录逐 case 判断)")
- p.add_argument("--batch", action="store_true",
- help="批量模式: source 是 batch_posts.json (results[] 每项含 post), "
- "out-dir 是父目录, 每个工序一个子目录 (已出 HTML 的自动跳过)")
- p.add_argument("--batch-redo", action="store_true",
- help="batch 模式下重跑已有 HTML 的 case (默认跳过)")
- p.add_argument("--batch-workers", type=int, default=1,
- help="batch 并行数 (默认 1=进程内串行, 控制台看实时 trace; >1 每 case 一个"
- "子进程, 输出落各自 _extract.log, 建议 2-4)")
- p.add_argument("--no-ocr", action="store_true",
- help="跳过执行前的配图预 OCR (默认开启: 每张图 OCR 成文本落 _scratch/ocr.txt, 供 quote-source --ocr 搜)")
- p.add_argument("--ocr-model", default="google/gemini-3.1-flash-lite",
- help="预 OCR 用的视觉模型 (default: google/gemini-3.1-flash-lite, 走 OpenRouter)")
- p.add_argument("--max-auto-continue", type=int, default=2,
- help="完成度兜底: 跑完若 workflow 没填全/没出 HTML(弱模型常吐空消息自停), "
- "自动带'还差X'续跑的最大次数 (default: 2, 0=关闭)")
- return p.parse_args()
- def _run_pass(args: argparse.Namespace, *, provider: str, model: str,
- mode: str, resume: bool = False) -> tuple:
- """跑一段: 设好 provider/model/mode/resume 后调 run()。返回 (退出码, usage 统计)。
- mode ∈ {full, phase1, phase2-handoff, exp-direct, exp-understanding, exp-workflow}
- (run() 内据此选起手附注、是否兜底续跑; 取代旧版 5 个 args 隐藏 flag)。
- """
- args.provider, args.model = provider, model
- args.mode, args.resume = mode, resume
- rc = asyncio.run(run(args))
- return rc, dict(args._last_stats)
- def _banner(title: str) -> None:
- print(f"\n{'='*64}\n {title}\n{'='*64}", flush=True)
- def _run_batch(args: argparse.Namespace) -> int:
- """批量模式: source 是 batch_posts.json, 逐条跑完整单 case 流程。
- 输入格式: {"results": [{"case_id", "platform", "source_url", "post": {...}}, ...]}
- (也容忍顶层直接是 post 列表)。每条的 post 字段是单 case 输入的超集
- (body_text/title/link/publish_timestamp/images), 落成 <子目录>/_source.json 后
- 走与单跑完全相同的流程 — 门禁/OCR/兜底续跑全部复用, 不另起一套。
- 目录: outputs/<out-dir>/<case_id>/ 一工序一子目录。
- 跳过: 子目录已有 *.html 视为做完, 跳过 (--batch-redo 强制重跑); 失败不中断后续 case。
- --resume: 子目录有 .trace_id 的 case 续 trace 跑, 没有的正常 fresh。
- 并行: --batch-workers N (默认 1)。N=1 进程内串行 (控制台可看实时 trace);
- N>1 每 case 一个**子进程** (plan_tool._PLAN_CTX 是模块级全局, 同进程并发会把
- A case 的骨架写进 B case 的目录 — 进程边界是唯一安全的隔离), 各 case 输出落
- <子目录>/_extract.log, 主进程只报进度。
- 退出码: 0=全部成功(含跳过/unknown) / 2=有失败 / 130=中断。
- """
- src = Path(args.source).expanduser().resolve()
- data = json.loads(src.read_text(encoding="utf-8"))
- items = data.get("results") if isinstance(data, dict) else data
- if not isinstance(items, list) or not items:
- print(f"❌ --batch: {src.name} 里没有 results[] 列表", file=sys.stderr)
- return 2
- batch_root = _resolve_out_dir(args.out_dir, DSL_ROOT)
- batch_root.mkdir(parents=True, exist_ok=True)
- workers = max(1, getattr(args, "batch_workers", 1))
- print(f"[batch] {len(items)} 条 → {batch_root} (model={args.model}, workers={workers})", flush=True)
- # ── 第一遍: 落盘 _source/_meta, 建工作清单 (跳过的直接进汇总) ──
- want_resume = args.resume
- summary: List[tuple] = [] # (case_id, 状态, cost)
- work: List[tuple] = [] # (case_id, sub_dir, case_src, resume)
- for n, item in enumerate(items, 1):
- if not isinstance(item, dict):
- continue
- post = item.get("post") or item
- cid = str(item.get("case_id") or post.get("channel_content_id") or f"case{n:03d}")
- sub = batch_root / cid
- if not args.batch_redo and list(sub.glob("*.html")):
- print(f"[batch] {cid} 已有 HTML, 跳过", flush=True)
- summary.append((cid, "skip", 0.0))
- continue
- if not (post.get("body_text") or "").strip():
- print(f"[batch] {cid} post.body_text 为空, 跳过", flush=True)
- summary.append((cid, "empty", 0.0))
- continue
- sub.mkdir(parents=True, exist_ok=True)
- case_src = sub / "_source.json"
- case_src.write_text(json.dumps(post, ensure_ascii=False, indent=2), encoding="utf-8")
- (sub / "_meta.json").write_text(json.dumps({
- "case_id": cid, "platform": item.get("platform", ""),
- "source_url": item.get("source_url", post.get("link", "")),
- "title": post.get("title", ""), "model": args.model,
- "started_at": datetime.now().strftime("%Y-%m-%dT%H:%M:%S"),
- }, ensure_ascii=False, indent=2), encoding="utf-8")
- work.append((cid, sub, case_src, want_resume and (sub / ".trace_id").exists()))
- totals = {"in": 0, "out": 0, "cost": 0.0}
- interrupted = False
- if workers == 1:
- # ── 串行: 进程内复用 run(), 控制台可看实时 trace ──
- for n, (cid, sub, case_src, resume) in enumerate(work, 1):
- args.source = str(case_src)
- args.out_dir = str(sub) # 绝对路径 → _resolve_out_dir 直通, 不再拼 outputs/
- args.case_id = cid
- args.resume = resume
- args._last_stats = {} # 防 run() 异常早退时把上一条 case 的统计错记到本条
- _banner(f"[batch {n}/{len(work)}] {cid}")
- try:
- rc = asyncio.run(run(args))
- except KeyboardInterrupt:
- rc = 130
- stats = dict(args._last_stats)
- totals["in"] += stats.get("in", 0)
- totals["out"] += stats.get("out", 0)
- totals["cost"] += stats.get("cost", 0.0)
- summary.append((cid, rc, stats.get("cost", 0.0)))
- if rc == 130:
- interrupted = True
- print("\n⚠️ 批量中断; 已完成的 case 保留, 重跑同命令会跳过它们续做剩余。", file=sys.stderr)
- break
- elif work:
- # ── 并行: 每 case 一个子进程, 输出落 <子目录>/_extract.log ──
- from concurrent.futures import ThreadPoolExecutor, as_completed
- live_procs: set = set()
- def _one(cid: str, sub: Path, case_src: Path, resume: bool) -> tuple:
- cmd = [sys.executable, "-u", str(DSL_ROOT / "run_cyber.py"), str(case_src),
- "--out-dir", str(sub), "--model", args.model, "--provider", args.provider,
- "--max-turns", str(args.max_turns),
- "--max-image-dim", str(args.max_image_dim),
- "--image-quality", str(args.image_quality),
- "--ocr-model", args.ocr_model,
- "--max-auto-continue", str(args.max_auto_continue)]
- if getattr(args, "spec_version", None):
- cmd += ["--spec-version", args.spec_version]
- if getattr(args, "skill", "dsl") != "dsl":
- cmd += ["--skill", args.skill]
- if args.no_ocr:
- cmd.append("--no-ocr")
- if resume:
- cmd.append("--resume")
- log_path = sub / "_extract.log"
- env = {**os.environ, "PYTHONIOENCODING": "utf-8"}
- with log_path.open("w", encoding="utf-8", buffering=1) as fh:
- proc = subprocess.Popen(cmd, stdout=fh, stderr=subprocess.STDOUT,
- cwd=str(DSL_ROOT), env=env)
- live_procs.add(proc)
- try:
- rc = proc.wait()
- finally:
- live_procs.discard(proc)
- # 子进程 cost 从其日志的 [usage] 行回收 (best-effort)
- cost = tin = tout = 0
- try:
- import re as _re
- tail = log_path.read_text(encoding="utf-8", errors="replace")[-4000:]
- m = _re.findall(r"tokens in=([\d,]+) out=([\d,]+).*?cost=\$([0-9.]+)", tail)
- if m:
- tin, tout, cost = (int(m[-1][0].replace(",", "")),
- int(m[-1][1].replace(",", "")), float(m[-1][2]))
- except Exception:
- pass
- return cid, rc, cost, tin, tout
- print(f"[batch] 并行 {workers} 路, 各 case 实时日志: <子目录>/_extract.log", flush=True)
- done_n = 0
- try:
- with ThreadPoolExecutor(max_workers=workers) as pool:
- futs = {pool.submit(_one, *w): w[0] for w in work}
- for fut in as_completed(futs):
- cid, rc, cost, tin, tout = fut.result()
- done_n += 1
- totals["in"] += tin
- totals["out"] += tout
- totals["cost"] += cost
- summary.append((cid, rc, cost))
- mark = "✅" if rc == 0 else ("⚠" if rc == 3 else "❌")
- print(f"[batch {done_n}/{len(work)}] {mark} {cid} rc={rc} ${cost:.4f}", flush=True)
- except KeyboardInterrupt:
- interrupted = True
- print("\n⚠️ 批量中断, 终止运行中的子进程…", file=sys.stderr)
- for p in list(live_procs):
- try:
- p.terminate()
- except Exception:
- pass
- _banner(f"批量汇总 ({len(summary)}/{len(items)} 条)")
- _MARK = {0: "✅", 3: "⚠ unknown", 130: "⏸ 中断", "skip": "↷ 跳过", "empty": "∅ 空正文"}
- for cid, st, cost in summary:
- print(f" {_MARK.get(st, f'❌ rc={st}'):10} {cid} ${cost:.4f}")
- print(f" tokens in={totals['in']:,} out={totals['out']:,} · 合计 ${totals['cost']:.4f}", flush=True)
- if interrupted:
- return 130
- return 0 if all(st in (0, 3, "skip", "empty") for _, st, _ in summary) else 2
- def main() -> None:
- for stream in (sys.stdout, sys.stderr):
- if hasattr(stream, "reconfigure"):
- stream.reconfigure(encoding="utf-8", errors="replace")
- logging.basicConfig(level=logging.WARNING)
- _load_env() # 把 .env (OPEN_ROUTER_API_KEY / QWEN_API_KEY / QWEN_BASE_URL) 载入环境
- args = _parse_args()
- args.case_id = _derive_case_id(args.out_dir)
- args.mode = "procedure" if args.skill == "procedure" else "full"
- args._last_stats = {}
- if args.skill == "procedure" and (args.exp or args.phase1_model or args.spec_version):
- print("❌ --skill procedure 不与 --exp / --phase1-model / --spec-version 组合 "
- "(单模型直写流程, 没有阶段拆分)。", file=sys.stderr)
- sys.exit(2)
- # 模型别名解析 (flash → google/gemini-3.5-flash 等; 非别名原样透传)
- args.model = _resolve_model(args.model)
- args.ocr_model = _resolve_model(args.ocr_model)
- if args.phase1_model:
- args.phase1_model = _resolve_model(args.phase1_model)
- # ── 批量模式: 单模型全流程逐条跑 (不与 --exp / 两段式组合) ──
- if args.batch:
- if args.exp or args.phase1_model:
- print("❌ --batch 暂不支持与 --exp / --phase1-model 组合 (单模型全流程逐条跑)。", file=sys.stderr)
- sys.exit(2)
- sys.exit(_run_batch(args))
- def _g(d, k):
- return d.get(k, 0) if d else 0
- # ── 实验模式 (--exp): 只产 workflow.json 骨架, 不跑 Phase 2/3 ──
- if args.exp == "direct":
- # 方案1: 强模型(--model)不写 understanding, 边想边直接出 workflow.json。
- _banner(f"[exp:direct] {args.provider}/{args.model} · 直接产 workflow.json")
- rc, _ = _run_pass(args, provider=args.provider, model=args.model, mode="exp-direct")
- sys.exit(rc)
- if args.exp == "split":
- # 方案2: 强模型(--phase1-model)只产 understanding → 弱模型(--model)据其产 workflow.json。
- if not args.phase1_model:
- print("❌ --exp split 需要 --phase1-model (强模型, 产 understanding)。", file=sys.stderr)
- sys.exit(2)
- main_provider, main_model = args.provider, args.model
- p1_provider = args.phase1_provider or args.provider
- p1_model = args.phase1_model
- _banner(f"[exp:split] A · understanding · {p1_provider}/{p1_model}")
- rcA, statsA = _run_pass(args, provider=p1_provider, model=p1_model, mode="exp-understanding")
- if rcA not in (0, 3):
- print(f"❌ split-A (understanding) 退出码={rcA}, 不继续。", file=sys.stderr)
- sys.exit(rcA)
- _banner(f"[exp:split] B · workflow.json · {main_provider}/{main_model}")
- # B 是全新一段 (resume=False): 不继承强模型历史(含图), 只凭 understanding.md + schema
- rcB, statsB = _run_pass(args, provider=main_provider, model=main_model, mode="exp-workflow")
- _banner(f"[exp:split] 成本汇总 (case {args.case_id})")
- print(f" A understanding [{p1_model}]: in={_g(statsA,'in'):,} out={_g(statsA,'out'):,} · ${_g(statsA,'cost'):.4f}")
- print(f" B workflow.json [{main_model}]: in={_g(statsB,'in'):,} out={_g(statsB,'out'):,} · ${_g(statsB,'cost'):.4f}")
- print(f" 合计: ${_g(statsA,'cost') + _g(statsB,'cost'):.4f}", flush=True)
- sys.exit(rcB)
- # 单模型: 直接跑 (mode 已在上面按 --skill 解析: full / procedure)。
- if not args.phase1_model:
- rc, _ = _run_pass(args, provider=args.provider, model=args.model, mode=args.mode)
- sys.exit(rc)
- # 两段式: Pass 1 (Phase 1, 模型A) → Pass 2 (Phase 2+, 模型B, resume 同一 trace)。
- main_provider, main_model = args.provider, args.model
- p1_provider = args.phase1_provider or args.provider
- p1_model = args.phase1_model
- _banner(f"Pass 1/2 · Phase 1 only · {p1_provider}/{p1_model}")
- rc1, stats1 = _run_pass(args, provider=p1_provider, model=p1_model, mode="phase1")
- if rc1 not in (0, 3): # 3 = status unknown 但产物可能完整, 带着警告继续 Pass 2
- print(f"❌ Pass 1 退出码={rc1}, 不继续 Phase 2。", file=sys.stderr)
- sys.exit(rc1)
- _banner(f"Pass 2/2 · Phase 2+ (resume) · {main_provider}/{main_model}")
- rc2, stats2 = _run_pass(args, provider=main_provider, model=main_model,
- mode="phase2-handoff", resume=True)
- _banner(f"两段式成本汇总 (case {args.case_id})")
- print(f" Pass1 [{p1_model}]: in={_g(stats1,'in'):,} out={_g(stats1,'out'):,} · ${_g(stats1,'cost'):.4f}")
- print(f" Pass2 [{main_model}]: in={_g(stats2,'in'):,} out={_g(stats2,'out'):,} · ${_g(stats2,'cost'):.4f}")
- print(f" 合计: ${_g(stats1,'cost') + _g(stats2,'cost'):.4f} (不含子 Agent)", flush=True)
- sys.exit(rc2)
- if __name__ == "__main__":
- main()
|