""" Browser-Use 原生工具适配器 Native Browser-Use Tools Adapter 直接使用 browser-use 的原生类(BrowserSession, Tools)实现所有浏览器操作工具。 不依赖 Playwright,完全基于 CDP 协议。 核心特性: 1. 浏览器会话持久化 - 只启动一次浏览器 2. 状态自动保持 - 登录状态、Cookie、LocalStorage 等 3. 完整的底层访问 - 可以直接使用 CDP 协议 4. 性能优异 - 避免频繁创建/销毁浏览器实例 使用方法: 1. 在 Agent 初始化时调用 init_browser_session() 2. 使用各个工具函数执行浏览器操作 3. 任务结束时调用 cleanup_browser_session() """ import sys import os import json import asyncio from typing import Optional, List, Dict, Any, Tuple from pathlib import Path from urllib.parse import urlparse # 将项目根目录添加到 Python 路径 sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) # 导入框架的工具装饰器和结果类 from agent.tools import tool, ToolResult from agent.tools.builtin.browser.sync_mysql_help import mysql # 导入 browser-use 的核心类 from browser_use import BrowserSession, BrowserProfile from browser_use.tools.service import Tools from browser_use.agent.views import ActionResult from browser_use.filesystem.file_system import FileSystem # ============================================================ # 全局浏览器会话管理 # ============================================================ # 全局变量:浏览器会话和工具实例 _browser_session: Optional[BrowserSession] = None _browser_tools: Optional[Tools] = None _file_system: Optional[FileSystem] = None async def init_browser_session( headless: bool = False, user_data_dir: Optional[str] = None, profile_name: str = "default", browser_profile: Optional[BrowserProfile] = None, use_cloud: bool = False, **kwargs ) -> tuple[BrowserSession, Tools]: """ 初始化全局浏览器会话 Args: headless: 是否无头模式 user_data_dir: 用户数据目录(用于保存登录状态) profile_name: 配置文件名称 browser_profile: BrowserProfile 对象(用于预设 cookies 等) use_cloud: 是否使用云浏览器(默认 False,使用本地浏览器) **kwargs: 其他 BrowserSession 参数 Returns: (BrowserSession, Tools) 元组 """ global _browser_session, _browser_tools, _file_system if _browser_session is not None: return _browser_session, _browser_tools # 设置用户数据目录(持久化登录状态) if user_data_dir is None and profile_name and not use_cloud: user_data_dir = str(Path.home() / ".browser_use" / "profiles" / profile_name) Path(user_data_dir).mkdir(parents=True, exist_ok=True) # 创建浏览器会话 session_params = { "headless": headless, } if use_cloud: # 云浏览器模式 session_params["use_cloud"] = True print("🌐 使用云浏览器模式") else: # 本地浏览器模式 session_params["is_local"] = True # macOS 上显式指定 Chrome 路径 import platform if platform.system() == "Darwin": # macOS chrome_path = "/Applications/Google Chrome.app/Contents/MacOS/Google Chrome" if Path(chrome_path).exists(): session_params["executable_path"] = chrome_path # 只在有值时才添加 user_data_dir if user_data_dir: session_params["user_data_dir"] = user_data_dir # 只在有值时才添加 browser_profile if browser_profile: session_params["browser_profile"] = browser_profile # 合并其他参数 session_params.update(kwargs) _browser_session = BrowserSession(**session_params) # 启动浏览器 await _browser_session.start() # 创建工具实例 _browser_tools = Tools() # 创建文件系统实例(用于文件操作) base_dir = Path.cwd() / ".browser_use_files" base_dir.mkdir(parents=True, exist_ok=True) _file_system = FileSystem(base_dir=str(base_dir)) return _browser_session, _browser_tools async def get_browser_session() -> tuple[BrowserSession, Tools]: """ 获取当前浏览器会话,如果不存在则自动创建 Returns: (BrowserSession, Tools) 元组 """ global _browser_session, _browser_tools if _browser_session is None: await init_browser_session() return _browser_session, _browser_tools async def cleanup_browser_session(): """ 清理浏览器会话 优雅地停止浏览器但保留会话状态 """ global _browser_session, _browser_tools, _file_system if _browser_session is not None: await _browser_session.stop() _browser_session = None _browser_tools = None _file_system = None async def kill_browser_session(): """ 强制终止浏览器会话 完全关闭浏览器进程 """ global _browser_session, _browser_tools, _file_system if _browser_session is not None: await _browser_session.kill() _browser_session = None _browser_tools = None _file_system = None # ============================================================ # 辅助函数:ActionResult 转 ToolResult # ============================================================ def action_result_to_tool_result(result: ActionResult, title: str = None) -> ToolResult: """ 将 browser-use 的 ActionResult 转换为框架的 ToolResult Args: result: browser-use 的 ActionResult title: 可选的标题(如果不提供则从 result 推断) Returns: ToolResult """ if result.error: return ToolResult( title=title or "操作失败", output="", error=result.error, long_term_memory=result.long_term_memory or result.error ) return ToolResult( title=title or "操作成功", output=result.extracted_content or "", long_term_memory=result.long_term_memory or result.extracted_content or "", metadata=result.metadata or {} ) def _cookie_domain_for_type(cookie_type: str, url: str) -> Tuple[str, str]: if cookie_type: key = cookie_type.lower() if key in {"xiaohongshu", "xhs"}: return ".xiaohongshu.com", "https://www.xiaohongshu.com" parsed = urlparse(url or "") domain = parsed.netloc or "" domain = domain.replace("www.", "") if domain: domain = f".{domain}" base_url = f"{parsed.scheme}://{parsed.netloc}" if parsed.scheme and parsed.netloc else url return domain, base_url def _parse_cookie_string(cookie_str: str, domain: str, url: str) -> List[Dict[str, Any]]: cookies: List[Dict[str, Any]] = [] if not cookie_str: return cookies parts = cookie_str.split(";") for part in parts: if not part: continue if "=" not in part: continue name, value = part.split("=", 1) cookie = { "name": str(name).strip(), "value": str(value).strip(), "domain": domain, "path": "/", "expires": -1, "httpOnly": False, "secure": True, "sameSite": "None" } if url: cookie["url"] = url cookies.append(cookie) return cookies def _normalize_cookies(cookie_value: Any, domain: str, url: str) -> List[Dict[str, Any]]: if cookie_value is None: return [] if isinstance(cookie_value, list): return cookie_value if isinstance(cookie_value, dict): if "cookies" in cookie_value: return _normalize_cookies(cookie_value.get("cookies"), domain, url) if "name" in cookie_value and "value" in cookie_value: return [cookie_value] return [] if isinstance(cookie_value, (bytes, bytearray)): cookie_value = cookie_value.decode("utf-8", errors="ignore") if isinstance(cookie_value, str): text = cookie_value.strip() if not text: return [] try: parsed = json.loads(text) except Exception: parsed = None if parsed is not None: return _normalize_cookies(parsed, domain, url) return _parse_cookie_string(text, domain, url) return [] def _extract_cookie_value(row: Optional[Dict[str, Any]]) -> Any: if not row: return None # 优先使用 cookies 字段 if "cookies" in row: return row["cookies"] # 兼容其他可能的字段名 for key, value in row.items(): if "cookie" in key.lower(): return value return None def _fetch_cookie_row(cookie_type: str) -> Optional[Dict[str, Any]]: if not cookie_type: return None try: return mysql.fetchone( "select * from agent_channel_cookies where type=%s limit 1", (cookie_type,) ) except Exception: return None def _fetch_profile_id(cookie_type: str) -> Optional[str]: """从数据库获取 cloud_profile_id""" if not cookie_type: return None try: row = mysql.fetchone( "select profileId from agent_channel_cookies where type=%s limit 1", (cookie_type,) ) if row and "profileId" in row: return row["profileId"] return None except Exception: return None # ============================================================ # 导航类工具 (Navigation Tools) # ============================================================ @tool() async def navigate_to_url(url: str, new_tab: bool = False, uid: str = "") -> ToolResult: """ 导航到指定的 URL Navigate to a specific URL 使用 browser-use 的原生导航功能,支持在新标签页打开。 Args: url: 要访问的 URL 地址 new_tab: 是否在新标签页中打开(默认 False) uid: 用户 ID(由框架自动注入) Returns: ToolResult: 包含导航结果的工具返回对象 Example: navigate_to_url("https://www.baidu.com") navigate_to_url("https://www.google.com", new_tab=True) """ try: browser, tools = await get_browser_session() # 使用 browser-use 的 navigate 工具 result = await tools.navigate( url=url, new_tab=new_tab, browser_session=browser ) return action_result_to_tool_result(result, f"导航到 {url}") except Exception as e: return ToolResult( title="导航失败", output="", error=f"Failed to navigate to {url}: {str(e)}", long_term_memory=f"导航到 {url} 失败" ) @tool() async def search_web(query: str, engine: str = "google", uid: str = "") -> ToolResult: """ 使用搜索引擎搜索 Search the web using a search engine Args: query: 搜索关键词 engine: 搜索引擎 (google, duckduckgo, bing) - 默认: google uid: 用户 ID(由框架自动注入) Returns: ToolResult: 搜索结果 Example: search_web("Python async programming", engine="google") """ try: browser, tools = await get_browser_session() # 使用 browser-use 的 search 工具 result = await tools.search( query=query, engine=engine, browser_session=browser ) return action_result_to_tool_result(result, f"搜索: {query}") except Exception as e: return ToolResult( title="搜索失败", output="", error=f"Search failed: {str(e)}", long_term_memory=f"搜索 '{query}' 失败" ) @tool() async def go_back(uid: str = "") -> ToolResult: """ 返回到上一个页面 Go back to the previous page 模拟浏览器的"后退"按钮功能。 Args: uid: 用户 ID(由框架自动注入) Returns: ToolResult: 包含返回操作结果的工具返回对象 """ try: browser, tools = await get_browser_session() result = await tools.go_back(browser_session=browser) return action_result_to_tool_result(result, "返回上一页") except Exception as e: return ToolResult( title="返回失败", output="", error=f"Failed to go back: {str(e)}", long_term_memory="返回上一页失败" ) @tool() async def wait(seconds: int = 3, uid: str = "") -> ToolResult: """ 等待指定的秒数 Wait for a specified number of seconds 用于等待页面加载、动画完成或其他异步操作。 Args: seconds: 等待时间(秒),最大30秒 uid: 用户 ID(由框架自动注入) Returns: ToolResult: 包含等待操作结果的工具返回对象 Example: wait(5) # 等待5秒 """ try: browser, tools = await get_browser_session() result = await tools.wait(seconds=seconds, browser_session=browser) return action_result_to_tool_result(result, f"等待 {seconds} 秒") except Exception as e: return ToolResult( title="等待失败", output="", error=f"Failed to wait: {str(e)}", long_term_memory="等待失败" ) # ============================================================ # 元素交互工具 (Element Interaction Tools) # ============================================================ @tool() async def click_element(index: int, uid: str = "") -> ToolResult: """ 通过索引点击页面元素 Click an element by index Args: index: 元素索引(从浏览器状态中获取) uid: 用户 ID(由框架自动注入) Returns: ToolResult: 包含点击操作结果的工具返回对象 Example: click_element(index=5) Note: 需要先通过 get_selector_map 获取页面元素索引 """ try: browser, tools = await get_browser_session() result = await tools.click( index=index, browser_session=browser ) return action_result_to_tool_result(result, f"点击元素 {index}") except Exception as e: return ToolResult( title="点击失败", output="", error=f"Failed to click element {index}: {str(e)}", long_term_memory=f"点击元素 {index} 失败" ) @tool() async def input_text(index: int, text: str, clear: bool = True, uid: str = "") -> ToolResult: """ 在指定元素中输入文本 Input text into an element Args: index: 元素索引(从浏览器状态中获取) text: 要输入的文本内容 clear: 是否先清除现有文本(默认 True) uid: 用户 ID(由框架自动注入) Returns: ToolResult: 包含输入操作结果的工具返回对象 Example: input_text(index=0, text="Hello World", clear=True) """ try: browser, tools = await get_browser_session() result = await tools.input( index=index, text=text, clear=clear, browser_session=browser ) return action_result_to_tool_result(result, f"输入文本到元素 {index}") except Exception as e: return ToolResult( title="输入失败", output="", error=f"Failed to input text into element {index}: {str(e)}", long_term_memory=f"输入文本失败" ) @tool() async def send_keys(keys: str, uid: str = "") -> ToolResult: """ 发送键盘按键或快捷键 Send keyboard keys or shortcuts 支持发送单个按键、组合键和快捷键。 Args: keys: 要发送的按键字符串 - 单个按键: "Enter", "Escape", "PageDown", "Tab" - 组合键: "Control+o", "Shift+Tab", "Alt+F4" - 功能键: "F1", "F2", ..., "F12" uid: 用户 ID(由框架自动注入) Returns: ToolResult: 包含按键操作结果的工具返回对象 Example: send_keys("Enter") send_keys("Control+A") """ try: browser, tools = await get_browser_session() result = await tools.send_keys( keys=keys, browser_session=browser ) return action_result_to_tool_result(result, f"发送按键: {keys}") except Exception as e: return ToolResult( title="发送按键失败", output="", error=f"Failed to send keys: {str(e)}", long_term_memory="发送按键失败" ) @tool() async def upload_file(index: int, path: str, uid: str = "") -> ToolResult: """ 上传文件到文件输入元素 Upload a file to a file input element Args: index: 文件输入框的元素索引 path: 要上传的文件路径(绝对路径) uid: 用户 ID(由框架自动注入) Returns: ToolResult: 包含上传操作结果的工具返回对象 Example: upload_file(index=7, path="/path/to/file.pdf") Note: 文件必须存在且路径必须是绝对路径 """ try: browser, tools = await get_browser_session() result = await tools.upload_file( index=index, path=path, browser_session=browser, available_file_paths=[path], file_system=_file_system ) return action_result_to_tool_result(result, f"上传文件: {path}") except Exception as e: return ToolResult( title="上传失败", output="", error=f"Failed to upload file: {str(e)}", long_term_memory=f"上传文件 {path} 失败" ) # ============================================================ # 滚动和视图工具 (Scroll & View Tools) # ============================================================ @tool() async def scroll_page(down: bool = True, pages: float = 1.0, index: Optional[int] = None, uid: str = "") -> ToolResult: """ 滚动页面或元素 Scroll the page or a specific element Args: down: True 向下滚动,False 向上滚动 pages: 滚动页数(0.5=半页,1=全页,10=滚动到底部/顶部) index: 可选,滚动特定元素(如下拉框内部) uid: 用户 ID(由框架自动注入) Returns: ToolResult: 滚动结果 Example: scroll_page(down=True, pages=2.0) # 向下滚动2页 scroll_page(down=False, pages=1.0) # 向上滚动1页 """ try: browser, tools = await get_browser_session() result = await tools.scroll( down=down, pages=pages, index=index, browser_session=browser ) direction = "向下" if down else "向上" return action_result_to_tool_result(result, f"{direction}滚动 {pages} 页") except Exception as e: return ToolResult( title="滚动失败", output="", error=f"Failed to scroll: {str(e)}", long_term_memory="滚动失败" ) @tool() async def find_text(text: str, uid: str = "") -> ToolResult: """ 查找页面中的文本并滚动到该位置 Find text on the page and scroll to it 在页面中搜索指定的文本,找到后自动滚动到该位置。 Args: text: 要查找的文本内容 uid: 用户 ID(由框架自动注入) Returns: ToolResult: 包含查找结果的工具返回对象 Example: find_text("Privacy Policy") """ try: browser, tools = await get_browser_session() result = await tools.find_text( text=text, browser_session=browser ) return action_result_to_tool_result(result, f"查找文本: {text}") except Exception as e: return ToolResult( title="查找失败", output="", error=f"Failed to find text: {str(e)}", long_term_memory=f"查找文本 '{text}' 失败" ) @tool() async def screenshot(uid: str = "") -> ToolResult: """ 请求在下次观察中包含页面截图 Request a screenshot to be included in the next observation 用于视觉检查页面状态,帮助理解页面布局和内容。 Args: uid: 用户 ID(由框架自动注入) Returns: ToolResult: 包含截图请求结果的工具返回对象 Example: screenshot() Note: 截图会在下次页面观察时自动包含在结果中。 """ try: browser, tools = await get_browser_session() result = await tools.screenshot(browser_session=browser) return action_result_to_tool_result(result, "截图请求") except Exception as e: return ToolResult( title="截图失败", output="", error=f"Failed to capture screenshot: {str(e)}", long_term_memory="截图失败" ) # ============================================================ # 标签页管理工具 (Tab Management Tools) # ============================================================ @tool() async def switch_tab(tab_id: str, uid: str = "") -> ToolResult: """ 切换到指定标签页 Switch to a different browser tab Args: tab_id: 4字符标签ID(target_id 的最后4位) uid: 用户 ID(由框架自动注入) Returns: ToolResult: 切换结果 Example: switch_tab(tab_id="a3f2") """ try: browser, tools = await get_browser_session() normalized_tab_id = tab_id[-4:] if tab_id else tab_id result = await tools.switch( tab_id=normalized_tab_id, browser_session=browser ) return action_result_to_tool_result(result, f"切换到标签页 {normalized_tab_id}") except Exception as e: return ToolResult( title="切换标签页失败", output="", error=f"Failed to switch tab: {str(e)}", long_term_memory=f"切换到标签页 {tab_id} 失败" ) @tool() async def close_tab(tab_id: str, uid: str = "") -> ToolResult: """ 关闭指定标签页 Close a browser tab Args: tab_id: 4字符标签ID uid: 用户 ID(由框架自动注入) Returns: ToolResult: 关闭结果 Example: close_tab(tab_id="a3f2") """ try: browser, tools = await get_browser_session() normalized_tab_id = tab_id[-4:] if tab_id else tab_id result = await tools.close( tab_id=normalized_tab_id, browser_session=browser ) return action_result_to_tool_result(result, f"关闭标签页 {normalized_tab_id}") except Exception as e: return ToolResult( title="关闭标签页失败", output="", error=f"Failed to close tab: {str(e)}", long_term_memory=f"关闭标签页 {tab_id} 失败" ) # ============================================================ # 下拉框工具 (Dropdown Tools) # ============================================================ @tool() async def get_dropdown_options(index: int, uid: str = "") -> ToolResult: """ 获取下拉框的所有选项 Get options from a dropdown element Args: index: 下拉框的元素索引 uid: 用户 ID(由框架自动注入) Returns: ToolResult: 包含所有选项的结果 Example: get_dropdown_options(index=8) """ try: browser, tools = await get_browser_session() result = await tools.dropdown_options( index=index, browser_session=browser ) return action_result_to_tool_result(result, f"获取下拉框选项: {index}") except Exception as e: return ToolResult( title="获取下拉框选项失败", output="", error=f"Failed to get dropdown options: {str(e)}", long_term_memory=f"获取下拉框 {index} 选项失败" ) @tool() async def select_dropdown_option(index: int, text: str, uid: str = "") -> ToolResult: """ 选择下拉框选项 Select an option from a dropdown Args: index: 下拉框的元素索引 text: 要选择的选项文本(精确匹配) uid: 用户 ID(由框架自动注入) Returns: ToolResult: 选择结果 Example: select_dropdown_option(index=8, text="Option 2") """ try: browser, tools = await get_browser_session() result = await tools.select_dropdown( index=index, text=text, browser_session=browser ) return action_result_to_tool_result(result, f"选择下拉框选项: {text}") except Exception as e: return ToolResult( title="选择下拉框选项失败", output="", error=f"Failed to select dropdown option: {str(e)}", long_term_memory=f"选择选项 '{text}' 失败" ) # ============================================================ # 内容提取工具 (Content Extraction Tools) # ============================================================ @tool() async def extract_content(query: str, extract_links: bool = False, start_from_char: int = 0, uid: str = "") -> ToolResult: """ 使用 LLM 从页面提取结构化数据 Extract content from the current page using LLM Args: query: 提取查询(告诉 LLM 要提取什么内容) extract_links: 是否提取链接(默认 False,节省 token) start_from_char: 从哪个字符开始提取(用于分页提取大内容) uid: 用户 ID(由框架自动注入) Returns: ToolResult: 提取的内容 Example: extract_content(query="提取页面上所有产品的名称和价格", extract_links=True) Note: 需要配置 page_extraction_llm,否则会失败 支持分页提取,最大100k字符 """ try: browser, tools = await get_browser_session() # 注意:extract 需要 page_extraction_llm 参数 # 这里我们假设用户会在初始化时配置 LLM # 如果没有配置,会抛出异常 result = await tools.extract( query=query, extract_links=extract_links, start_from_char=start_from_char, browser_session=browser, page_extraction_llm=None, # 需要用户配置 file_system=_file_system ) return action_result_to_tool_result(result, f"提取内容: {query}") except Exception as e: return ToolResult( title="内容提取失败", output="", error=f"Failed to extract content: {str(e)}", long_term_memory=f"提取内容失败: {query}" ) @tool() async def get_page_html(uid: str = "") -> ToolResult: """ 获取当前页面的完整 HTML Get the full HTML of the current page 返回当前页面的完整 HTML 源代码。 Args: uid: 用户 ID(由框架自动注入) Returns: ToolResult: 包含页面 HTML 的工具返回对象 Example: get_page_html() Note: - 返回的是完整的 HTML 源代码 - 输出会被限制在 10000 字符以内(完整内容保存在 metadata 中) """ try: browser, tools = await get_browser_session() # 使用 CDP 获取页面 HTML cdp = await browser.get_or_create_cdp_session() # 获取页面内容 result = await cdp.cdp_client.send.Runtime.evaluate( params={'expression': 'document.documentElement.outerHTML'}, session_id=cdp.session_id ) html = result.get('result', {}).get('value', '') # 获取 URL 和标题 url = await browser.get_current_page_url() title_result = await cdp.cdp_client.send.Runtime.evaluate( params={'expression': 'document.title'}, session_id=cdp.session_id ) title = title_result.get('result', {}).get('value', '') # 限制输出大小 output_html = html if len(html) > 10000: output_html = html[:10000] + "... (truncated)" return ToolResult( title=f"获取 HTML: {url}", output=f"页面: {title}\nURL: {url}\n\nHTML:\n{output_html}", long_term_memory=f"获取 HTML: {url}", metadata={"url": url, "title": title, "html": html} ) except Exception as e: return ToolResult( title="获取 HTML 失败", output="", error=f"Failed to get page HTML: {str(e)}", long_term_memory="获取 HTML 失败" ) @tool() async def get_selector_map(uid: str = "") -> ToolResult: """ 获取当前页面的元素索引映射 Get the selector map of interactive elements on the current page 返回页面所有可交互元素的索引字典,用于后续的元素操作。 Args: uid: 用户 ID(由框架自动注入) Returns: ToolResult: 包含元素映射的工具返回对象 Example: get_selector_map() Note: 返回的索引可以用于 click_element, input_text 等操作 """ try: browser, tools = await get_browser_session() # 获取选择器映射 selector_map = await browser.get_selector_map() # 构建输出信息 elements_info = [] for index, node in list(selector_map.items())[:20]: # 只显示前20个 tag = node.tag_name attrs = node.attributes or {} text = attrs.get('aria-label') or attrs.get('placeholder') or attrs.get('value', '') elements_info.append(f"索引 {index}: <{tag}> {text[:50]}") output = f"找到 {len(selector_map)} 个交互元素\n\n" output += "\n".join(elements_info) if len(selector_map) > 20: output += f"\n... 还有 {len(selector_map) - 20} 个元素" return ToolResult( title="获取元素映射", output=output, long_term_memory=f"获取到 {len(selector_map)} 个交互元素", metadata={"selector_map": {k: str(v) for k, v in list(selector_map.items())[:100]}} ) except Exception as e: return ToolResult( title="获取元素映射失败", output="", error=f"Failed to get selector map: {str(e)}", long_term_memory="获取元素映射失败" ) # ============================================================ # JavaScript 执行工具 (JavaScript Tools) # ============================================================ @tool() async def evaluate(code: str, uid: str = "") -> ToolResult: """ 在页面中执行 JavaScript 代码 Execute JavaScript code in the page context 允许在当前页面中执行任意 JavaScript 代码,用于复杂的页面操作或数据提取。 Args: code: 要执行的 JavaScript 代码字符串 uid: 用户 ID(由框架自动注入) Returns: ToolResult: 包含执行结果的工具返回对象 Example: evaluate("document.title") evaluate("document.querySelectorAll('a').length") Note: - 代码在页面上下文中执行,可以访问 DOM 和全局变量 - 返回值会被自动序列化为字符串 - 执行结果限制在 20k 字符以内 """ try: browser, tools = await get_browser_session() result = await tools.evaluate( code=code, browser_session=browser ) return action_result_to_tool_result(result, "执行 JavaScript") except Exception as e: return ToolResult( title="JavaScript 执行失败", output="", error=f"Failed to execute JavaScript: {str(e)}", long_term_memory="JavaScript 执行失败" ) @tool() async def ensure_login_with_cookies(cookie_type: str, url: str = "https://www.xiaohongshu.com", uid: str = "") -> ToolResult: """ 检查登录状态并在需要时注入 cookies """ try: browser, tools = await get_browser_session() if url: await tools.navigate(url=url, browser_session=browser) await tools.wait(seconds=2, browser_session=browser) check_login_js = """ (function() { const loginBtn = document.querySelector('[class*="login"]') || document.querySelector('[href*="login"]') || Array.from(document.querySelectorAll('button, a')).find(el => (el.textContent || '').includes('登录')); const userInfo = document.querySelector('[class*="user"]') || document.querySelector('[class*="avatar"]'); return { needLogin: !!loginBtn && !userInfo, hasLoginBtn: !!loginBtn, hasUserInfo: !!userInfo }; })() """ result = await tools.evaluate(code=check_login_js, browser_session=browser) status_output = result.extracted_content if isinstance(status_output, str) and status_output.startswith("Result: "): status_output = status_output[8:] login_info: Dict[str, Any] = {} if isinstance(status_output, str): try: login_info = json.loads(status_output) except Exception: login_info = {} elif isinstance(status_output, dict): login_info = status_output if not login_info.get("needLogin"): output = json.dumps({"need_login": False}, ensure_ascii=False) return ToolResult( title="已登录", output=output, long_term_memory=output ) row = _fetch_cookie_row(cookie_type) cookie_value = _extract_cookie_value(row) if not cookie_value: output = json.dumps({"need_login": True, "cookies_count": 0}, ensure_ascii=False) return ToolResult( title="未找到 cookies", output=output, error="未找到 cookies", long_term_memory=output ) domain, base_url = _cookie_domain_for_type(cookie_type, url) cookies = _normalize_cookies(cookie_value, domain, base_url) if not cookies: output = json.dumps({"need_login": True, "cookies_count": 0}, ensure_ascii=False) return ToolResult( title="cookies 解析失败", output=output, error="cookies 解析失败", long_term_memory=output ) await browser._cdp_set_cookies(cookies) if url: await tools.navigate(url=url, browser_session=browser) await tools.wait(seconds=2, browser_session=browser) output = json.dumps({"need_login": True, "cookies_count": len(cookies)}, ensure_ascii=False) return ToolResult( title="已注入 cookies", output=output, long_term_memory=output ) except Exception as e: return ToolResult( title="登录检查失败", output="", error=str(e), long_term_memory="登录检查失败" ) # ============================================================ # 文件系统工具 (File System Tools) # ============================================================ @tool() async def write_file(file_name: str, content: str, append: bool = False, uid: str = "") -> ToolResult: """ 写入文件到本地文件系统 Write content to a local file 支持多种文件格式的写入操作。 Args: file_name: 文件名(包含扩展名) content: 要写入的文件内容 append: 是否追加模式(默认 False,覆盖写入) uid: 用户 ID(由框架自动注入) Returns: ToolResult: 包含写入结果的工具返回对象 Example: write_file("output.txt", "Hello World") write_file("data.json", '{"key": "value"}') Note: 支持的文件格式: .txt, .md, .json, .jsonl, .csv, .pdf """ try: browser, tools = await get_browser_session() result = await tools.write_file( file_name=file_name, content=content, append=append, file_system=_file_system ) return action_result_to_tool_result(result, f"写入文件: {file_name}") except Exception as e: return ToolResult( title="写入文件失败", output="", error=f"Failed to write file: {str(e)}", long_term_memory=f"写入文件 {file_name} 失败" ) @tool() async def read_file(file_name: str, uid: str = "") -> ToolResult: """ 读取文件内容 Read content from a local file 支持多种文件格式的读取操作。 Args: file_name: 文件名(包含扩展名) uid: 用户 ID(由框架自动注入) Returns: ToolResult: 包含文件内容的工具返回对象 Example: read_file("input.txt") read_file("data.json") Note: 支持的文件格式: 文本文件、PDF、DOCX、图片等 """ try: browser, tools = await get_browser_session() result = await tools.read_file( file_name=file_name, available_file_paths=[], file_system=_file_system ) return action_result_to_tool_result(result, f"读取文件: {file_name}") except Exception as e: return ToolResult( title="读取文件失败", output="", error=f"Failed to read file: {str(e)}", long_term_memory=f"读取文件 {file_name} 失败" ) @tool() async def replace_file(file_name: str, old_str: str, new_str: str, uid: str = "") -> ToolResult: """ 替换文件中的特定文本 Replace specific text in a file 在文件中查找并替换指定的文本内容。 Args: file_name: 文件名(包含扩展名) old_str: 要替换的文本 new_str: 新文本 uid: 用户 ID(由框架自动注入) Returns: ToolResult: 包含替换结果的工具返回对象 Example: replace_file("config.txt", "old_value", "new_value") Note: - 会替换文件中所有匹配的文本 - 如果找不到要替换的文本,会返回警告 """ try: browser, tools = await get_browser_session() result = await tools.replace_file( file_name=file_name, old_str=old_str, new_str=new_str, file_system=_file_system ) return action_result_to_tool_result(result, f"替换文件内容: {file_name}") except Exception as e: return ToolResult( title="替换文件失败", output="", error=f"Failed to replace file content: {str(e)}", long_term_memory=f"替换文件 {file_name} 失败" ) # ============================================================ # 等待用户操作工具 (Wait for User Action) # ============================================================ @tool() async def wait_for_user_action(message: str = "Please complete the action in browser", timeout: int = 300, uid: str = "") -> ToolResult: """ 等待用户在浏览器中完成操作(如登录) Wait for user to complete an action in the browser (e.g., login) 暂停自动化流程,等待用户手动完成某些操作(如登录、验证码等)。 Args: message: 提示用户需要完成的操作 timeout: 最大等待时间(秒),默认 300 秒(5 分钟) uid: 用户 ID(由框架自动注入) Returns: ToolResult: 包含等待结果的工具返回对象 Example: wait_for_user_action("Please login to Xiaohongshu", timeout=180) wait_for_user_action("Please complete the CAPTCHA", timeout=60) Note: - 用户需要在浏览器窗口中手动完成操作 - 完成后按回车键继续 - 超时后会自动继续执行 """ try: import asyncio print(f"\n{'='*60}") print(f"⏸️ WAITING FOR USER ACTION") print(f"{'='*60}") print(f"📝 {message}") print(f"⏱️ Timeout: {timeout} seconds") print(f"\n👉 Please complete the action in the browser window") print(f"👉 Press ENTER when done, or wait for timeout") print(f"{'='*60}\n") # Wait for user input or timeout try: loop = asyncio.get_event_loop() # Wait for either user input or timeout await asyncio.wait_for( loop.run_in_executor(None, input), timeout=timeout ) return ToolResult( title="用户操作完成", output=f"User completed: {message}", long_term_memory=f"用户完成操作: {message}" ) except asyncio.TimeoutError: return ToolResult( title="用户操作超时", output=f"Timeout waiting for: {message}", long_term_memory=f"等待用户操作超时: {message}" ) except Exception as e: return ToolResult( title="等待用户操作失败", output="", error=f"Failed to wait for user action: {str(e)}", long_term_memory="等待用户操作失败" ) # ============================================================ # 任务完成工具 (Task Completion) # ============================================================ @tool() async def done(text: str, success: bool = True, files_to_display: Optional[List[str]] = None, uid: str = "") -> ToolResult: """ 标记任务完成并返回最终消息 Mark the task as complete and return final message to user Args: text: 给用户的最终消息 success: 任务是否成功完成 files_to_display: 可选的要显示的文件路径列表 uid: 用户 ID(由框架自动注入) Returns: ToolResult: 完成结果 Example: done("任务已完成,提取了10个产品信息", success=True) """ try: browser, tools = await get_browser_session() result = await tools.done( text=text, success=success, files_to_display=files_to_display, file_system=_file_system ) return action_result_to_tool_result(result, "任务完成") except Exception as e: return ToolResult( title="标记任务完成失败", output="", error=f"Failed to complete task: {str(e)}", long_term_memory="标记任务完成失败" ) # ============================================================ # 容器管理工具 (Container Management Tools) # ============================================================ import aiohttp async def create_container(url: str, account_name: str = "liuwenwu") -> Dict[str, Any]: """ 创建浏览器容器并导航到指定URL 按照 test.md 的要求: 1.1 调用接口创建容器 1.2 调用接口创建窗口并导航到URL Args: url: 要导航的URL地址 account_name: 账户名称 Returns: 包含容器信息的字典: - success: 是否成功 - container_id: 容器ID - vnc: VNC访问URL - cdp: CDP协议URL(用于浏览器连接) - connection_id: 窗口连接ID - error: 错误信息(如果失败) """ result = { "success": False, "container_id": None, "vnc": None, "cdp": None, "connection_id": None, "error": None } try: async with aiohttp.ClientSession() as session: # 步骤1.1: 创建容器 print("📦 步骤1.1: 创建容器...") create_url = "http://47.84.182.56:8200/api/v1/container/create" create_payload = { "auto_remove": True, "need_port_binding": True, "max_lifetime_seconds": 900 } async with session.post(create_url, json=create_payload) as resp: if resp.status != 200: raise RuntimeError(f"创建容器失败: HTTP {resp.status}") create_result = await resp.json() if create_result.get("code") != 0: raise RuntimeError(f"创建容器失败: {create_result.get('msg')}") data = create_result.get("data", {}) result["container_id"] = data.get("container_id") result["vnc"] = data.get("vnc") result["cdp"] = data.get("cdp") print(f"✅ 容器创建成功") print(f" Container ID: {result['container_id']}") print(f" VNC: {result['vnc']}") print(f" CDP: {result['cdp']}") # 等待容器内的浏览器启动 print(f"\n⏳ 等待容器内浏览器启动...") await asyncio.sleep(5) # 步骤1.2: 创建页面并导航 print(f"\n📱 步骤1.2: 创建页面并导航到 {url}...") page_create_url = "http://47.84.182.56:8200/api/v1/browser/page/create" page_payload = { "container_id": result["container_id"], "url": url, "account_name": account_name, "need_wait": True, "timeout": 30 } # 重试机制:最多尝试3次 max_retries = 3 page_created = False last_error = None for attempt in range(max_retries): try: if attempt > 0: print(f" 重试 {attempt + 1}/{max_retries}...") await asyncio.sleep(3) # 重试前等待 async with session.post(page_create_url, json=page_payload, timeout=aiohttp.ClientTimeout(total=60)) as resp: if resp.status != 200: response_text = await resp.text() last_error = f"HTTP {resp.status}: {response_text[:200]}" continue page_result = await resp.json() if page_result.get("code") != 0: last_error = f"{page_result.get('msg')}" continue page_data = page_result.get("data", {}) result["connection_id"] = page_data.get("connection_id") result["success"] = True page_created = True print(f"✅ 页面创建成功") print(f" Connection ID: {result['connection_id']}") break except asyncio.TimeoutError: last_error = "请求超时" continue except aiohttp.ClientError as e: last_error = f"网络错误: {str(e)}" continue except Exception as e: last_error = f"未知错误: {str(e)}" continue if not page_created: raise RuntimeError(f"创建页面失败(尝试{max_retries}次后): {last_error}") except Exception as e: result["error"] = str(e) print(f"❌ 错误: {str(e)}") return result # ============================================================ # 导出所有工具函数(供外部使用) # ============================================================ __all__ = [ # 会话管理 'init_browser_session', 'get_browser_session', 'cleanup_browser_session', 'kill_browser_session', # 导航类工具 'navigate_to_url', 'search_web', 'go_back', 'wait', # 元素交互工具 'click_element', 'input_text', 'send_keys', 'upload_file', # 滚动和视图工具 'scroll_page', 'find_text', 'screenshot', # 标签页管理工具 'switch_tab', 'close_tab', # 下拉框工具 'get_dropdown_options', 'select_dropdown_option', # 内容提取工具 'extract_content', 'get_page_html', 'get_selector_map', # JavaScript 执行工具 'evaluate', 'ensure_login_with_cookies', # 文件系统工具 'write_file', 'read_file', 'replace_file', # 等待用户操作 'wait_for_user_action', # 任务完成 'done', # 容器管理 'create_container', ]