| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187 |
- """
- 浏览器调研示例 (交互增强版)
- 功能:
- 1. Agent 模式自动化调研
- 2. 手动接管:随时按 [Enter] 键暂停 Agent 并手动操作浏览器
- 3. 自动清理:无论成功或崩溃,均安全关闭浏览器进程
- """
- import os
- import sys
- import asyncio
- import logging
- import re
- import uuid
- from pathlib import Path
- from datetime import datetime
- from argparse import Namespace
- # 添加项目根目录到 Python 路径
- sys.path.insert(0, str(Path(__file__).parent.parent.parent))
- from dotenv import load_dotenv
- load_dotenv()
- # --- 日志配置 ---
- logging.basicConfig(level=logging.WARNING)
- logging.getLogger("agent.core.message_manager").setLevel(logging.INFO)
- logging.getLogger("tools").setLevel(logging.INFO)
- from agent.llm.prompts import SimplePrompt
- from agent.core.runner import AgentRunner, RunConfig
- from agent.trace import FileSystemTraceStore, Trace, Message
- from agent.llm import create_openrouter_llm_call
- from agent.tools.builtin.browser.baseClass import kill_browser_session
- # ===== 全局交互控制 =====
- pause_event = asyncio.Event()
- async def listen_for_interrupt():
- """后台协程:监听标准输入,按下回车即触发暂停"""
- while True:
- # 在执行器中运行同步的 readline,避免阻塞事件循环
- await asyncio.get_event_loop().run_in_executor(None, sys.stdin.readline)
- if not pause_event.is_set():
- print("\n" + "!" * 40)
- print("🛑 检测到手动干预请求!")
- print("Agent 将在完成当前动作后暂停,请准备接管浏览器。")
- print("!" * 40 + "\n")
- pause_event.set()
- # ===== 核心逻辑 =====
- async def main():
- # 1. 环境准备
- base_dir = Path(__file__).parent
- project_root = base_dir.parent.parent
- trace_dir = project_root / ".trace"
- prompt_path = base_dir / "test.prompt"
- output_dir = base_dir / "output"
- output_dir.mkdir(exist_ok=True)
- print("=" * 60)
- print("🚀 交互式浏览器调研 Agent")
- print("👉 操作指南:")
- print(" - 运行中随时按下 [Enter] 键进入手动接管模式")
- print(" - 在浏览器完成操作后,点击页面上的 'Done' 或回车返回")
- print("=" * 60 + "\n")
- # 2. 加载任务
- prompt = SimplePrompt(prompt_path)
- system_prompt = prompt._messages.get("system", "")
- user_task = prompt._messages.get("user", "")
- # 默认使用 cheap 模型进行调研,如 gemini-3-flash-preview
- model_name = prompt.config.get('model', 'gemini-3-flash-preview')
- temperature = float(prompt.config.get('temperature', 0.3))
- messages = prompt.build_messages()
- # 3. 初始化 Runner
- # 注意:确保你的 openrouter 配置正确
- runner = AgentRunner(
- trace_store=FileSystemTraceStore(base_path=str(trace_dir)),
- llm_call=create_openrouter_llm_call(model=f"google/{model_name}"),
- skills_dir=None,
- debug=True
- )
- # 4. 启动监听任务
- interrupt_task = asyncio.create_task(listen_for_interrupt())
-
- final_response = ""
- current_trace_id = None
- try:
- # 启动 Agent 迭代
- agent_stream = runner.run(
- messages=messages,
- config=RunConfig(
- system_prompt=system_prompt,
- model=f"google/{model_name}",
- temperature=temperature,
- max_iterations=30,
- name=user_task[:50],
- ),
- )
- async for item in agent_stream:
- # --- 检查手动暂停信号 ---
- if pause_event.is_set():
- print("\n" + "🛠️" * 20)
- print(">>> 人工接管模式激活 <<<")
- print("1. 请在浏览器窗口进行必要操作(登录、过验证码等)")
- print("2. 操作完成后,请在终端按 [Enter] 或在页面点击交互按钮继续")
-
- try:
- # 调用内置的等待交互工具
- await runner.tools.execute(
- "browser_wait_for_user_action",
- {"message": "人工干预中,请完成操作后恢复 Agent"},
- uid="human_admin",
- context={"runner": runner}
- )
- except Exception as e:
- print(f"⚠️ 交互工具调用失败: {e}")
-
- print(">>> 交互结束,交还控制权给 Agent <<<")
- print("🛠️" * 20 + "\n")
- pause_event.clear()
- # --- 正常处理 Agent 消息输出 ---
- if isinstance(item, Trace):
- current_trace_id = item.trace_id
- if item.status == "running":
- print(f"[{datetime.now().strftime('%H:%M:%S')}] 🛰️ Trace 启动: {item.trace_id[:8]}")
- elif item.status == "completed":
- print(f"\n✅ 任务圆满完成!Cost: ${item.total_cost:.4f}")
- elif isinstance(item, Message):
- if item.role == "assistant":
- content = item.content
- if isinstance(content, dict):
- text = content.get("text", "")
- tool_calls = content.get("tool_calls")
- if text:
- # 打印摘要,带点 Wit
- print(f"\n🤖 Agent: {text[:200]}..." if len(text) > 200 else f"\n🤖 Agent: {text}")
- if tool_calls:
- for tc in tool_calls:
- t_name = tc.get("function", {}).get("name", "unknown")
- print(f" 🛠️ 执行工具: {t_name}")
-
- elif item.role == "tool":
- t_content = item.content
- if isinstance(t_content, dict):
- t_name = t_content.get("tool_name", "unknown")
- print(f" ✅ 工具返回: {t_name}")
- except Exception as e:
- print(f"\n🔥 发生严重错误: {e}")
- import traceback
- traceback.print_exc()
- finally:
- # 停止监听协程
- interrupt_task.cancel()
-
- # 5. 强制清理浏览器环境
- print("\n" + "·" * 40)
- print("🧹 正在执行环境清理...")
- try:
- await kill_browser_session()
- print("✨ 浏览器进程已安全终止。")
- except Exception as err:
- print(f"❌ 清理失败: {err}")
- print("·" * 40 + "\n")
- # 6. 结果展示
- if current_trace_id:
- print(f"🔍 任务 Trace ID: {current_trace_id}")
- print(f"📊 访问可视化面板查看详情。")
- if __name__ == "__main__":
- try:
- asyncio.run(main())
- except KeyboardInterrupt:
- print("\n👋 收到退出信号,程序已停止。")
|