| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841 |
- """
- Browser-Use 原生工具适配器
- Native Browser-Use Tools Adapter
- 直接使用 browser-use 的原生类(BrowserSession, Tools)实现所有浏览器操作工具。
- 不依赖 Playwright,完全基于 CDP 协议。
- 核心特性:
- 1. 浏览器会话持久化 - 只启动一次浏览器
- 2. 状态自动保持 - 登录状态、Cookie、LocalStorage 等
- 3. 完整的底层访问 - 可以直接使用 CDP 协议
- 4. 性能优异 - 避免频繁创建/销毁浏览器实例
- 5. 多种浏览器类型 - 支持 local、cloud、container 三种模式
- 支持的浏览器类型:
- 1. Local (本地浏览器):
- - 在本地运行 Chrome
- - 支持可视化调试
- - 速度最快
- - 示例: init_browser_session(browser_type="local")
- 2. Cloud (云浏览器):
- - 在云端运行
- - 不占用本地资源
- - 适合生产环境
- - 示例: init_browser_session(browser_type="cloud")
- 3. Container (容器浏览器):
- - 在独立容器中运行
- - 隔离性好
- - 支持预配置账户
- - 示例: init_browser_session(browser_type="container", container_url="https://example.com")
- 使用方法:
- 1. 在 Agent 初始化时调用 init_browser_session() 并指定 browser_type
- 2. 使用各个工具函数执行浏览器操作
- 3. 任务结束时调用 cleanup_browser_session()
- 文件操作说明:
- - 浏览器专用文件目录:.browser_use_files/ (在当前工作目录下)
- 用于存储浏览器会话产生的临时文件(下载、上传、截图等)
- - 一般文件操作:请使用 agent.tools.builtin 中的文件工具 (read_file, write_file, edit_file)
- 这些工具功能更完善,支持diff预览、智能匹配、分页读取等
- """
- import sys
- import os
- import json
- import asyncio
- import aiohttp
- import re
- import base64
- from urllib.parse import urlparse, parse_qs, unquote
- from typing import Optional, List, Dict, Any, Tuple, Union
- from pathlib import Path
- from langchain_core.runnables import RunnableLambda
- from argparse import Namespace # 使用 Namespace 快速构造带属性的对象
- from langchain_core.messages import AIMessage
- from ....llm.openrouter import openrouter_llm_call
- # 将项目根目录添加到 Python 路径
- sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
- # 导入框架的工具装饰器和结果类
- from agent.tools import tool, ToolResult
- from agent.tools.builtin.browser.sync_mysql_help import mysql
- # 导入 browser-use 的核心类
- from browser_use import BrowserSession, BrowserProfile
- from browser_use.tools.service import Tools
- try:
- from browser_use.tools.views import ReadContentAction # type: ignore
- except Exception:
- from pydantic import BaseModel
- class ReadContentAction(BaseModel):
- goal: str
- source: str = "page"
- context: str = ""
- from browser_use.agent.views import ActionResult
- from browser_use.filesystem.file_system import FileSystem
- # ============================================================
- # 无需注册的内部辅助函数
- # ============================================================
- # ============================================================
- # 全局浏览器会话管理
- # ============================================================
- # 全局变量:浏览器会话和工具实例
- _browser_session: Optional[BrowserSession] = None
- _browser_tools: Optional[Tools] = None
- _file_system: Optional[FileSystem] = None
- async def create_container(url: str, account_name: str = "liuwenwu") -> Dict[str, Any]:
- """
- 创建浏览器容器并导航到指定URL
- 按照 test.md 的要求:
- 1.1 调用接口创建容器
- 1.2 调用接口创建窗口并导航到URL
- Args:
- url: 要导航的URL地址
- account_name: 账户名称
- Returns:
- 包含容器信息的字典:
- - success: 是否成功
- - container_id: 容器ID
- - vnc: VNC访问URL
- - cdp: CDP协议URL(用于浏览器连接)
- - connection_id: 窗口连接ID
- - error: 错误信息(如果失败)
- """
- result = {
- "success": False,
- "container_id": None,
- "vnc": None,
- "cdp": None,
- "connection_id": None,
- "error": None
- }
- try:
- async with aiohttp.ClientSession() as session:
- # 步骤1.1: 创建容器
- print("📦 步骤1.1: 创建容器...")
- create_url = "http://47.84.182.56:8200/api/v1/container/create"
- create_payload = {
- "auto_remove": True,
- "need_port_binding": True,
- "max_lifetime_seconds": 900
- }
- async with session.post(create_url, json=create_payload) as resp:
- if resp.status != 200:
- raise RuntimeError(f"创建容器失败: HTTP {resp.status}")
- create_result = await resp.json()
- if create_result.get("code") != 0:
- raise RuntimeError(f"创建容器失败: {create_result.get('msg')}")
- data = create_result.get("data", {})
- result["container_id"] = data.get("container_id")
- result["vnc"] = data.get("vnc")
- result["cdp"] = data.get("cdp")
- print(f"✅ 容器创建成功")
- print(f" Container ID: {result['container_id']}")
- print(f" VNC: {result['vnc']}")
- print(f" CDP: {result['cdp']}")
- # 等待容器内的浏览器启动
- print(f"\n⏳ 等待容器内浏览器启动...")
- await asyncio.sleep(5)
- # 步骤1.2: 创建页面并导航
- print(f"\n📱 步骤1.2: 创建页面并导航到 {url}...")
- page_create_url = "http://47.84.182.56:8200/api/v1/browser/page/create"
- page_payload = {
- "container_id": result["container_id"],
- "url": url,
- "account_name": account_name,
- "need_wait": True,
- "timeout": 30
- }
- # 重试机制:最多尝试3次
- max_retries = 3
- page_created = False
- last_error = None
- for attempt in range(max_retries):
- try:
- if attempt > 0:
- print(f" 重试 {attempt + 1}/{max_retries}...")
- await asyncio.sleep(3) # 重试前等待
- async with session.post(page_create_url, json=page_payload, timeout=aiohttp.ClientTimeout(total=60)) as resp:
- if resp.status != 200:
- response_text = await resp.text()
- last_error = f"HTTP {resp.status}: {response_text[:200]}"
- continue
- page_result = await resp.json()
- if page_result.get("code") != 0:
- last_error = f"{page_result.get('msg')}"
- continue
- page_data = page_result.get("data", {})
- result["connection_id"] = page_data.get("connection_id")
- result["success"] = True
- page_created = True
- print(f"✅ 页面创建成功")
- print(f" Connection ID: {result['connection_id']}")
- break
- except asyncio.TimeoutError:
- last_error = "请求超时"
- continue
- except aiohttp.ClientError as e:
- last_error = f"网络错误: {str(e)}"
- continue
- except Exception as e:
- last_error = f"未知错误: {str(e)}"
- continue
- if not page_created:
- raise RuntimeError(f"创建页面失败(尝试{max_retries}次后): {last_error}")
- except Exception as e:
- result["error"] = str(e)
- print(f"❌ 错误: {str(e)}")
- return result
- async def init_browser_session(
- browser_type: str = "local",
- headless: bool = False,
- url: Optional[str] = None,
- profile_name: str = "default",
- user_data_dir: Optional[str] = None,
- browser_profile: Optional[BrowserProfile] = None,
- **kwargs
- ) -> tuple[BrowserSession, Tools]:
- """
- 初始化全局浏览器会话 - 支持三种浏览器类型
- Args:
- browser_type: 浏览器类型 ("local", "cloud", "container")
- headless: 是否无头模式
- url: 初始访问URL(可选)
- - local/cloud: 初始化后会自动导航到此URL
- - container: 必需,容器启动时访问的URL
- profile_name: 配置文件/账户名称(默认 "default")
- - local: 用于创建用户数据目录路径
- - cloud: 云浏览器配置ID
- - container: 容器账户名称
- user_data_dir: 用户数据目录(仅 local 模式,高级用法)
- 如果提供则覆盖 profile_name 生成的路径
- browser_profile: BrowserProfile 对象(通用,高级用法)
- 用于预设 cookies 等
- **kwargs: 其他 BrowserSession 参数
- Returns:
- (BrowserSession, Tools) 元组
- Examples:
- # 本地浏览器
- browser, tools = await init_browser_session(
- browser_type="local",
- url="https://www.baidu.com" # 可选
- )
- # 云浏览器
- browser, tools = await init_browser_session(
- browser_type="cloud",
- profile_name="my_cloud_profile" # 可选
- )
- # 容器浏览器
- browser, tools = await init_browser_session(
- browser_type="container",
- url="https://www.xiaohongshu.com", # 必需
- profile_name="my_account" # 可选
- )
- """
- global _browser_session, _browser_tools, _file_system
- if _browser_session is not None:
- return _browser_session, _browser_tools
- # 验证 browser_type
- valid_types = ["local", "cloud", "container"]
- if browser_type not in valid_types:
- raise ValueError(f"无效的 browser_type: {browser_type},必须是 {valid_types} 之一")
- # 创建浏览器会话参数
- session_params = {
- "headless": headless,
- }
- # === Container 模式 ===
- if browser_type == "container":
- print("🐳 使用容器浏览器模式")
- # container 模式必须提供 URL
- if not url:
- url = "about:blank" # 使用默认空白页
- print("⚠️ 未提供 url 参数,使用默认空白页")
- # 创建容器并获取 CDP URL
- print(f"📦 正在创建容器...")
- container_info = await create_container(
- url=url,
- account_name=profile_name
- )
- if not container_info["success"]:
- raise RuntimeError(f"容器创建失败: {container_info['error']}")
- cdp_url = container_info["cdp"]
- print(f"✅ 容器创建成功")
- print(f" CDP URL: {cdp_url}")
- print(f" Container ID: {container_info['container_id']}")
- print(f" Connection ID: {container_info.get('connection_id')}")
- # 使用容器的 CDP URL 连接
- session_params["cdp_url"] = cdp_url
- # 等待容器完全启动
- print("⏳ 等待容器浏览器启动...")
- await asyncio.sleep(3)
- # === Cloud 模式 ===
- elif browser_type == "cloud":
- print("🌐 使用云浏览器模式")
- session_params["use_cloud"] = True
- # profile_name 作为云配置ID
- if profile_name and profile_name != "default":
- session_params["cloud_profile_id"] = profile_name
- # === Local 模式 ===
- else: # local
- print("💻 使用本地浏览器模式")
- session_params["is_local"] = True
- # 设置用户数据目录(持久化登录状态)
- if user_data_dir is None and profile_name:
- user_data_dir = str(Path.home() / ".browser_use" / "profiles" / profile_name)
- Path(user_data_dir).mkdir(parents=True, exist_ok=True)
- # macOS 上显式指定 Chrome 路径
- import platform
- if platform.system() == "Darwin": # macOS
- chrome_path = "/Applications/Google Chrome.app/Contents/MacOS/Google Chrome"
- if Path(chrome_path).exists():
- session_params["executable_path"] = chrome_path
- # 只在有值时才添加 user_data_dir
- if user_data_dir:
- session_params["user_data_dir"] = user_data_dir
- # 只在有值时才添加 browser_profile (适用于所有模式)
- if browser_profile:
- session_params["browser_profile"] = browser_profile
- # 合并其他参数
- session_params.update(kwargs)
- # 创建浏览器会话
- _browser_session = BrowserSession(**session_params)
- # 启动浏览器
- await _browser_session.start()
- # 创建工具实例
- _browser_tools = Tools()
- # 创建文件系统实例(用于浏览器会话产生的文件)
- # 注意:这个目录仅用于浏览器操作相关的临时文件(下载、上传、截图等)
- # 对于一般文件读写操作,请使用 agent.tools.builtin 中的文件工具
- base_dir = Path.cwd() / ".browser_use_files"
- base_dir.mkdir(parents=True, exist_ok=True)
- _file_system = FileSystem(base_dir=str(base_dir))
- print("✅ 浏览器会话初始化成功")
- # 如果是 local 或 cloud 模式且提供了 URL,导航到该 URL
- if browser_type in ["local", "cloud"] and url:
- print(f"🔗 导航到: {url}")
- await _browser_tools.navigate(url=url, browser_session=_browser_session)
- return _browser_session, _browser_tools
- async def get_browser_session() -> tuple[BrowserSession, Tools]:
- """
- 获取当前浏览器会话,如果不存在则自动创建
- Returns:
- (BrowserSession, Tools) 元组
- """
- global _browser_session, _browser_tools
- if _browser_session is None:
- await init_browser_session()
- return _browser_session, _browser_tools
- async def cleanup_browser_session():
- """
- 清理浏览器会话
- 优雅地停止浏览器但保留会话状态
- """
- global _browser_session, _browser_tools, _file_system
- if _browser_session is not None:
- await _browser_session.stop()
- _browser_session = None
- _browser_tools = None
- _file_system = None
- async def kill_browser_session():
- """
- 强制终止浏览器会话
- 完全关闭浏览器进程
- """
- global _browser_session, _browser_tools, _file_system
- if _browser_session is not None:
- await _browser_session.kill()
- _browser_session = None
- _browser_tools = None
- _file_system = None
- # ============================================================
- # 辅助函数:ActionResult 转 ToolResult
- # ============================================================
- def action_result_to_tool_result(result: ActionResult, title: str = None) -> ToolResult:
- """
- 将 browser-use 的 ActionResult 转换为框架的 ToolResult
- Args:
- result: browser-use 的 ActionResult
- title: 可选的标题(如果不提供则从 result 推断)
- Returns:
- ToolResult
- """
- if result.error:
- return ToolResult(
- title=title or "操作失败",
- output="",
- error=result.error,
- long_term_memory=result.long_term_memory or result.error
- )
- return ToolResult(
- title=title or "操作成功",
- output=result.extracted_content or "",
- long_term_memory=result.long_term_memory or result.extracted_content or "",
- metadata=result.metadata or {}
- )
- def _cookie_domain_for_type(cookie_type: str, url: str) -> Tuple[str, str]:
- if cookie_type:
- key = cookie_type.lower()
- if key in {"xiaohongshu", "xhs"}:
- return ".xiaohongshu.com", "https://www.xiaohongshu.com"
- parsed = urlparse(url or "")
- domain = parsed.netloc or ""
- domain = domain.replace("www.", "")
- if domain:
- domain = f".{domain}"
- base_url = f"{parsed.scheme}://{parsed.netloc}" if parsed.scheme and parsed.netloc else url
- return domain, base_url
- def _parse_cookie_string(cookie_str: str, domain: str, url: str) -> List[Dict[str, Any]]:
- cookies: List[Dict[str, Any]] = []
- if not cookie_str:
- return cookies
- parts = cookie_str.split(";")
- for part in parts:
- if not part:
- continue
- if "=" not in part:
- continue
- name, value = part.split("=", 1)
- cookie = {
- "name": str(name).strip(),
- "value": str(value).strip(),
- "domain": domain,
- "path": "/",
- "expires": -1,
- "httpOnly": False,
- "secure": True,
- "sameSite": "None"
- }
- if url:
- cookie["url"] = url
- cookies.append(cookie)
- return cookies
- def _normalize_cookies(cookie_value: Any, domain: str, url: str) -> List[Dict[str, Any]]:
- if cookie_value is None:
- return []
- if isinstance(cookie_value, list):
- return cookie_value
- if isinstance(cookie_value, dict):
- if "cookies" in cookie_value:
- return _normalize_cookies(cookie_value.get("cookies"), domain, url)
- if "name" in cookie_value and "value" in cookie_value:
- return [cookie_value]
- return []
- if isinstance(cookie_value, (bytes, bytearray)):
- cookie_value = cookie_value.decode("utf-8", errors="ignore")
- if isinstance(cookie_value, str):
- text = cookie_value.strip()
- if not text:
- return []
- try:
- parsed = json.loads(text)
- except Exception:
- parsed = None
- if parsed is not None:
- return _normalize_cookies(parsed, domain, url)
- return _parse_cookie_string(text, domain, url)
- return []
- def _extract_cookie_value(row: Optional[Dict[str, Any]]) -> Any:
- if not row:
- return None
- # 优先使用 cookies 字段
- if "cookies" in row:
- return row["cookies"]
- # 兼容其他可能的字段名
- for key, value in row.items():
- if "cookie" in key.lower():
- return value
- return None
- def _fetch_cookie_row(cookie_type: str) -> Optional[Dict[str, Any]]:
- if not cookie_type:
- return None
- try:
- return mysql.fetchone(
- "select * from agent_channel_cookies where type=%s limit 1",
- (cookie_type,)
- )
- except Exception:
- return None
- def _fetch_profile_id(cookie_type: str) -> Optional[str]:
- """从数据库获取 cloud_profile_id"""
- if not cookie_type:
- return None
- try:
- row = mysql.fetchone(
- "select profileId from agent_channel_cookies where type=%s limit 1",
- (cookie_type,)
- )
- if row and "profileId" in row:
- return row["profileId"]
- return None
- except Exception:
- return None
- # ============================================================
- # 需要注册的工具
- # ============================================================
- # ============================================================
- # 导航类工具 (Navigation Tools)
- # ============================================================
- @tool()
- async def browser_navigate_to_url(url: str, new_tab: bool = False) -> ToolResult:
- """
- 导航到指定的 URL
- Navigate to a specific URL
- 使用 browser-use 的原生导航功能,支持在新标签页打开。
- Args:
- url: 要访问的 URL 地址
- new_tab: 是否在新标签页中打开(默认 False)
- Returns:
- ToolResult: 包含导航结果的工具返回对象
- Example:
- navigate_to_url("https://www.baidu.com")
- navigate_to_url("https://www.google.com", new_tab=True)
- """
- try:
- browser, tools = await get_browser_session()
- # 使用 browser-use 的 navigate 工具
- result = await tools.navigate(
- url=url,
- new_tab=new_tab,
- browser_session=browser
- )
- return action_result_to_tool_result(result, f"导航到 {url}")
- except Exception as e:
- return ToolResult(
- title="导航失败",
- output="",
- error=f"Failed to navigate to {url}: {str(e)}",
- long_term_memory=f"导航到 {url} 失败"
- )
- @tool()
- async def browser_search_web(query: str, engine: str = "bing") -> ToolResult:
- """
- 使用搜索引擎搜索
- Search the web using a search engine
- Args:
- query: 搜索关键词
- engine: 搜索引擎 (google, duckduckgo, bing) - 默认: google
- Returns:
- ToolResult: 搜索结果
- Example:
- search_web("Python async programming", engine="google")
- """
- try:
- browser, tools = await get_browser_session()
- # 使用 browser-use 的 search 工具
- result = await tools.search(
- query=query,
- engine=engine,
- browser_session=browser
- )
- return action_result_to_tool_result(result, f"搜索: {query}")
- except Exception as e:
- return ToolResult(
- title="搜索失败",
- output="",
- error=f"Search failed: {str(e)}",
- long_term_memory=f"搜索 '{query}' 失败"
- )
- @tool()
- async def browser_go_back() -> ToolResult:
- """
- 返回到上一个页面
- Go back to the previous page
- 模拟浏览器的"后退"按钮功能。
- Returns:
- ToolResult: 包含返回操作结果的工具返回对象
- """
- try:
- browser, tools = await get_browser_session()
- result = await tools.go_back(browser_session=browser)
- return action_result_to_tool_result(result, "返回上一页")
- except Exception as e:
- return ToolResult(
- title="返回失败",
- output="",
- error=f"Failed to go back: {str(e)}",
- long_term_memory="返回上一页失败"
- )
- @tool()
- async def browser_wait(seconds: int = 3) -> ToolResult:
- """
- 等待指定的秒数
- Wait for a specified number of seconds
- 用于等待页面加载、动画完成或其他异步操作。
- Args:
- seconds: 等待时间(秒),最大30秒
- Returns:
- ToolResult: 包含等待操作结果的工具返回对象
- Example:
- wait(5) # 等待5秒
- """
- try:
- browser, tools = await get_browser_session()
- result = await tools.wait(seconds=seconds, browser_session=browser)
- return action_result_to_tool_result(result, f"等待 {seconds} 秒")
- except Exception as e:
- return ToolResult(
- title="等待失败",
- output="",
- error=f"Failed to wait: {str(e)}",
- long_term_memory="等待失败"
- )
- # ============================================================
- # 元素交互工具 (Element Interaction Tools)
- # ============================================================
- @tool()
- async def browser_click_element(index: int) -> ToolResult:
- """
- 通过索引点击页面元素
- Click an element by index
- Args:
- index: 元素索引(从浏览器状态中获取)
- Returns:
- ToolResult: 包含点击操作结果的工具返回对象
- Example:
- click_element(index=5)
- Note:
- 需要先通过 get_selector_map 获取页面元素索引
- """
- try:
- browser, tools = await get_browser_session()
- result = await tools.click(
- index=index,
- browser_session=browser
- )
- return action_result_to_tool_result(result, f"点击元素 {index}")
- except Exception as e:
- return ToolResult(
- title="点击失败",
- output="",
- error=f"Failed to click element {index}: {str(e)}",
- long_term_memory=f"点击元素 {index} 失败"
- )
- @tool()
- async def browser_input_text(index: int, text: str, clear: bool = True) -> ToolResult:
- """
- 在指定元素中输入文本
- Input text into an element
- Args:
- index: 元素索引(从浏览器状态中获取)
- text: 要输入的文本内容
- clear: 是否先清除现有文本(默认 True)
- Returns:
- ToolResult: 包含输入操作结果的工具返回对象
- Example:
- input_text(index=0, text="Hello World", clear=True)
- """
- try:
- browser, tools = await get_browser_session()
- result = await tools.input(
- index=index,
- text=text,
- clear=clear,
- browser_session=browser
- )
- return action_result_to_tool_result(result, f"输入文本到元素 {index}")
- except Exception as e:
- return ToolResult(
- title="输入失败",
- output="",
- error=f"Failed to input text into element {index}: {str(e)}",
- long_term_memory=f"输入文本失败"
- )
- @tool()
- async def browser_send_keys(keys: str) -> ToolResult:
- """
- 发送键盘按键或快捷键
- Send keyboard keys or shortcuts
- 支持发送单个按键、组合键和快捷键。
- Args:
- keys: 要发送的按键字符串
- - 单个按键: "Enter", "Escape", "PageDown", "Tab"
- - 组合键: "Control+o", "Shift+Tab", "Alt+F4"
- - 功能键: "F1", "F2", ..., "F12"
- Returns:
- ToolResult: 包含按键操作结果的工具返回对象
- Example:
- send_keys("Enter")
- send_keys("Control+A")
- """
- try:
- browser, tools = await get_browser_session()
- result = await tools.send_keys(
- keys=keys,
- browser_session=browser
- )
- return action_result_to_tool_result(result, f"发送按键: {keys}")
- except Exception as e:
- return ToolResult(
- title="发送按键失败",
- output="",
- error=f"Failed to send keys: {str(e)}",
- long_term_memory="发送按键失败"
- )
- @tool()
- async def browser_upload_file(index: int, path: str) -> ToolResult:
- """
- 上传文件到文件输入元素
- Upload a file to a file input element
- Args:
- index: 文件输入框的元素索引
- path: 要上传的文件路径(绝对路径)
- Returns:
- ToolResult: 包含上传操作结果的工具返回对象
- Example:
- upload_file(index=7, path="/path/to/file.pdf")
- Note:
- 文件必须存在且路径必须是绝对路径
- """
- try:
- browser, tools = await get_browser_session()
- result = await tools.upload_file(
- index=index,
- path=path,
- browser_session=browser,
- available_file_paths=[path],
- file_system=_file_system
- )
- return action_result_to_tool_result(result, f"上传文件: {path}")
- except Exception as e:
- return ToolResult(
- title="上传失败",
- output="",
- error=f"Failed to upload file: {str(e)}",
- long_term_memory=f"上传文件 {path} 失败"
- )
- # ============================================================
- # 滚动和视图工具 (Scroll & View Tools)
- # ============================================================
- @tool()
- async def browser_scroll_page(down: bool = True, pages: float = 1.0, index: Optional[int] = None) -> ToolResult:
- try:
- browser, tools = await get_browser_session()
-
- # --- 核心修复 1: 必须先 await 拿到 session 实例 ---
- cdp_session = await browser.get_or_create_cdp_session()
-
- # 这里的执行方式建议参考你已有的 cdp 调用逻辑
- # 如果 cdp_session 没有直接封装 .eval(),使用 Runtime.evaluate
- before_y_result = await cdp_session.cdp_client.send.Runtime.evaluate(
- params={'expression': 'window.scrollY'},
- session_id=cdp_session.session_id
- )
- before_y = before_y_result.get('result', {}).get('value', 0)
- # 执行滚动
- result = await tools.scroll(down=down, pages=pages, index=index, browser_session=browser)
-
- # 等待渲染并检查偏移
- await asyncio.sleep(1)
-
- after_y_result = await cdp_session.cdp_client.send.Runtime.evaluate(
- params={'expression': 'window.scrollY'},
- session_id=cdp_session.session_id
- )
- after_y = after_y_result.get('result', {}).get('value', 0)
- # 3. 验证是否真的动了
- if before_y == after_y and index is None:
- return ToolResult(
- title="滚动无效",
- output="页面已到达边界或滚动被拦截",
- error="No movement detected"
- )
- return action_result_to_tool_result(result, f"已滚动")
- except Exception as e:
- # --- 核心修复 2: 必须补全 output 参数,否则框架会报错 ---
- return ToolResult(
- title="滚动失败",
- output="", # 补全这个缺失的必填参数
- error=str(e)
- )
- @tool()
- async def browser_find_text(text: str) -> ToolResult:
- """
- 查找页面中的文本并滚动到该位置
- Find text on the page and scroll to it
- 在页面中搜索指定的文本,找到后自动滚动到该位置。
- Args:
- text: 要查找的文本内容
- Returns:
- ToolResult: 包含查找结果的工具返回对象
- Example:
- find_text("Privacy Policy")
- """
- try:
- browser, tools = await get_browser_session()
- result = await tools.find_text(
- text=text,
- browser_session=browser
- )
- return action_result_to_tool_result(result, f"查找文本: {text}")
- except Exception as e:
- return ToolResult(
- title="查找失败",
- output="",
- error=f"Failed to find text: {str(e)}",
- long_term_memory=f"查找文本 '{text}' 失败"
- )
- @tool()
- async def browser_screenshot() -> ToolResult:
- """
- 请求在下次观察中包含页面截图
- Request a screenshot to be included in the next observation
- 用于视觉检查页面状态,帮助理解页面布局和内容。
- Returns:
- ToolResult: 包含截图请求结果的工具返回对象
- Example:
- screenshot()
- Note:
- 截图会在下次页面观察时自动包含在结果中。
- """
- try:
- browser, tools = await get_browser_session()
- result = await tools.screenshot(browser_session=browser)
- return action_result_to_tool_result(result, "截图请求")
- except Exception as e:
- return ToolResult(
- title="截图失败",
- output="",
- error=f"Failed to capture screenshot: {str(e)}",
- long_term_memory="截图失败"
- )
- # ============================================================
- # 标签页管理工具 (Tab Management Tools)
- # ============================================================
- @tool()
- async def browser_switch_tab(tab_id: str) -> ToolResult:
- """
- 切换到指定标签页
- Switch to a different browser tab
- Args:
- tab_id: 4字符标签ID(target_id 的最后4位)
- Returns:
- ToolResult: 切换结果
- Example:
- switch_tab(tab_id="a3f2")
- """
- try:
- browser, tools = await get_browser_session()
- normalized_tab_id = tab_id[-4:] if tab_id else tab_id
- result = await tools.switch(
- tab_id=normalized_tab_id,
- browser_session=browser
- )
- return action_result_to_tool_result(result, f"切换到标签页 {normalized_tab_id}")
- except Exception as e:
- return ToolResult(
- title="切换标签页失败",
- output="",
- error=f"Failed to switch tab: {str(e)}",
- long_term_memory=f"切换到标签页 {tab_id} 失败"
- )
- @tool()
- async def browser_close_tab(tab_id: str) -> ToolResult:
- """
- 关闭指定标签页
- Close a browser tab
- Args:
- tab_id: 4字符标签ID
- Returns:
- ToolResult: 关闭结果
- Example:
- close_tab(tab_id="a3f2")
- """
- try:
- browser, tools = await get_browser_session()
- normalized_tab_id = tab_id[-4:] if tab_id else tab_id
- result = await tools.close(
- tab_id=normalized_tab_id,
- browser_session=browser
- )
- return action_result_to_tool_result(result, f"关闭标签页 {normalized_tab_id}")
- except Exception as e:
- return ToolResult(
- title="关闭标签页失败",
- output="",
- error=f"Failed to close tab: {str(e)}",
- long_term_memory=f"关闭标签页 {tab_id} 失败"
- )
- # ============================================================
- # 下拉框工具 (Dropdown Tools)
- # ============================================================
- @tool()
- async def browser_get_dropdown_options(index: int) -> ToolResult:
- """
- 获取下拉框的所有选项
- Get options from a dropdown element
- Args:
- index: 下拉框的元素索引
- Returns:
- ToolResult: 包含所有选项的结果
- Example:
- get_dropdown_options(index=8)
- """
- try:
- browser, tools = await get_browser_session()
- result = await tools.dropdown_options(
- index=index,
- browser_session=browser
- )
- return action_result_to_tool_result(result, f"获取下拉框选项: {index}")
- except Exception as e:
- return ToolResult(
- title="获取下拉框选项失败",
- output="",
- error=f"Failed to get dropdown options: {str(e)}",
- long_term_memory=f"获取下拉框 {index} 选项失败"
- )
- @tool()
- async def browser_select_dropdown_option(index: int, text: str) -> ToolResult:
- """
- 选择下拉框选项
- Select an option from a dropdown
- Args:
- index: 下拉框的元素索引
- text: 要选择的选项文本(精确匹配)
- Returns:
- ToolResult: 选择结果
- Example:
- select_dropdown_option(index=8, text="Option 2")
- """
- try:
- browser, tools = await get_browser_session()
- result = await tools.select_dropdown(
- index=index,
- text=text,
- browser_session=browser
- )
- return action_result_to_tool_result(result, f"选择下拉框选项: {text}")
- except Exception as e:
- return ToolResult(
- title="选择下拉框选项失败",
- output="",
- error=f"Failed to select dropdown option: {str(e)}",
- long_term_memory=f"选择选项 '{text}' 失败"
- )
- # ============================================================
- # 内容提取工具 (Content Extraction Tools)
- # ============================================================
- def scrub_search_redirect_url(url: str) -> str:
- """
- 自动检测并解析 Bing/Google 等搜索引擎的重定向链接,提取真实目标 URL。
- """
- if not url or not isinstance(url, str):
- return url
-
- try:
- parsed = urlparse(url)
-
- # 1. 处理 Bing 重定向 (特征:u 参数带 Base64)
- # 示例:...&u=a1aHR0cHM6Ly96aHVhbmxhbi56aGlodS5jb20vcC8zODYxMjgwOQ&...
- if "bing.com" in parsed.netloc:
- u_param = parse_qs(parsed.query).get('u', [None])[0]
- if u_param:
- # 移除开头的 'a1', 'a0' 等标识符
- b64_str = u_param[2:]
- # 补齐 Base64 填充符
- padding = '=' * (4 - len(b64_str) % 4)
- decoded = base64.b64decode(b64_str + padding).decode('utf-8', errors='ignore')
- if decoded.startswith('http'):
- return decoded
- # 2. 处理 Google 重定向 (特征:url 参数)
- if "google.com" in parsed.netloc:
- url_param = parse_qs(parsed.query).get('url', [None])[0]
- if url_param:
- return unquote(url_param)
- # 3. 兜底:处理常见的跳转参数
- for param in ['target', 'dest', 'destination', 'link']:
- found = parse_qs(parsed.query).get(param, [None])[0]
- if found and found.startswith('http'):
- return unquote(found)
-
- except Exception:
- pass # 解析失败则返回原链接
-
- return url
- async def extraction_adapter(input_data):
- # 提取字符串
- if isinstance(input_data, list):
- prompt = input_data[-1].content if hasattr(input_data[-1], 'content') else str(input_data[-1])
- else:
- prompt = str(input_data)
-
- response = await openrouter_llm_call(
- messages=[{"role": "user", "content": prompt}]
- )
-
- content = response["content"]
-
- # --- 核心改进:URL 自动修复 ---
- # 使用正则表达式匹配内容中的所有 URL,并尝试进行洗涤
- urls = re.findall(r'https?://[^\s<>"\']+', content)
- for original_url in urls:
- clean_url = scrub_search_redirect_url(original_url)
- if clean_url != original_url:
- content = content.replace(original_url, clean_url)
-
- from argparse import Namespace
- return Namespace(completion=content)
- @tool()
- async def browser_extract_content(query: str, extract_links: bool = False,
- start_from_char: int = 0) -> ToolResult:
- """
- 使用 LLM 从页面提取结构化数据
- Extract content from the current page using LLM
- Args:
- query: 提取查询(告诉 LLM 要提取什么内容)
- extract_links: 是否提取链接(默认 False,节省 token)
- start_from_char: 从哪个字符开始提取(用于分页提取大内容)
- Returns:
- ToolResult: 提取的内容
- Example:
- extract_content(query="提取页面上所有产品的名称和价格", extract_links=True)
- Note:
- 需要配置 page_extraction_llm,否则会失败
- 支持分页提取,最大100k字符
- """
- try:
- browser, tools = await get_browser_session()
- # 注意:extract 需要 page_extraction_llm 参数
- # 这里我们假设用户会在初始化时配置 LLM
- # 如果没有配置,会抛出异常
- result = await tools.extract(
- query=query,
- extract_links=extract_links,
- start_from_char=start_from_char,
- browser_session=browser,
- page_extraction_llm=RunnableLambda(extraction_adapter), # 需要用户配置
- file_system=_file_system
- )
- return action_result_to_tool_result(result, f"提取内容: {query}")
- except Exception as e:
- return ToolResult(
- title="内容提取失败",
- output="",
- error=f"Failed to extract content: {str(e)}",
- long_term_memory=f"提取内容失败: {query}"
- )
- async def _detect_and_download_pdf_via_cdp(browser) -> Optional[str]:
- """
- 检测当前页面是否为 PDF,如果是则通过 CDP(浏览器内 fetch)下载到本地。
- 优势:自动携带浏览器的 cookies/session,可访问需要登录的 PDF。
- 返回本地文件路径,非 PDF 页面返回 None。
- """
- try:
- current_url = await browser.get_current_page_url()
- if not current_url:
- return None
- parsed = urlparse(current_url)
- is_pdf = parsed.path.lower().endswith('.pdf')
- # URL 不明显是 PDF 时,通过 CDP 检查 content-type
- if not is_pdf:
- try:
- cdp = await browser.get_or_create_cdp_session()
- ct_result = await cdp.cdp_client.send.Runtime.evaluate(
- params={'expression': 'document.contentType'},
- session_id=cdp.session_id
- )
- content_type = ct_result.get('result', {}).get('value', '')
- is_pdf = 'pdf' in content_type.lower()
- except Exception:
- pass
- if not is_pdf:
- return None
- # 通过浏览器内 fetch API 下载 PDF(自动携带 cookies)
- cdp = await browser.get_or_create_cdp_session()
- js_code = """
- (async () => {
- try {
- const resp = await fetch(window.location.href);
- if (!resp.ok) return JSON.stringify({error: 'HTTP ' + resp.status});
- const blob = await resp.blob();
- return new Promise((resolve, reject) => {
- const reader = new FileReader();
- reader.onloadend = () => resolve(JSON.stringify({data: reader.result}));
- reader.onerror = () => resolve(JSON.stringify({error: 'FileReader failed'}));
- reader.readAsDataURL(blob);
- });
- } catch(e) {
- return JSON.stringify({error: e.message});
- }
- })()
- """
- result = await cdp.cdp_client.send.Runtime.evaluate(
- params={
- 'expression': js_code,
- 'awaitPromise': True,
- 'returnByValue': True,
- 'timeout': 60000
- },
- session_id=cdp.session_id
- )
- value = result.get('result', {}).get('value', '')
- if not value:
- print("⚠️ CDP fetch PDF: 无返回值")
- return None
- data = json.loads(value)
- if 'error' in data:
- print(f"⚠️ CDP fetch PDF 失败: {data['error']}")
- return None
- # 从 data URL 中提取 base64 并解码
- data_url = data['data'] # data:application/pdf;base64,JVBERi0...
- base64_data = data_url.split(',', 1)[1]
- pdf_bytes = base64.b64decode(base64_data)
- # 保存到本地
- save_dir = Path.cwd() / ".browser_use_files"
- save_dir.mkdir(parents=True, exist_ok=True)
- filename = Path(parsed.path).name if parsed.path else ""
- if not filename or not filename.lower().endswith('.pdf'):
- import time
- filename = f"downloaded_{int(time.time())}.pdf"
- save_path = str(save_dir / filename)
- with open(save_path, 'wb') as f:
- f.write(pdf_bytes)
- print(f"📄 PDF 已通过 CDP 下载到: {save_path} ({len(pdf_bytes)} bytes)")
- return save_path
- except Exception as e:
- print(f"⚠️ PDF 检测/下载异常: {e}")
- return None
- @tool()
- async def browser_read_long_content(
- goal: Union[str, dict],
- source: str = "page",
- context: str = "",
- **kwargs
- ) -> ToolResult:
- """
- 智能读取长内容。支持自动检测并读取网页上的 PDF 文件。
- 当 source="page" 且当前页面是 PDF 时,会通过 CDP 下载 PDF 并用 pypdf 解析,
- 而非使用 DOM 提取(DOM 无法读取浏览器内置 PDF Viewer 的内容)。
- 通过 CDP 下载可自动携带浏览器的 cookies/session,支持需要登录的 PDF。
- """
- try:
- browser, tools = await get_browser_session()
- # 1. 提取目标文本 (针对 GoalTree 字典结构)
- final_goal_text = ""
- if isinstance(goal, dict):
- final_goal_text = goal.get("mission") or goal.get("goal") or str(goal)
- else:
- final_goal_text = str(goal)
- # 2. 清洗业务背景 (过滤框架注入的 dict 类型 context)
- business_context = context if isinstance(context, str) else ""
- # 3. PDF 自动检测:当 source="page" 时检查是否为 PDF 页面
- available_files = []
- if source.lower() == "page":
- pdf_path = await _detect_and_download_pdf_via_cdp(browser)
- if pdf_path:
- source = pdf_path
- available_files.append(pdf_path)
- # 4. 验证并实例化
- action_params = ReadContentAction(
- goal=final_goal_text,
- source=source,
- context=business_context
- )
- # 5. 解包参数调用底层方法
- result = await tools.read_long_content(
- **action_params.model_dump(),
- browser_session=browser,
- page_extraction_llm=RunnableLambda(extraction_adapter),
- available_file_paths=available_files
- )
- return action_result_to_tool_result(result, f"深度读取: {source}")
- except Exception as e:
- return ToolResult(
- title="深度读取失败",
- output="",
- error=f"Read long content failed: {str(e)}",
- long_term_memory="参数解析或校验失败,请检查输入"
- )
- @tool()
- async def browser_get_page_html() -> ToolResult:
- """
- 获取当前页面的完整 HTML
- Get the full HTML of the current page
- 返回当前页面的完整 HTML 源代码。
- Returns:
- ToolResult: 包含页面 HTML 的工具返回对象
- Example:
- get_page_html()
- Note:
- - 返回的是完整的 HTML 源代码
- - 输出会被限制在 10000 字符以内(完整内容保存在 metadata 中)
- """
- try:
- browser, tools = await get_browser_session()
- # 使用 CDP 获取页面 HTML
- cdp = await browser.get_or_create_cdp_session()
- # 获取页面内容
- result = await cdp.cdp_client.send.Runtime.evaluate(
- params={'expression': 'document.documentElement.outerHTML'},
- session_id=cdp.session_id
- )
- html = result.get('result', {}).get('value', '')
- # 获取 URL 和标题
- url = await browser.get_current_page_url()
- title_result = await cdp.cdp_client.send.Runtime.evaluate(
- params={'expression': 'document.title'},
- session_id=cdp.session_id
- )
- title = title_result.get('result', {}).get('value', '')
- # 限制输出大小
- output_html = html
- if len(html) > 10000:
- output_html = html[:10000] + "... (truncated)"
- return ToolResult(
- title=f"获取 HTML: {url}",
- output=f"页面: {title}\nURL: {url}\n\nHTML:\n{output_html}",
- long_term_memory=f"获取 HTML: {url}",
- metadata={"url": url, "title": title, "html": html}
- )
- except Exception as e:
- return ToolResult(
- title="获取 HTML 失败",
- output="",
- error=f"Failed to get page HTML: {str(e)}",
- long_term_memory="获取 HTML 失败"
- )
- @tool()
- async def browser_get_selector_map() -> ToolResult:
- """
- 获取当前页面的元素索引映射
- Get the selector map of interactive elements on the current page
- 返回页面所有可交互元素的索引字典,用于后续的元素操作。
- Returns:
- ToolResult: 包含元素映射的工具返回对象
- Example:
- get_selector_map()
- Note:
- 返回的索引可以用于 click_element, input_text 等操作
- """
- try:
- browser, tools = await get_browser_session()
- # 关键修复:先触发 BrowserStateRequestEvent 来更新 DOM 状态
- # 这会触发 DOM watchdog 重新构建 DOM 树并更新 selector_map
- from browser_use.browser.events import BrowserStateRequestEvent
- # 触发事件并等待结果
- event = browser.event_bus.dispatch(
- BrowserStateRequestEvent(
- include_dom=True,
- include_screenshot=False, # 不需要截图,节省时间
- include_recent_events=False
- )
- )
- # 等待 DOM 更新完成
- browser_state = await event.event_result(raise_if_none=True, raise_if_any=True)
- # 从更新后的状态中获取 selector_map
- selector_map = browser_state.dom_state.selector_map if browser_state.dom_state else {}
- # 构建输出信息
- elements_info = []
- for index, node in list(selector_map.items())[:20]: # 只显示前20个
- tag = node.tag_name
- attrs = node.attributes or {}
- text = attrs.get('aria-label') or attrs.get('placeholder') or attrs.get('value', '')
- elements_info.append(f"索引 {index}: <{tag}> {text[:50]}")
- output = f"找到 {len(selector_map)} 个交互元素\n\n"
- output += "\n".join(elements_info)
- if len(selector_map) > 20:
- output += f"\n... 还有 {len(selector_map) - 20} 个元素"
- return ToolResult(
- title="获取元素映射",
- output=output,
- long_term_memory=f"获取到 {len(selector_map)} 个交互元素",
- metadata={"selector_map": {k: str(v) for k, v in list(selector_map.items())[:100]}}
- )
- except Exception as e:
- return ToolResult(
- title="获取元素映射失败",
- output="",
- error=f"Failed to get selector map: {str(e)}",
- long_term_memory="获取元素映射失败"
- )
- # ============================================================
- # JavaScript 执行工具 (JavaScript Tools)
- # ============================================================
- @tool()
- async def browser_evaluate(code: str) -> ToolResult:
- """
- 在页面中执行 JavaScript 代码
- Execute JavaScript code in the page context
- 允许在当前页面中执行任意 JavaScript 代码,用于复杂的页面操作或数据提取。
- Args:
- code: 要执行的 JavaScript 代码字符串
- Returns:
- ToolResult: 包含执行结果的工具返回对象
- Example:
- evaluate("document.title")
- evaluate("document.querySelectorAll('a').length")
- Note:
- - 代码在页面上下文中执行,可以访问 DOM 和全局变量
- - 返回值会被自动序列化为字符串
- - 执行结果限制在 20k 字符以内
- """
- try:
- browser, tools = await get_browser_session()
- result = await tools.evaluate(
- code=code,
- browser_session=browser
- )
- return action_result_to_tool_result(result, "执行 JavaScript")
- except Exception as e:
- return ToolResult(
- title="JavaScript 执行失败",
- output="",
- error=f"Failed to execute JavaScript: {str(e)}",
- long_term_memory="JavaScript 执行失败"
- )
- @tool()
- async def browser_ensure_login_with_cookies(cookie_type: str, url: str = "https://www.xiaohongshu.com") -> ToolResult:
- """
- 检查登录状态并在需要时注入 cookies
- """
- try:
- browser, tools = await get_browser_session()
- if url:
- await tools.navigate(url=url, browser_session=browser)
- await tools.wait(seconds=2, browser_session=browser)
- check_login_js = """
- (function() {
- const loginBtn = document.querySelector('[class*="login"]') ||
- document.querySelector('[href*="login"]') ||
- Array.from(document.querySelectorAll('button, a')).find(el => (el.textContent || '').includes('登录'));
- const userInfo = document.querySelector('[class*="user"]') ||
- document.querySelector('[class*="avatar"]');
- return {
- needLogin: !!loginBtn && !userInfo,
- hasLoginBtn: !!loginBtn,
- hasUserInfo: !!userInfo
- };
- })()
- """
- result = await tools.evaluate(code=check_login_js, browser_session=browser)
- status_output = result.extracted_content
- if isinstance(status_output, str) and status_output.startswith("Result: "):
- status_output = status_output[8:]
- login_info: Dict[str, Any] = {}
- if isinstance(status_output, str):
- try:
- login_info = json.loads(status_output)
- except Exception:
- login_info = {}
- elif isinstance(status_output, dict):
- login_info = status_output
- if not login_info.get("needLogin"):
- output = json.dumps({"need_login": False}, ensure_ascii=False)
- return ToolResult(
- title="已登录",
- output=output,
- long_term_memory=output
- )
- row = _fetch_cookie_row(cookie_type)
- cookie_value = _extract_cookie_value(row)
- if not cookie_value:
- output = json.dumps({"need_login": True, "cookies_count": 0}, ensure_ascii=False)
- return ToolResult(
- title="未找到 cookies",
- output=output,
- error="未找到 cookies",
- long_term_memory=output
- )
- domain, base_url = _cookie_domain_for_type(cookie_type, url)
- cookies = _normalize_cookies(cookie_value, domain, base_url)
- if not cookies:
- output = json.dumps({"need_login": True, "cookies_count": 0}, ensure_ascii=False)
- return ToolResult(
- title="cookies 解析失败",
- output=output,
- error="cookies 解析失败",
- long_term_memory=output
- )
- await browser._cdp_set_cookies(cookies)
- if url:
- await tools.navigate(url=url, browser_session=browser)
- await tools.wait(seconds=2, browser_session=browser)
- output = json.dumps({"need_login": True, "cookies_count": len(cookies)}, ensure_ascii=False)
- return ToolResult(
- title="已注入 cookies",
- output=output,
- long_term_memory=output
- )
- except Exception as e:
- return ToolResult(
- title="登录检查失败",
- output="",
- error=str(e),
- long_term_memory="登录检查失败"
- )
- # ============================================================
- # 等待用户操作工具 (Wait for User Action)
- # ============================================================
- @tool()
- async def browser_wait_for_user_action(message: str = "Please complete the action in browser",
- timeout: int = 300) -> ToolResult:
- """
- 等待用户在浏览器中完成操作(如登录)
- Wait for user to complete an action in the browser (e.g., login)
- 暂停自动化流程,等待用户手动完成某些操作(如登录、验证码等)。
- Args:
- message: 提示用户需要完成的操作
- timeout: 最大等待时间(秒),默认 300 秒(5 分钟)
- Returns:
- ToolResult: 包含等待结果的工具返回对象
- Example:
- wait_for_user_action("Please login to Xiaohongshu", timeout=180)
- wait_for_user_action("Please complete the CAPTCHA", timeout=60)
- Note:
- - 用户需要在浏览器窗口中手动完成操作
- - 完成后按回车键继续
- - 超时后会自动继续执行
- """
- try:
- import asyncio
- print(f"\n{'='*60}")
- print(f"⏸️ WAITING FOR USER ACTION")
- print(f"{'='*60}")
- print(f"📝 {message}")
- print(f"⏱️ Timeout: {timeout} seconds")
- print(f"\n👉 Please complete the action in the browser window")
- print(f"👉 Press ENTER when done, or wait for timeout")
- print(f"{'='*60}\n")
- # Wait for user input or timeout
- try:
- loop = asyncio.get_event_loop()
- # Wait for either user input or timeout
- await asyncio.wait_for(
- loop.run_in_executor(None, input),
- timeout=timeout
- )
- return ToolResult(
- title="用户操作完成",
- output=f"User completed: {message}",
- long_term_memory=f"用户完成操作: {message}"
- )
- except asyncio.TimeoutError:
- return ToolResult(
- title="用户操作超时",
- output=f"Timeout waiting for: {message}",
- long_term_memory=f"等待用户操作超时: {message}"
- )
- except Exception as e:
- return ToolResult(
- title="等待用户操作失败",
- output="",
- error=f"Failed to wait for user action: {str(e)}",
- long_term_memory="等待用户操作失败"
- )
- # ============================================================
- # 任务完成工具 (Task Completion)
- # ============================================================
- @tool()
- async def browser_done(text: str, success: bool = True,
- files_to_display: Optional[List[str]] = None) -> ToolResult:
- """
- 标记任务完成并返回最终消息
- Mark the task as complete and return final message to user
- Args:
- text: 给用户的最终消息
- success: 任务是否成功完成
- files_to_display: 可选的要显示的文件路径列表
- Returns:
- ToolResult: 完成结果
- Example:
- done("任务已完成,提取了10个产品信息", success=True)
- """
- try:
- browser, tools = await get_browser_session()
- result = await tools.done(
- text=text,
- success=success,
- files_to_display=files_to_display,
- file_system=_file_system
- )
- return action_result_to_tool_result(result, "任务完成")
- except Exception as e:
- return ToolResult(
- title="标记任务完成失败",
- output="",
- error=f"Failed to complete task: {str(e)}",
- long_term_memory="标记任务完成失败"
- )
- # ============================================================
- # 导出所有工具函数(供外部使用)
- # ============================================================
- __all__ = [
- # 会话管理
- 'init_browser_session',
- 'get_browser_session',
- 'cleanup_browser_session',
- 'kill_browser_session',
- # 导航类工具
- 'browser_navigate_to_url',
- 'browser_search_web',
- 'browser_go_back',
- 'browser_wait',
- # 元素交互工具
- 'browser_click_element',
- 'browser_input_text',
- 'browser_send_keys',
- 'browser_upload_file',
- # 滚动和视图工具
- 'browser_scroll_page',
- 'browser_find_text',
- 'browser_screenshot',
- # 标签页管理工具
- 'browser_switch_tab',
- 'browser_close_tab',
- # 下拉框工具
- 'browser_get_dropdown_options',
- 'browser_select_dropdown_option',
- # 内容提取工具
- 'browser_extract_content',
- 'browser_get_page_html',
- 'browser_read_long_content',
- 'browser_get_selector_map',
- # JavaScript 执行工具
- 'browser_evaluate',
- 'browser_ensure_login_with_cookies',
- # 等待用户操作
- 'browser_wait_for_user_action',
- # 任务完成
- 'browser_done',
- ]
|