""" Agent SDK 入口 —— 统一调用 remote / 本地 Agent 的公开 API。 使用方式(任何进程,只要装了 cyber-agent 包): import asyncio from agent import invoke_agent # 远端(HTTP 调用 KnowHub 服务器) result = asyncio.run(invoke_agent( agent_type="remote_librarian", task="ControlNet 相关的工具知识", skills=["ask_strategy"], )) # 本地(在当前进程起 AgentRunner) result = asyncio.run(invoke_agent( agent_type="deconstruct", task="...", project_root="./examples/production_plan", )) skill 脚本只需要 `from agent import invoke_agent` 然后透传命令行参数即可—— 不依赖仓库相对路径,也不会触发 `agent.tools.builtin` 的 eager tool registry 加载。 """ import importlib import importlib.util import logging import os import sys from pathlib import Path from typing import Any, Dict, List, Optional logger = logging.getLogger(__name__) # 模块加载时就 load .env,保证后续任何 module-level `os.getenv(...)` 读取都能拿到值。 # 查找顺序:cwd / cyber-agent 仓库根(editable install 定位)。 def _load_default_env() -> None: try: from dotenv import load_dotenv except ImportError: return # 先试 cwd 向上查找 load_dotenv() # 再试 cyber-agent 仓库根(编辑安装时 agent/__init__.py 所在仓库的 .env) repo_env = Path(__file__).resolve().parent.parent / ".env" if repo_env.exists(): load_dotenv(repo_env, override=False) _load_default_env() async def invoke_agent( agent_type: str, task: str, skills: Optional[List[str]] = None, continue_from: Optional[str] = None, messages: Optional[List[Dict[str, Any]]] = None, project_root: Optional[str] = None, ) -> Dict[str, Any]: """ 统一调用远端或本地 Agent。 Args: agent_type: Agent 类型。以 "remote_" 开头 → HTTP 调用 KnowHub;否则本地执行。 task: 任务描述 skills: 指定 skill 列表(远端由服务器白名单过滤;本地覆盖项目 RUN_CONFIG.skills) continue_from: 已有 sub_trace_id,传入则续跑 messages: 预置 OpenAI 格式消息(远端 1D、本地 1D) project_root: 本地 agent 必填——项目目录(含 config.py / presets.json / tools/) Returns: {"mode", "agent_type", "sub_trace_id", "status", "summary", "stats", "error"?} """ if agent_type.startswith("remote_"): # 懒 import,避免加载整个 tool registry(远端调用只需要 httpx) from agent.tools.builtin.subagent import _run_remote_agent return await _run_remote_agent( agent_type=agent_type, task=task, messages=messages, continue_from=continue_from, skills=skills, ) if not project_root: return { "mode": "local", "agent_type": agent_type, "status": "failed", "error": "本地 agent 需要 project_root 指定项目目录(含 config.py)", } return await _run_local_agent( agent_type=agent_type, task=task, skills=skills, continue_from=continue_from, messages=messages, project_root=project_root, ) async def _run_local_agent( agent_type: str, task: str, skills: Optional[List[str]], continue_from: Optional[str], messages: Optional[List[Dict[str, Any]]], project_root: str, ) -> Dict[str, Any]: """ 在当前进程中起 AgentRunner 跑本地 agent。 项目目录约定: project_root/ ├── config.py # 必需:定义 RUN_CONFIG(RunConfig 实例),可选 SKILLS_DIR / TRACE_STORE_PATH ├── presets.json # 可选:agent_type preset └── tools/ # 可选:项目自定义工具(有 __init__.py 则 import 触发 @tool 注册) .env 会自动从 project_root 或上两级目录查找并加载。 """ root = Path(project_root).resolve() if not root.is_dir(): return {"mode": "local", "agent_type": agent_type, "status": "failed", "error": f"project_root 不存在: {root}"} # 1. 把项目根加入 sys.path(让 `import config` / `import tools` 能找到) if str(root) not in sys.path: sys.path.insert(0, str(root)) # 2. 加载 .env:依次查项目根、两级父目录(兼容 monorepo)、cyber-agent 仓库根 try: from dotenv import load_dotenv import agent as _agent_pkg agent_repo_root = Path(_agent_pkg.__file__).parent.parent for candidate in ( root / ".env", root.parent / ".env", root.parent.parent / ".env", agent_repo_root / ".env", ): if candidate.exists(): load_dotenv(candidate) break except ImportError: pass # 3. 加载项目 config config_path = root / "config.py" if not config_path.exists(): return {"mode": "local", "agent_type": agent_type, "status": "failed", "error": f"缺少 {config_path}"} try: spec = importlib.util.spec_from_file_location("_project_config", config_path) cfg_mod = importlib.util.module_from_spec(spec) spec.loader.exec_module(cfg_mod) except Exception as e: return {"mode": "local", "agent_type": agent_type, "status": "failed", "error": f"加载 {config_path} 失败: {e}"} run_config = getattr(cfg_mod, "RUN_CONFIG", None) if run_config is None: return {"mode": "local", "agent_type": agent_type, "status": "failed", "error": f"{config_path} 未定义 RUN_CONFIG"} # 覆盖 agent_type / skills / continue_from run_config.agent_type = agent_type if skills is not None: run_config.skills = skills if continue_from: run_config.trace_id = continue_from # 4. 加载项目 presets.json(如果有) presets_path = root / "presets.json" if presets_path.exists(): try: from agent.core.presets import load_presets_from_json load_presets_from_json(str(presets_path)) except Exception as e: logger.warning(f"加载 presets.json 失败: {e}") # 5. 触发项目自定义工具注册(约定:project_root/tools/__init__.py) if (root / "tools" / "__init__.py").exists(): try: importlib.import_module("tools") except Exception as e: logger.warning(f"加载 tools 包失败: {e}") # 6. 创建 AgentRunner from agent.core.runner import AgentRunner from agent.trace import FileSystemTraceStore from agent.llm import create_qwen_llm_call trace_store_path = getattr(cfg_mod, "TRACE_STORE_PATH", ".trace") skills_dir = getattr(cfg_mod, "SKILLS_DIR", "./skills") # 相对路径以 project_root 为基准 if not os.path.isabs(trace_store_path): trace_store_path = str(root / trace_store_path) if not os.path.isabs(skills_dir): skills_dir = str(root / skills_dir) runner = AgentRunner( trace_store=FileSystemTraceStore(base_path=trace_store_path), llm_call=create_qwen_llm_call(model=run_config.model), skills_dir=skills_dir, ) # 7. 构建消息 msgs = list(messages) if messages else [] msgs.append({"role": "user", "content": task}) # 8. 运行 try: result = await runner.run_result(messages=msgs, config=run_config) except Exception as e: logger.exception("本地 agent 运行失败") return {"mode": "local", "agent_type": agent_type, "status": "failed", "error": f"{type(e).__name__}: {e}"} return { "mode": "local", "agent_type": agent_type, "sub_trace_id": result.get("trace_id"), "status": result.get("status", "unknown"), "summary": result.get("summary", ""), "stats": result.get("stats", {}), "error": result.get("error"), }