guantao 1 неделя назад
Родитель
Сommit
1d671145c5

+ 24 - 0
examples/process_pipeline/script/search_eval/procedure-dsl/presets_cyber.json

@@ -0,0 +1,24 @@
+{
+  "main": {
+    "max_iterations": 300,
+    "temperature": 0.3,
+    "skills": [],
+    "description": "procedure-dsl 主 Agent (Cyber Agent 版): 自驱读 spec/、跑三阶段提取、Phase 2 并行分发 phase-2a/2b 子 Agent。工具走 core 组 (read_file/write_file/edit_file/bash_command/glob_files/grep_content/agent)。"
+  },
+  "phase-2a-normalizer": {
+    "allowed_tools": ["read_file", "grep_content", "glob_files", "write_file", "edit_file", "bash_command"],
+    "max_iterations": 40,
+    "temperature": 0.2,
+    "skills": [],
+    "description": "Phase 2A 归一化子 Agent: 读 _scratch/task_2a.json, 归一 effect/action/type, 管 procedure 级 type_registry, 产出 _scratch/patch_2a.json ({path,value} 扁平数组, 符合 wf-patch.py 契约)。",
+    "system_prompt": "You are a dedicated Phase 2A normalization sub-agent.\nYour goal is to process the inputs and outputs of a workflow for effect, action, and type normalization:\n1. Read the outputs/case-N/_scratch/task_2a.json file to get the steps and IO variables.\n2. Normalize every step's `effect` and `action` against the taxonomy specs in `spec/taxonomy/effect.json` and `spec/taxonomy/action.json`.\n3. Normalize every IO variable's `type` against `spec/taxonomy/type.json`. If a custom type is used, register it in the procedure's `type_registry` with extends and description.\n4. Output a standard `patch_2a.json` JSON file under outputs/case-N/_scratch/.\nIMPORTANT: The format of `patch_2a.json` MUST be a flat JSON array of objects, where each object has a \"path\" and a \"value\" key (exactly conforming to the `wf-patch.py` tool contract).\nExample format:\n[\n  {\"path\": \"p1.s1.effect\", \"value\": \"预处理\"},\n  {\"path\": \"p1.s1.action\", \"value\": \"提取/化学提取/反推\"},\n  {\"path\": \"p1.s1.inputs[0].type\", \"value\": \"工具选型标准\"},\n  {\"path\": \"p1.type_registry.工具配置.extends\", \"value\": \"评语\"},\n  {\"path\": \"p1.type_registry.工具配置.desc\", \"value\": \"工具选型依据...\"}\n]\nDo not output raw dictionary structure or any other nesting. Do not touch or modify other files.\n\nNOTE (Cyber Agent runtime): the file tools here are `read_file(path=...)`, `write_file(path=..., content=...)`, `edit_file(...)`, `bash_command(command=...)` — NOT the Claude-style Read/Write/Bash. Call external scripts (taxonomy-lookup.py / wf-patch.py) via `bash_command`, never read their source."
+  },
+  "phase-2b-matcher": {
+    "allowed_tools": ["read_file", "grep_content", "glob_files", "write_file", "edit_file", "bash_command"],
+    "max_iterations": 40,
+    "temperature": 0.2,
+    "skills": [],
+    "description": "Phase 2B 分类匹配子 Agent: 读 _scratch/task_2b.json, 调 taxonomy-lookup.py 查 substance/form 路径, 产出 _scratch/patch_2b.json ({path,value} 扁平数组)。",
+    "system_prompt": "You are a dedicated Phase 2B taxonomy matching sub-agent.\nYour goal is to query and match the substance and form for each workflow IO variable:\n1. Read the outputs/case-N/_scratch/task_2b.json file to get the variables to match.\n2. For each variable, run `python spec/tools/taxonomy-lookup.py --dim 实质 --match \"...\"` and `--dim 形式 --match \"...\"` to search for the most precise taxonomy paths matching the variable's value, name, and related_images.\n3. Output a standard `patch_2b.json` JSON file under outputs/case-N/_scratch/.\nIMPORTANT: The format of `patch_2b.json` MUST be a flat JSON array of objects, where each object has a \"path\" and a \"value\" key (exactly conforming to the `wf-patch.py` tool contract). Substance and form values can be single string paths, multiple paths separated by ' + ', or JSON arrays of strings for multi-path matching.\nExample format:\n[\n  {\"path\": \"p1.s1.inputs[0].substance\", \"value\": \"/理念/知识/思想/概念范畴/性质属性/功能效用\"},\n  {\"path\": \"p1.s1.inputs[0].form\", \"value\": \"/呈现/视觉/视觉制作/构图编排/版面设计/版面结构\"},\n  {\"path\": \"p1.s2.inputs[0].substance\", \"value\": [\"/理念/知识/商业/前沿技术/AI智能/AI应用\", \"/理念/知识/思想\"]}\n]\nDo not output raw dictionary structure or any other nesting. Do not touch or modify other files.\n\nNOTE (Cyber Agent runtime): the file tools here are `read_file(path=...)`, `write_file(path=..., content=...)`, `bash_command(command=...)` — NOT the Claude-style Read/Write/Bash. Call taxonomy-lookup.py via `bash_command`."
+  }
+}

