""" Agent 工具函数 提供便捷的封装方法,简化 Agent 的使用。 """ from typing import Dict, Any, Optional from pathlib import Path from agent.core.runner import AgentRunner, RunConfig from agent.llm.openrouter import create_openrouter_llm_call from agent.trace.models import Trace, Message from agent.trace.store import FileSystemTraceStore DEFAULT_MODEL = "google/gemini-3-flash-preview" async def run_agent( prompt: str, model: str = DEFAULT_MODEL, **run_config_kwargs ) -> str: """ 通用 Agent 执行方法 传入 prompt,返回 agent 执行的返回结果(summary 字符串)。 Args: prompt: 用户输入的提示词 model: 模型名称,默认为 "google/gemini-3-flash-preview" **run_config_kwargs: 其他 RunConfig 参数,如: - temperature: float = 0.3 - max_iterations: int = 200 - tools: Optional[List[str]] = None - system_prompt: Optional[str] = None - enable_memory: bool = True - auto_execute_tools: bool = True - name: Optional[str] = None 等等 Returns: str: Agent 的最终回复文本(summary) Example: ```python result = await run_agent("帮我分析一下这个代码") print(result) ``` """ # 创建 AgentRunner(需要 trace_store 以支持 goal 工具) trace_dir = Path(".trace") trace_dir.mkdir(exist_ok=True) trace_store = FileSystemTraceStore(base_path=str(trace_dir)) runner = AgentRunner( trace_store=trace_store, llm_call=create_openrouter_llm_call(model=model), ) # 构建消息 messages = [{"role": "user", "content": prompt}] # 构建 RunConfig config = RunConfig( model=model, **run_config_kwargs ) # 执行并输出过程日志 last_assistant_text = "" final_trace: Optional[Trace] = None async for item in runner.run(messages=messages, config=config): if isinstance(item, Trace): final_trace = item if item.status == "running": print(f"\n{'='*60}") print(f"[Trace] 🚀 开始执行") print(f" Trace ID: {item.trace_id[:8]}...") print(f"{'='*60}") elif item.status == "completed": print(f"\n{'='*60}") print(f"[Trace] ✅ 执行完成") print(f" - 总消息数: {item.total_messages}") print(f" - 总 Token 数: {item.total_tokens}") print(f" - 总成本: ${item.total_cost:.4f}") print(f"{'='*60}") elif item.status == "failed": print(f"\n{'='*60}") print(f"[Trace] ❌ 执行失败") print(f" 错误信息: {item.error_message}") print(f"{'='*60}") elif item.status == "stopped": print(f"\n{'='*60}") print(f"[Trace] ⏸️ 执行已停止") print(f"{'='*60}") elif isinstance(item, Message): # 输出消息序列号和时间信息 seq_info = f"[序列 {item.sequence}]" if item.parent_sequence: seq_info += f" (父序列: {item.parent_sequence})" if item.role == "assistant": content = item.content if isinstance(content, dict): text = content.get("text", "") tool_calls = content.get("tool_calls") # 输出思考过程和文本内容 if text: if not tool_calls: # 最终回复 last_assistant_text = text print(f"\n{seq_info} [思考过程] 🤔 Agent 思考与回复:") print(f"{'-'*60}") print(text) print(f"{'-'*60}") else: # 有工具调用的思考过程 print(f"\n{seq_info} [思考过程] 🤔 Agent 思考:") print(f"{'-'*60}") print(text) print(f"{'-'*60}") # 输出工具调用详细信息 if tool_calls: print(f"\n{seq_info} [工具调用] 🛠️ 调用 {len(tool_calls)} 个工具:") for idx, tc in enumerate(tool_calls, 1): func_info = tc.get("function", {}) tool_name = func_info.get("name", "unknown") tool_id = tc.get("id", "unknown") tool_args = func_info.get("arguments", "") print(f" [{idx}] 工具名称: {tool_name}") print(f" 调用 ID: {tool_id[:16]}...") try: import json args_dict = json.loads(tool_args) if isinstance(tool_args, str) else tool_args args_str = json.dumps(args_dict, ensure_ascii=False, indent=2) if len(args_str) > 500: args_str = args_str[:500] + "..." print(f" 参数: {args_str}") except: args_preview = str(tool_args)[:200] + "..." if len(str(tool_args)) > 200 else str(tool_args) print(f" 参数: {args_preview}") print() elif isinstance(content, str): # 纯文本内容 last_assistant_text = content print(f"\n{seq_info} [思考过程] 🤔 Agent 回复:") print(f"{'-'*60}") print(content) print(f"{'-'*60}") elif item.role == "tool": # 输出工具执行结果详细信息 content = item.content tool_name = "unknown" tool_result = "" if isinstance(content, dict): tool_name = content.get("tool_name", "unknown") result = content.get("result", "") if isinstance(result, (dict, list)): import json tool_result = json.dumps(result, ensure_ascii=False, indent=2) else: tool_result = str(result) else: tool_result = str(content) if content else "" print(f"\n{seq_info} [工具结果] ✅ {tool_name} 执行完成") if item.tool_call_id: print(f" 关联调用 ID: {item.tool_call_id[:16]}...") print(f"{'-'*60}") if tool_result: if len(tool_result) > 1000: print(tool_result[:1000]) print(f"\n... (结果已截断,共 {len(tool_result)} 字符)") else: print(tool_result) else: print(" (无返回结果)") print(f"{'-'*60}") # 输出工具描述和元数据(如果有) if item.description: print(f" 描述: {item.description}") if item.duration_ms: print(f" 执行耗时: {item.duration_ms}ms") if item.cost: print(f" 成本: ${item.cost:.6f}") elif item.role == "user": # 输出用户输入(可选,用于调试) if isinstance(item.content, str) and len(item.content) > 0: preview = item.content[:200] + "..." if len(item.content) > 200 else item.content print(f"\n{seq_info} [用户输入] 👤 {preview}") # 如果没有获取到最终 trace,尝试从 store 获取 if not final_trace and config.trace_id and runner.trace_store: final_trace = await runner.trace_store.get_trace(config.trace_id) # 如果没有 summary,返回错误信息 if not last_assistant_text: error_msg = final_trace.error_message if final_trace else "Agent 没有产生 assistant 文本结果" print(f"\n[Error] {error_msg}") return error_msg return last_assistant_text