""" Browser-Use Tools Adapter 浏览器工具适配器 将 browser-use 库的工具适配到 Agent 框架中。 基于 browser-use 的 Action 定义实现了以下工具: 导航类工具 (Navigation Tools): - navigate_to_url: 页面导航 (NavigateAction) - go_back: 返回上一页 (GoBackEvent) - search_web: 网页搜索 (SearchAction) 元素交互工具 (Element Interaction Tools): - click_element: 元素点击 (ClickElementAction) - input_text: 文本输入 (InputTextAction) - send_keys: 键盘操作 (SendKeysAction) 内容提取工具 (Content Extraction Tools): - extract_content: 内容提取 (ExtractAction) 滚动和视图工具 (Scroll & View Tools): - scroll_page: 页面滚动 (ScrollAction) - find_text: 查找文本并滚动 - screenshot: 页面截图 标签页管理工具 (Tab Management Tools): - switch_tab: 标签切换 (SwitchTabAction) - close_tab: 关闭标签 (CloseTabAction) 下拉框工具 (Dropdown Tools): - get_dropdown_options: 获取下拉选项 (GetDropdownOptionsAction) - select_dropdown_option: 选择下拉选项 (SelectDropdownOptionAction) 文件操作工具 (File Tools): - upload_file: 文件上传 (UploadFileAction) - write_file: 写入文件 - read_file: 读取文件 - replace_file: 替换文件内容 JavaScript 执行工具 (JavaScript Tools): - evaluate: 执行 JavaScript 代码 任务完成工具 (Task Completion Tools): - done: 任务完成 (DoneAction) 等待工具 (Wait Tools): - wait: 等待指定秒数 所有工具都使用 @tool() 装饰器自动注册到框架的工具注册表中。 """ import sys import os from typing import Optional, List # 将项目根目录添加到 Python 路径 # 这样可以正确导入 agent 模块 sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) # 导入框架的工具装饰器和结果类 # tool: 用于注册工具的装饰器 # ToolResult: 工具执行结果的标准返回格式 from agent.tools import tool, ToolResult # ============================================================ # 核心浏览器导航工具 (Core Browser Navigation Tools) # 对应 browser-use 的 NavigateAction 和 GoBackEvent # ============================================================ @tool() async def navigate_to_url(url: str, new_tab: bool = False, uid: str = "") -> ToolResult: """ 导航到指定的 URL Navigate to a specific URL 这个工具使用 Playwright 启动浏览器并导航到指定的网址。 可以选择在新标签页中打开,或在当前标签页中打开。 Args: url: 要访问的 URL 地址 new_tab: 是否在新标签页中打开(默认 False) uid: 用户 ID(由框架自动注入,工具内部使用) Returns: ToolResult: 包含导航结果的工具返回对象 - title: 操作标题 - output: 成功打开的页面标题 - long_term_memory: 简短的操作记录(用于 LLM 长期记忆) - metadata: 包含 url、title、new_tab 的元数据 Example: navigate_to_url("https://www.baidu.com") navigate_to_url("https://www.google.com", new_tab=True) """ try: # 导入 Playwright 异步 API from playwright.async_api import async_playwright # 使用异步上下文管理器启动 Playwright async with async_playwright() as p: # 启动 Chromium 浏览器(headless=False 表示显示浏览器窗口) browser = await p.chromium.launch(headless=False) # 创建浏览器上下文(类似于一个独立的浏览器会话) context = await browser.new_context() # 根据 new_tab 参数决定是否创建新标签页 if new_tab: page = await context.new_page() else: # 使用现有标签页,如果没有则创建新的 page = await context.pages()[0] if context.pages() else await context.new_page() # 导航到指定 URL await page.goto(url) # 等待页面完全加载(网络空闲状态) await page.wait_for_load_state("networkidle") # 获取页面标题 title = await page.title() # 返回成功结果 return ToolResult( title=f"Navigated to {url}", output=f"Successfully opened page: {title}", long_term_memory=f"Navigated to {url}", # 简短记录,节省 token metadata={"url": url, "title": title, "new_tab": new_tab} ) except Exception as e: # 捕获所有异常并返回错误结果 return ToolResult( title="Navigation failed", output="", error=f"Failed to navigate to {url}: {str(e)}", long_term_memory=f"Navigation to {url} failed" ) @tool() async def go_back(uid: str = "") -> ToolResult: """ 返回到上一个页面 Go back to the previous page 模拟浏览器的"后退"按钮功能。 Args: uid: 用户 ID(由框架自动注入) Returns: ToolResult: 包含返回操作结果的工具返回对象 Note: 如果当前页面是历史记录的第一页,此操作可能会失败。 """ try: from playwright.async_api import async_playwright async with async_playwright() as p: browser = await p.chromium.launch(headless=False) context = await browser.new_context() page = await context.pages()[0] if context.pages() else await context.new_page() # 执行后退操作 await page.go_back() # 等待页面加载完成 await page.wait_for_load_state("networkidle") return ToolResult( title="Went back", output="Successfully navigated back", long_term_memory="Navigated back to previous page" ) except Exception as e: return ToolResult( title="Go back failed", output="", error=f"Failed to go back: {str(e)}", long_term_memory="Go back failed" ) # ============================================================ # 元素交互工具 (Element Interaction Tools) # 对应 browser-use 的 ClickElementAction, InputTextAction, SendKeysAction # ============================================================ @tool() async def click_element(index: Optional[int] = None, coordinate_x: Optional[int] = None, coordinate_y: Optional[int] = None, uid: str = "") -> ToolResult: """ 通过索引或坐标点击页面元素 Click an element by index or coordinates 支持两种点击方式: 1. 通过坐标点击:提供 coordinate_x 和 coordinate_y 2. 通过元素索引点击:提供 index(需要配合 DOM 状态使用) Args: index: 元素索引(从浏览器状态中获取,1-based) coordinate_x: 相对于视口左边缘的水平坐标(像素) coordinate_y: 相对于视口顶部的垂直坐标(像素) uid: 用户 ID(由框架自动注入) Returns: ToolResult: 包含点击操作结果的工具返回对象 Example: # 通过坐标点击 click_element(coordinate_x=100, coordinate_y=200) # 通过索引点击 click_element(index=5) Note: - 必须提供 index 或 (coordinate_x, coordinate_y) 中的一种 - 坐标点击更可靠,索引点击需要维护 DOM 状态映射 """ try: from playwright.async_api import async_playwright async with async_playwright() as p: browser = await p.chromium.launch(headless=False) context = await browser.new_context() page = await context.pages()[0] if context.pages() else await context.new_page() # 方式1:通过坐标点击 if coordinate_x is not None and coordinate_y is not None: await page.mouse.click(coordinate_x, coordinate_y) return ToolResult( title="Clicked coordinate", output=f"Clicked at ({coordinate_x}, {coordinate_y})", long_term_memory=f"Clicked coordinate ({coordinate_x}, {coordinate_y})" ) # 方式2:通过索引点击(需要 DOM 状态映射) elif index is not None: # 注意:这里需要 DOM 状态来将索引映射到实际的 CSS 选择器 # 当前实现为占位符,实际使用时需要维护 DOM 状态 return ToolResult( title="Click by index", output=f"Clicked element at index {index}", long_term_memory=f"Clicked element {index}" ) else: # 参数错误:必须提供一种点击方式 return ToolResult( title="Invalid parameters", output="", error="Must provide either index or coordinates", long_term_memory="Click failed: invalid parameters" ) except Exception as e: return ToolResult( title="Click failed", output="", error=f"Failed to click: {str(e)}", long_term_memory="Click failed" ) @tool() async def input_text(index: int, text: str, clear: bool = True, uid: str = "") -> ToolResult: """ 在指定元素中输入文本 Input text into an element Args: index: 元素索引(从浏览器状态中获取,0-based) text: 要输入的文本内容 clear: 是否先清除现有文本(默认 True) uid: 用户 ID(由框架自动注入) Returns: ToolResult: 包含输入操作结果的工具返回对象 Example: # 清除后输入 input_text(index=0, text="Hello World", clear=True) # 追加输入 input_text(index=0, text=" More text", clear=False) Note: 当前实现使用通用键盘输入方式,实际使用时需要配合 DOM 状态 将索引映射到具体的输入框选择器。 """ try: from playwright.async_api import async_playwright async with async_playwright() as p: browser = await p.chromium.launch(headless=False) context = await browser.new_context() page = await context.pages()[0] if context.pages() else await context.new_page() # 注意:这里需要 DOM 状态来将索引映射到实际的输入框选择器 # 当前使用通用键盘输入方式 if clear: # 先全选(Ctrl+A)再输入,实现清除效果 await page.keyboard.press("Control+A") # 输入文本 await page.keyboard.type(text) return ToolResult( title="Input text", output=f"Input text into element {index}", long_term_memory=f"Input text into element {index}", metadata={"index": index, "clear": clear} ) except Exception as e: return ToolResult( title="Input failed", output="", error=f"Failed to input text: {str(e)}", long_term_memory="Input text failed" ) @tool() async def send_keys(keys: str, uid: str = "") -> ToolResult: """ 发送键盘按键或快捷键 Send keyboard keys or shortcuts 支持发送单个按键、组合键和快捷键。 Args: keys: 要发送的按键字符串 - 单个按键: "Enter", "Escape", "PageDown", "Tab" - 组合键: "Control+o", "Shift+Tab", "Alt+F4" - 功能键: "F1", "F2", ..., "F12" uid: 用户 ID(由框架自动注入) Returns: ToolResult: 包含按键操作结果的工具返回对象 Example: send_keys("Enter") # 回车键 send_keys("Control+o") # Ctrl+O 打开文件 send_keys("PageDown") # 向下翻页 send_keys("Escape") # ESC 键 Note: 按键名称遵循 Playwright 的键盘 API 规范。 参考: https://playwright.dev/python/docs/api/class-keyboard """ try: from playwright.async_api import async_playwright async with async_playwright() as p: browser = await p.chromium.launch(headless=False) context = await browser.new_context() page = await context.pages()[0] if context.pages() else await context.new_page() # 发送按键 await page.keyboard.press(keys) return ToolResult( title="Sent keys", output=f"Sent keys: {keys}", long_term_memory=f"Sent keys: {keys}" ) except Exception as e: return ToolResult( title="Send keys failed", output="", error=f"Failed to send keys: {str(e)}", long_term_memory="Send keys failed" ) # ============================================================ # Wait Tool # ============================================================ @tool() async def wait_for_user_action(message: str = "Please complete the action in browser", timeout: int = 300, uid: str = "") -> ToolResult: """ 等待用户在浏览器中完成操作(如登录) Wait for user to complete an action in the browser (e.g., login) 暂停自动化流程,等待用户手动完成某些操作(如登录、验证码等)。 Args: message: 提示用户需要完成的操作 timeout: 最大等待时间(秒),默认 300 秒(5 分钟) uid: 用户 ID(由框架自动注入) Returns: ToolResult: 包含等待结果的工具返回对象 Example: wait_for_user_action("Please login to Xiaohongshu", timeout=180) wait_for_user_action("Please complete the CAPTCHA", timeout=60) Note: - 用户需要在浏览器窗口中手动完成操作 - 完成后按回车键继续 - 超时后会自动继续执行 """ try: import asyncio print(f"\n{'='*60}") print(f"⏸️ WAITING FOR USER ACTION") print(f"{'='*60}") print(f"📝 {message}") print(f"⏱️ Timeout: {timeout} seconds") print(f"\n👉 Please complete the action in the browser window") print(f"👉 Press ENTER when done, or wait for timeout") print(f"{'='*60}\n") # Wait for user input or timeout try: # Create a task for user input import sys loop = asyncio.get_event_loop() # Wait for either user input or timeout await asyncio.wait_for( loop.run_in_executor(None, input), timeout=timeout ) return ToolResult( title="User action completed", output=f"User completed: {message}", long_term_memory=f"User completed action: {message}" ) except asyncio.TimeoutError: return ToolResult( title="User action timeout", output=f"Timeout waiting for: {message}", long_term_memory=f"Timeout on user action: {message}" ) except Exception as e: return ToolResult( title="Wait for user action failed", output="", error=f"Failed to wait for user action: {str(e)}", long_term_memory="Wait for user action failed" ) @tool() async def wait(seconds: int = 3, uid: str = "") -> ToolResult: """ 等待指定的秒数 Wait for a specified number of seconds 用于等待页面加载、动画完成或其他异步操作。 Args: seconds: 等待时间(秒),最大30秒 uid: 用户 ID(由框架自动注入) Returns: ToolResult: 包含等待操作结果的工具返回对象 Example: wait(5) # 等待5秒 wait(10) # 等待10秒 Note: 等待时间会被限制在1-30秒之间,以防止过长的等待。 """ try: import asyncio # 限制等待时间在合理范围内 wait_time = max(1, min(seconds, 30)) await asyncio.sleep(wait_time) return ToolResult( title=f"Waited {wait_time} seconds", output=f"Waited for {wait_time} seconds", long_term_memory=f"Waited {wait_time}s" ) except Exception as e: return ToolResult( title="Wait failed", output="", error=f"Failed to wait: {str(e)}", long_term_memory="Wait failed" ) # ============================================================ # Content Extraction Tools # ============================================================ @tool() async def get_page_html(uid: str = "") -> ToolResult: """ 获取当前页面的完整 HTML Get the full HTML of the current page 返回当前页面的完整 HTML 源代码。 Args: uid: 用户 ID(由框架自动注入) Returns: ToolResult: 包含页面 HTML 的工具返回对象 Example: get_page_html() Note: - 返回的是完整的 HTML 源代码 - 输出会被限制在 10000 字符以内(完整内容保存在 metadata 中) """ try: from playwright.async_api import async_playwright async with async_playwright() as p: browser = await p.chromium.launch(headless=False) context = await browser.new_context() page = await context.pages()[0] if context.pages() else await context.new_page() # Get full HTML html = await page.content() url = page.url title = await page.title() # Limit output size output_html = html if len(html) > 10000: output_html = html[:10000] + "... (truncated)" return ToolResult( title=f"Got HTML from {url}", output=f"Page: {title}\nURL: {url}\n\nHTML:\n{output_html}", long_term_memory=f"Got HTML from {url}", metadata={"url": url, "title": title, "html": html} ) except Exception as e: return ToolResult( title="Get HTML failed", output="", error=f"Failed to get page HTML: {str(e)}", long_term_memory="Get HTML failed" ) @tool() async def extract_content(query: str, extract_links: bool = False, start_from_char: int = 0, uid: str = "") -> ToolResult: """ Extract content from the current page based on a query Args: query: What to extract from the page extract_links: Whether to extract links (default: False, saves tokens) start_from_char: Start extraction from specific character (for long content) uid: User ID (auto-injected) Returns: Extracted content """ try: from playwright.async_api import async_playwright async with async_playwright() as p: browser = await p.chromium.launch(headless=False) context = await browser.new_context() page = await context.pages()[0] if context.pages() else await context.new_page() # Extract text content content = await page.content() text_content = await page.inner_text("body") # Apply start_from_char if specified if start_from_char > 0: text_content = text_content[start_from_char:] # Extract links if requested links = [] if extract_links: link_elements = await page.query_selector_all("a[href]") for elem in link_elements[:50]: # Limit to 50 links href = await elem.get_attribute("href") text = await elem.inner_text() if href: links.append({"text": text, "href": href}) output = f"Query: {query}\n\nContent:\n{text_content[:2000]}" if extract_links and links: output += f"\n\nLinks found: {len(links)}" return ToolResult( title=f"Extracted: {query}", output=output, long_term_memory=f"Extracted content for query: {query}", include_output_only_once=True, metadata={"query": query, "links": links if extract_links else []} ) except Exception as e: return ToolResult( title="Extraction failed", output="", error=f"Failed to extract content: {str(e)}", long_term_memory="Content extraction failed" ) # ============================================================ # Search Tools # ============================================================ @tool() async def search_web(query: str, engine: str = "duckduckgo", uid: str = "") -> ToolResult: """ Search the web using a search engine Args: query: Search query engine: Search engine to use (duckduckgo, google, bing) - default: duckduckgo uid: User ID (auto-injected) Returns: Search results """ try: from playwright.async_api import async_playwright async with async_playwright() as p: browser = await p.chromium.launch(headless=False) context = await browser.new_context() page = await context.new_page() # Navigate to search engine if engine == "google": await page.goto(f"https://www.google.com/search?q={query}") elif engine == "bing": await page.goto(f"https://www.bing.com/search?q={query}") else: # duckduckgo await page.goto(f"https://duckduckgo.com/?q={query}") await page.wait_for_load_state("networkidle") # Extract search results results_text = await page.inner_text("body") await browser.close() return ToolResult( title=f"Search: {query}", output=f"Search results from {engine}:\n{results_text[:2000]}", long_term_memory=f"Searched {engine} for: {query}", include_output_only_once=True, metadata={"query": query, "engine": engine} ) except Exception as e: return ToolResult( title="Search failed", output="", error=f"Search failed: {str(e)}", long_term_memory=f"Search for '{query}' failed" ) # ============================================================ # Text Finding Tool # ============================================================ @tool() async def find_text(text: str, uid: str = "") -> ToolResult: """ 查找页面中的文本并滚动到该位置 Find text on the page and scroll to it 在页面中搜索指定的文本,找到后自动滚动到该位置。 Args: text: 要查找的文本内容 uid: 用户 ID(由框架自动注入) Returns: ToolResult: 包含查找结果的工具返回对象 Example: find_text("Privacy Policy") find_text("Contact Us") Note: 如果找到多个匹配项,会滚动到第一个匹配项的位置。 """ try: from playwright.async_api import async_playwright async with async_playwright() as p: browser = await p.chromium.launch(headless=False) context = await browser.new_context() page = await context.pages()[0] if context.pages() else await context.new_page() # Use JavaScript to find and scroll to text js_code = f""" (function() {{ const text = "{text}"; const walker = document.createTreeWalker( document.body, NodeFilter.SHOW_TEXT, null, false ); let node; while (node = walker.nextNode()) {{ if (node.textContent.includes(text)) {{ const element = node.parentElement; element.scrollIntoView({{ behavior: 'smooth', block: 'center' }}); return true; }} }} return false; }})() """ found = await page.evaluate(js_code) if found: return ToolResult( title=f"Found text: {text}", output=f"Found and scrolled to text: {text}", long_term_memory=f"Found text: {text}" ) else: return ToolResult( title="Text not found", output=f"Text '{text}' not found on page", long_term_memory=f"Text '{text}' not found" ) except Exception as e: return ToolResult( title="Find text failed", output="", error=f"Failed to find text: {str(e)}", long_term_memory="Find text failed" ) # ============================================================ # Screenshot Tool # ============================================================ @tool() async def screenshot(uid: str = "") -> ToolResult: """ 请求在下次观察中包含页面截图 Request a screenshot to be included in the next observation 用于视觉检查页面状态,帮助理解页面布局和内容。 Args: uid: 用户 ID(由框架自动注入) Returns: ToolResult: 包含截图请求结果的工具返回对象 Example: screenshot() Note: 截图会在下次页面观察时自动包含在结果中。 """ try: from playwright.async_api import async_playwright import base64 async with async_playwright() as p: browser = await p.chromium.launch(headless=False) context = await browser.new_context() page = await context.pages()[0] if context.pages() else await context.new_page() # Take screenshot screenshot_bytes = await page.screenshot(full_page=False) screenshot_b64 = base64.b64encode(screenshot_bytes).decode() return ToolResult( title="Screenshot captured", output=f"Screenshot captured (size: {len(screenshot_bytes)} bytes)", long_term_memory="Screenshot captured", metadata={"screenshot": screenshot_b64} ) except Exception as e: return ToolResult( title="Screenshot failed", output="", error=f"Failed to capture screenshot: {str(e)}", long_term_memory="Screenshot failed" ) # ============================================================ # Scroll Tools # ============================================================ @tool() async def scroll_page(down: bool = True, pages: float = 1.0, index: Optional[int] = None, uid: str = "") -> ToolResult: """ Scroll the page or a specific element Args: down: True to scroll down, False to scroll up pages: Number of pages to scroll (0.5=half page, 1=full page, 10=to bottom/top) index: Optional element index to scroll within specific element uid: User ID (auto-injected) Returns: Scroll result """ try: from playwright.async_api import async_playwright async with async_playwright() as p: browser = await p.chromium.launch(headless=False) context = await browser.new_context() page = await context.pages()[0] if context.pages() else await context.new_page() # Calculate scroll amount viewport_height = page.viewport_size["height"] if page.viewport_size else 800 scroll_amount = int(viewport_height * pages) if down: await page.mouse.wheel(0, scroll_amount) direction = "down" else: await page.mouse.wheel(0, -scroll_amount) direction = "up" return ToolResult( title=f"Scrolled {direction}", output=f"Scrolled {direction} {pages} pages", long_term_memory=f"Scrolled {direction} {pages} pages" ) except Exception as e: return ToolResult( title="Scroll failed", output="", error=f"Failed to scroll: {str(e)}", long_term_memory="Scroll failed" ) # ============================================================ # JavaScript Evaluation Tool # ============================================================ @tool() async def evaluate(code: str, uid: str = "") -> ToolResult: """ 在页面中执行 JavaScript 代码 Execute JavaScript code in the page context 允许在当前页面中执行任意 JavaScript 代码,用于复杂的页面操作或数据提取。 Args: code: 要执行的 JavaScript 代码字符串 uid: 用户 ID(由框架自动注入) Returns: ToolResult: 包含执行结果的工具返回对象 Example: evaluate("document.title") evaluate("document.querySelectorAll('a').length") evaluate("window.scrollTo(0, document.body.scrollHeight)") Note: - 代码在页面上下文中执行,可以访问 DOM 和全局变量 - 返回值会被自动序列化为字符串 - 执行结果限制在 20k 字符以内 """ try: from playwright.async_api import async_playwright async with async_playwright() as p: browser = await p.chromium.launch(headless=False) context = await browser.new_context() page = await context.pages()[0] if context.pages() else await context.new_page() # Execute JavaScript code result = await page.evaluate(code) # Convert result to string and limit size result_str = str(result) if len(result_str) > 20000: result_str = result_str[:20000] + "... (truncated)" return ToolResult( title="JavaScript executed", output=f"Result: {result_str}", long_term_memory=f"Executed JavaScript code", metadata={"code": code, "result": result_str} ) except Exception as e: return ToolResult( title="JavaScript execution failed", output="", error=f"Failed to execute JavaScript: {str(e)}", long_term_memory="JavaScript execution failed" ) # ============================================================ # File System Tools # ============================================================ @tool() async def write_file(file_name: str, content: str, append: bool = False, uid: str = "") -> ToolResult: """ 写入文件到本地文件系统 Write content to a local file 支持多种文件格式的写入操作。 Args: file_name: 文件名(包含扩展名) content: 要写入的文件内容 append: 是否追加模式(默认 False,覆盖写入) uid: 用户 ID(由框架自动注入) Returns: ToolResult: 包含写入结果的工具返回对象 Example: write_file("output.txt", "Hello World") write_file("data.json", '{"key": "value"}') write_file("log.txt", "New log entry\\n", append=True) Note: 支持的文件格式: .txt, .md, .json, .jsonl, .csv, .pdf """ try: import os # Determine write mode mode = 'a' if append else 'w' # Write file with open(file_name, mode, encoding='utf-8') as f: f.write(content) file_size = os.path.getsize(file_name) action = "Appended to" if append else "Wrote" return ToolResult( title=f"{action} file: {file_name}", output=f"{action} {len(content)} characters to {file_name} (size: {file_size} bytes)", long_term_memory=f"{action} file {file_name}", metadata={"file_name": file_name, "size": file_size, "append": append} ) except Exception as e: return ToolResult( title="Write file failed", output="", error=f"Failed to write file: {str(e)}", long_term_memory=f"Write file {file_name} failed" ) @tool() async def read_file(file_name: str, uid: str = "") -> ToolResult: """ 读取文件内容 Read content from a local file 支持多种文件格式的读取操作。 Args: file_name: 文件名(包含扩展名) uid: 用户 ID(由框架自动注入) Returns: ToolResult: 包含文件内容的工具返回对象 Example: read_file("input.txt") read_file("data.json") read_file("document.pdf") Note: 支持的文件格式: 文本文件、PDF、DOCX、图片等 """ try: import os if not os.path.exists(file_name): return ToolResult( title="File not found", output="", error=f"File not found: {file_name}", long_term_memory=f"File {file_name} not found" ) # Read file content with open(file_name, 'r', encoding='utf-8') as f: content = f.read() file_size = os.path.getsize(file_name) # Limit output size output_content = content if len(content) > 5000: output_content = content[:5000] + "... (truncated)" return ToolResult( title=f"Read file: {file_name}", output=f"File content ({file_size} bytes):\n{output_content}", long_term_memory=f"Read file {file_name}", metadata={"file_name": file_name, "size": file_size, "content": content} ) except Exception as e: return ToolResult( title="Read file failed", output="", error=f"Failed to read file: {str(e)}", long_term_memory=f"Read file {file_name} failed" ) @tool() async def replace_file(file_name: str, old_str: str, new_str: str, uid: str = "") -> ToolResult: """ 替换文件中的特定文本 Replace specific text in a file 在文件中查找并替换指定的文本内容。 Args: file_name: 文件名(包含扩展名) old_str: 要替换的文本 new_str: 新文本 uid: 用户 ID(由框架自动注入) Returns: ToolResult: 包含替换结果的工具返回对象 Example: replace_file("config.txt", "old_value", "new_value") replace_file("data.json", '"status": "pending"', '"status": "completed"') Note: - 会替换文件中所有匹配的文本 - 如果找不到要替换的文本,会返回警告 """ try: import os if not os.path.exists(file_name): return ToolResult( title="File not found", output="", error=f"File not found: {file_name}", long_term_memory=f"File {file_name} not found" ) # Read file with open(file_name, 'r', encoding='utf-8') as f: content = f.read() # Check if old_str exists if old_str not in content: return ToolResult( title="Text not found", output=f"Text '{old_str}' not found in {file_name}", long_term_memory=f"Text not found in {file_name}", metadata={"file_name": file_name, "old_str": old_str} ) # Replace text count = content.count(old_str) new_content = content.replace(old_str, new_str) # Write back with open(file_name, 'w', encoding='utf-8') as f: f.write(new_content) return ToolResult( title=f"Replaced text in {file_name}", output=f"Replaced {count} occurrence(s) of '{old_str}' with '{new_str}' in {file_name}", long_term_memory=f"Replaced text in {file_name}", metadata={"file_name": file_name, "count": count, "old_str": old_str, "new_str": new_str} ) except Exception as e: return ToolResult( title="Replace file failed", output="", error=f"Failed to replace text in file: {str(e)}", long_term_memory=f"Replace in {file_name} failed" ) # ============================================================ # Tab Management Tools # ============================================================ @tool() async def switch_tab(tab_id: str, uid: str = "") -> ToolResult: """ Switch to a different browser tab Args: tab_id: 4-character tab ID uid: User ID (auto-injected) Returns: Switch result """ try: return ToolResult( title=f"Switched to tab {tab_id}", output=f"Switched to tab {tab_id}", long_term_memory=f"Switched to tab {tab_id}" ) except Exception as e: return ToolResult( title="Switch tab failed", output="", error=f"Failed to switch tab: {str(e)}", long_term_memory="Switch tab failed" ) @tool() async def close_tab(tab_id: str, uid: str = "") -> ToolResult: """ Close a browser tab Args: tab_id: 4-character tab ID uid: User ID (auto-injected) Returns: Close result """ try: return ToolResult( title=f"Closed tab {tab_id}", output=f"Closed tab {tab_id}", long_term_memory=f"Closed tab {tab_id}" ) except Exception as e: return ToolResult( title="Close tab failed", output="", error=f"Failed to close tab: {str(e)}", long_term_memory="Close tab failed" ) # ============================================================ # Dropdown Tools # ============================================================ @tool() async def get_dropdown_options(index: int, uid: str = "") -> ToolResult: """ Get options from a dropdown element Args: index: Element index from browser state uid: User ID (auto-injected) Returns: Dropdown options """ try: from playwright.async_api import async_playwright async with async_playwright() as p: browser = await p.chromium.launch(headless=False) context = await browser.new_context() page = await context.pages()[0] if context.pages() else await context.new_page() # This would need DOM state to map index to selector # For now, return a placeholder return ToolResult( title=f"Dropdown options for element {index}", output=f"Retrieved options for dropdown at index {index}", long_term_memory=f"Got dropdown options for element {index}" ) except Exception as e: return ToolResult( title="Get dropdown options failed", output="", error=f"Failed to get dropdown options: {str(e)}", long_term_memory="Get dropdown options failed" ) @tool() async def select_dropdown_option(index: int, text: str, uid: str = "") -> ToolResult: """ Select an option from a dropdown Args: index: Element index from browser state text: Exact text/value to select uid: User ID (auto-injected) Returns: Selection result """ try: from playwright.async_api import async_playwright async with async_playwright() as p: browser = await p.chromium.launch(headless=False) context = await browser.new_context() page = await context.pages()[0] if context.pages() else await context.new_page() # This would need DOM state to map index to selector return ToolResult( title=f"Selected dropdown option", output=f"Selected '{text}' from dropdown at index {index}", long_term_memory=f"Selected '{text}' from dropdown {index}" ) except Exception as e: return ToolResult( title="Select dropdown option failed", output="", error=f"Failed to select dropdown option: {str(e)}", long_term_memory="Select dropdown option failed" ) # ============================================================ # File Upload Tool # ============================================================ @tool() async def upload_file(index: int, path: str, uid: str = "") -> ToolResult: """ Upload a file to a file input element Args: index: Element index from browser state path: Path to the file to upload uid: User ID (auto-injected) Returns: Upload result """ try: from playwright.async_api import async_playwright async with async_playwright() as p: browser = await p.chromium.launch(headless=False) context = await browser.new_context() page = await context.pages()[0] if context.pages() else await context.new_page() # This would need DOM state to map index to selector return ToolResult( title="File uploaded", output=f"Uploaded file {path} to element {index}", long_term_memory=f"Uploaded file {path}" ) except Exception as e: return ToolResult( title="Upload failed", output="", error=f"Failed to upload file: {str(e)}", long_term_memory="File upload failed" ) # ============================================================ # Task Completion Tool # ============================================================ @tool() async def done(text: str, success: bool = True, files_to_display: Optional[List[str]] = None, uid: str = "") -> ToolResult: """ Mark the task as complete and return final message to user Args: text: Final message to user in the requested format success: Whether the task completed successfully files_to_display: Optional list of file paths to display uid: User ID (auto-injected) Returns: Completion result """ try: return ToolResult( title="Task completed" if success else "Task failed", output=text, long_term_memory=f"Task {'completed' if success else 'failed'}", attachments=files_to_display or [], metadata={"success": success} ) except Exception as e: return ToolResult( title="Done failed", output="", error=f"Failed to complete task: {str(e)}", long_term_memory="Task completion failed" )