| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303 |
- """
- Browser-Use Tools Adapter
- 浏览器工具适配器
- 将 browser-use 库的工具适配到 Agent 框架中。
- 基于 browser-use 的 Action 定义实现了以下工具:
- 导航类工具 (Navigation Tools):
- - navigate_to_url: 页面导航 (NavigateAction)
- - go_back: 返回上一页 (GoBackEvent)
- - search_web: 网页搜索 (SearchAction)
- 元素交互工具 (Element Interaction Tools):
- - click_element: 元素点击 (ClickElementAction)
- - input_text: 文本输入 (InputTextAction)
- - send_keys: 键盘操作 (SendKeysAction)
- 内容提取工具 (Content Extraction Tools):
- - extract_content: 内容提取 (ExtractAction)
- 滚动和视图工具 (Scroll & View Tools):
- - scroll_page: 页面滚动 (ScrollAction)
- - find_text: 查找文本并滚动
- - screenshot: 页面截图
- 标签页管理工具 (Tab Management Tools):
- - switch_tab: 标签切换 (SwitchTabAction)
- - close_tab: 关闭标签 (CloseTabAction)
- 下拉框工具 (Dropdown Tools):
- - get_dropdown_options: 获取下拉选项 (GetDropdownOptionsAction)
- - select_dropdown_option: 选择下拉选项 (SelectDropdownOptionAction)
- 文件操作工具 (File Tools):
- - upload_file: 文件上传 (UploadFileAction)
- - write_file: 写入文件
- - read_file: 读取文件
- - replace_file: 替换文件内容
- JavaScript 执行工具 (JavaScript Tools):
- - evaluate: 执行 JavaScript 代码
- 任务完成工具 (Task Completion Tools):
- - done: 任务完成 (DoneAction)
- 等待工具 (Wait Tools):
- - wait: 等待指定秒数
- 所有工具都使用 @tool() 装饰器自动注册到框架的工具注册表中。
- """
- import sys
- import os
- from typing import Optional, List
- # 将项目根目录添加到 Python 路径
- # 这样可以正确导入 agent 模块
- sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
- # 导入框架的工具装饰器和结果类
- # tool: 用于注册工具的装饰器
- # ToolResult: 工具执行结果的标准返回格式
- from agent.tools import tool, ToolResult
- # ============================================================
- # 核心浏览器导航工具 (Core Browser Navigation Tools)
- # 对应 browser-use 的 NavigateAction 和 GoBackEvent
- # ============================================================
- @tool()
- async def navigate_to_url(url: str, new_tab: bool = False, uid: str = "") -> ToolResult:
- """
- 导航到指定的 URL
- Navigate to a specific URL
- 这个工具使用 Playwright 启动浏览器并导航到指定的网址。
- 可以选择在新标签页中打开,或在当前标签页中打开。
- Args:
- url: 要访问的 URL 地址
- new_tab: 是否在新标签页中打开(默认 False)
- uid: 用户 ID(由框架自动注入,工具内部使用)
- Returns:
- ToolResult: 包含导航结果的工具返回对象
- - title: 操作标题
- - output: 成功打开的页面标题
- - long_term_memory: 简短的操作记录(用于 LLM 长期记忆)
- - metadata: 包含 url、title、new_tab 的元数据
- Example:
- navigate_to_url("https://www.baidu.com")
- navigate_to_url("https://www.google.com", new_tab=True)
- """
- try:
- # 导入 Playwright 异步 API
- from playwright.async_api import async_playwright
- # 使用异步上下文管理器启动 Playwright
- async with async_playwright() as p:
- # 启动 Chromium 浏览器(headless=False 表示显示浏览器窗口)
- browser = await p.chromium.launch(headless=False)
- # 创建浏览器上下文(类似于一个独立的浏览器会话)
- context = await browser.new_context()
- # 根据 new_tab 参数决定是否创建新标签页
- if new_tab:
- page = await context.new_page()
- else:
- # 使用现有标签页,如果没有则创建新的
- page = await context.pages()[0] if context.pages() else await context.new_page()
- # 导航到指定 URL
- await page.goto(url)
- # 等待页面完全加载(网络空闲状态)
- await page.wait_for_load_state("networkidle")
- # 获取页面标题
- title = await page.title()
- # 返回成功结果
- return ToolResult(
- title=f"Navigated to {url}",
- output=f"Successfully opened page: {title}",
- long_term_memory=f"Navigated to {url}", # 简短记录,节省 token
- metadata={"url": url, "title": title, "new_tab": new_tab}
- )
- except Exception as e:
- # 捕获所有异常并返回错误结果
- return ToolResult(
- title="Navigation failed",
- output="",
- error=f"Failed to navigate to {url}: {str(e)}",
- long_term_memory=f"Navigation to {url} failed"
- )
- @tool()
- async def go_back(uid: str = "") -> ToolResult:
- """
- 返回到上一个页面
- Go back to the previous page
- 模拟浏览器的"后退"按钮功能。
- Args:
- uid: 用户 ID(由框架自动注入)
- Returns:
- ToolResult: 包含返回操作结果的工具返回对象
- Note:
- 如果当前页面是历史记录的第一页,此操作可能会失败。
- """
- try:
- from playwright.async_api import async_playwright
- async with async_playwright() as p:
- browser = await p.chromium.launch(headless=False)
- context = await browser.new_context()
- page = await context.pages()[0] if context.pages() else await context.new_page()
- # 执行后退操作
- await page.go_back()
- # 等待页面加载完成
- await page.wait_for_load_state("networkidle")
- return ToolResult(
- title="Went back",
- output="Successfully navigated back",
- long_term_memory="Navigated back to previous page"
- )
- except Exception as e:
- return ToolResult(
- title="Go back failed",
- output="",
- error=f"Failed to go back: {str(e)}",
- long_term_memory="Go back failed"
- )
- # ============================================================
- # 元素交互工具 (Element Interaction Tools)
- # 对应 browser-use 的 ClickElementAction, InputTextAction, SendKeysAction
- # ============================================================
- @tool()
- async def click_element(index: Optional[int] = None, coordinate_x: Optional[int] = None,
- coordinate_y: Optional[int] = None, uid: str = "") -> ToolResult:
- """
- 通过索引或坐标点击页面元素
- Click an element by index or coordinates
- 支持两种点击方式:
- 1. 通过坐标点击:提供 coordinate_x 和 coordinate_y
- 2. 通过元素索引点击:提供 index(需要配合 DOM 状态使用)
- Args:
- index: 元素索引(从浏览器状态中获取,1-based)
- coordinate_x: 相对于视口左边缘的水平坐标(像素)
- coordinate_y: 相对于视口顶部的垂直坐标(像素)
- uid: 用户 ID(由框架自动注入)
- Returns:
- ToolResult: 包含点击操作结果的工具返回对象
- Example:
- # 通过坐标点击
- click_element(coordinate_x=100, coordinate_y=200)
- # 通过索引点击
- click_element(index=5)
- Note:
- - 必须提供 index 或 (coordinate_x, coordinate_y) 中的一种
- - 坐标点击更可靠,索引点击需要维护 DOM 状态映射
- """
- try:
- from playwright.async_api import async_playwright
- async with async_playwright() as p:
- browser = await p.chromium.launch(headless=False)
- context = await browser.new_context()
- page = await context.pages()[0] if context.pages() else await context.new_page()
- # 方式1:通过坐标点击
- if coordinate_x is not None and coordinate_y is not None:
- await page.mouse.click(coordinate_x, coordinate_y)
- return ToolResult(
- title="Clicked coordinate",
- output=f"Clicked at ({coordinate_x}, {coordinate_y})",
- long_term_memory=f"Clicked coordinate ({coordinate_x}, {coordinate_y})"
- )
- # 方式2:通过索引点击(需要 DOM 状态映射)
- elif index is not None:
- # 注意:这里需要 DOM 状态来将索引映射到实际的 CSS 选择器
- # 当前实现为占位符,实际使用时需要维护 DOM 状态
- return ToolResult(
- title="Click by index",
- output=f"Clicked element at index {index}",
- long_term_memory=f"Clicked element {index}"
- )
- else:
- # 参数错误:必须提供一种点击方式
- return ToolResult(
- title="Invalid parameters",
- output="",
- error="Must provide either index or coordinates",
- long_term_memory="Click failed: invalid parameters"
- )
- except Exception as e:
- return ToolResult(
- title="Click failed",
- output="",
- error=f"Failed to click: {str(e)}",
- long_term_memory="Click failed"
- )
- @tool()
- async def input_text(index: int, text: str, clear: bool = True, uid: str = "") -> ToolResult:
- """
- 在指定元素中输入文本
- Input text into an element
- Args:
- index: 元素索引(从浏览器状态中获取,0-based)
- text: 要输入的文本内容
- clear: 是否先清除现有文本(默认 True)
- uid: 用户 ID(由框架自动注入)
- Returns:
- ToolResult: 包含输入操作结果的工具返回对象
- Example:
- # 清除后输入
- input_text(index=0, text="Hello World", clear=True)
- # 追加输入
- input_text(index=0, text=" More text", clear=False)
- Note:
- 当前实现使用通用键盘输入方式,实际使用时需要配合 DOM 状态
- 将索引映射到具体的输入框选择器。
- """
- try:
- from playwright.async_api import async_playwright
- async with async_playwright() as p:
- browser = await p.chromium.launch(headless=False)
- context = await browser.new_context()
- page = await context.pages()[0] if context.pages() else await context.new_page()
- # 注意:这里需要 DOM 状态来将索引映射到实际的输入框选择器
- # 当前使用通用键盘输入方式
- if clear:
- # 先全选(Ctrl+A)再输入,实现清除效果
- await page.keyboard.press("Control+A")
- # 输入文本
- await page.keyboard.type(text)
- return ToolResult(
- title="Input text",
- output=f"Input text into element {index}",
- long_term_memory=f"Input text into element {index}",
- metadata={"index": index, "clear": clear}
- )
- except Exception as e:
- return ToolResult(
- title="Input failed",
- output="",
- error=f"Failed to input text: {str(e)}",
- long_term_memory="Input text failed"
- )
- @tool()
- async def send_keys(keys: str, uid: str = "") -> ToolResult:
- """
- 发送键盘按键或快捷键
- Send keyboard keys or shortcuts
- 支持发送单个按键、组合键和快捷键。
- Args:
- keys: 要发送的按键字符串
- - 单个按键: "Enter", "Escape", "PageDown", "Tab"
- - 组合键: "Control+o", "Shift+Tab", "Alt+F4"
- - 功能键: "F1", "F2", ..., "F12"
- uid: 用户 ID(由框架自动注入)
- Returns:
- ToolResult: 包含按键操作结果的工具返回对象
- Example:
- send_keys("Enter") # 回车键
- send_keys("Control+o") # Ctrl+O 打开文件
- send_keys("PageDown") # 向下翻页
- send_keys("Escape") # ESC 键
- Note:
- 按键名称遵循 Playwright 的键盘 API 规范。
- 参考: https://playwright.dev/python/docs/api/class-keyboard
- """
- try:
- from playwright.async_api import async_playwright
- async with async_playwright() as p:
- browser = await p.chromium.launch(headless=False)
- context = await browser.new_context()
- page = await context.pages()[0] if context.pages() else await context.new_page()
- # 发送按键
- await page.keyboard.press(keys)
- return ToolResult(
- title="Sent keys",
- output=f"Sent keys: {keys}",
- long_term_memory=f"Sent keys: {keys}"
- )
- except Exception as e:
- return ToolResult(
- title="Send keys failed",
- output="",
- error=f"Failed to send keys: {str(e)}",
- long_term_memory="Send keys failed"
- )
- # ============================================================
- # Wait Tool
- # ============================================================
- @tool()
- async def wait_for_user_action(message: str = "Please complete the action in browser",
- timeout: int = 300, uid: str = "") -> ToolResult:
- """
- 等待用户在浏览器中完成操作(如登录)
- Wait for user to complete an action in the browser (e.g., login)
- 暂停自动化流程,等待用户手动完成某些操作(如登录、验证码等)。
- Args:
- message: 提示用户需要完成的操作
- timeout: 最大等待时间(秒),默认 300 秒(5 分钟)
- uid: 用户 ID(由框架自动注入)
- Returns:
- ToolResult: 包含等待结果的工具返回对象
- Example:
- wait_for_user_action("Please login to Xiaohongshu", timeout=180)
- wait_for_user_action("Please complete the CAPTCHA", timeout=60)
- Note:
- - 用户需要在浏览器窗口中手动完成操作
- - 完成后按回车键继续
- - 超时后会自动继续执行
- """
- try:
- import asyncio
- print(f"\n{'='*60}")
- print(f"⏸️ WAITING FOR USER ACTION")
- print(f"{'='*60}")
- print(f"📝 {message}")
- print(f"⏱️ Timeout: {timeout} seconds")
- print(f"\n👉 Please complete the action in the browser window")
- print(f"👉 Press ENTER when done, or wait for timeout")
- print(f"{'='*60}\n")
- # Wait for user input or timeout
- try:
- # Create a task for user input
- import sys
- loop = asyncio.get_event_loop()
- # Wait for either user input or timeout
- await asyncio.wait_for(
- loop.run_in_executor(None, input),
- timeout=timeout
- )
- return ToolResult(
- title="User action completed",
- output=f"User completed: {message}",
- long_term_memory=f"User completed action: {message}"
- )
- except asyncio.TimeoutError:
- return ToolResult(
- title="User action timeout",
- output=f"Timeout waiting for: {message}",
- long_term_memory=f"Timeout on user action: {message}"
- )
- except Exception as e:
- return ToolResult(
- title="Wait for user action failed",
- output="",
- error=f"Failed to wait for user action: {str(e)}",
- long_term_memory="Wait for user action failed"
- )
- @tool()
- async def wait(seconds: int = 3, uid: str = "") -> ToolResult:
- """
- 等待指定的秒数
- Wait for a specified number of seconds
- 用于等待页面加载、动画完成或其他异步操作。
- Args:
- seconds: 等待时间(秒),最大30秒
- uid: 用户 ID(由框架自动注入)
- Returns:
- ToolResult: 包含等待操作结果的工具返回对象
- Example:
- wait(5) # 等待5秒
- wait(10) # 等待10秒
- Note:
- 等待时间会被限制在1-30秒之间,以防止过长的等待。
- """
- try:
- import asyncio
- # 限制等待时间在合理范围内
- wait_time = max(1, min(seconds, 30))
- await asyncio.sleep(wait_time)
- return ToolResult(
- title=f"Waited {wait_time} seconds",
- output=f"Waited for {wait_time} seconds",
- long_term_memory=f"Waited {wait_time}s"
- )
- except Exception as e:
- return ToolResult(
- title="Wait failed",
- output="",
- error=f"Failed to wait: {str(e)}",
- long_term_memory="Wait failed"
- )
- # ============================================================
- # Content Extraction Tools
- # ============================================================
- @tool()
- async def get_page_html(uid: str = "") -> ToolResult:
- """
- 获取当前页面的完整 HTML
- Get the full HTML of the current page
- 返回当前页面的完整 HTML 源代码。
- Args:
- uid: 用户 ID(由框架自动注入)
- Returns:
- ToolResult: 包含页面 HTML 的工具返回对象
- Example:
- get_page_html()
- Note:
- - 返回的是完整的 HTML 源代码
- - 输出会被限制在 10000 字符以内(完整内容保存在 metadata 中)
- """
- try:
- from playwright.async_api import async_playwright
- async with async_playwright() as p:
- browser = await p.chromium.launch(headless=False)
- context = await browser.new_context()
- page = await context.pages()[0] if context.pages() else await context.new_page()
- # Get full HTML
- html = await page.content()
- url = page.url
- title = await page.title()
- # Limit output size
- output_html = html
- if len(html) > 10000:
- output_html = html[:10000] + "... (truncated)"
- return ToolResult(
- title=f"Got HTML from {url}",
- output=f"Page: {title}\nURL: {url}\n\nHTML:\n{output_html}",
- long_term_memory=f"Got HTML from {url}",
- metadata={"url": url, "title": title, "html": html}
- )
- except Exception as e:
- return ToolResult(
- title="Get HTML failed",
- output="",
- error=f"Failed to get page HTML: {str(e)}",
- long_term_memory="Get HTML failed"
- )
- @tool()
- async def extract_content(query: str, extract_links: bool = False,
- start_from_char: int = 0, uid: str = "") -> ToolResult:
- """
- Extract content from the current page based on a query
- Args:
- query: What to extract from the page
- extract_links: Whether to extract links (default: False, saves tokens)
- start_from_char: Start extraction from specific character (for long content)
- uid: User ID (auto-injected)
- Returns:
- Extracted content
- """
- try:
- from playwright.async_api import async_playwright
- async with async_playwright() as p:
- browser = await p.chromium.launch(headless=False)
- context = await browser.new_context()
- page = await context.pages()[0] if context.pages() else await context.new_page()
- # Extract text content
- content = await page.content()
- text_content = await page.inner_text("body")
- # Apply start_from_char if specified
- if start_from_char > 0:
- text_content = text_content[start_from_char:]
- # Extract links if requested
- links = []
- if extract_links:
- link_elements = await page.query_selector_all("a[href]")
- for elem in link_elements[:50]: # Limit to 50 links
- href = await elem.get_attribute("href")
- text = await elem.inner_text()
- if href:
- links.append({"text": text, "href": href})
- output = f"Query: {query}\n\nContent:\n{text_content[:2000]}"
- if extract_links and links:
- output += f"\n\nLinks found: {len(links)}"
- return ToolResult(
- title=f"Extracted: {query}",
- output=output,
- long_term_memory=f"Extracted content for query: {query}",
- include_output_only_once=True,
- metadata={"query": query, "links": links if extract_links else []}
- )
- except Exception as e:
- return ToolResult(
- title="Extraction failed",
- output="",
- error=f"Failed to extract content: {str(e)}",
- long_term_memory="Content extraction failed"
- )
- # ============================================================
- # Search Tools
- # ============================================================
- @tool()
- async def search_web(query: str, engine: str = "duckduckgo", uid: str = "") -> ToolResult:
- """
- Search the web using a search engine
- Args:
- query: Search query
- engine: Search engine to use (duckduckgo, google, bing) - default: duckduckgo
- uid: User ID (auto-injected)
- Returns:
- Search results
- """
- try:
- from playwright.async_api import async_playwright
- async with async_playwright() as p:
- browser = await p.chromium.launch(headless=False)
- context = await browser.new_context()
- page = await context.new_page()
- # Navigate to search engine
- if engine == "google":
- await page.goto(f"https://www.google.com/search?q={query}")
- elif engine == "bing":
- await page.goto(f"https://www.bing.com/search?q={query}")
- else: # duckduckgo
- await page.goto(f"https://duckduckgo.com/?q={query}")
- await page.wait_for_load_state("networkidle")
- # Extract search results
- results_text = await page.inner_text("body")
- await browser.close()
- return ToolResult(
- title=f"Search: {query}",
- output=f"Search results from {engine}:\n{results_text[:2000]}",
- long_term_memory=f"Searched {engine} for: {query}",
- include_output_only_once=True,
- metadata={"query": query, "engine": engine}
- )
- except Exception as e:
- return ToolResult(
- title="Search failed",
- output="",
- error=f"Search failed: {str(e)}",
- long_term_memory=f"Search for '{query}' failed"
- )
- # ============================================================
- # Text Finding Tool
- # ============================================================
- @tool()
- async def find_text(text: str, uid: str = "") -> ToolResult:
- """
- 查找页面中的文本并滚动到该位置
- Find text on the page and scroll to it
- 在页面中搜索指定的文本,找到后自动滚动到该位置。
- Args:
- text: 要查找的文本内容
- uid: 用户 ID(由框架自动注入)
- Returns:
- ToolResult: 包含查找结果的工具返回对象
- Example:
- find_text("Privacy Policy")
- find_text("Contact Us")
- Note:
- 如果找到多个匹配项,会滚动到第一个匹配项的位置。
- """
- try:
- from playwright.async_api import async_playwright
- async with async_playwright() as p:
- browser = await p.chromium.launch(headless=False)
- context = await browser.new_context()
- page = await context.pages()[0] if context.pages() else await context.new_page()
- # Use JavaScript to find and scroll to text
- js_code = f"""
- (function() {{
- const text = "{text}";
- const walker = document.createTreeWalker(
- document.body,
- NodeFilter.SHOW_TEXT,
- null,
- false
- );
- let node;
- while (node = walker.nextNode()) {{
- if (node.textContent.includes(text)) {{
- const element = node.parentElement;
- element.scrollIntoView({{ behavior: 'smooth', block: 'center' }});
- return true;
- }}
- }}
- return false;
- }})()
- """
- found = await page.evaluate(js_code)
- if found:
- return ToolResult(
- title=f"Found text: {text}",
- output=f"Found and scrolled to text: {text}",
- long_term_memory=f"Found text: {text}"
- )
- else:
- return ToolResult(
- title="Text not found",
- output=f"Text '{text}' not found on page",
- long_term_memory=f"Text '{text}' not found"
- )
- except Exception as e:
- return ToolResult(
- title="Find text failed",
- output="",
- error=f"Failed to find text: {str(e)}",
- long_term_memory="Find text failed"
- )
- # ============================================================
- # Screenshot Tool
- # ============================================================
- @tool()
- async def screenshot(uid: str = "") -> ToolResult:
- """
- 请求在下次观察中包含页面截图
- Request a screenshot to be included in the next observation
- 用于视觉检查页面状态,帮助理解页面布局和内容。
- Args:
- uid: 用户 ID(由框架自动注入)
- Returns:
- ToolResult: 包含截图请求结果的工具返回对象
- Example:
- screenshot()
- Note:
- 截图会在下次页面观察时自动包含在结果中。
- """
- try:
- from playwright.async_api import async_playwright
- import base64
- async with async_playwright() as p:
- browser = await p.chromium.launch(headless=False)
- context = await browser.new_context()
- page = await context.pages()[0] if context.pages() else await context.new_page()
- # Take screenshot
- screenshot_bytes = await page.screenshot(full_page=False)
- screenshot_b64 = base64.b64encode(screenshot_bytes).decode()
- return ToolResult(
- title="Screenshot captured",
- output=f"Screenshot captured (size: {len(screenshot_bytes)} bytes)",
- long_term_memory="Screenshot captured",
- metadata={"screenshot": screenshot_b64}
- )
- except Exception as e:
- return ToolResult(
- title="Screenshot failed",
- output="",
- error=f"Failed to capture screenshot: {str(e)}",
- long_term_memory="Screenshot failed"
- )
- # ============================================================
- # Scroll Tools
- # ============================================================
- @tool()
- async def scroll_page(down: bool = True, pages: float = 1.0,
- index: Optional[int] = None, uid: str = "") -> ToolResult:
- """
- Scroll the page or a specific element
- Args:
- down: True to scroll down, False to scroll up
- pages: Number of pages to scroll (0.5=half page, 1=full page, 10=to bottom/top)
- index: Optional element index to scroll within specific element
- uid: User ID (auto-injected)
- Returns:
- Scroll result
- """
- try:
- from playwright.async_api import async_playwright
- async with async_playwright() as p:
- browser = await p.chromium.launch(headless=False)
- context = await browser.new_context()
- page = await context.pages()[0] if context.pages() else await context.new_page()
- # Calculate scroll amount
- viewport_height = page.viewport_size["height"] if page.viewport_size else 800
- scroll_amount = int(viewport_height * pages)
- if down:
- await page.mouse.wheel(0, scroll_amount)
- direction = "down"
- else:
- await page.mouse.wheel(0, -scroll_amount)
- direction = "up"
- return ToolResult(
- title=f"Scrolled {direction}",
- output=f"Scrolled {direction} {pages} pages",
- long_term_memory=f"Scrolled {direction} {pages} pages"
- )
- except Exception as e:
- return ToolResult(
- title="Scroll failed",
- output="",
- error=f"Failed to scroll: {str(e)}",
- long_term_memory="Scroll failed"
- )
- # ============================================================
- # JavaScript Evaluation Tool
- # ============================================================
- @tool()
- async def evaluate(code: str, uid: str = "") -> ToolResult:
- """
- 在页面中执行 JavaScript 代码
- Execute JavaScript code in the page context
- 允许在当前页面中执行任意 JavaScript 代码,用于复杂的页面操作或数据提取。
- Args:
- code: 要执行的 JavaScript 代码字符串
- uid: 用户 ID(由框架自动注入)
- Returns:
- ToolResult: 包含执行结果的工具返回对象
- Example:
- evaluate("document.title")
- evaluate("document.querySelectorAll('a').length")
- evaluate("window.scrollTo(0, document.body.scrollHeight)")
- Note:
- - 代码在页面上下文中执行,可以访问 DOM 和全局变量
- - 返回值会被自动序列化为字符串
- - 执行结果限制在 20k 字符以内
- """
- try:
- from playwright.async_api import async_playwright
- async with async_playwright() as p:
- browser = await p.chromium.launch(headless=False)
- context = await browser.new_context()
- page = await context.pages()[0] if context.pages() else await context.new_page()
- # Execute JavaScript code
- result = await page.evaluate(code)
- # Convert result to string and limit size
- result_str = str(result)
- if len(result_str) > 20000:
- result_str = result_str[:20000] + "... (truncated)"
- return ToolResult(
- title="JavaScript executed",
- output=f"Result: {result_str}",
- long_term_memory=f"Executed JavaScript code",
- metadata={"code": code, "result": result_str}
- )
- except Exception as e:
- return ToolResult(
- title="JavaScript execution failed",
- output="",
- error=f"Failed to execute JavaScript: {str(e)}",
- long_term_memory="JavaScript execution failed"
- )
- # ============================================================
- # File System Tools
- # ============================================================
- @tool()
- async def write_file(file_name: str, content: str, append: bool = False, uid: str = "") -> ToolResult:
- """
- 写入文件到本地文件系统
- Write content to a local file
- 支持多种文件格式的写入操作。
- Args:
- file_name: 文件名(包含扩展名)
- content: 要写入的文件内容
- append: 是否追加模式(默认 False,覆盖写入)
- uid: 用户 ID(由框架自动注入)
- Returns:
- ToolResult: 包含写入结果的工具返回对象
- Example:
- write_file("output.txt", "Hello World")
- write_file("data.json", '{"key": "value"}')
- write_file("log.txt", "New log entry\\n", append=True)
- Note:
- 支持的文件格式: .txt, .md, .json, .jsonl, .csv, .pdf
- """
- try:
- import os
- # Determine write mode
- mode = 'a' if append else 'w'
- # Write file
- with open(file_name, mode, encoding='utf-8') as f:
- f.write(content)
- file_size = os.path.getsize(file_name)
- action = "Appended to" if append else "Wrote"
- return ToolResult(
- title=f"{action} file: {file_name}",
- output=f"{action} {len(content)} characters to {file_name} (size: {file_size} bytes)",
- long_term_memory=f"{action} file {file_name}",
- metadata={"file_name": file_name, "size": file_size, "append": append}
- )
- except Exception as e:
- return ToolResult(
- title="Write file failed",
- output="",
- error=f"Failed to write file: {str(e)}",
- long_term_memory=f"Write file {file_name} failed"
- )
- @tool()
- async def read_file(file_name: str, uid: str = "") -> ToolResult:
- """
- 读取文件内容
- Read content from a local file
- 支持多种文件格式的读取操作。
- Args:
- file_name: 文件名(包含扩展名)
- uid: 用户 ID(由框架自动注入)
- Returns:
- ToolResult: 包含文件内容的工具返回对象
- Example:
- read_file("input.txt")
- read_file("data.json")
- read_file("document.pdf")
- Note:
- 支持的文件格式: 文本文件、PDF、DOCX、图片等
- """
- try:
- import os
- if not os.path.exists(file_name):
- return ToolResult(
- title="File not found",
- output="",
- error=f"File not found: {file_name}",
- long_term_memory=f"File {file_name} not found"
- )
- # Read file content
- with open(file_name, 'r', encoding='utf-8') as f:
- content = f.read()
- file_size = os.path.getsize(file_name)
- # Limit output size
- output_content = content
- if len(content) > 5000:
- output_content = content[:5000] + "... (truncated)"
- return ToolResult(
- title=f"Read file: {file_name}",
- output=f"File content ({file_size} bytes):\n{output_content}",
- long_term_memory=f"Read file {file_name}",
- metadata={"file_name": file_name, "size": file_size, "content": content}
- )
- except Exception as e:
- return ToolResult(
- title="Read file failed",
- output="",
- error=f"Failed to read file: {str(e)}",
- long_term_memory=f"Read file {file_name} failed"
- )
- @tool()
- async def replace_file(file_name: str, old_str: str, new_str: str, uid: str = "") -> ToolResult:
- """
- 替换文件中的特定文本
- Replace specific text in a file
- 在文件中查找并替换指定的文本内容。
- Args:
- file_name: 文件名(包含扩展名)
- old_str: 要替换的文本
- new_str: 新文本
- uid: 用户 ID(由框架自动注入)
- Returns:
- ToolResult: 包含替换结果的工具返回对象
- Example:
- replace_file("config.txt", "old_value", "new_value")
- replace_file("data.json", '"status": "pending"', '"status": "completed"')
- Note:
- - 会替换文件中所有匹配的文本
- - 如果找不到要替换的文本,会返回警告
- """
- try:
- import os
- if not os.path.exists(file_name):
- return ToolResult(
- title="File not found",
- output="",
- error=f"File not found: {file_name}",
- long_term_memory=f"File {file_name} not found"
- )
- # Read file
- with open(file_name, 'r', encoding='utf-8') as f:
- content = f.read()
- # Check if old_str exists
- if old_str not in content:
- return ToolResult(
- title="Text not found",
- output=f"Text '{old_str}' not found in {file_name}",
- long_term_memory=f"Text not found in {file_name}",
- metadata={"file_name": file_name, "old_str": old_str}
- )
- # Replace text
- count = content.count(old_str)
- new_content = content.replace(old_str, new_str)
- # Write back
- with open(file_name, 'w', encoding='utf-8') as f:
- f.write(new_content)
- return ToolResult(
- title=f"Replaced text in {file_name}",
- output=f"Replaced {count} occurrence(s) of '{old_str}' with '{new_str}' in {file_name}",
- long_term_memory=f"Replaced text in {file_name}",
- metadata={"file_name": file_name, "count": count, "old_str": old_str, "new_str": new_str}
- )
- except Exception as e:
- return ToolResult(
- title="Replace file failed",
- output="",
- error=f"Failed to replace text in file: {str(e)}",
- long_term_memory=f"Replace in {file_name} failed"
- )
- # ============================================================
- # Tab Management Tools
- # ============================================================
- @tool()
- async def switch_tab(tab_id: str, uid: str = "") -> ToolResult:
- """
- Switch to a different browser tab
- Args:
- tab_id: 4-character tab ID
- uid: User ID (auto-injected)
- Returns:
- Switch result
- """
- try:
- return ToolResult(
- title=f"Switched to tab {tab_id}",
- output=f"Switched to tab {tab_id}",
- long_term_memory=f"Switched to tab {tab_id}"
- )
- except Exception as e:
- return ToolResult(
- title="Switch tab failed",
- output="",
- error=f"Failed to switch tab: {str(e)}",
- long_term_memory="Switch tab failed"
- )
- @tool()
- async def close_tab(tab_id: str, uid: str = "") -> ToolResult:
- """
- Close a browser tab
- Args:
- tab_id: 4-character tab ID
- uid: User ID (auto-injected)
- Returns:
- Close result
- """
- try:
- return ToolResult(
- title=f"Closed tab {tab_id}",
- output=f"Closed tab {tab_id}",
- long_term_memory=f"Closed tab {tab_id}"
- )
- except Exception as e:
- return ToolResult(
- title="Close tab failed",
- output="",
- error=f"Failed to close tab: {str(e)}",
- long_term_memory="Close tab failed"
- )
- # ============================================================
- # Dropdown Tools
- # ============================================================
- @tool()
- async def get_dropdown_options(index: int, uid: str = "") -> ToolResult:
- """
- Get options from a dropdown element
- Args:
- index: Element index from browser state
- uid: User ID (auto-injected)
- Returns:
- Dropdown options
- """
- try:
- from playwright.async_api import async_playwright
- async with async_playwright() as p:
- browser = await p.chromium.launch(headless=False)
- context = await browser.new_context()
- page = await context.pages()[0] if context.pages() else await context.new_page()
- # This would need DOM state to map index to selector
- # For now, return a placeholder
- return ToolResult(
- title=f"Dropdown options for element {index}",
- output=f"Retrieved options for dropdown at index {index}",
- long_term_memory=f"Got dropdown options for element {index}"
- )
- except Exception as e:
- return ToolResult(
- title="Get dropdown options failed",
- output="",
- error=f"Failed to get dropdown options: {str(e)}",
- long_term_memory="Get dropdown options failed"
- )
- @tool()
- async def select_dropdown_option(index: int, text: str, uid: str = "") -> ToolResult:
- """
- Select an option from a dropdown
- Args:
- index: Element index from browser state
- text: Exact text/value to select
- uid: User ID (auto-injected)
- Returns:
- Selection result
- """
- try:
- from playwright.async_api import async_playwright
- async with async_playwright() as p:
- browser = await p.chromium.launch(headless=False)
- context = await browser.new_context()
- page = await context.pages()[0] if context.pages() else await context.new_page()
- # This would need DOM state to map index to selector
- return ToolResult(
- title=f"Selected dropdown option",
- output=f"Selected '{text}' from dropdown at index {index}",
- long_term_memory=f"Selected '{text}' from dropdown {index}"
- )
- except Exception as e:
- return ToolResult(
- title="Select dropdown option failed",
- output="",
- error=f"Failed to select dropdown option: {str(e)}",
- long_term_memory="Select dropdown option failed"
- )
- # ============================================================
- # File Upload Tool
- # ============================================================
- @tool()
- async def upload_file(index: int, path: str, uid: str = "") -> ToolResult:
- """
- Upload a file to a file input element
- Args:
- index: Element index from browser state
- path: Path to the file to upload
- uid: User ID (auto-injected)
- Returns:
- Upload result
- """
- try:
- from playwright.async_api import async_playwright
- async with async_playwright() as p:
- browser = await p.chromium.launch(headless=False)
- context = await browser.new_context()
- page = await context.pages()[0] if context.pages() else await context.new_page()
- # This would need DOM state to map index to selector
- return ToolResult(
- title="File uploaded",
- output=f"Uploaded file {path} to element {index}",
- long_term_memory=f"Uploaded file {path}"
- )
- except Exception as e:
- return ToolResult(
- title="Upload failed",
- output="",
- error=f"Failed to upload file: {str(e)}",
- long_term_memory="File upload failed"
- )
- # ============================================================
- # Task Completion Tool
- # ============================================================
- @tool()
- async def done(text: str, success: bool = True,
- files_to_display: Optional[List[str]] = None, uid: str = "") -> ToolResult:
- """
- Mark the task as complete and return final message to user
- Args:
- text: Final message to user in the requested format
- success: Whether the task completed successfully
- files_to_display: Optional list of file paths to display
- uid: User ID (auto-injected)
- Returns:
- Completion result
- """
- try:
- return ToolResult(
- title="Task completed" if success else "Task failed",
- output=text,
- long_term_memory=f"Task {'completed' if success else 'failed'}",
- attachments=files_to_display or [],
- metadata={"success": success}
- )
- except Exception as e:
- return ToolResult(
- title="Done failed",
- output="",
- error=f"Failed to complete task: {str(e)}",
- long_term_memory="Task completion failed"
- )
|