| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211 |
- """
- Agent 工具函数
- 提供便捷的封装方法,简化 Agent 的使用。
- """
- from typing import Dict, Any, Optional
- from pathlib import Path
- from agent.core.runner import AgentRunner, RunConfig
- from agent.llm.openrouter import create_openrouter_llm_call
- from agent.trace.models import Trace, Message
- from agent.trace.store import FileSystemTraceStore
- DEFAULT_MODEL = "google/gemini-3-flash-preview"
- async def run_agent(
- prompt: str,
- model: str = DEFAULT_MODEL,
- **run_config_kwargs
- ) -> str:
- """
- 通用 Agent 执行方法
-
- 传入 prompt,返回 agent 执行的返回结果(summary 字符串)。
-
- Args:
- prompt: 用户输入的提示词
- model: 模型名称,默认为 "google/gemini-3-flash-preview"
- **run_config_kwargs: 其他 RunConfig 参数,如:
- - temperature: float = 0.3
- - max_iterations: int = 200
- - tools: Optional[List[str]] = None
- - system_prompt: Optional[str] = None
- - enable_memory: bool = True
- - auto_execute_tools: bool = True
- - name: Optional[str] = None
- 等等
-
- Returns:
- str: Agent 的最终回复文本(summary)
-
- Example:
- ```python
- result = await run_agent("帮我分析一下这个代码")
- print(result)
- ```
- """
- # 创建 AgentRunner(需要 trace_store 以支持 goal 工具)
- trace_dir = Path(".trace")
- trace_dir.mkdir(exist_ok=True)
- trace_store = FileSystemTraceStore(base_path=str(trace_dir))
-
- runner = AgentRunner(
- trace_store=trace_store,
- llm_call=create_openrouter_llm_call(model=model),
- )
-
- # 构建消息
- messages = [{"role": "user", "content": prompt}]
-
- # 构建 RunConfig
- config = RunConfig(
- model=model,
- **run_config_kwargs
- )
-
- # 执行并输出过程日志
- last_assistant_text = ""
- final_trace: Optional[Trace] = None
-
- async for item in runner.run(messages=messages, config=config):
- if isinstance(item, Trace):
- final_trace = item
- if item.status == "running":
- print(f"\n{'='*60}")
- print(f"[Trace] 🚀 开始执行")
- print(f" Trace ID: {item.trace_id[:8]}...")
- print(f"{'='*60}")
- elif item.status == "completed":
- print(f"\n{'='*60}")
- print(f"[Trace] ✅ 执行完成")
- print(f" - 总消息数: {item.total_messages}")
- print(f" - 总 Token 数: {item.total_tokens}")
- print(f" - 总成本: ${item.total_cost:.4f}")
- print(f"{'='*60}")
- elif item.status == "failed":
- print(f"\n{'='*60}")
- print(f"[Trace] ❌ 执行失败")
- print(f" 错误信息: {item.error_message}")
- print(f"{'='*60}")
- elif item.status == "stopped":
- print(f"\n{'='*60}")
- print(f"[Trace] ⏸️ 执行已停止")
- print(f"{'='*60}")
-
- elif isinstance(item, Message):
- # 输出消息序列号和时间信息
- seq_info = f"[序列 {item.sequence}]"
- if item.parent_sequence:
- seq_info += f" (父序列: {item.parent_sequence})"
-
- if item.role == "assistant":
- content = item.content
- if isinstance(content, dict):
- text = content.get("text", "")
- tool_calls = content.get("tool_calls")
-
- # 输出思考过程和文本内容
- if text:
- if not tool_calls:
- # 最终回复
- last_assistant_text = text
- print(f"\n{seq_info} [思考过程] 🤔 Agent 思考与回复:")
- print(f"{'-'*60}")
- print(text)
- print(f"{'-'*60}")
- else:
- # 有工具调用的思考过程
- print(f"\n{seq_info} [思考过程] 🤔 Agent 思考:")
- print(f"{'-'*60}")
- print(text)
- print(f"{'-'*60}")
-
- # 输出工具调用详细信息
- if tool_calls:
- print(f"\n{seq_info} [工具调用] 🛠️ 调用 {len(tool_calls)} 个工具:")
- for idx, tc in enumerate(tool_calls, 1):
- func_info = tc.get("function", {})
- tool_name = func_info.get("name", "unknown")
- tool_id = tc.get("id", "unknown")
- tool_args = func_info.get("arguments", "")
-
- print(f" [{idx}] 工具名称: {tool_name}")
- print(f" 调用 ID: {tool_id[:16]}...")
- try:
- import json
- args_dict = json.loads(tool_args) if isinstance(tool_args, str) else tool_args
- args_str = json.dumps(args_dict, ensure_ascii=False, indent=2)
- if len(args_str) > 500:
- args_str = args_str[:500] + "..."
- print(f" 参数: {args_str}")
- except:
- args_preview = str(tool_args)[:200] + "..." if len(str(tool_args)) > 200 else str(tool_args)
- print(f" 参数: {args_preview}")
- print()
- elif isinstance(content, str):
- # 纯文本内容
- last_assistant_text = content
- print(f"\n{seq_info} [思考过程] 🤔 Agent 回复:")
- print(f"{'-'*60}")
- print(content)
- print(f"{'-'*60}")
-
- elif item.role == "tool":
- # 输出工具执行结果详细信息
- content = item.content
- tool_name = "unknown"
- tool_result = ""
-
- if isinstance(content, dict):
- tool_name = content.get("tool_name", "unknown")
- result = content.get("result", "")
- if isinstance(result, (dict, list)):
- import json
- tool_result = json.dumps(result, ensure_ascii=False, indent=2)
- else:
- tool_result = str(result)
- else:
- tool_result = str(content) if content else ""
-
- print(f"\n{seq_info} [工具结果] ✅ {tool_name} 执行完成")
- if item.tool_call_id:
- print(f" 关联调用 ID: {item.tool_call_id[:16]}...")
- print(f"{'-'*60}")
- if tool_result:
- if len(tool_result) > 1000:
- print(tool_result[:1000])
- print(f"\n... (结果已截断,共 {len(tool_result)} 字符)")
- else:
- print(tool_result)
- else:
- print(" (无返回结果)")
- print(f"{'-'*60}")
-
- # 输出工具描述和元数据(如果有)
- if item.description:
- print(f" 描述: {item.description}")
- if item.duration_ms:
- print(f" 执行耗时: {item.duration_ms}ms")
- if item.cost:
- print(f" 成本: ${item.cost:.6f}")
-
- elif item.role == "user":
- # 输出用户输入(可选,用于调试)
- if isinstance(item.content, str) and len(item.content) > 0:
- preview = item.content[:200] + "..." if len(item.content) > 200 else item.content
- print(f"\n{seq_info} [用户输入] 👤 {preview}")
-
- # 如果没有获取到最终 trace,尝试从 store 获取
- if not final_trace and config.trace_id and runner.trace_store:
- final_trace = await runner.trace_store.get_trace(config.trace_id)
-
- # 如果没有 summary,返回错误信息
- if not last_assistant_text:
- error_msg = final_trace.error_message if final_trace else "Agent 没有产生 assistant 文本结果"
- print(f"\n[Error] {error_msg}")
- return error_msg
-
- return last_assistant_text
|