""" Browser-Use 原生工具适配器 Native Browser-Use Tools Adapter 直接使用 browser-use 的原生类(BrowserSession, Tools)实现所有浏览器操作工具。 不依赖 Playwright,完全基于 CDP 协议。 核心特性: 1. 浏览器会话持久化 - 只启动一次浏览器 2. 状态自动保持 - 登录状态、Cookie、LocalStorage 等 3. 完整的底层访问 - 可以直接使用 CDP 协议 4. 性能优异 - 避免频繁创建/销毁浏览器实例 5. 多种浏览器类型 - 支持 local、cloud、container 三种模式 支持的浏览器类型: 1. Local (本地浏览器): - 在本地运行 Chrome - 支持可视化调试 - 速度最快 - 示例: init_browser_session(browser_type="local") 2. Cloud (云浏览器): - 在云端运行 - 不占用本地资源 - 适合生产环境 - 示例: init_browser_session(browser_type="cloud") 3. Container (容器浏览器): - 在独立容器中运行 - 隔离性好 - 支持预配置账户 - 示例: init_browser_session(browser_type="container", container_url="https://example.com") 使用方法: 1. 在 Agent 初始化时调用 init_browser_session() 并指定 browser_type 2. 使用各个工具函数执行浏览器操作 3. 任务结束时调用 cleanup_browser_session() 文件操作说明: - 浏览器专用文件目录:.cache/.browser_use_files/ (在当前工作目录下) 用于存储浏览器会话产生的临时文件(下载、上传、截图等) - 一般文件操作:请使用 agent.tools.builtin 中的文件工具 (read_file, write_file, edit_file) 这些工具功能更完善,支持diff预览、智能匹配、分页读取等 """ import logging import sys import os import json import httpx import asyncio import aiohttp import re import base64 from urllib.parse import urlparse, parse_qs, unquote from typing import Optional, List, Dict, Any, Tuple, Union from pathlib import Path from langchain_core.runnables import RunnableLambda from argparse import Namespace # 使用 Namespace 快速构造带属性的对象 from langchain_core.messages import AIMessage from ....llm.openrouter import openrouter_llm_call # 将项目根目录添加到 Python 路径 sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) # 导入框架的工具装饰器和结果类 from agent.tools import tool, ToolResult from agent.tools.builtin.browser.sync_mysql_help import mysql # 导入 browser-use 的核心类 from browser_use import BrowserSession, BrowserProfile from browser_use.tools.service import Tools try: from browser_use.tools.views import ReadContentAction # type: ignore except Exception: from pydantic import BaseModel class ReadContentAction(BaseModel): goal: str source: str = "page" context: str = "" from browser_use.agent.views import ActionResult from browser_use.filesystem.file_system import FileSystem # ============================================================ # 无需注册的内部辅助函数 # ============================================================ # ============================================================ # 全局浏览器会话管理 # ============================================================ # 全局变量:浏览器会话和工具实例 _browser_session: Optional[BrowserSession] = None _browser_tools: Optional[Tools] = None _file_system: Optional[FileSystem] = None async def create_container(url: str, account_name: str = "liuwenwu") -> Dict[str, Any]: """ 创建浏览器容器并导航到指定URL 按照 test.md 的要求: 1.1 调用接口创建容器 1.2 调用接口创建窗口并导航到URL Args: url: 要导航的URL地址 account_name: 账户名称 Returns: 包含容器信息的字典: - success: 是否成功 - container_id: 容器ID - vnc: VNC访问URL - cdp: CDP协议URL(用于浏览器连接) - connection_id: 窗口连接ID - error: 错误信息(如果失败) """ result = { "success": False, "container_id": None, "vnc": None, "cdp": None, "connection_id": None, "error": None } try: async with aiohttp.ClientSession() as session: # 步骤1.1: 创建容器 print("📦 步骤1.1: 创建容器...") create_url = "http://47.84.182.56:8200/api/v1/container/create" create_payload = { "auto_remove": True, "need_port_binding": True, "max_lifetime_seconds": 900 } async with session.post(create_url, json=create_payload) as resp: if resp.status != 200: raise RuntimeError(f"创建容器失败: HTTP {resp.status}") create_result = await resp.json() if create_result.get("code") != 0: raise RuntimeError(f"创建容器失败: {create_result.get('msg')}") data = create_result.get("data", {}) result["container_id"] = data.get("container_id") result["vnc"] = data.get("vnc") result["cdp"] = data.get("cdp") print(f"✅ 容器创建成功") print(f" Container ID: {result['container_id']}") print(f" VNC: {result['vnc']}") print(f" CDP: {result['cdp']}") # 等待容器内的浏览器启动 print(f"\n⏳ 等待容器内浏览器启动...") await asyncio.sleep(5) # 步骤1.2: 创建页面并导航 print(f"\n📱 步骤1.2: 创建页面并导航到 {url}...") page_create_url = "http://47.84.182.56:8200/api/v1/browser/page/create" page_payload = { "container_id": result["container_id"], "url": url, "account_name": account_name, "need_wait": True, "timeout": 30 } # 重试机制:最多尝试3次 max_retries = 3 page_created = False last_error = None for attempt in range(max_retries): try: if attempt > 0: print(f" 重试 {attempt + 1}/{max_retries}...") await asyncio.sleep(3) # 重试前等待 async with session.post(page_create_url, json=page_payload, timeout=aiohttp.ClientTimeout(total=60)) as resp: if resp.status != 200: response_text = await resp.text() last_error = f"HTTP {resp.status}: {response_text[:200]}" continue page_result = await resp.json() if page_result.get("code") != 0: last_error = f"{page_result.get('msg')}" continue page_data = page_result.get("data", {}) result["connection_id"] = page_data.get("connection_id") result["success"] = True page_created = True print(f"✅ 页面创建成功") print(f" Connection ID: {result['connection_id']}") break except asyncio.TimeoutError: last_error = "请求超时" continue except aiohttp.ClientError as e: last_error = f"网络错误: {str(e)}" continue except Exception as e: last_error = f"未知错误: {str(e)}" continue if not page_created: raise RuntimeError(f"创建页面失败(尝试{max_retries}次后): {last_error}") except Exception as e: result["error"] = str(e) print(f"❌ 错误: {str(e)}") return result async def init_browser_session( browser_type: str = "local", headless: bool = False, url: Optional[str] = None, profile_name: str = "default", user_data_dir: Optional[str] = None, browser_profile: Optional[BrowserProfile] = None, **kwargs ) -> tuple[BrowserSession, Tools]: global _browser_session, _browser_tools, _file_system if _browser_session is not None: return _browser_session, _browser_tools valid_types = ["local", "cloud", "container"] if browser_type not in valid_types: raise ValueError(f"无效的 browser_type: {browser_type}") # --- 核心:定义本地统一存储路径 --- save_dir = Path.cwd() / ".cache/.browser_use_files" save_dir.mkdir(parents=True, exist_ok=True) # 基础参数配置 session_params = { "headless": headless, # 告诉 Playwright 所有的下载临时流先存入此本地目录 "downloads_path": str(save_dir), } if browser_type == "container": print("🐳 使用容器浏览器模式") if not url: url = "about:blank" container_info = await create_container(url=url, account_name=profile_name) if not container_info["success"]: raise RuntimeError(f"容器创建失败: {container_info['error']}") session_params["cdp_url"] = container_info["cdp"] await asyncio.sleep(3) elif browser_type == "cloud": print("🌐 使用云浏览器模式") session_params["use_cloud"] = True if profile_name and profile_name != "default": session_params["cloud_profile_id"] = profile_name else: # local print("💻 使用本地浏览器模式") session_params["is_local"] = True if user_data_dir is None and profile_name: user_data_dir = str(Path.home() / ".browser_use" / "profiles" / profile_name) Path(user_data_dir).mkdir(parents=True, exist_ok=True) session_params["user_data_dir"] = user_data_dir # macOS 路径兼容 import platform if platform.system() == "Darwin": chrome_path = "/Applications/Google Chrome.app/Contents/MacOS/Google Chrome" if Path(chrome_path).exists(): session_params["executable_path"] = chrome_path if browser_profile: session_params["browser_profile"] = browser_profile session_params.update(kwargs) # 创建会话 _browser_session = BrowserSession(**session_params) await _browser_session.start() _browser_tools = Tools() _file_system = FileSystem(base_dir=str(save_dir)) print(f"✅ 浏览器会话初始化成功 | 默认下载路径: {save_dir}") if browser_type in ["local", "cloud"] and url: await _browser_tools.navigate(url=url, browser_session=_browser_session) return _browser_session, _browser_tools async def get_browser_session() -> tuple[BrowserSession, Tools]: """ 获取当前浏览器会话,如果不存在或连接已断开则自动重新创建 Returns: (BrowserSession, Tools) 元组 """ global _browser_session, _browser_tools, _file_system if _browser_session is not None: # 检查底层 CDP 连接是否仍然存活 # 当 runner.stop() 暂停后用户在菜单停留较久,WebSocket 可能超时断开, # 但 _browser_session 对象仍然存在,导致后续操作抛出 ConnectionClosedError alive = False try: cdp_root = getattr(_browser_session, '_cdp_client_root', None) sess_mgr = getattr(_browser_session, 'session_manager', None) if cdp_root is not None and sess_mgr is not None: cdp_session = await _browser_session.get_or_create_cdp_session() await asyncio.wait_for( cdp_session.cdp_client.send.Runtime.evaluate( params={'expression': '1+1'}, session_id=cdp_session.session_id ), timeout=3.0, ) alive = True except Exception: pass if not alive: print("⚠️ 浏览器会话连接已断开,正在重新初始化...") try: await cleanup_browser_session() except Exception: _browser_session = None _browser_tools = None _file_system = None if _browser_session is None: await init_browser_session() return _browser_session, _browser_tools async def cleanup_browser_session(): """ 清理浏览器会话 优雅地停止浏览器但保留会话状态 """ global _browser_session, _browser_tools, _file_system if _browser_session is not None: await _browser_session.stop() _browser_session = None _browser_tools = None _file_system = None async def kill_browser_session(): """ 强制终止浏览器会话 完全关闭浏览器进程 """ global _browser_session, _browser_tools, _file_system if _browser_session is not None: await _browser_session.kill() _browser_session = None _browser_tools = None _file_system = None # ============================================================ # 辅助函数:ActionResult 转 ToolResult # ============================================================ def action_result_to_tool_result(result: ActionResult, title: str = None) -> ToolResult: """ 将 browser-use 的 ActionResult 转换为框架的 ToolResult Args: result: browser-use 的 ActionResult title: 可选的标题(如果不提供则从 result 推断) Returns: ToolResult """ if result.error: return ToolResult( title=title or "操作失败", output="", error=result.error, long_term_memory=result.long_term_memory or result.error ) return ToolResult( title=title or "操作成功", output=result.extracted_content or "", long_term_memory=result.long_term_memory or result.extracted_content or "", metadata=result.metadata or {} ) def _cookie_domain_for_type(cookie_type: str, url: str) -> Tuple[str, str]: if cookie_type: key = cookie_type.lower() if key in {"xiaohongshu", "xhs"}: return ".xiaohongshu.com", "https://www.xiaohongshu.com" parsed = urlparse(url or "") domain = parsed.netloc or "" domain = domain.replace("www.", "") if domain: domain = f".{domain}" base_url = f"{parsed.scheme}://{parsed.netloc}" if parsed.scheme and parsed.netloc else url return domain, base_url def _parse_cookie_string(cookie_str: str, domain: str, url: str) -> List[Dict[str, Any]]: cookies: List[Dict[str, Any]] = [] if not cookie_str: return cookies parts = cookie_str.split(";") for part in parts: if not part: continue if "=" not in part: continue name, value = part.split("=", 1) cookie = { "name": str(name).strip(), "value": str(value).strip(), "domain": domain, "path": "/", "expires": -1, "httpOnly": False, "secure": True, "sameSite": "None" } if url: cookie["url"] = url cookies.append(cookie) return cookies def _normalize_cookies(cookie_value: Any, domain: str, url: str) -> List[Dict[str, Any]]: if cookie_value is None: return [] if isinstance(cookie_value, list): return cookie_value if isinstance(cookie_value, dict): if "cookies" in cookie_value: return _normalize_cookies(cookie_value.get("cookies"), domain, url) if "name" in cookie_value and "value" in cookie_value: return [cookie_value] return [] if isinstance(cookie_value, (bytes, bytearray)): cookie_value = cookie_value.decode("utf-8", errors="ignore") if isinstance(cookie_value, str): text = cookie_value.strip() if not text: return [] try: parsed = json.loads(text) except Exception: parsed = None if parsed is not None: return _normalize_cookies(parsed, domain, url) return _parse_cookie_string(text, domain, url) return [] def _extract_cookie_value(row: Optional[Dict[str, Any]]) -> Any: if not row: return None # 优先使用 cookies 字段 if "cookies" in row: return row["cookies"] # 兼容其他可能的字段名 for key, value in row.items(): if "cookie" in key.lower(): return value return None def _fetch_cookie_row(cookie_type: str) -> Optional[Dict[str, Any]]: if not cookie_type: return None try: return mysql.fetchone( "select * from agent_channel_cookies where type=%s limit 1", (cookie_type,) ) except Exception: return None def _fetch_profile_id(cookie_type: str) -> Optional[str]: """从数据库获取 cloud_profile_id""" if not cookie_type: return None try: row = mysql.fetchone( "select profileId from agent_channel_cookies where type=%s limit 1", (cookie_type,) ) if row and "profileId" in row: return row["profileId"] return None except Exception: return None # ============================================================ # 需要注册的工具 # ============================================================ # ============================================================ # 导航类工具 (Navigation Tools) # ============================================================ @tool() async def browser_navigate_to_url(url: str, new_tab: bool = False) -> ToolResult: """ 导航到指定的 URL Navigate to a specific URL 使用 browser-use 的原生导航功能,支持在新标签页打开。 Args: url: 要访问的 URL 地址 new_tab: 是否在新标签页中打开(默认 False) Returns: ToolResult: 包含导航结果的工具返回对象 Example: navigate_to_url("https://www.baidu.com") navigate_to_url("https://www.google.com", new_tab=True) """ try: browser, tools = await get_browser_session() # 使用 browser-use 的 navigate 工具 result = await tools.navigate( url=url, new_tab=new_tab, browser_session=browser ) return action_result_to_tool_result(result, f"导航到 {url}") except Exception as e: return ToolResult( title="导航失败", output="", error=f"Failed to navigate to {url}: {str(e)}", long_term_memory=f"导航到 {url} 失败" ) @tool() async def browser_search_web(query: str, engine: str = "bing") -> ToolResult: """ 使用搜索引擎搜索 Search the web using a search engine Args: query: 搜索关键词 engine: 搜索引擎 (google, duckduckgo, bing) - 默认: google Returns: ToolResult: 搜索结果 Example: search_web("Python async programming", engine="google") """ try: browser, tools = await get_browser_session() # 使用 browser-use 的 search 工具 result = await tools.search( query=query, engine=engine, browser_session=browser ) return action_result_to_tool_result(result, f"搜索: {query}") except Exception as e: return ToolResult( title="搜索失败", output="", error=f"Search failed: {str(e)}", long_term_memory=f"搜索 '{query}' 失败" ) @tool() async def browser_go_back() -> ToolResult: """ 返回到上一个页面 Go back to the previous page 模拟浏览器的"后退"按钮功能。 Returns: ToolResult: 包含返回操作结果的工具返回对象 """ try: browser, tools = await get_browser_session() result = await tools.go_back(browser_session=browser) return action_result_to_tool_result(result, "返回上一页") except Exception as e: return ToolResult( title="返回失败", output="", error=f"Failed to go back: {str(e)}", long_term_memory="返回上一页失败" ) @tool() async def browser_wait(seconds: int = 3) -> ToolResult: """ 等待指定的秒数 Wait for a specified number of seconds 用于等待页面加载、动画完成或其他异步操作。 Args: seconds: 等待时间(秒),最大30秒 Returns: ToolResult: 包含等待操作结果的工具返回对象 Example: wait(5) # 等待5秒 """ try: browser, tools = await get_browser_session() result = await tools.wait(seconds=seconds, browser_session=browser) return action_result_to_tool_result(result, f"等待 {seconds} 秒") except Exception as e: return ToolResult( title="等待失败", output="", error=f"Failed to wait: {str(e)}", long_term_memory="等待失败" ) # ============================================================ # 元素交互工具 (Element Interaction Tools) # ============================================================ # 定义一个专门捕获下载链接的 Handler class DownloadLinkCaptureHandler(logging.Handler): def __init__(self): super().__init__() self.captured_url = None def emit(self, record): # 如果已经捕获到了(通常第一条是最完整的),就不再处理后续日志 if self.captured_url: return message = record.getMessage() # 寻找包含下载信息的日志 if "redirection?filename=" in message or "Failed to download" in message: # 使用更严格的正则,确保不抓取带省略号(...)的截断链接 # 排除掉末尾带有三个点的干扰 match = re.search(r"https?://[^\s]+(?!\.\.\.)", message) if match: url = match.group(0) # 再次过滤:如果发现提取出的 URL 确实包含三个点,说明依然抓到了截断版,跳过 if "..." not in url: self.captured_url = url # print(f"🎯 成功锁定完整直链: {url[:50]}...") # 调试用 @tool() async def browser_download_direct_url(url: str, save_name: str = "book.epub") -> ToolResult: save_dir = Path.cwd() / ".cache/.browser_use_files" save_dir.mkdir(parents=True, exist_ok=True) # 提取域名作为 Referer,这能骗过 90% 的防盗链校验 from urllib.parse import urlparse parsed_url = urlparse(url) base_url = f"{parsed_url.scheme}://{parsed_url.netloc}/" # 如果没传 save_name,自动从 URL 获取 if not save_name: import unquote # 尝试从 URL 路径获取文件名并解码(处理中文) save_name = Path(urlparse(url).path).name or f"download_{int(time.time())}" save_name = unquote(save_name) target_path = save_dir / save_name headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", "Accept": "*/*", "Referer": base_url, # 动态设置 Referer "Range": "bytes=0-", # 有时对大文件下载有奇效 } try: print(f"🚀 开始下载: {url[:60]}...") # 使用 follow_redirects=True 处理链接中的 redirection async with httpx.AsyncClient(headers=headers, follow_redirects=True, timeout=60.0) as client: async with client.stream("GET", url) as response: if response.status_code != 200: print(f"❌ 下载失败,HTTP 状态码: {response.status_code}") return # 获取实际文件名(如果服务器提供了) # 这里会优先使用你指定的 save_name with open(target_path, "wb") as f: downloaded_bytes = 0 async for chunk in response.aiter_bytes(): f.write(chunk) downloaded_bytes += len(chunk) if downloaded_bytes % (1024 * 1024) == 0: # 每下载 1MB 打印一次 print(f"📥 已下载: {downloaded_bytes // (1024 * 1024)} MB") print(f"✅ 下载完成!文件已存至: {target_path}") success_msg = f"✅ 下载完成!文件已存至: {target_path}" return ToolResult( title="直链下载成功", output=success_msg, long_term_memory=success_msg, metadata={"path": str(target_path)} ) except Exception as e: # 异常捕获返回 return ToolResult( title="下载异常", output="", error=f"💥 发生错误: {str(e)}", long_term_memory=f"下载任务由于异常中断: {str(e)}" ) @tool() async def browser_click_element(index: int) -> ToolResult: """ 点击页面元素,并自动通过拦截内部日志获取下载直链。 """ # 1. 挂载日志窃听器 capture_handler = DownloadLinkCaptureHandler() logger = logging.getLogger("browser_use") # 拦截整个 browser_use 命名空间 logger.addHandler(capture_handler) try: browser, tools = await get_browser_session() # 2. 执行原生的点击动作 result = await tools.click( index=index, browser_session=browser ) # 3. 检查是否有“意外收获” download_msg = "" if capture_handler.captured_url: captured_url = capture_handler.captured_url download_msg = f"\n\n⚠️ 系统检测到浏览器下载被拦截,已自动捕获准确直链:\n{captured_url}\n\n建议:你可以直接使用 browser_download_direct_url 工具下载此链接。" # 如果你想更激进一点,甚至可以在这里直接自动触发本地下载逻辑 # await auto_download_file(captured_url) # 4. 转换结果并附加捕获的信息 tool_result = action_result_to_tool_result(result, f"点击元素 {index}") if download_msg: # 关键:把日志里的信息塞进 output,这样 LLM 就能看到了! tool_result.output = (tool_result.output or "") + download_msg tool_result.long_term_memory = (tool_result.long_term_memory or "") + f" 捕获下载链接: {captured_url}" return tool_result except Exception as e: return ToolResult( title="点击失败", output="", error=f"Failed to click element {index}: {str(e)}", long_term_memory=f"点击元素 {index} 失败" ) finally: # 5. 务必移除监听器,防止内存泄漏和日志污染 logger.removeHandler(capture_handler) @tool() async def browser_input_text(index: int, text: str, clear: bool = True) -> ToolResult: """ 在指定元素中输入文本 Input text into an element Args: index: 元素索引(从浏览器状态中获取) text: 要输入的文本内容 clear: 是否先清除现有文本(默认 True) Returns: ToolResult: 包含输入操作结果的工具返回对象 Example: input_text(index=0, text="Hello World", clear=True) """ try: browser, tools = await get_browser_session() result = await tools.input( index=index, text=text, clear=clear, browser_session=browser ) return action_result_to_tool_result(result, f"输入文本到元素 {index}") except Exception as e: return ToolResult( title="输入失败", output="", error=f"Failed to input text into element {index}: {str(e)}", long_term_memory=f"输入文本失败" ) @tool() async def browser_send_keys(keys: str) -> ToolResult: """ 发送键盘按键或快捷键 Send keyboard keys or shortcuts 支持发送单个按键、组合键和快捷键。 Args: keys: 要发送的按键字符串 - 单个按键: "Enter", "Escape", "PageDown", "Tab" - 组合键: "Control+o", "Shift+Tab", "Alt+F4" - 功能键: "F1", "F2", ..., "F12" Returns: ToolResult: 包含按键操作结果的工具返回对象 Example: send_keys("Enter") send_keys("Control+A") """ try: browser, tools = await get_browser_session() result = await tools.send_keys( keys=keys, browser_session=browser ) return action_result_to_tool_result(result, f"发送按键: {keys}") except Exception as e: return ToolResult( title="发送按键失败", output="", error=f"Failed to send keys: {str(e)}", long_term_memory="发送按键失败" ) @tool() async def browser_upload_file(index: int, path: str) -> ToolResult: """ 上传文件到文件输入元素 Upload a file to a file input element Args: index: 文件输入框的元素索引 path: 要上传的文件路径(绝对路径) Returns: ToolResult: 包含上传操作结果的工具返回对象 Example: upload_file(index=7, path="/path/to/file.pdf") Note: 文件必须存在且路径必须是绝对路径 """ try: browser, tools = await get_browser_session() result = await tools.upload_file( index=index, path=path, browser_session=browser, available_file_paths=[path], file_system=_file_system ) return action_result_to_tool_result(result, f"上传文件: {path}") except Exception as e: return ToolResult( title="上传失败", output="", error=f"Failed to upload file: {str(e)}", long_term_memory=f"上传文件 {path} 失败" ) # ============================================================ # 滚动和视图工具 (Scroll & View Tools) # ============================================================ @tool() async def browser_scroll_page(down: bool = True, pages: float = 1.0, index: Optional[int] = None) -> ToolResult: try: # 限制单次滚动幅度,避免 agent 一次滚 100 页 MAX_PAGES = 10 if pages > MAX_PAGES: pages = MAX_PAGES browser, tools = await get_browser_session() cdp_session = await browser.get_or_create_cdp_session() before_y_result = await cdp_session.cdp_client.send.Runtime.evaluate( params={'expression': 'window.scrollY'}, session_id=cdp_session.session_id ) before_y = before_y_result.get('result', {}).get('value', 0) # 执行滚动 result = await tools.scroll(down=down, pages=pages, index=index, browser_session=browser) # 等待渲染(懒加载页面需要更长时间) await asyncio.sleep(2) after_y_result = await cdp_session.cdp_client.send.Runtime.evaluate( params={'expression': 'window.scrollY'}, session_id=cdp_session.session_id ) after_y = after_y_result.get('result', {}).get('value', 0) # 如果第一次检测没动,再等一轮(应对懒加载触发后的延迟滚动) if before_y == after_y and index is None: await asyncio.sleep(2) retry_result = await cdp_session.cdp_client.send.Runtime.evaluate( params={'expression': 'window.scrollY'}, session_id=cdp_session.session_id ) after_y = retry_result.get('result', {}).get('value', 0) if before_y == after_y and index is None: direction = "下" if down else "上" return ToolResult( title="滚动无效", output=f"页面已到达{direction}边界,无法继续滚动", error="No movement detected" ) delta = abs(after_y - before_y) direction = "下" if down else "上" return action_result_to_tool_result(result, f"已向{direction}滚动 {delta}px") except Exception as e: # --- 核心修复 2: 必须补全 output 参数,否则框架会报错 --- return ToolResult( title="滚动失败", output="", # 补全这个缺失的必填参数 error=str(e) ) @tool() async def browser_find_text(text: str) -> ToolResult: """ 查找页面中的文本并滚动到该位置 Find text on the page and scroll to it 在页面中搜索指定的文本,找到后自动滚动到该位置。 Args: text: 要查找的文本内容 Returns: ToolResult: 包含查找结果的工具返回对象 Example: find_text("Privacy Policy") """ try: browser, tools = await get_browser_session() result = await tools.find_text( text=text, browser_session=browser ) return action_result_to_tool_result(result, f"查找文本: {text}") except Exception as e: return ToolResult( title="查找失败", output="", error=f"Failed to find text: {str(e)}", long_term_memory=f"查找文本 '{text}' 失败" ) @tool() async def browser_get_visual_selector_map() -> ToolResult: """ 获取当前页面的视觉快照和交互元素索引映射。 Get visual snapshot and selector map of interactive elements. 该工具会同时执行两个操作: 1. 捕捉当前页面的截图,并用 browser-use 内置方法在截图上标注元素索引号。 2. 生成页面所有可交互元素的索引字典(含 href、type 等属性信息)。 Returns: ToolResult: 包含高亮截图(在 images 中)和元素列表的工具返回对象。 """ try: browser, _ = await get_browser_session() # 1. 构造同时包含 DOM 和 截图 的请求 from browser_use.browser.events import BrowserStateRequestEvent from browser_use.browser.python_highlights import create_highlighted_screenshot_async event = browser.event_bus.dispatch( BrowserStateRequestEvent( include_dom=True, include_screenshot=True, include_recent_events=False ) ) # 2. 等待浏览器返回完整状态 browser_state = await event.event_result(raise_if_none=True, raise_if_any=True) # 3. 提取 Selector Map selector_map = browser_state.dom_state.selector_map if browser_state.dom_state else {} # 4. 提取截图并生成带索引标注的高亮截图(通过 CDP 获取精确 DPI 和滚动偏移) screenshot_b64 = browser_state.screenshot or "" highlighted_b64 = "" if screenshot_b64 and selector_map: try: cdp_session = await browser.get_or_create_cdp_session() highlighted_b64 = await create_highlighted_screenshot_async( screenshot_b64, selector_map, cdp_session=cdp_session, filter_highlight_ids=False ) except Exception: highlighted_b64 = screenshot_b64 # fallback to raw screenshot else: highlighted_b64 = screenshot_b64 # 5. 构建供 Agent 阅读的完整元素列表,包含丰富的属性信息 elements_info = [] for index, node in selector_map.items(): tag = node.tag_name attrs = node.attributes or {} desc = attrs.get('aria-label') or attrs.get('placeholder') or attrs.get('title') or node.get_all_children_text(max_depth=1) or "" # 收集有用的属性片段 extra_parts = [] if attrs.get('href'): extra_parts.append(f"href={attrs['href'][:60]}") if attrs.get('type'): extra_parts.append(f"type={attrs['type']}") if attrs.get('role'): extra_parts.append(f"role={attrs['role']}") if attrs.get('name'): extra_parts.append(f"name={attrs['name']}") extra = f" ({', '.join(extra_parts)})" if extra_parts else "" elements_info.append(f"Index {index}: <{tag}> \"{desc[:50]}\"{extra}") output = f"页面截图已捕获(含元素索引标注)\n找到 {len(selector_map)} 个交互元素\n\n" output += "元素列表:\n" + "\n".join(elements_info) # 6. 将高亮截图存入 images 字段,metadata 保留结构化数据 images = [] if highlighted_b64: images.append({"type": "base64", "media_type": "image/png", "data": highlighted_b64}) return ToolResult( title="视觉元素观察", output=output, long_term_memory=f"在页面观察到 {len(selector_map)} 个元素并保存了截图", images=images, metadata={ "selector_map": {k: str(v) for k, v in list(selector_map.items())[:100]}, "url": browser_state.url, "title": browser_state.title } ) except Exception as e: return ToolResult( title="视觉观察失败", output="", error=f"Failed to get visual selector map: {str(e)}", long_term_memory="获取视觉元素映射失败" ) @tool() async def browser_screenshot() -> ToolResult: """ 请求在下次观察中包含页面截图 Request a screenshot to be included in the next observation 用于视觉检查页面状态,帮助理解页面布局和内容。 Returns: ToolResult: 包含截图请求结果的工具返回对象 Example: screenshot() Note: 截图会在下次页面观察时自动包含在结果中。 """ try: browser, tools = await get_browser_session() result = await tools.screenshot(browser_session=browser) return action_result_to_tool_result(result, "截图请求") except Exception as e: return ToolResult( title="截图失败", output="", error=f"Failed to capture screenshot: {str(e)}", long_term_memory="截图失败" ) # ============================================================ # 标签页管理工具 (Tab Management Tools) # ============================================================ @tool() async def browser_switch_tab(tab_id: str) -> ToolResult: """ 切换到指定标签页 Switch to a different browser tab Args: tab_id: 4字符标签ID(target_id 的最后4位) Returns: ToolResult: 切换结果 Example: switch_tab(tab_id="a3f2") """ try: browser, tools = await get_browser_session() normalized_tab_id = tab_id[-4:] if tab_id else tab_id result = await tools.switch( tab_id=normalized_tab_id, browser_session=browser ) return action_result_to_tool_result(result, f"切换到标签页 {normalized_tab_id}") except Exception as e: return ToolResult( title="切换标签页失败", output="", error=f"Failed to switch tab: {str(e)}", long_term_memory=f"切换到标签页 {tab_id} 失败" ) @tool() async def browser_close_tab(tab_id: str) -> ToolResult: """ 关闭指定标签页 Close a browser tab Args: tab_id: 4字符标签ID Returns: ToolResult: 关闭结果 Example: close_tab(tab_id="a3f2") """ try: browser, tools = await get_browser_session() normalized_tab_id = tab_id[-4:] if tab_id else tab_id result = await tools.close( tab_id=normalized_tab_id, browser_session=browser ) return action_result_to_tool_result(result, f"关闭标签页 {normalized_tab_id}") except Exception as e: return ToolResult( title="关闭标签页失败", output="", error=f"Failed to close tab: {str(e)}", long_term_memory=f"关闭标签页 {tab_id} 失败" ) # ============================================================ # 下拉框工具 (Dropdown Tools) # ============================================================ @tool() async def browser_get_dropdown_options(index: int) -> ToolResult: """ 获取下拉框的所有选项 Get options from a dropdown element Args: index: 下拉框的元素索引 Returns: ToolResult: 包含所有选项的结果 Example: get_dropdown_options(index=8) """ try: browser, tools = await get_browser_session() result = await tools.dropdown_options( index=index, browser_session=browser ) return action_result_to_tool_result(result, f"获取下拉框选项: {index}") except Exception as e: return ToolResult( title="获取下拉框选项失败", output="", error=f"Failed to get dropdown options: {str(e)}", long_term_memory=f"获取下拉框 {index} 选项失败" ) @tool() async def browser_select_dropdown_option(index: int, text: str) -> ToolResult: """ 选择下拉框选项 Select an option from a dropdown Args: index: 下拉框的元素索引 text: 要选择的选项文本(精确匹配) Returns: ToolResult: 选择结果 Example: select_dropdown_option(index=8, text="Option 2") """ try: browser, tools = await get_browser_session() result = await tools.select_dropdown( index=index, text=text, browser_session=browser ) return action_result_to_tool_result(result, f"选择下拉框选项: {text}") except Exception as e: return ToolResult( title="选择下拉框选项失败", output="", error=f"Failed to select dropdown option: {str(e)}", long_term_memory=f"选择选项 '{text}' 失败" ) # ============================================================ # 内容提取工具 (Content Extraction Tools) # ============================================================ def scrub_search_redirect_url(url: str) -> str: """ 自动检测并解析 Bing/Google 等搜索引擎的重定向链接,提取真实目标 URL。 """ if not url or not isinstance(url, str): return url try: parsed = urlparse(url) # 1. 处理 Bing 重定向 (特征:u 参数带 Base64) # 示例:...&u=a1aHR0cHM6Ly96aHVhbmxhbi56aGlodS5jb20vcC8zODYxMjgwOQ&... if "bing.com" in parsed.netloc: u_param = parse_qs(parsed.query).get('u', [None])[0] if u_param: # 移除开头的 'a1', 'a0' 等标识符 b64_str = u_param[2:] # 补齐 Base64 填充符 padding = '=' * (4 - len(b64_str) % 4) decoded = base64.b64decode(b64_str + padding).decode('utf-8', errors='ignore') if decoded.startswith('http'): return decoded # 2. 处理 Google 重定向 (特征:url 参数) if "google.com" in parsed.netloc: url_param = parse_qs(parsed.query).get('url', [None])[0] if url_param: return unquote(url_param) # 3. 兜底:处理常见的跳转参数 for param in ['target', 'dest', 'destination', 'link']: found = parse_qs(parsed.query).get(param, [None])[0] if found and found.startswith('http'): return unquote(found) except Exception: pass # 解析失败则返回原链接 return url async def extraction_adapter(input_data): # 提取字符串 if isinstance(input_data, list): prompt = input_data[-1].content if hasattr(input_data[-1], 'content') else str(input_data[-1]) else: prompt = str(input_data) response = await openrouter_llm_call( messages=[{"role": "user", "content": prompt}] ) content = response["content"] # --- 核心改进:URL 自动修复 --- # 使用正则表达式匹配内容中的所有 URL,并尝试进行洗涤 urls = re.findall(r'https?://[^\s<>"\']+', content) for original_url in urls: clean_url = scrub_search_redirect_url(original_url) if clean_url != original_url: content = content.replace(original_url, clean_url) from argparse import Namespace return Namespace(completion=content) @tool() async def browser_extract_content(query: str, extract_links: bool = False, start_from_char: int = 0) -> ToolResult: """ 使用 LLM 从页面提取结构化数据 Extract content from the current page using LLM Args: query: 提取查询(告诉 LLM 要提取什么内容) extract_links: 是否提取链接(默认 False,节省 token) start_from_char: 从哪个字符开始提取(用于分页提取大内容) Returns: ToolResult: 提取的内容 Example: extract_content(query="提取页面上所有产品的名称和价格", extract_links=True) Note: 需要配置 page_extraction_llm,否则会失败 支持分页提取,最大100k字符 """ try: browser, tools = await get_browser_session() # 注意:extract 需要 page_extraction_llm 参数 # 这里我们假设用户会在初始化时配置 LLM # 如果没有配置,会抛出异常 result = await tools.extract( query=query, extract_links=extract_links, start_from_char=start_from_char, browser_session=browser, page_extraction_llm=RunnableLambda(extraction_adapter), # 需要用户配置 file_system=_file_system ) return action_result_to_tool_result(result, f"提取内容: {query}") except Exception as e: return ToolResult( title="内容提取失败", output="", error=f"Failed to extract content: {str(e)}", long_term_memory=f"提取内容失败: {query}" ) async def _detect_and_download_pdf_via_cdp(browser) -> Optional[str]: """ 检测当前页面是否为 PDF,如果是则通过 CDP(浏览器内 fetch)下载到本地。 优势:自动携带浏览器的 cookies/session,可访问需要登录的 PDF。 返回本地文件路径,非 PDF 页面返回 None。 """ try: current_url = await browser.get_current_page_url() if not current_url: return None parsed = urlparse(current_url) is_pdf = parsed.path.lower().endswith('.pdf') # URL 不明显是 PDF 时,通过 CDP 检查 content-type if not is_pdf: try: cdp = await browser.get_or_create_cdp_session() ct_result = await cdp.cdp_client.send.Runtime.evaluate( params={'expression': 'document.contentType'}, session_id=cdp.session_id ) content_type = ct_result.get('result', {}).get('value', '') is_pdf = 'pdf' in content_type.lower() except Exception: pass if not is_pdf: return None # 通过浏览器内 fetch API 下载 PDF(自动携带 cookies) cdp = await browser.get_or_create_cdp_session() js_code = """ (async () => { try { const resp = await fetch(window.location.href); if (!resp.ok) return JSON.stringify({error: 'HTTP ' + resp.status}); const blob = await resp.blob(); return new Promise((resolve, reject) => { const reader = new FileReader(); reader.onloadend = () => resolve(JSON.stringify({data: reader.result})); reader.onerror = () => resolve(JSON.stringify({error: 'FileReader failed'})); reader.readAsDataURL(blob); }); } catch(e) { return JSON.stringify({error: e.message}); } })() """ result = await cdp.cdp_client.send.Runtime.evaluate( params={ 'expression': js_code, 'awaitPromise': True, 'returnByValue': True, 'timeout': 60000 }, session_id=cdp.session_id ) value = result.get('result', {}).get('value', '') if not value: print("⚠️ CDP fetch PDF: 无返回值") return None data = json.loads(value) if 'error' in data: print(f"⚠️ CDP fetch PDF 失败: {data['error']}") return None # 从 data URL 中提取 base64 并解码 data_url = data['data'] # data:application/pdf;base64,JVBERi0... base64_data = data_url.split(',', 1)[1] pdf_bytes = base64.b64decode(base64_data) # 保存到本地 save_dir = Path.cwd() / ".cache/.browser_use_files" save_dir.mkdir(parents=True, exist_ok=True) filename = Path(parsed.path).name if parsed.path else "" if not filename or not filename.lower().endswith('.pdf'): import time filename = f"downloaded_{int(time.time())}.pdf" save_path = str(save_dir / filename) with open(save_path, 'wb') as f: f.write(pdf_bytes) print(f"📄 PDF 已通过 CDP 下载到: {save_path} ({len(pdf_bytes)} bytes)") return save_path except Exception as e: print(f"⚠️ PDF 检测/下载异常: {e}") return None @tool() async def browser_read_long_content( goal: Union[str, dict], source: str = "page", context: str = "", **kwargs ) -> ToolResult: """ 智能读取长内容。支持自动检测并读取网页上的 PDF 文件。 当 source="page" 且当前页面是 PDF 时,会通过 CDP 下载 PDF 并用 pypdf 解析, 而非使用 DOM 提取(DOM 无法读取浏览器内置 PDF Viewer 的内容)。 通过 CDP 下载可自动携带浏览器的 cookies/session,支持需要登录的 PDF。 """ try: browser, tools = await get_browser_session() # 1. 提取目标文本 (针对 GoalTree 字典结构) final_goal_text = "" if isinstance(goal, dict): final_goal_text = goal.get("mission") or goal.get("goal") or str(goal) else: final_goal_text = str(goal) # 2. 清洗业务背景 (过滤框架注入的 dict 类型 context) business_context = context if isinstance(context, str) else "" # 3. PDF 自动检测:当 source="page" 时检查是否为 PDF 页面 available_files = [] if source.lower() == "page": pdf_path = await _detect_and_download_pdf_via_cdp(browser) if pdf_path: source = pdf_path available_files.append(pdf_path) # 4. 验证并实例化 action_params = ReadContentAction( goal=final_goal_text, source=source, context=business_context ) # 5. 解包参数调用底层方法 result = await tools.read_long_content( **action_params.model_dump(), browser_session=browser, page_extraction_llm=RunnableLambda(extraction_adapter), available_file_paths=available_files ) return action_result_to_tool_result(result, f"深度读取: {source}") except Exception as e: return ToolResult( title="深度读取失败", output="", error=f"Read long content failed: {str(e)}", long_term_memory="参数解析或校验失败,请检查输入" ) @tool() async def browser_get_page_html() -> ToolResult: """ 获取当前页面的完整 HTML Get the full HTML of the current page 返回当前页面的完整 HTML 源代码。 Returns: ToolResult: 包含页面 HTML 的工具返回对象 Example: get_page_html() Note: - 返回的是完整的 HTML 源代码 - 输出会被限制在 10000 字符以内(完整内容保存在 metadata 中) """ try: browser, tools = await get_browser_session() # 使用 CDP 获取页面 HTML cdp = await browser.get_or_create_cdp_session() # 获取页面内容 result = await cdp.cdp_client.send.Runtime.evaluate( params={'expression': 'document.documentElement.outerHTML'}, session_id=cdp.session_id ) html = result.get('result', {}).get('value', '') # 获取 URL 和标题 url = await browser.get_current_page_url() title_result = await cdp.cdp_client.send.Runtime.evaluate( params={'expression': 'document.title'}, session_id=cdp.session_id ) title = title_result.get('result', {}).get('value', '') # 限制输出大小 output_html = html if len(html) > 10000: output_html = html[:10000] + "... (truncated)" return ToolResult( title=f"获取 HTML: {url}", output=f"页面: {title}\nURL: {url}\n\nHTML:\n{output_html}", long_term_memory=f"获取 HTML: {url}", metadata={"url": url, "title": title, "html": html} ) except Exception as e: return ToolResult( title="获取 HTML 失败", output="", error=f"Failed to get page HTML: {str(e)}", long_term_memory="获取 HTML 失败" ) @tool() async def browser_get_selector_map() -> ToolResult: """ 获取当前页面的元素索引映射 Get the selector map of interactive elements on the current page 返回页面所有可交互元素的索引字典,用于后续的元素操作。 Returns: ToolResult: 包含元素映射的工具返回对象 Example: get_selector_map() Note: 返回的索引可以用于 click_element, input_text 等操作 """ try: browser, tools = await get_browser_session() # 关键修复:先触发 BrowserStateRequestEvent 来更新 DOM 状态 # 这会触发 DOM watchdog 重新构建 DOM 树并更新 selector_map from browser_use.browser.events import BrowserStateRequestEvent # 触发事件并等待结果 event = browser.event_bus.dispatch( BrowserStateRequestEvent( include_dom=True, include_screenshot=False, # 不需要截图,节省时间 include_recent_events=False ) ) # 等待 DOM 更新完成 browser_state = await event.event_result(raise_if_none=True, raise_if_any=True) # 从更新后的状态中获取 selector_map selector_map = browser_state.dom_state.selector_map if browser_state.dom_state else {} # 构建输出信息 elements_info = [] for index, node in list(selector_map.items())[:20]: # 只显示前20个 tag = node.tag_name attrs = node.attributes or {} text = attrs.get('aria-label') or attrs.get('placeholder') or attrs.get('value', '') elements_info.append(f"索引 {index}: <{tag}> {text[:50]}") output = f"找到 {len(selector_map)} 个交互元素\n\n" output += "\n".join(elements_info) if len(selector_map) > 20: output += f"\n... 还有 {len(selector_map) - 20} 个元素" return ToolResult( title="获取元素映射", output=output, long_term_memory=f"获取到 {len(selector_map)} 个交互元素", metadata={"selector_map": {k: str(v) for k, v in list(selector_map.items())[:100]}} ) except Exception as e: return ToolResult( title="获取元素映射失败", output="", error=f"Failed to get selector map: {str(e)}", long_term_memory="获取元素映射失败" ) # ============================================================ # JavaScript 执行工具 (JavaScript Tools) # ============================================================ @tool() async def browser_evaluate(code: str) -> ToolResult: """ 在页面中执行 JavaScript 代码 Execute JavaScript code in the page context 允许在当前页面中执行任意 JavaScript 代码,用于复杂的页面操作或数据提取。 Args: code: 要执行的 JavaScript 代码字符串 Returns: ToolResult: 包含执行结果的工具返回对象 Example: evaluate("document.title") evaluate("document.querySelectorAll('a').length") Note: - 代码在页面上下文中执行,可以访问 DOM 和全局变量 - 返回值会被自动序列化为字符串 - 执行结果限制在 20k 字符以内 """ try: browser, tools = await get_browser_session() result = await tools.evaluate( code=code, browser_session=browser ) return action_result_to_tool_result(result, "执行 JavaScript") except Exception as e: return ToolResult( title="JavaScript 执行失败", output="", error=f"Failed to execute JavaScript: {str(e)}", long_term_memory="JavaScript 执行失败" ) @tool() async def browser_ensure_login_with_cookies(cookie_type: str, url: str = "https://www.xiaohongshu.com") -> ToolResult: """ 检查登录状态并在需要时注入 cookies """ try: browser, tools = await get_browser_session() if url: await tools.navigate(url=url, browser_session=browser) await tools.wait(seconds=2, browser_session=browser) check_login_js = """ (function() { const loginBtn = document.querySelector('[class*="login"]') || document.querySelector('[href*="login"]') || Array.from(document.querySelectorAll('button, a')).find(el => (el.textContent || '').includes('登录')); const userInfo = document.querySelector('[class*="user"]') || document.querySelector('[class*="avatar"]'); return { needLogin: !!loginBtn && !userInfo, hasLoginBtn: !!loginBtn, hasUserInfo: !!userInfo }; })() """ result = await tools.evaluate(code=check_login_js, browser_session=browser) status_output = result.extracted_content if isinstance(status_output, str) and status_output.startswith("Result: "): status_output = status_output[8:] login_info: Dict[str, Any] = {} if isinstance(status_output, str): try: login_info = json.loads(status_output) except Exception: login_info = {} elif isinstance(status_output, dict): login_info = status_output if not login_info.get("needLogin"): output = json.dumps({"need_login": False}, ensure_ascii=False) return ToolResult( title="已登录", output=output, long_term_memory=output ) row = _fetch_cookie_row(cookie_type) cookie_value = _extract_cookie_value(row) if not cookie_value: output = json.dumps({"need_login": True, "cookies_count": 0}, ensure_ascii=False) return ToolResult( title="未找到 cookies", output=output, error="未找到 cookies", long_term_memory=output ) domain, base_url = _cookie_domain_for_type(cookie_type, url) cookies = _normalize_cookies(cookie_value, domain, base_url) if not cookies: output = json.dumps({"need_login": True, "cookies_count": 0}, ensure_ascii=False) return ToolResult( title="cookies 解析失败", output=output, error="cookies 解析失败", long_term_memory=output ) await browser._cdp_set_cookies(cookies) if url: await tools.navigate(url=url, browser_session=browser) await tools.wait(seconds=2, browser_session=browser) output = json.dumps({"need_login": True, "cookies_count": len(cookies)}, ensure_ascii=False) return ToolResult( title="已注入 cookies", output=output, long_term_memory=output ) except Exception as e: return ToolResult( title="登录检查失败", output="", error=str(e), long_term_memory="登录检查失败" ) # ============================================================ # 等待用户操作工具 (Wait for User Action) # ============================================================ @tool() async def browser_wait_for_user_action(message: str = "Please complete the action in browser", timeout: int = 300) -> ToolResult: """ 等待用户在浏览器中完成操作(如登录) Wait for user to complete an action in the browser (e.g., login) 暂停自动化流程,等待用户手动完成某些操作(如登录、验证码等)。 Args: message: 提示用户需要完成的操作 timeout: 最大等待时间(秒),默认 300 秒(5 分钟) Returns: ToolResult: 包含等待结果的工具返回对象 Example: wait_for_user_action("Please login to Xiaohongshu", timeout=180) wait_for_user_action("Please complete the CAPTCHA", timeout=60) Note: - 用户需要在浏览器窗口中手动完成操作 - 完成后按回车键继续 - 超时后会自动继续执行 """ try: import asyncio print(f"\n{'='*60}") print(f"⏸️ WAITING FOR USER ACTION") print(f"{'='*60}") print(f"📝 {message}") print(f"⏱️ Timeout: {timeout} seconds") print(f"\n👉 Please complete the action in the browser window") print(f"👉 Press ENTER when done, or wait for timeout") print(f"{'='*60}\n") # Wait for user input or timeout try: loop = asyncio.get_event_loop() # Wait for either user input or timeout await asyncio.wait_for( loop.run_in_executor(None, input), timeout=timeout ) return ToolResult( title="用户操作完成", output=f"User completed: {message}", long_term_memory=f"用户完成操作: {message}" ) except asyncio.TimeoutError: return ToolResult( title="用户操作超时", output=f"Timeout waiting for: {message}", long_term_memory=f"等待用户操作超时: {message}" ) except Exception as e: return ToolResult( title="等待用户操作失败", output="", error=f"Failed to wait for user action: {str(e)}", long_term_memory="等待用户操作失败" ) # ============================================================ # 任务完成工具 (Task Completion) # ============================================================ @tool() async def browser_done(text: str, success: bool = True, files_to_display: Optional[List[str]] = None) -> ToolResult: """ 标记任务完成并返回最终消息 Mark the task as complete and return final message to user Args: text: 给用户的最终消息 success: 任务是否成功完成 files_to_display: 可选的要显示的文件路径列表 Returns: ToolResult: 完成结果 Example: done("任务已完成,提取了10个产品信息", success=True) """ try: browser, tools = await get_browser_session() result = await tools.done( text=text, success=success, files_to_display=files_to_display, file_system=_file_system ) return action_result_to_tool_result(result, "任务完成") except Exception as e: return ToolResult( title="标记任务完成失败", output="", error=f"Failed to complete task: {str(e)}", long_term_memory="标记任务完成失败" ) # ============================================================ # Cookie 持久化工具 # ============================================================ _COOKIES_DIR = Path(__file__).parent.parent.parent.parent.parent / ".cache/.cookies" @tool() async def browser_export_cookies(name: str = "", account: str = "") -> ToolResult: """ 导出当前浏览器的所有 Cookie 到本地 .cookies/ 目录。 文件命名格式:{域名}_{账号名}.json,如 bilibili.com_zhangsan.json 登录成功后调用此工具,下次可通过 browser_load_cookies 恢复登录态。 Args: name: 自定义文件名(可选,提供则忽略自动命名) account: 账号名称(可选,用于区分同一网站的不同账号) """ try: browser, _ = await get_browser_session() # 获取所有 Cookie(CDP 格式) all_cookies = await browser._cdp_get_cookies() if not all_cookies: return ToolResult(title="Cookie 导出", output="当前浏览器没有 Cookie", long_term_memory="无 Cookie 可导出") # 获取当前域名,用于过滤和命名 from urllib.parse import urlparse current_url = await browser.get_current_page_url() or '' domain = urlparse(current_url).netloc.replace("www.", "") or "default" if not name: name = f"{domain}_{account}" if account else domain # 只保留当前域名的 cookie(过滤第三方) cookies = [c for c in all_cookies if domain in c.get("domain", "").lstrip(".")] # 保存 _COOKIES_DIR.mkdir(parents=True, exist_ok=True) cookie_file = _COOKIES_DIR / f"{name}.json" cookie_file.write_text(json.dumps(cookies, ensure_ascii=False, indent=2), encoding="utf-8") return ToolResult( title="Cookie 已导出", output=f"已保存 {len(cookies)} 条 Cookie 到 .cookies/{name}.json(从 {len(all_cookies)} 条中过滤当前域名)", long_term_memory=f"导出 {len(cookies)} 条 Cookie 到 .cookies/{name}.json" ) except Exception as e: return ToolResult(title="Cookie 导出失败", output="", error=str(e), long_term_memory="导出 Cookie 失败") @tool() async def browser_load_cookies(url: str, name: str = "") -> ToolResult: """ 根据目标 URL 自动查找本地 Cookie 文件,注入浏览器并导航到目标页面恢复登录态。 重要:此工具会自动完成导航,调用前不需要先调用 browser_navigate_to_url。 Args: url: 目标 URL(必须提供,同时用于自动匹配 Cookie 文件) name: Cookie 文件名(可选,不传则根据 URL 域名自动查找) """ try: browser, tools = await get_browser_session() if not url.startswith("http"): url = f"https://{url}" # 根据域名自动查找 Cookie 文件 if not name: from urllib.parse import urlparse domain = urlparse(url).netloc.replace("www.", "") if _COOKIES_DIR.exists(): matches = list(_COOKIES_DIR.glob(f"{domain}*.json")) if matches: cookie_file = matches[0] # 取第一个匹配的 else: available = [f.stem for f in _COOKIES_DIR.glob("*.json")] return ToolResult(title="未找到 Cookie", output=f"没有匹配 {domain} 的文件,可用: {available}", error=f"无 {domain} 的 Cookie 文件") else: return ToolResult(title="未找到 Cookie", output=".cookies 目录不存在", error="Cookie 目录不存在") else: cookie_file = _COOKIES_DIR / f"{name}.json" if not cookie_file.exists(): available = [f.stem for f in _COOKIES_DIR.glob("*.json")] if _COOKIES_DIR.exists() else [] return ToolResult(title="文件不存在", output=f"可用: {available}", error=f"未找到 .cookies/{name}.json") cookies = json.loads(cookie_file.read_text(encoding="utf-8")) # 直接注入(export 和 load 使用相同的 CDP 格式,无需标准化) await browser._cdp_set_cookies(cookies) # 导航到目标页面(带上刚注入的 Cookie) if url: if not url.startswith("http"): url = f"https://{url}" await tools.navigate(url=url, browser_session=browser) await tools.wait(seconds=3, browser_session=browser) return ToolResult( title="Cookie 注入并导航完成", output=f"从 {cookie_file.name} 注入 {len(cookies)} 条 Cookie,已导航到 {url}", long_term_memory=f"已从 {cookie_file.name} 注入 Cookie 并导航到 {url},登录态已恢复" ) except Exception as e: return ToolResult(title="Cookie 加载失败", output="", error=str(e), long_term_memory="加载 Cookie 失败") # ============================================================ # 导出所有工具函数(供外部使用) # ============================================================ __all__ = [ # 会话管理 'init_browser_session', 'get_browser_session', 'cleanup_browser_session', 'kill_browser_session', # 导航类工具 'browser_navigate_to_url', 'browser_search_web', 'browser_go_back', 'browser_wait', # 元素交互工具 'browser_click_element', 'browser_input_text', 'browser_send_keys', 'browser_upload_file', # 滚动和视图工具 'browser_scroll_page', 'browser_find_text', 'browser_screenshot', # 标签页管理工具 'browser_switch_tab', 'browser_close_tab', # 下拉框工具 'browser_get_dropdown_options', 'browser_select_dropdown_option', # 内容提取工具 'browser_extract_content', 'browser_get_page_html', 'browser_read_long_content', 'browser_download_direct_url', 'browser_get_selector_map', 'browser_get_visual_selector_map', # JavaScript 执行工具 'browser_evaluate', 'browser_ensure_login_with_cookies', # 等待用户操作 'browser_wait_for_user_action', # 任务完成 'browser_done', # Cookie 持久化 'browser_export_cookies', 'browser_load_cookies', ]