| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234 |
- """
- Agent SDK 入口 —— 统一调用 remote / 本地 Agent 的公开 API。
- 使用方式(任何进程,只要装了 cyber-agent 包):
- import asyncio
- from agent import invoke_agent
- # 远端(HTTP 调用 KnowHub 服务器)
- result = asyncio.run(invoke_agent(
- agent_type="remote_librarian",
- task="ControlNet 相关的工具知识",
- skills=["ask_strategy"],
- ))
- # 本地(在当前进程起 AgentRunner)
- result = asyncio.run(invoke_agent(
- agent_type="deconstruct",
- task="...",
- project_root="./examples/production_plan",
- ))
- skill 脚本只需要 `from agent import invoke_agent` 然后透传命令行参数即可——
- 不依赖仓库相对路径,也不会触发 `agent.tools.builtin` 的 eager tool registry 加载。
- """
- import importlib
- import importlib.util
- import logging
- import os
- import sys
- from pathlib import Path
- from typing import Any, Dict, List, Optional
- logger = logging.getLogger(__name__)
- # 模块加载时就 load .env,保证后续任何 module-level `os.getenv(...)` 读取都能拿到值。
- # 查找顺序:cwd / cyber-agent 仓库根(editable install 定位)。
- def _load_default_env() -> None:
- try:
- from dotenv import load_dotenv
- except ImportError:
- return
- # 先试 cwd 向上查找
- load_dotenv()
- # 再试 cyber-agent 仓库根(编辑安装时 agent/__init__.py 所在仓库的 .env)
- repo_env = Path(__file__).resolve().parent.parent / ".env"
- if repo_env.exists():
- load_dotenv(repo_env, override=False)
- _load_default_env()
- async def invoke_agent(
- agent_type: str,
- task: str,
- skills: Optional[List[str]] = None,
- continue_from: Optional[str] = None,
- messages: Optional[List[Dict[str, Any]]] = None,
- project_root: Optional[str] = None,
- ) -> Dict[str, Any]:
- """
- 统一调用远端或本地 Agent。
- Args:
- agent_type: Agent 类型。以 "remote_" 开头 → HTTP 调用 KnowHub;否则本地执行。
- task: 任务描述
- skills: 指定 skill 列表(远端由服务器白名单过滤;本地覆盖项目 RUN_CONFIG.skills)
- continue_from: 已有 sub_trace_id,传入则续跑
- messages: 预置 OpenAI 格式消息(远端 1D、本地 1D)
- project_root: 本地 agent 必填——项目目录(含 config.py / presets.json / tools/)
- Returns:
- {"mode", "agent_type", "sub_trace_id", "status", "summary", "stats", "error"?}
- """
- if agent_type.startswith("remote_"):
- # 懒 import,避免加载整个 tool registry(远端调用只需要 httpx)
- from agent.tools.builtin.subagent import _run_remote_agent
- return await _run_remote_agent(
- agent_type=agent_type,
- task=task,
- messages=messages,
- continue_from=continue_from,
- skills=skills,
- )
- if not project_root:
- return {
- "mode": "local",
- "agent_type": agent_type,
- "status": "failed",
- "error": "本地 agent 需要 project_root 指定项目目录(含 config.py)",
- }
- return await _run_local_agent(
- agent_type=agent_type,
- task=task,
- skills=skills,
- continue_from=continue_from,
- messages=messages,
- project_root=project_root,
- )
- async def _run_local_agent(
- agent_type: str,
- task: str,
- skills: Optional[List[str]],
- continue_from: Optional[str],
- messages: Optional[List[Dict[str, Any]]],
- project_root: str,
- ) -> Dict[str, Any]:
- """
- 在当前进程中起 AgentRunner 跑本地 agent。
- 项目目录约定:
- project_root/
- ├── config.py # 必需:定义 RUN_CONFIG(RunConfig 实例),可选 SKILLS_DIR / TRACE_STORE_PATH
- ├── presets.json # 可选:agent_type preset
- └── tools/ # 可选:项目自定义工具(有 __init__.py 则 import 触发 @tool 注册)
- .env 会自动从 project_root 或上两级目录查找并加载。
- """
- root = Path(project_root).resolve()
- if not root.is_dir():
- return {"mode": "local", "agent_type": agent_type, "status": "failed",
- "error": f"project_root 不存在: {root}"}
- # 1. 把项目根加入 sys.path(让 `import config` / `import tools` 能找到)
- if str(root) not in sys.path:
- sys.path.insert(0, str(root))
- # 2. 加载 .env:依次查项目根、两级父目录(兼容 monorepo)、cyber-agent 仓库根
- try:
- from dotenv import load_dotenv
- import agent as _agent_pkg
- agent_repo_root = Path(_agent_pkg.__file__).parent.parent
- for candidate in (
- root / ".env",
- root.parent / ".env",
- root.parent.parent / ".env",
- agent_repo_root / ".env",
- ):
- if candidate.exists():
- load_dotenv(candidate)
- break
- except ImportError:
- pass
- # 3. 加载项目 config
- config_path = root / "config.py"
- if not config_path.exists():
- return {"mode": "local", "agent_type": agent_type, "status": "failed",
- "error": f"缺少 {config_path}"}
- try:
- spec = importlib.util.spec_from_file_location("_project_config", config_path)
- cfg_mod = importlib.util.module_from_spec(spec)
- spec.loader.exec_module(cfg_mod)
- except Exception as e:
- return {"mode": "local", "agent_type": agent_type, "status": "failed",
- "error": f"加载 {config_path} 失败: {e}"}
- run_config = getattr(cfg_mod, "RUN_CONFIG", None)
- if run_config is None:
- return {"mode": "local", "agent_type": agent_type, "status": "failed",
- "error": f"{config_path} 未定义 RUN_CONFIG"}
- # 覆盖 agent_type / skills / continue_from
- run_config.agent_type = agent_type
- if skills is not None:
- run_config.skills = skills
- if continue_from:
- run_config.trace_id = continue_from
- # 4. 加载项目 presets.json(如果有)
- presets_path = root / "presets.json"
- if presets_path.exists():
- try:
- from agent.core.presets import load_presets_from_json
- load_presets_from_json(str(presets_path))
- except Exception as e:
- logger.warning(f"加载 presets.json 失败: {e}")
- # 5. 触发项目自定义工具注册(约定:project_root/tools/__init__.py)
- if (root / "tools" / "__init__.py").exists():
- try:
- importlib.import_module("tools")
- except Exception as e:
- logger.warning(f"加载 tools 包失败: {e}")
- # 6. 创建 AgentRunner
- from agent.core.runner import AgentRunner
- from agent.trace import FileSystemTraceStore
- from agent.llm import create_qwen_llm_call
- trace_store_path = getattr(cfg_mod, "TRACE_STORE_PATH", ".trace")
- skills_dir = getattr(cfg_mod, "SKILLS_DIR", "./skills")
- # 相对路径以 project_root 为基准
- if not os.path.isabs(trace_store_path):
- trace_store_path = str(root / trace_store_path)
- if not os.path.isabs(skills_dir):
- skills_dir = str(root / skills_dir)
- runner = AgentRunner(
- trace_store=FileSystemTraceStore(base_path=trace_store_path),
- llm_call=create_qwen_llm_call(model=run_config.model),
- skills_dir=skills_dir,
- )
- # 7. 构建消息
- msgs = list(messages) if messages else []
- msgs.append({"role": "user", "content": task})
- # 8. 运行
- try:
- result = await runner.run_result(messages=msgs, config=run_config)
- except Exception as e:
- logger.exception("本地 agent 运行失败")
- return {"mode": "local", "agent_type": agent_type, "status": "failed",
- "error": f"{type(e).__name__}: {e}"}
- return {
- "mode": "local",
- "agent_type": agent_type,
- "sub_trace_id": result.get("trace_id"),
- "status": result.get("status", "unknown"),
- "summary": result.get("summary", ""),
- "stats": result.get("stats", {}),
- "error": result.get("error"),
- }
|