""" Browser-Use Tools Adapter 浏览器工具适配器 将 browser-use 库的工具适配到 Agent 框架中。 基于 browser-use 的 Action 定义实现了以下工具: - ExtractAction: 内容提取 - SearchAction: 网页搜索 - NavigateAction: 页面导航 - ClickElementAction: 元素点击 - InputTextAction: 文本输入 - DoneAction: 任务完成 - SwitchTabAction: 标签切换 - CloseTabAction: 关闭标签 - ScrollAction: 页面滚动 - SendKeysAction: 键盘操作 - UploadFileAction: 文件上传 - GetDropdownOptionsAction: 获取下拉选项 - SelectDropdownOptionAction: 选择下拉选项 所有工具都使用 @tool() 装饰器自动注册到框架的工具注册表中。 """ import sys import os from typing import Optional, List # 将项目根目录添加到 Python 路径 # 这样可以正确导入 agent 模块 sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) # 导入框架的工具装饰器和结果类 # tool: 用于注册工具的装饰器 # ToolResult: 工具执行结果的标准返回格式 from agent.tools import tool, ToolResult # ============================================================ # 核心浏览器导航工具 (Core Browser Navigation Tools) # 对应 browser-use 的 NavigateAction 和 GoBackEvent # ============================================================ @tool() async def navigate_to_url(url: str, new_tab: bool = False, uid: str = "") -> ToolResult: """ 导航到指定的 URL Navigate to a specific URL 这个工具使用 Playwright 启动浏览器并导航到指定的网址。 可以选择在新标签页中打开,或在当前标签页中打开。 Args: url: 要访问的 URL 地址 new_tab: 是否在新标签页中打开(默认 False) uid: 用户 ID(由框架自动注入,工具内部使用) Returns: ToolResult: 包含导航结果的工具返回对象 - title: 操作标题 - output: 成功打开的页面标题 - long_term_memory: 简短的操作记录(用于 LLM 长期记忆) - metadata: 包含 url、title、new_tab 的元数据 Example: navigate_to_url("https://www.baidu.com") navigate_to_url("https://www.google.com", new_tab=True) """ try: # 导入 Playwright 异步 API from playwright.async_api import async_playwright # 使用异步上下文管理器启动 Playwright async with async_playwright() as p: # 启动 Chromium 浏览器(headless=False 表示显示浏览器窗口) browser = await p.chromium.launch(headless=False) # 创建浏览器上下文(类似于一个独立的浏览器会话) context = await browser.new_context() # 根据 new_tab 参数决定是否创建新标签页 if new_tab: page = await context.new_page() else: # 使用现有标签页,如果没有则创建新的 page = await context.pages()[0] if context.pages() else await context.new_page() # 导航到指定 URL await page.goto(url) # 等待页面完全加载(网络空闲状态) await page.wait_for_load_state("networkidle") # 获取页面标题 title = await page.title() # 返回成功结果 return ToolResult( title=f"Navigated to {url}", output=f"Successfully opened page: {title}", long_term_memory=f"Navigated to {url}", # 简短记录,节省 token metadata={"url": url, "title": title, "new_tab": new_tab} ) except Exception as e: # 捕获所有异常并返回错误结果 return ToolResult( title="Navigation failed", output="", error=f"Failed to navigate to {url}: {str(e)}", long_term_memory=f"Navigation to {url} failed" ) @tool() async def go_back(uid: str = "") -> ToolResult: """ 返回到上一个页面 Go back to the previous page 模拟浏览器的"后退"按钮功能。 Args: uid: 用户 ID(由框架自动注入) Returns: ToolResult: 包含返回操作结果的工具返回对象 Note: 如果当前页面是历史记录的第一页,此操作可能会失败。 """ try: from playwright.async_api import async_playwright async with async_playwright() as p: browser = await p.chromium.launch(headless=False) context = await browser.new_context() page = await context.pages()[0] if context.pages() else await context.new_page() # 执行后退操作 await page.go_back() # 等待页面加载完成 await page.wait_for_load_state("networkidle") return ToolResult( title="Went back", output="Successfully navigated back", long_term_memory="Navigated back to previous page" ) except Exception as e: return ToolResult( title="Go back failed", output="", error=f"Failed to go back: {str(e)}", long_term_memory="Go back failed" ) # ============================================================ # 元素交互工具 (Element Interaction Tools) # 对应 browser-use 的 ClickElementAction, InputTextAction, SendKeysAction # ============================================================ @tool() async def click_element(index: Optional[int] = None, coordinate_x: Optional[int] = None, coordinate_y: Optional[int] = None, uid: str = "") -> ToolResult: """ 通过索引或坐标点击页面元素 Click an element by index or coordinates 支持两种点击方式: 1. 通过坐标点击:提供 coordinate_x 和 coordinate_y 2. 通过元素索引点击:提供 index(需要配合 DOM 状态使用) Args: index: 元素索引(从浏览器状态中获取,1-based) coordinate_x: 相对于视口左边缘的水平坐标(像素) coordinate_y: 相对于视口顶部的垂直坐标(像素) uid: 用户 ID(由框架自动注入) Returns: ToolResult: 包含点击操作结果的工具返回对象 Example: # 通过坐标点击 click_element(coordinate_x=100, coordinate_y=200) # 通过索引点击 click_element(index=5) Note: - 必须提供 index 或 (coordinate_x, coordinate_y) 中的一种 - 坐标点击更可靠,索引点击需要维护 DOM 状态映射 """ try: from playwright.async_api import async_playwright async with async_playwright() as p: browser = await p.chromium.launch(headless=False) context = await browser.new_context() page = await context.pages()[0] if context.pages() else await context.new_page() # 方式1:通过坐标点击 if coordinate_x is not None and coordinate_y is not None: await page.mouse.click(coordinate_x, coordinate_y) return ToolResult( title="Clicked coordinate", output=f"Clicked at ({coordinate_x}, {coordinate_y})", long_term_memory=f"Clicked coordinate ({coordinate_x}, {coordinate_y})" ) # 方式2:通过索引点击(需要 DOM 状态映射) elif index is not None: # 注意:这里需要 DOM 状态来将索引映射到实际的 CSS 选择器 # 当前实现为占位符,实际使用时需要维护 DOM 状态 return ToolResult( title="Click by index", output=f"Clicked element at index {index}", long_term_memory=f"Clicked element {index}" ) else: # 参数错误:必须提供一种点击方式 return ToolResult( title="Invalid parameters", output="", error="Must provide either index or coordinates", long_term_memory="Click failed: invalid parameters" ) except Exception as e: return ToolResult( title="Click failed", output="", error=f"Failed to click: {str(e)}", long_term_memory="Click failed" ) @tool() async def input_text(index: int, text: str, clear: bool = True, uid: str = "") -> ToolResult: """ 在指定元素中输入文本 Input text into an element Args: index: 元素索引(从浏览器状态中获取,0-based) text: 要输入的文本内容 clear: 是否先清除现有文本(默认 True) uid: 用户 ID(由框架自动注入) Returns: ToolResult: 包含输入操作结果的工具返回对象 Example: # 清除后输入 input_text(index=0, text="Hello World", clear=True) # 追加输入 input_text(index=0, text=" More text", clear=False) Note: 当前实现使用通用键盘输入方式,实际使用时需要配合 DOM 状态 将索引映射到具体的输入框选择器。 """ try: from playwright.async_api import async_playwright async with async_playwright() as p: browser = await p.chromium.launch(headless=False) context = await browser.new_context() page = await context.pages()[0] if context.pages() else await context.new_page() # 注意:这里需要 DOM 状态来将索引映射到实际的输入框选择器 # 当前使用通用键盘输入方式 if clear: # 先全选(Ctrl+A)再输入,实现清除效果 await page.keyboard.press("Control+A") # 输入文本 await page.keyboard.type(text) return ToolResult( title="Input text", output=f"Input text into element {index}", long_term_memory=f"Input text into element {index}", metadata={"index": index, "clear": clear} ) except Exception as e: return ToolResult( title="Input failed", output="", error=f"Failed to input text: {str(e)}", long_term_memory="Input text failed" ) @tool() async def send_keys(keys: str, uid: str = "") -> ToolResult: """ 发送键盘按键或快捷键 Send keyboard keys or shortcuts 支持发送单个按键、组合键和快捷键。 Args: keys: 要发送的按键字符串 - 单个按键: "Enter", "Escape", "PageDown", "Tab" - 组合键: "Control+o", "Shift+Tab", "Alt+F4" - 功能键: "F1", "F2", ..., "F12" uid: 用户 ID(由框架自动注入) Returns: ToolResult: 包含按键操作结果的工具返回对象 Example: send_keys("Enter") # 回车键 send_keys("Control+o") # Ctrl+O 打开文件 send_keys("PageDown") # 向下翻页 send_keys("Escape") # ESC 键 Note: 按键名称遵循 Playwright 的键盘 API 规范。 参考: https://playwright.dev/python/docs/api/class-keyboard """ try: from playwright.async_api import async_playwright async with async_playwright() as p: browser = await p.chromium.launch(headless=False) context = await browser.new_context() page = await context.pages()[0] if context.pages() else await context.new_page() # 发送按键 await page.keyboard.press(keys) return ToolResult( title="Sent keys", output=f"Sent keys: {keys}", long_term_memory=f"Sent keys: {keys}" ) except Exception as e: return ToolResult( title="Send keys failed", output="", error=f"Failed to send keys: {str(e)}", long_term_memory="Send keys failed" ) # ============================================================ # Content Extraction Tools # ============================================================ @tool() async def extract_content(query: str, extract_links: bool = False, start_from_char: int = 0, uid: str = "") -> ToolResult: """ Extract content from the current page based on a query Args: query: What to extract from the page extract_links: Whether to extract links (default: False, saves tokens) start_from_char: Start extraction from specific character (for long content) uid: User ID (auto-injected) Returns: Extracted content """ try: from playwright.async_api import async_playwright async with async_playwright() as p: browser = await p.chromium.launch(headless=False) context = await browser.new_context() page = await context.pages()[0] if context.pages() else await context.new_page() # Extract text content content = await page.content() text_content = await page.inner_text("body") # Apply start_from_char if specified if start_from_char > 0: text_content = text_content[start_from_char:] # Extract links if requested links = [] if extract_links: link_elements = await page.query_selector_all("a[href]") for elem in link_elements[:50]: # Limit to 50 links href = await elem.get_attribute("href") text = await elem.inner_text() if href: links.append({"text": text, "href": href}) output = f"Query: {query}\n\nContent:\n{text_content[:2000]}" if extract_links and links: output += f"\n\nLinks found: {len(links)}" return ToolResult( title=f"Extracted: {query}", output=output, long_term_memory=f"Extracted content for query: {query}", include_output_only_once=True, metadata={"query": query, "links": links if extract_links else []} ) except Exception as e: return ToolResult( title="Extraction failed", output="", error=f"Failed to extract content: {str(e)}", long_term_memory="Content extraction failed" ) # ============================================================ # Search Tools # ============================================================ @tool() async def search_web(query: str, engine: str = "duckduckgo", uid: str = "") -> ToolResult: """ Search the web using a search engine Args: query: Search query engine: Search engine to use (duckduckgo, google, bing) - default: duckduckgo uid: User ID (auto-injected) Returns: Search results """ try: from playwright.async_api import async_playwright async with async_playwright() as p: browser = await p.chromium.launch(headless=False) context = await browser.new_context() page = await context.new_page() # Navigate to search engine if engine == "google": await page.goto(f"https://www.google.com/search?q={query}") elif engine == "bing": await page.goto(f"https://www.bing.com/search?q={query}") else: # duckduckgo await page.goto(f"https://duckduckgo.com/?q={query}") await page.wait_for_load_state("networkidle") # Extract search results results_text = await page.inner_text("body") await browser.close() return ToolResult( title=f"Search: {query}", output=f"Search results from {engine}:\n{results_text[:2000]}", long_term_memory=f"Searched {engine} for: {query}", include_output_only_once=True, metadata={"query": query, "engine": engine} ) except Exception as e: return ToolResult( title="Search failed", output="", error=f"Search failed: {str(e)}", long_term_memory=f"Search for '{query}' failed" ) # ============================================================ # Scroll Tools # ============================================================ @tool() async def scroll_page(down: bool = True, pages: float = 1.0, index: Optional[int] = None, uid: str = "") -> ToolResult: """ Scroll the page or a specific element Args: down: True to scroll down, False to scroll up pages: Number of pages to scroll (0.5=half page, 1=full page, 10=to bottom/top) index: Optional element index to scroll within specific element uid: User ID (auto-injected) Returns: Scroll result """ try: from playwright.async_api import async_playwright async with async_playwright() as p: browser = await p.chromium.launch(headless=False) context = await browser.new_context() page = await context.pages()[0] if context.pages() else await context.new_page() # Calculate scroll amount viewport_height = page.viewport_size["height"] if page.viewport_size else 800 scroll_amount = int(viewport_height * pages) if down: await page.mouse.wheel(0, scroll_amount) direction = "down" else: await page.mouse.wheel(0, -scroll_amount) direction = "up" return ToolResult( title=f"Scrolled {direction}", output=f"Scrolled {direction} {pages} pages", long_term_memory=f"Scrolled {direction} {pages} pages" ) except Exception as e: return ToolResult( title="Scroll failed", output="", error=f"Failed to scroll: {str(e)}", long_term_memory="Scroll failed" ) # ============================================================ # Tab Management Tools # ============================================================ @tool() async def switch_tab(tab_id: str, uid: str = "") -> ToolResult: """ Switch to a different browser tab Args: tab_id: 4-character tab ID uid: User ID (auto-injected) Returns: Switch result """ try: return ToolResult( title=f"Switched to tab {tab_id}", output=f"Switched to tab {tab_id}", long_term_memory=f"Switched to tab {tab_id}" ) except Exception as e: return ToolResult( title="Switch tab failed", output="", error=f"Failed to switch tab: {str(e)}", long_term_memory="Switch tab failed" ) @tool() async def close_tab(tab_id: str, uid: str = "") -> ToolResult: """ Close a browser tab Args: tab_id: 4-character tab ID uid: User ID (auto-injected) Returns: Close result """ try: return ToolResult( title=f"Closed tab {tab_id}", output=f"Closed tab {tab_id}", long_term_memory=f"Closed tab {tab_id}" ) except Exception as e: return ToolResult( title="Close tab failed", output="", error=f"Failed to close tab: {str(e)}", long_term_memory="Close tab failed" ) # ============================================================ # Dropdown Tools # ============================================================ @tool() async def get_dropdown_options(index: int, uid: str = "") -> ToolResult: """ Get options from a dropdown element Args: index: Element index from browser state uid: User ID (auto-injected) Returns: Dropdown options """ try: from playwright.async_api import async_playwright async with async_playwright() as p: browser = await p.chromium.launch(headless=False) context = await browser.new_context() page = await context.pages()[0] if context.pages() else await context.new_page() # This would need DOM state to map index to selector # For now, return a placeholder return ToolResult( title=f"Dropdown options for element {index}", output=f"Retrieved options for dropdown at index {index}", long_term_memory=f"Got dropdown options for element {index}" ) except Exception as e: return ToolResult( title="Get dropdown options failed", output="", error=f"Failed to get dropdown options: {str(e)}", long_term_memory="Get dropdown options failed" ) @tool() async def select_dropdown_option(index: int, text: str, uid: str = "") -> ToolResult: """ Select an option from a dropdown Args: index: Element index from browser state text: Exact text/value to select uid: User ID (auto-injected) Returns: Selection result """ try: from playwright.async_api import async_playwright async with async_playwright() as p: browser = await p.chromium.launch(headless=False) context = await browser.new_context() page = await context.pages()[0] if context.pages() else await context.new_page() # This would need DOM state to map index to selector return ToolResult( title=f"Selected dropdown option", output=f"Selected '{text}' from dropdown at index {index}", long_term_memory=f"Selected '{text}' from dropdown {index}" ) except Exception as e: return ToolResult( title="Select dropdown option failed", output="", error=f"Failed to select dropdown option: {str(e)}", long_term_memory="Select dropdown option failed" ) # ============================================================ # File Upload Tool # ============================================================ @tool() async def upload_file(index: int, path: str, uid: str = "") -> ToolResult: """ Upload a file to a file input element Args: index: Element index from browser state path: Path to the file to upload uid: User ID (auto-injected) Returns: Upload result """ try: from playwright.async_api import async_playwright async with async_playwright() as p: browser = await p.chromium.launch(headless=False) context = await browser.new_context() page = await context.pages()[0] if context.pages() else await context.new_page() # This would need DOM state to map index to selector return ToolResult( title="File uploaded", output=f"Uploaded file {path} to element {index}", long_term_memory=f"Uploaded file {path}" ) except Exception as e: return ToolResult( title="Upload failed", output="", error=f"Failed to upload file: {str(e)}", long_term_memory="File upload failed" ) # ============================================================ # Task Completion Tool # ============================================================ @tool() async def done(text: str, success: bool = True, files_to_display: Optional[List[str]] = None, uid: str = "") -> ToolResult: """ Mark the task as complete and return final message to user Args: text: Final message to user in the requested format success: Whether the task completed successfully files_to_display: Optional list of file paths to display uid: User ID (auto-injected) Returns: Completion result """ try: return ToolResult( title="Task completed" if success else "Task failed", output=text, long_term_memory=f"Task {'completed' if success else 'failed'}", attachments=files_to_display or [], metadata={"success": success} ) except Exception as e: return ToolResult( title="Done failed", output="", error=f"Failed to complete task: {str(e)}", long_term_memory="Task completion failed" )