| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324 |
- """
- 生产级 Agent 模板
- 这是一个可直接用于生产环境的 Agent 模板,包含:
- - 完整的错误处理
- - 日志记录
- - 配置管理
- - 工具注册
- - Skills 加载
- - 结果输出
- """
- import asyncio
- import logging
- import sys
- import os
- from pathlib import Path
- from typing import Optional, List, Dict, Any
- from datetime import datetime
- # 添加项目根目录到 Python 路径
- sys.path.insert(0, str(Path(__file__).parent.parent.parent))
- from dotenv import load_dotenv
- load_dotenv()
- from agent import (
- AgentRunner,
- RunConfig,
- FileSystemTraceStore,
- Trace,
- Message,
- tool,
- ToolResult,
- ToolContext,
- )
- from agent.llm import create_openrouter_llm_call
- # 配置日志
- logging.basicConfig(
- level=logging.INFO,
- format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
- handlers=[
- logging.FileHandler('.cache/agent.log'),
- logging.StreamHandler()
- ]
- )
- logger = logging.getLogger(__name__)
- # ===== 配置管理 =====
- class AgentConfig:
- """Agent 配置"""
- def __init__(self):
- # LLM 配置
- self.model = os.getenv("MODEL", "anthropic/claude-sonnet-4.5")
- self.api_key = os.getenv("OPENROUTER_API_KEY")
- self.temperature = float(os.getenv("TEMPERATURE", "0.3"))
- self.max_iterations = int(os.getenv("MAX_ITERATIONS", "30"))
- # 存储配置
- self.trace_dir = os.getenv("TRACE_DIR", ".cache/traces")
- self.output_dir = os.getenv("OUTPUT_DIR", ".cache/output")
- # Skills 配置
- self.skills_dir = os.getenv("SKILLS_DIR", "./skills")
- self.enabled_skills = os.getenv("ENABLED_SKILLS", "").split(",") if os.getenv("ENABLED_SKILLS") else None
- # 验证配置
- self._validate()
- def _validate(self):
- """验证配置"""
- if not self.api_key:
- raise ValueError("OPENROUTER_API_KEY 未设置,请在 .env 文件中配置")
- # 创建必要的目录
- Path(self.trace_dir).mkdir(parents=True, exist_ok=True)
- Path(self.output_dir).mkdir(parents=True, exist_ok=True)
- # ===== 自定义工具 =====
- @tool(description="示例工具:处理数据")
- async def process_data(
- data: str,
- operation: str = "transform",
- ctx: ToolContext = None,
- ) -> ToolResult:
- """
- 处理数据的示例工具
- Args:
- data: 要处理的数据
- operation: 操作类型(transform/validate/analyze)
- ctx: 工具上下文
- """
- try:
- logger.info(f"处理数据: operation={operation}, data_length={len(data)}")
- # 你的业务逻辑
- if operation == "transform":
- result = data.upper()
- elif operation == "validate":
- result = f"数据有效: {len(data)} 字符"
- elif operation == "analyze":
- result = f"分析结果: 包含 {len(data.split())} 个单词"
- else:
- return ToolResult(
- title="操作失败",
- output=f"不支持的操作: {operation}",
- error=True,
- )
- return ToolResult(
- title="处理成功",
- output=result,
- data={"operation": operation, "result": result},
- )
- except Exception as e:
- logger.error(f"处理数据失败: {e}", exc_info=True)
- return ToolResult(
- title="处理失败",
- output=str(e),
- error=True,
- )
- @tool(description="示例工具:查询外部 API")
- async def query_api(
- endpoint: str,
- params: Optional[Dict[str, Any]] = None,
- ctx: ToolContext = None,
- ) -> ToolResult:
- """
- 查询外部 API 的示例工具
- Args:
- endpoint: API 端点
- params: 查询参数
- ctx: 工具上下文
- """
- try:
- logger.info(f"查询 API: endpoint={endpoint}, params={params}")
- # 模拟 API 调用
- # 实际使用时替换为真实的 API 调用
- await asyncio.sleep(0.5)
- result = {
- "endpoint": endpoint,
- "params": params,
- "data": {"status": "success", "message": "API 调用成功"},
- }
- return ToolResult(
- title="API 查询成功",
- output=f"成功查询 {endpoint}",
- data=result,
- )
- except Exception as e:
- logger.error(f"API 查询失败: {e}", exc_info=True)
- return ToolResult(
- title="API 查询失败",
- output=str(e),
- error=True,
- )
- # ===== Agent 类 =====
- class ProductionAgent:
- """生产级 Agent"""
- def __init__(self, config: AgentConfig):
- self.config = config
- self.runner = self._create_runner()
- def _create_runner(self) -> AgentRunner:
- """创建 AgentRunner"""
- # 初始化存储
- trace_store = FileSystemTraceStore(base_path=self.config.trace_dir)
- # 初始化 LLM
- llm_call = create_openrouter_llm_call(
- model=self.config.model,
- api_key=self.config.api_key,
- )
- # 创建 Runner
- runner = AgentRunner(
- llm_call=llm_call,
- trace_store=trace_store,
- skills_dir=self.config.skills_dir if Path(self.config.skills_dir).exists() else None,
- )
- logger.info(f"AgentRunner 已创建: model={self.config.model}")
- return runner
- async def run(
- self,
- task: str,
- trace_id: Optional[str] = None,
- ) -> Dict[str, Any]:
- """
- 运行 Agent
- Args:
- task: 任务描述
- trace_id: Trace ID(用于续跑)
- Returns:
- 执行结果
- """
- logger.info(f"开始执行任务: {task[:100]}...")
- # 构建运行配置
- run_config = RunConfig(
- model=self.config.model,
- temperature=self.config.temperature,
- max_iterations=self.config.max_iterations,
- trace_id=trace_id,
- skills=self.config.enabled_skills,
- )
- # 执行 Agent
- final_response = None
- current_trace_id = None
- messages_count = 0
- try:
- async for item in self.runner.run(
- messages=[{"role": "user", "content": task}],
- config=run_config,
- ):
- if isinstance(item, Trace):
- current_trace_id = item.trace_id
- logger.info(f"Trace ID: {current_trace_id}")
- elif isinstance(item, Message):
- messages_count += 1
- if item.role == "assistant" and item.content:
- final_response = item.content
- logger.info(f"收到 Assistant 消息 #{messages_count}")
- logger.info(f"任务执行完成: trace_id={current_trace_id}, messages={messages_count}")
- # 保存结果
- result = {
- "trace_id": current_trace_id,
- "task": task,
- "response": final_response,
- "messages_count": messages_count,
- "timestamp": datetime.now().isoformat(),
- }
- self._save_result(result)
- return result
- except Exception as e:
- logger.error(f"任务执行失败: {e}", exc_info=True)
- return {
- "error": str(e),
- "trace_id": current_trace_id,
- "task": task,
- }
- def _save_result(self, result: Dict[str, Any]):
- """保存执行结果"""
- output_file = Path(self.config.output_dir) / f"{result['trace_id']}.txt"
- with open(output_file, 'w', encoding='utf-8') as f:
- f.write(f"Trace ID: {result['trace_id']}\n")
- f.write(f"时间: {result['timestamp']}\n")
- f.write(f"任务: {result['task']}\n")
- f.write(f"\n{'='*60}\n\n")
- f.write(f"结果:\n{result['response']}\n")
- logger.info(f"结果已保存: {output_file}")
- # ===== 主函数 =====
- async def main():
- """主函数"""
- try:
- # 加载配置
- config = AgentConfig()
- # 创建 Agent
- agent = ProductionAgent(config)
- # 执行任务
- task = """
- 请帮我完成以下任务:
- 1. 使用 process_data 工具处理文本 "hello world"
- 2. 使用 query_api 工具查询 /api/users 端点
- 3. 总结执行结果
- """
- result = await agent.run(task)
- # 输出结果
- print("\n" + "="*60)
- print("执行结果:")
- print("="*60)
- print(f"Trace ID: {result.get('trace_id')}")
- print(f"消息数: {result.get('messages_count')}")
- print("\n响应:")
- print(result.get('response', result.get('error')))
- print("="*60)
- except Exception as e:
- logger.error(f"程序执行失败: {e}", exc_info=True)
- sys.exit(1)
- if __name__ == "__main__":
- asyncio.run(main())
|