+ 310 - 0
examples/process_pipeline/script/search_eval/procedure-dsl/run_cyber.py

@@ -0,0 +1,310 @@
+#!/usr/bin/env python3
+"""
+run_cyber.py — run_procedure_dsl.py 的 Cyber Agent 移植版 (POC)。
+
+与 run_procedure_dsl.py 的唯一区别是**执行引擎**:
+  - run_procedure_dsl.py: Claude Agent SDK (ClaudeSDKClient) → 走 ~/.claude OAuth Max,
+    只能跑 Anthropic 协议端点。
+  - run_cyber.py:         本仓库自研 AgentRunner (agent/core/runner.py) → 多 Provider。
+    本 POC 默认走 OpenRouter (create_openrouter_llm_call), 一个 provider 通打
+    GPT / Gemini / Qwen / DeepSeek / Claude 全家, 换模型只改 --model 字符串。
+
+复用 run_procedure_dsl.py 的:
+  - 起手 prompt 全文 (_build_initial_blocks 的 text 块) —— 三阶段指令一字不改。
+  - 图片抽取 (_images_from_source) + 客户端下载缓存 (_url_to_cached_path)。
+图片块从 Anthropic base64 格式转成 OpenRouter 要的 OpenAI `image_url` data-URL 格式。
+
+子 Agent (phase-2a-normalizer / phase-2b-matcher) 定义见同目录 presets_cyber.json,
+对位原脚本的 AgentDefinition。Phase 2 并行靠 RunConfig.parallel_tool_execution=True:
+主 Agent 在一轮里同时发两个 `agent(agent_type=..., task=...)` 调用即并行 (各自保留写权限);
+不走 explore 模式 (那会把子 Agent 降级为只读, 无法写 patch)。
+
+用法 (与 run_procedure_dsl.py 对齐):
+    python run_cyber.py input/case-2-raw.json --out-dir case-2-cyber
+    python run_cyber.py input/case-2-raw.json --out-dir case-2-cyber \
+        --model openai/gpt-4o
+    # 中断后续跑 (从 outputs/<out-dir>/.trace_id 读 trace 接着跑):
+    python run_cyber.py input/case-2-raw.json --out-dir case-2-cyber --resume
+
+⚠️ POC 已知缺口 (非 Claude 模型上需逐步调):
+  - 起手 prompt 与 spec/ 里写的是 Claude 工具名 (Read/Write/Bash)。Cyber 实际工具是
+    read_file/write_file/edit_file/bash_command。下方 _CYBER_RUNTIME_NOTE 给了映射, 但
+    spec 文档内 `详见 Read(...)` 这类示例仍是 Claude 名 —— 弱模型可能被带偏, 需观察 trace。
+  - edit_file 在非 Claude 模型上的 exact-match 命中率不如 Claude, workflow.json 反复 Edit
+    可能卡壳。先拿单 case smoke test, 别直接上量。
+"""
+import argparse
+import asyncio
+import base64
+import importlib.util
+import logging
+import os
+import sys
+import time
+from datetime import datetime
+from pathlib import Path
+from typing import Any, Dict, List
+
+# run_cyber.py → procedure-dsl/
+DSL_ROOT = Path(__file__).resolve().parent
+
+
+def _find_repo_root(start: Path) -> Path:
+    """向上找含 pyproject.toml 的目录 (cyber-agent 仓库根), 用于 sys.path 兜底。"""
+    for p in [start, *start.parents]:
+        if (p / "pyproject.toml").exists():
+            return p
+    return start
+
+
+REPO_ROOT = _find_repo_root(DSL_ROOT)
+for _p in (str(REPO_ROOT), str(DSL_ROOT)):
+    if _p not in sys.path:
+        sys.path.insert(0, _p)
+
+
+def _load_sibling_module(name: str, path: Path):
+    """按文件路径 import 同目录脚本 (run_procedure_dsl.py 不是包, 用 spec 加载)。"""
+    spec = importlib.util.spec_from_file_location(name, path)
+    mod = importlib.util.module_from_spec(spec)
+    spec.loader.exec_module(mod)
+    return mod
+
+
+# 复用 run_procedure_dsl.py 的纯函数 (它 main() 有 __main__ 守卫, import 无副作用)。
+_rpd = _load_sibling_module("run_procedure_dsl", DSL_ROOT / "run_procedure_dsl.py")
+_build_initial_blocks = _rpd._build_initial_blocks
+_images_from_source = _rpd._images_from_source
+_url_to_cached_path = _rpd._url_to_cached_path
+_derive_case_id = _rpd._derive_case_id
+_resolve_out_dir = _rpd._resolve_out_dir
+_MEDIA_TYPE = _rpd._MEDIA_TYPE
+
+
+# 追加给主 Agent 的 Cyber 运行时说明 (原 prompt 是 Claude 工具名, 这里给映射 + 子 agent 分发约定)。
+_CYBER_RUNTIME_NOTE = """
+
+## ⚙️ Cyber Agent 运行时差异 (本次执行引擎不是 Claude Code, 工具名不同!)
+
+上文出现的 Claude 风格工具名, 在本运行时对应如下 (调用时用**右边**的名字):
+- `Read(file_path=X)`  → `read_file(path=X)`
+- `Write(file_path=X, content=Y)` → `write_file(path=X, content=Y)`
+- `Edit(file_path=X, ...)` → `edit_file(...)`
+- `Bash(command=C)` → `bash_command(command=C)`
+- `Glob` → `glob_files`,`Grep` → `grep_content`
+- 读图用 `read_images`(若需要主动看本地图)。
+
+## 🔱 Phase 2 子 Agent 分发 (对应原 `Task`/`Agent` 工具)
+
+进入 Phase 2 (归一化与分类匹配) 时, **不要**自己手调 taxonomy-lookup.py 逐条决策。
+先用 prepare-subtask.py 生成 _scratch/task_2a.json 和 task_2b.json, 然后**在同一轮里**
+发出两个子 Agent 调用 (它们会并行执行, 各自写 patch 文件):
+
+    agent(agent_type="phase-2a-normalizer", task="处理 <case_dir>/_scratch/task_2a.json, 产出 patch_2a.json")
+    agent(agent_type="phase-2b-matcher",    task="处理 <case_dir>/_scratch/task_2b.json, 产出 patch_2b.json")
+
+两个子 Agent 都返回后, 用 `bash_command` 跑 wf-patch.py 把 patch_2a.json / patch_2b.json
+回填进 workflow.json, 再继续 Phase 3。
+"""
+
+
+def _to_openai_content(text: str, images: List[str]) -> List[Dict[str, Any]]:
+    """把 (text, 图URL列表) 拼成 OpenAI 格式的 content blocks (OpenRouter / 各家通吃)。
+
+    - 文本块: {"type": "text", "text": ...}
+    - 图片块: {"type": "image_url", "image_url": {"url": "data:<mime>;base64,<...>"}}
+      URL 先经 run_procedure_dsl._url_to_cached_path 客户端下载缓存 (绕图床 robots.txt)。
+    单张图失败不阻塞整批。
+    """
+    blocks: List[Dict[str, Any]] = [{"type": "text", "text": text}]
+    n_ok, n_fail = 0, 0
+    for ref in images:
+        try:
+            if ref.startswith(("http://", "https://")):
+                local = _url_to_cached_path(ref)
+            else:
+                local = Path(ref).expanduser().resolve()
+                if not local.exists():
+                    raise FileNotFoundError(ref)
+            mime = _MEDIA_TYPE.get(local.suffix.lower(), "image/png")
+            data = base64.standard_b64encode(local.read_bytes()).decode()
+            blocks.append({
+                "type": "image_url",
+                "image_url": {"url": f"data:{mime};base64,{data}"},
+            })
+            n_ok += 1
+        except Exception as e:
+            n_fail += 1
+            print(f"[image] skip {ref[:80]}... ({type(e).__name__}: {e})", flush=True)
+    if images:
+        print(f"[image] {n_ok}/{len(images)} 成功 base64 化, {n_fail} 失败已跳过", flush=True)
+    return blocks
+
+
+def _trace_append(trace_path: Path, chunk: str) -> None:
+    with trace_path.open("a", encoding="utf-8") as f:
+        f.write(chunk)
+
+
+async def run(args: argparse.Namespace) -> int:
+    from agent.core.runner import AgentRunner, RunConfig
+    from agent.core.presets import load_presets_from_json
+    from agent.trace import FileSystemTraceStore, Trace, Message
+    from agent.llm import create_openrouter_llm_call
+
+    workdir = DSL_ROOT
+    source_path = Path(args.source).expanduser().resolve()
+    if not source_path.exists():
+        print(f"❌ source not found: {source_path}", file=sys.stderr)
+        return 1
+
+    out_dir = _resolve_out_dir(args.out_dir, workdir)
+    out_dir.mkdir(parents=True, exist_ok=True)
+    (out_dir / "_scratch").mkdir(exist_ok=True)
+    trace_id_file = out_dir / ".trace_id"
+    trace_path = out_dir / "_trace_cyber.md"
+
+    # 注册子 Agent presets (phase-2a-normalizer / phase-2b-matcher / main)。
+    presets_json = DSL_ROOT / "presets_cyber.json"
+    if presets_json.exists():
+        load_presets_from_json(str(presets_json))
+    else:
+        print(f"⚠️ 缺少 {presets_json}, 子 Agent 分发会失败", file=sys.stderr)
+
+    # source 路径给 Agent (workdir 相对优先)。
+    try:
+        source_for_agent = source_path.relative_to(workdir).as_posix()
+    except ValueError:
+        source_for_agent = str(source_path)
+
+    # resume: 读已存 trace_id, 只发增量 "接着做" 消息。
+    resume_tid = None
+    if args.resume:
+        if not trace_id_file.exists():
+            print(f"❌ --resume 但无 {trace_id_file}; 先正常跑一次", file=sys.stderr)
+            return 1
+        resume_tid = trace_id_file.read_text(encoding="utf-8").strip() or None
+
+    images = _images_from_source(source_path) + (args.extra_image or [])
+
+    if resume_tid:
+        msgs = [{"role": "user", "content": (
+            f"上次中断了, 接续做 case-{args.case_id} 的提取流程。\n"
+            f"先用 bash_command `ls` 看 {out_dir.as_posix()}/ 当前已落盘哪些产物, "
+            f"再 read_file 这些**当前磁盘版本** (understanding.md / workflow.json) 接着跑, "
+            f"不要凭记忆。Phase 2 仍按子 Agent 分发约定 (agent(agent_type=...))。"
+        )}]
+    else:
+        # 复用原脚本的起手 prompt 全文 (取 text 块), 再补 Cyber 运行时说明。
+        anth_blocks = _build_initial_blocks(
+            source_for_agent, args.case_id, args.out_dir, images, workdir, "spec"
+        )
+        base_text = anth_blocks[0]["text"] + _CYBER_RUNTIME_NOTE
+        msgs = [{"role": "user", "content": _to_openai_content(base_text, images)}]
+
+    cfg = RunConfig(
+        model=args.model,
+        temperature=0.3,
+        max_iterations=args.max_turns,
+        agent_type="main",
+        name=f"procedure-dsl case-{args.case_id} (cyber)",
+        tool_groups=["core"],
+        parallel_tool_execution=True,   # Phase 2 同轮发两个 agent() 即并行
+        enable_prompt_caching=False,    # 非 Claude 模型无效, 关掉省得干扰
+        goal_compression="on_overflow",
+        trace_id=resume_tid,
+    )
+
+    print(f"[setup] engine     = Cyber AgentRunner (OpenRouter)")
+    print(f"[setup] model      = {args.model}")
+    print(f"[setup] source     = {source_path}")
+    print(f"[setup] case_id    = {args.case_id}")
+    print(f"[setup] out_dir    = {out_dir}")
+    print(f"[setup] images     = {len(images)}")
+    print(f"[setup] max_iter   = {args.max_turns}")
+    print(f"[setup] resume     = {resume_tid[:8] + '...' if resume_tid else 'no'}")
+    print(flush=True)
+
+    now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
+    _trace_append(trace_path, f"\n\n---\n\n## ▶ {'Resume' if resume_tid else 'Fresh'} @ {now}\n"
+                              f"- model: `{args.model}` · case: `{args.case_id}` · images: `{len(images)}`\n")
+
+    runner = AgentRunner(
+        llm_call=create_openrouter_llm_call(model=args.model),
+        trace_store=FileSystemTraceStore(base_path=str(out_dir / ".trace")),
+    )
+
+    turn = 0
+    t0 = time.time()
+    status = "unknown"
+    try:
+        async for item in runner.run(messages=msgs, config=cfg):
+            if isinstance(item, Trace):
+                status = item.status
+                if item.trace_id:
+                    trace_id_file.write_text(item.trace_id, encoding="utf-8")
+                print(f"[trace] {item.trace_id} status={item.status}", flush=True)
+            elif isinstance(item, Message):
+                role = getattr(item, "role", "?")
+                content = getattr(item, "content", "") or ""
+                tool_calls = getattr(item, "tool_calls", None)
+                if role == "assistant":
+                    turn += 1
+                    if content:
+                        print(f"\n[turn {turn} · text]\n{content}\n", flush=True)
+                        _trace_append(trace_path, f"\n### Turn {turn}\n> {content[:2000]}\n")
+                    for tc in (tool_calls or []):
+                        fn = (tc.get("function") or {}) if isinstance(tc, dict) else {}
+                        nm = fn.get("name", tc.get("name", "?") if isinstance(tc, dict) else "?")
+                        ar = str(fn.get("arguments", ""))[:200]
+                        print(f"[turn {turn} · tool] {nm}({ar})", flush=True)
+                        _trace_append(trace_path, f"- `{nm}` — `{ar}`\n")
+                elif role == "tool":
+                    preview = str(content)[:300]
+                    print(f"  ↳ [tool result] {preview}", flush=True)
+    except KeyboardInterrupt:
+        print(f"\n⚠️ 中断. {out_dir}/ 产物已保留. 续跑: --resume", file=sys.stderr)
+        return 130
+    except Exception as e:
+        logging.exception("cyber run failed")
+        print(f"❌ {type(e).__name__}: {e}", file=sys.stderr)
+        return 1
+
+    elapsed = time.time() - t0
+    print(f"\n[done] status={status} turns={turn} wall={elapsed:.1f}s", flush=True)
+    _trace_append(trace_path, f"\n### ◀ done · status={status} · turns={turn} · {elapsed:.1f}s\n")
+    return 0 if status in ("completed", "unknown") else 2
+
+
+def _parse_args() -> argparse.Namespace:
+    p = argparse.ArgumentParser(
+        description="跑 procedure-dsl 提取流程 (Cyber AgentRunner + OpenRouter)",
+        formatter_class=argparse.RawDescriptionHelpFormatter,
+        epilog=__doc__,
+    )
+    p.add_argument("source", help="原始 post 文件 (input/case-N-raw.json)")
+    p.add_argument("--out-dir", required=True,
+                   help="输出目录名, 落在 outputs/ 下. case_id 自动从 basename 推。")
+    p.add_argument("--extra-image", action="append", default=[],
+                   help="额外配图 (本地路径 or URL), 可多次。")
+    p.add_argument("--model", default="openai/gpt-4o",
+                   help="OpenRouter 模型名 (default: openai/gpt-4o). "
+                        "如 google/gemini-2.5-pro / qwen/qwen-max / anthropic/claude-sonnet-4.5")
+    p.add_argument("--max-turns", type=int, default=300, help="最大迭代轮数 (default: 300)")
+    p.add_argument("--resume", action="store_true",
+                   help="从 outputs/<out-dir>/.trace_id 读 trace 续跑")
+    return p.parse_args()
+
+
+def main() -> None:
+    for stream in (sys.stdout, sys.stderr):
+        if hasattr(stream, "reconfigure"):
+            stream.reconfigure(encoding="utf-8", errors="replace")
+    logging.basicConfig(level=logging.WARNING)
+    args = _parse_args()
+    args.case_id = _derive_case_id(args.out_dir)
+    sys.exit(asyncio.run(run(args)))
+
+
+if __name__ == "__main__":
+    main()