""" 生产级 Agent 模板 这是一个可直接用于生产环境的 Agent 模板,包含: - 完整的错误处理 - 日志记录 - 配置管理 - 工具注册 - Skills 加载 - 结果输出 """ import asyncio import logging import sys import os from pathlib import Path from typing import Optional, List, Dict, Any from datetime import datetime # 添加项目根目录到 Python 路径 sys.path.insert(0, str(Path(__file__).parent.parent.parent)) from dotenv import load_dotenv load_dotenv() from agent import ( AgentRunner, RunConfig, FileSystemTraceStore, Trace, Message, tool, ToolResult, ToolContext, ) from agent.llm import create_openrouter_llm_call # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('.cache/agent.log'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) # ===== 配置管理 ===== class AgentConfig: """Agent 配置""" def __init__(self): # LLM 配置 self.model = os.getenv("MODEL", "anthropic/claude-sonnet-4.5") self.api_key = os.getenv("OPENROUTER_API_KEY") self.temperature = float(os.getenv("TEMPERATURE", "0.3")) self.max_iterations = int(os.getenv("MAX_ITERATIONS", "30")) # 存储配置 self.trace_dir = os.getenv("TRACE_DIR", ".cache/traces") self.output_dir = os.getenv("OUTPUT_DIR", ".cache/output") # Skills 配置 self.skills_dir = os.getenv("SKILLS_DIR", "./skills") self.enabled_skills = os.getenv("ENABLED_SKILLS", "").split(",") if os.getenv("ENABLED_SKILLS") else None # 验证配置 self._validate() def _validate(self): """验证配置""" if not self.api_key: raise ValueError("OPENROUTER_API_KEY 未设置,请在 .env 文件中配置") # 创建必要的目录 Path(self.trace_dir).mkdir(parents=True, exist_ok=True) Path(self.output_dir).mkdir(parents=True, exist_ok=True) # ===== 自定义工具 ===== @tool(description="示例工具:处理数据") async def process_data( data: str, operation: str = "transform", ctx: ToolContext = None, ) -> ToolResult: """ 处理数据的示例工具 Args: data: 要处理的数据 operation: 操作类型(transform/validate/analyze) ctx: 工具上下文 """ try: logger.info(f"处理数据: operation={operation}, data_length={len(data)}") # 你的业务逻辑 if operation == "transform": result = data.upper() elif operation == "validate": result = f"数据有效: {len(data)} 字符" elif operation == "analyze": result = f"分析结果: 包含 {len(data.split())} 个单词" else: return ToolResult( title="操作失败", output=f"不支持的操作: {operation}", error=True, ) return ToolResult( title="处理成功", output=result, data={"operation": operation, "result": result}, ) except Exception as e: logger.error(f"处理数据失败: {e}", exc_info=True) return ToolResult( title="处理失败", output=str(e), error=True, ) @tool(description="示例工具:查询外部 API") async def query_api( endpoint: str, params: Optional[Dict[str, Any]] = None, ctx: ToolContext = None, ) -> ToolResult: """ 查询外部 API 的示例工具 Args: endpoint: API 端点 params: 查询参数 ctx: 工具上下文 """ try: logger.info(f"查询 API: endpoint={endpoint}, params={params}") # 模拟 API 调用 # 实际使用时替换为真实的 API 调用 await asyncio.sleep(0.5) result = { "endpoint": endpoint, "params": params, "data": {"status": "success", "message": "API 调用成功"}, } return ToolResult( title="API 查询成功", output=f"成功查询 {endpoint}", data=result, ) except Exception as e: logger.error(f"API 查询失败: {e}", exc_info=True) return ToolResult( title="API 查询失败", output=str(e), error=True, ) # ===== Agent 类 ===== class ProductionAgent: """生产级 Agent""" def __init__(self, config: AgentConfig): self.config = config self.runner = self._create_runner() def _create_runner(self) -> AgentRunner: """创建 AgentRunner""" # 初始化存储 trace_store = FileSystemTraceStore(base_path=self.config.trace_dir) # 初始化 LLM llm_call = create_openrouter_llm_call( model=self.config.model, api_key=self.config.api_key, ) # 创建 Runner runner = AgentRunner( llm_call=llm_call, trace_store=trace_store, skills_dir=self.config.skills_dir if Path(self.config.skills_dir).exists() else None, ) logger.info(f"AgentRunner 已创建: model={self.config.model}") return runner async def run( self, task: str, trace_id: Optional[str] = None, ) -> Dict[str, Any]: """ 运行 Agent Args: task: 任务描述 trace_id: Trace ID(用于续跑) Returns: 执行结果 """ logger.info(f"开始执行任务: {task[:100]}...") # 构建运行配置 run_config = RunConfig( model=self.config.model, temperature=self.config.temperature, max_iterations=self.config.max_iterations, trace_id=trace_id, skills=self.config.enabled_skills, ) # 执行 Agent final_response = None current_trace_id = None messages_count = 0 try: async for item in self.runner.run( messages=[{"role": "user", "content": task}], config=run_config, ): if isinstance(item, Trace): current_trace_id = item.trace_id logger.info(f"Trace ID: {current_trace_id}") elif isinstance(item, Message): messages_count += 1 if item.role == "assistant" and item.content: final_response = item.content logger.info(f"收到 Assistant 消息 #{messages_count}") logger.info(f"任务执行完成: trace_id={current_trace_id}, messages={messages_count}") # 保存结果 result = { "trace_id": current_trace_id, "task": task, "response": final_response, "messages_count": messages_count, "timestamp": datetime.now().isoformat(), } self._save_result(result) return result except Exception as e: logger.error(f"任务执行失败: {e}", exc_info=True) return { "error": str(e), "trace_id": current_trace_id, "task": task, } def _save_result(self, result: Dict[str, Any]): """保存执行结果""" output_file = Path(self.config.output_dir) / f"{result['trace_id']}.txt" with open(output_file, 'w', encoding='utf-8') as f: f.write(f"Trace ID: {result['trace_id']}\n") f.write(f"时间: {result['timestamp']}\n") f.write(f"任务: {result['task']}\n") f.write(f"\n{'='*60}\n\n") f.write(f"结果:\n{result['response']}\n") logger.info(f"结果已保存: {output_file}") # ===== 主函数 ===== async def main(): """主函数""" try: # 加载配置 config = AgentConfig() # 创建 Agent agent = ProductionAgent(config) # 执行任务 task = """ 请帮我完成以下任务: 1. 使用 process_data 工具处理文本 "hello world" 2. 使用 query_api 工具查询 /api/users 端点 3. 总结执行结果 """ result = await agent.run(task) # 输出结果 print("\n" + "="*60) print("执行结果:") print("="*60) print(f"Trace ID: {result.get('trace_id')}") print(f"消息数: {result.get('messages_count')}") print("\n响应:") print(result.get('response', result.get('error'))) print("="*60) except Exception as e: logger.error(f"程序执行失败: {e}", exc_info=True) sys.exit(1) if __name__ == "__main__": asyncio.run(main())