| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190 |
- """
- 浏览器调研示例 (增强版)
- 功能:
- 1. 使用 Agent 模式进行网络调研
- 2. 任务结束自动关闭浏览器并杀掉进程
- 3. 异常安全:即使程序崩溃也能清理环境
- """
- import os
- import sys
- import asyncio
- from pathlib import Path
- # 添加项目根目录到 Python 路径
- sys.path.insert(0, str(Path(__file__).parent.parent.parent))
- from dotenv import load_dotenv
- load_dotenv()
- import logging
- # 配置感知日志
- logging.basicConfig(level=logging.WARNING) # 默认 WARNING
- logging.getLogger("agent.core.message_manager").setLevel(logging.INFO) # 开启感知日志
- logging.getLogger("tools").setLevel(logging.INFO) # 开启工具日志
- from agent.llm.prompts import SimplePrompt
- from agent.core.runner import AgentRunner, RunConfig
- from agent.trace import (
- FileSystemTraceStore,
- Trace,
- Message,
- )
- from agent.llm import create_openrouter_llm_call
- # 导入浏览器清理工具
- from agent.tools.builtin.browser.baseClass import get_browser_session,kill_browser_session,init_browser_session
- async def main():
- # 路径配置
- base_dir = Path(__file__).parent
- project_root = base_dir.parent.parent
- trace_dir = project_root / ".trace"
- prompt_path = base_dir / "test.prompt"
- output_dir = base_dir / "output"
- output_dir.mkdir(exist_ok=True)
- # Skills 目录
- skills_dir = None
- print("=" * 60)
- print("🚀 浏览器调研任务 (Agent 模式)")
- print("=" * 60)
- print()
- # 1. 加载 prompt
- print("1. 加载 prompt...")
- prompt = SimplePrompt(prompt_path)
- # 提取配置
- system_prompt = prompt._messages.get("system", "")
- user_task = prompt._messages.get("user", "")
- model_name = prompt.config.get('model', 'gemini-2.5-flash')
- temperature = float(prompt.config.get('temperature', 0.3))
- print(f" - 任务: {user_task[:80]}...")
- print(f" - 模型: {model_name}")
-
- # 2. 构建消息
- print("2. 构建任务消息...")
- messages = prompt.build_messages()
- # 3. 创建 Agent Runner
- print("3. 创建 Agent Runner...")
- runner = AgentRunner(
- trace_store=FileSystemTraceStore(base_path=str(trace_dir)),
- llm_call=create_openrouter_llm_call(model=f"google/{model_name}"),
- skills_dir=skills_dir,
- debug=True
- )
- final_response = ""
- current_trace_id = None
- # 4. Agent 模式执行(使用 try...finally 确保清理)
- try:
- print(f"4. 初始化云浏览器...")
- await init_browser_session(browser_type="cloud", headless=True)
- print(f"5. 启动 Agent 模式执行...")
- print()
- async for item in runner.run(
- messages=messages,
- config=RunConfig(
- system_prompt=system_prompt,
- model=f"google/{model_name}",
- temperature=temperature,
- max_iterations=20,
- name=user_task[:50],
- ),
- ):
- # 处理 Trace 对象(整体状态变化)
- if isinstance(item, Trace):
- current_trace_id = item.trace_id
- if item.status == "running":
- print(f"[Trace] 开始: {item.trace_id[:8]}")
- elif item.status == "completed":
- print(f"[Trace] 完成")
- print(f" - Total tokens: {item.total_tokens}")
- print(f" - Total cost: ${item.total_cost:.4f}")
- elif item.status == "failed":
- print(f"[Trace] 失败: {item.error_message}")
- # 处理 Message 对象(执行过程)
- elif isinstance(item, Message):
- if item.role == "assistant":
- content = item.content
- if isinstance(content, dict):
- text = content.get("text", "")
- tool_calls = content.get("tool_calls")
- if text and not tool_calls:
- final_response = text
- print(f"[Response] Agent 给出最终回复")
- elif text:
- # 增加打印长度到 300,方便观察
- print(f"[Assistant] {text[:300]}...")
- if tool_calls:
- for tc in tool_calls:
- tool_name = tc.get("function", {}).get("name", "unknown")
- print(f"[Tool Call] 🛠️ {tool_name}")
- elif item.role == "tool":
- content = item.content
- if isinstance(content, dict):
- tool_name = content.get("tool_name", "unknown")
- print(f"[Tool Result] ✅ {tool_name}")
- if item.description:
- desc = item.description[:80] if len(item.description) > 80 else item.description
- print(f" {desc}...")
- # 5. 输出结果
- print()
- print("=" * 60)
- print("Final Agent Response:")
- print("=" * 60)
- print(final_response)
- print("=" * 60)
- print()
- # 6. 保存结果
- output_file = output_dir / "research_result.txt"
- with open(output_file, 'w', encoding='utf-8') as f:
- f.write(final_response)
- print(f"✓ 结果已保存到: {output_file}")
- except Exception as e:
- print(f"\n❌ 程序运行崩溃: {str(e)}")
- import traceback
- traceback.print_exc()
- finally:
- # --- 核心逻辑:无论成功失败,必须关闭浏览器进程 ---
- print("\n" + "·" * 40)
- print("🧹 正在清理浏览器环境,关闭 CDP 会话并终止进程...")
- try:
- # 强制杀掉浏览器进程,释放容器或本地端口
- await kill_browser_session()
- print("✅ 浏览器已安全关闭。")
- except Exception as cleanup_err:
- print(f"⚠️ 清理浏览器时出现错误: {cleanup_err}")
- print("·" * 40 + "\n")
- # 7. 可视化提示
- if current_trace_id:
- print("=" * 60)
- print("可视化 Step Tree:")
- print("=" * 60)
- print("1. 启动 API Server: python3 api_server.py")
- print(f"2. 访问: http://localhost:8000/api/traces")
- print(f"3. Trace ID: {current_trace_id}")
- print("=" * 60)
- if __name__ == "__main__":
- try:
- asyncio.run(main())
- except KeyboardInterrupt:
- print("\n🛑 用户手动终止 (KeyboardInterrupt),正在强制退出...")
|