| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200 |
- """
- Browser-Use 原生工具适配器
- Native Browser-Use Tools Adapter
- 直接使用 browser-use 的原生类(BrowserSession, Tools)实现所有浏览器操作工具。
- 不依赖 Playwright,完全基于 CDP 协议。
- 核心特性:
- 1. 浏览器会话持久化 - 只启动一次浏览器
- 2. 状态自动保持 - 登录状态、Cookie、LocalStorage 等
- 3. 完整的底层访问 - 可以直接使用 CDP 协议
- 4. 性能优异 - 避免频繁创建/销毁浏览器实例
- 5. 多种浏览器类型 - 支持 local、cloud、container 三种模式
- 支持的浏览器类型:
- 1. Local (本地浏览器):
- - 在本地运行 Chrome
- - 支持可视化调试
- - 速度最快
- - 示例: init_browser_session(browser_type="local")
- 2. Cloud (云浏览器):
- - 在云端运行
- - 不占用本地资源
- - 适合生产环境
- - 示例: init_browser_session(browser_type="cloud")
- 3. Container (容器浏览器):
- - 在独立容器中运行
- - 隔离性好
- - 支持预配置账户
- - 示例: init_browser_session(browser_type="container", container_url="https://example.com")
- 使用方法:
- 1. 在 Agent 初始化时调用 init_browser_session() 并指定 browser_type
- 2. 使用各个工具函数执行浏览器操作
- 3. 任务结束时调用 cleanup_browser_session()
- 文件操作说明:
- - 浏览器专用文件目录:.cache/.browser_use_files/ (在当前工作目录下)
- 用于存储浏览器会话产生的临时文件(下载、上传、截图等)
- - 一般文件操作:请使用 agent.tools.builtin 中的文件工具 (read_file, write_file, edit_file)
- 这些工具功能更完善,支持diff预览、智能匹配、分页读取等
- """
- import logging
- import sys
- import os
- import json
- import httpx
- import asyncio
- import aiohttp
- import re
- import base64
- from urllib.parse import urlparse, parse_qs, unquote
- from typing import Optional, List, Dict, Any, Tuple, Union
- from pathlib import Path
- from langchain_core.runnables import RunnableLambda
- from argparse import Namespace # 使用 Namespace 快速构造带属性的对象
- from langchain_core.messages import AIMessage
- from ....llm.openrouter import openrouter_llm_call
- # 将项目根目录添加到 Python 路径
- sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
- # 配置日志
- logger = logging.getLogger(__name__)
- # 导入框架的工具装饰器和结果类
- from agent.tools import tool, ToolResult
- from agent.tools.builtin.browser.sync_mysql_help import mysql
- # 导入 browser-use 的核心类
- from browser_use import BrowserSession, BrowserProfile
- from browser_use.tools.service import Tools
- try:
- from browser_use.tools.views import ReadContentAction # type: ignore
- except Exception:
- from pydantic import BaseModel
- class ReadContentAction(BaseModel):
- goal: str
- source: str = "page"
- context: str = ""
- from browser_use.agent.views import ActionResult
- from browser_use.filesystem.file_system import FileSystem
- # ============================================================
- # 无需注册的内部辅助函数
- # ============================================================
- # ============================================================
- # 全局浏览器会话管理
- # ============================================================
- # 全局变量:浏览器会话和工具实例
- _browser_session: Optional[BrowserSession] = None
- _browser_tools: Optional[Tools] = None
- _file_system: Optional[FileSystem] = None
- async def create_container(url: str, account_name: str = "liuwenwu") -> Dict[str, Any]:
- """
- 创建浏览器容器并导航到指定URL
- 按照 test.md 的要求:
- 1.1 调用接口创建容器
- 1.2 调用接口创建窗口并导航到URL
- Args:
- url: 要导航的URL地址
- account_name: 账户名称
- Returns:
- 包含容器信息的字典:
- - success: 是否成功
- - container_id: 容器ID
- - vnc: VNC访问URL
- - cdp: CDP协议URL(用于浏览器连接)
- - connection_id: 窗口连接ID
- - error: 错误信息(如果失败)
- """
- result = {
- "success": False,
- "container_id": None,
- "vnc": None,
- "cdp": None,
- "connection_id": None,
- "error": None
- }
- try:
- async with aiohttp.ClientSession() as session:
- # 步骤1.1: 创建容器
- print("📦 步骤1.1: 创建容器...")
- create_url = "http://47.84.182.56:8200/api/v1/container/create"
- create_payload = {
- "auto_remove": True,
- "need_port_binding": True,
- "max_lifetime_seconds": 900
- }
- async with session.post(create_url, json=create_payload) as resp:
- if resp.status != 200:
- raise RuntimeError(f"创建容器失败: HTTP {resp.status}")
- create_result = await resp.json()
- if create_result.get("code") != 0:
- raise RuntimeError(f"创建容器失败: {create_result.get('msg')}")
- data = create_result.get("data", {})
- result["container_id"] = data.get("container_id")
- result["vnc"] = data.get("vnc")
- result["cdp"] = data.get("cdp")
- print(f"✅ 容器创建成功")
- print(f" Container ID: {result['container_id']}")
- print(f" VNC: {result['vnc']}")
- print(f" CDP: {result['cdp']}")
- # 等待容器内的浏览器启动
- print(f"\n⏳ 等待容器内浏览器启动...")
- await asyncio.sleep(5)
- # 步骤1.2: 创建页面并导航
- print(f"\n📱 步骤1.2: 创建页面并导航到 {url}...")
- page_create_url = "http://47.84.182.56:8200/api/v1/browser/page/create"
- page_payload = {
- "container_id": result["container_id"],
- "url": url,
- "account_name": account_name,
- "need_wait": True,
- "timeout": 30
- }
- # 重试机制:最多尝试3次
- max_retries = 3
- page_created = False
- last_error = None
- for attempt in range(max_retries):
- try:
- if attempt > 0:
- print(f" 重试 {attempt + 1}/{max_retries}...")
- await asyncio.sleep(3) # 重试前等待
- async with session.post(page_create_url, json=page_payload, timeout=aiohttp.ClientTimeout(total=60)) as resp:
- if resp.status != 200:
- response_text = await resp.text()
- last_error = f"HTTP {resp.status}: {response_text[:200]}"
- continue
- page_result = await resp.json()
- if page_result.get("code") != 0:
- last_error = f"{page_result.get('msg')}"
- continue
- page_data = page_result.get("data", {})
- result["connection_id"] = page_data.get("connection_id")
- result["success"] = True
- page_created = True
- print(f"✅ 页面创建成功")
- print(f" Connection ID: {result['connection_id']}")
- break
- except asyncio.TimeoutError:
- last_error = "请求超时"
- continue
- except aiohttp.ClientError as e:
- last_error = f"网络错误: {str(e)}"
- continue
- except Exception as e:
- last_error = f"未知错误: {str(e)}"
- continue
- if not page_created:
- raise RuntimeError(f"创建页面失败(尝试{max_retries}次后): {last_error}")
- except Exception as e:
- result["error"] = str(e)
- print(f"❌ 错误: {str(e)}")
- return result
- async def init_browser_session(
- browser_type: str = "local",
- # TEMPORARY FIX (2026-03-02): 改为 True 以解决 CDP 连接时序问题
- # browser-use 在非 headless 模式下有时会在 Chrome 完全启动前尝试连接 CDP,
- # 导致 "JSONDecodeError: Expecting value" 错误
- # TODO: 之后改回 headless: bool = False,或在 browser-use 修复此问题后移除此注释
- headless: bool = True, # 原值: False
- url: Optional[str] = None,
- profile_name: str = "default",
- user_data_dir: Optional[str] = None,
- browser_profile: Optional[BrowserProfile] = None,
- **kwargs
- ) -> tuple[BrowserSession, Tools]:
- global _browser_session, _browser_tools, _file_system
- if _browser_session is not None:
- return _browser_session, _browser_tools
- valid_types = ["local", "cloud", "container"]
- if browser_type not in valid_types:
- raise ValueError(f"无效的 browser_type: {browser_type}")
- # --- 核心:定义本地统一存储路径 ---
- save_dir = Path.cwd() / ".cache/.browser_use_files"
- save_dir.mkdir(parents=True, exist_ok=True)
- # 基础参数配置
- session_params = {
- "headless": headless,
- # 告诉 Playwright 所有的下载临时流先存入此本地目录
- "downloads_path": str(save_dir),
- }
- if browser_type == "container":
- print("🐳 使用容器浏览器模式")
- if not url: url = "about:blank"
- container_info = await create_container(url=url, account_name=profile_name)
- if not container_info["success"]:
- raise RuntimeError(f"容器创建失败: {container_info['error']}")
- session_params["cdp_url"] = container_info["cdp"]
- await asyncio.sleep(3)
- elif browser_type == "cloud":
- print("🌐 使用云浏览器模式")
- session_params["use_cloud"] = True
- if profile_name and profile_name != "default":
- session_params["cloud_profile_id"] = profile_name
- else: # local
- print("💻 使用本地浏览器模式")
- session_params["is_local"] = True
- if user_data_dir is None and profile_name:
- user_data_dir = str(Path.home() / ".browser_use" / "profiles" / profile_name)
- Path(user_data_dir).mkdir(parents=True, exist_ok=True)
- session_params["user_data_dir"] = user_data_dir
-
- # macOS 路径兼容
- import platform
- if platform.system() == "Darwin":
- chrome_path = "/Applications/Google Chrome.app/Contents/MacOS/Google Chrome"
- if Path(chrome_path).exists():
- session_params["executable_path"] = chrome_path
- if browser_profile:
- session_params["browser_profile"] = browser_profile
- session_params.update(kwargs)
- # 创建会话
- _browser_session = BrowserSession(**session_params)
- # 添加短暂延迟,确保 Chrome CDP 端点完全就绪
- await asyncio.sleep(1)
- await _browser_session.start()
- _browser_tools = Tools()
- _file_system = FileSystem(base_dir=str(save_dir))
- print(f"✅ 浏览器会话初始化成功 | 默认下载路径: {save_dir}")
- if browser_type in ["local", "cloud"] and url:
- await _browser_tools.navigate(url=url, browser_session=_browser_session)
- return _browser_session, _browser_tools
- async def get_browser_session() -> tuple[BrowserSession, Tools]:
- """
- 获取当前浏览器会话,如果不存在或连接已断开则自动重新创建
- Returns:
- (BrowserSession, Tools) 元组
- """
- global _browser_session, _browser_tools, _file_system
- if _browser_session is not None:
- # 检查底层 CDP 连接是否仍然存活
- # 当 runner.stop() 暂停后用户在菜单停留较久,WebSocket 可能超时断开,
- # 但 _browser_session 对象仍然存在,导致后续操作抛出 ConnectionClosedError
- alive = False
- try:
- cdp_root = getattr(_browser_session, '_cdp_client_root', None)
- sess_mgr = getattr(_browser_session, 'session_manager', None)
- if cdp_root is not None and sess_mgr is not None:
- cdp_session = await _browser_session.get_or_create_cdp_session()
- await asyncio.wait_for(
- cdp_session.cdp_client.send.Runtime.evaluate(
- params={'expression': '1+1'},
- session_id=cdp_session.session_id
- ),
- timeout=3.0,
- )
- alive = True
- except Exception:
- pass
- if not alive:
- print("⚠️ 浏览器会话连接已断开,正在重新初始化...")
- try:
- await cleanup_browser_session()
- except Exception:
- _browser_session = None
- _browser_tools = None
- _file_system = None
- if _browser_session is None:
- await init_browser_session()
- return _browser_session, _browser_tools
- async def cleanup_browser_session():
- """
- 清理浏览器会话
- 优雅地停止浏览器但保留会话状态
- """
- global _browser_session, _browser_tools, _file_system
- if _browser_session is not None:
- await _browser_session.stop()
- _browser_session = None
- _browser_tools = None
- _file_system = None
- async def kill_browser_session():
- """
- 强制终止浏览器会话
- 完全关闭浏览器进程
- """
- global _browser_session, _browser_tools, _file_system
- if _browser_session is not None:
- await _browser_session.kill()
- _browser_session = None
- _browser_tools = None
- _file_system = None
- # ============================================================
- # 辅助函数:ActionResult 转 ToolResult
- # ============================================================
- def action_result_to_tool_result(result: ActionResult, title: str = None) -> ToolResult:
- """
- 将 browser-use 的 ActionResult 转换为框架的 ToolResult
- Args:
- result: browser-use 的 ActionResult
- title: 可选的标题(如果不提供则从 result 推断)
- Returns:
- ToolResult
- """
- if result.error:
- return ToolResult(
- title=title or "操作失败",
- output="",
- error=result.error,
- long_term_memory=result.long_term_memory or result.error
- )
- return ToolResult(
- title=title or "操作成功",
- output=result.extracted_content or "",
- long_term_memory=result.long_term_memory or result.extracted_content or "",
- metadata=result.metadata or {}
- )
- def _cookie_domain_for_type(cookie_type: str, url: str) -> Tuple[str, str]:
- if cookie_type:
- key = cookie_type.lower()
- if key in {"xiaohongshu", "xhs"}:
- return ".xiaohongshu.com", "https://www.xiaohongshu.com"
- parsed = urlparse(url or "")
- domain = parsed.netloc or ""
- domain = domain.replace("www.", "")
- if domain:
- domain = f".{domain}"
- base_url = f"{parsed.scheme}://{parsed.netloc}" if parsed.scheme and parsed.netloc else url
- return domain, base_url
- def _parse_cookie_string(cookie_str: str, domain: str, url: str) -> List[Dict[str, Any]]:
- cookies: List[Dict[str, Any]] = []
- if not cookie_str:
- return cookies
- parts = cookie_str.split(";")
- for part in parts:
- if not part:
- continue
- if "=" not in part:
- continue
- name, value = part.split("=", 1)
- cookie = {
- "name": str(name).strip(),
- "value": str(value).strip(),
- "domain": domain,
- "path": "/",
- "expires": -1,
- "httpOnly": False,
- "secure": True,
- "sameSite": "None"
- }
- if url:
- cookie["url"] = url
- cookies.append(cookie)
- return cookies
- def _normalize_cookies(cookie_value: Any, domain: str, url: str) -> List[Dict[str, Any]]:
- if cookie_value is None:
- return []
- if isinstance(cookie_value, list):
- return cookie_value
- if isinstance(cookie_value, dict):
- if "cookies" in cookie_value:
- return _normalize_cookies(cookie_value.get("cookies"), domain, url)
- if "name" in cookie_value and "value" in cookie_value:
- return [cookie_value]
- return []
- if isinstance(cookie_value, (bytes, bytearray)):
- cookie_value = cookie_value.decode("utf-8", errors="ignore")
- if isinstance(cookie_value, str):
- text = cookie_value.strip()
- if not text:
- return []
- try:
- parsed = json.loads(text)
- except Exception:
- parsed = None
- if parsed is not None:
- return _normalize_cookies(parsed, domain, url)
- return _parse_cookie_string(text, domain, url)
- return []
- def _extract_cookie_value(row: Optional[Dict[str, Any]]) -> Any:
- if not row:
- return None
- # 优先使用 cookies 字段
- if "cookies" in row:
- return row["cookies"]
- # 兼容其他可能的字段名
- for key, value in row.items():
- if "cookie" in key.lower():
- return value
- return None
- def _fetch_cookie_row(cookie_type: str) -> Optional[Dict[str, Any]]:
- if not cookie_type:
- return None
- try:
- return mysql.fetchone(
- "select * from agent_channel_cookies where type=%s limit 1",
- (cookie_type,)
- )
- except Exception:
- return None
- def _fetch_profile_id(cookie_type: str) -> Optional[str]:
- """从数据库获取 cloud_profile_id"""
- if not cookie_type:
- return None
- try:
- row = mysql.fetchone(
- "select profileId from agent_channel_cookies where type=%s limit 1",
- (cookie_type,)
- )
- if row and "profileId" in row:
- return row["profileId"]
- return None
- except Exception:
- return None
- # ============================================================
- # 需要注册的工具
- # ============================================================
- # ============================================================
- # 导航类工具 (Navigation Tools)
- # ============================================================
- @tool()
- async def browser_navigate_to_url(url: str, new_tab: bool = False) -> ToolResult:
- """
- 导航到指定的 URL
- Navigate to a specific URL
- 使用 browser-use 的原生导航功能,支持在新标签页打开。
- Args:
- url: 要访问的 URL 地址
- new_tab: 是否在新标签页中打开(默认 False)
- Returns:
- ToolResult: 包含导航结果的工具返回对象
- Example:
- navigate_to_url("https://www.baidu.com")
- navigate_to_url("https://www.google.com", new_tab=True)
- """
- try:
- browser, tools = await get_browser_session()
- # 使用 browser-use 的 navigate 工具
- result = await tools.navigate(
- url=url,
- new_tab=new_tab,
- browser_session=browser
- )
- return action_result_to_tool_result(result, f"导航到 {url}")
- except Exception as e:
- return ToolResult(
- title="导航失败",
- output="",
- error=f"Failed to navigate to {url}: {str(e)}",
- long_term_memory=f"导航到 {url} 失败"
- )
- @tool()
- async def browser_search_web(query: str, engine: str = "bing") -> ToolResult:
- """
- 使用搜索引擎搜索
- Search the web using a search engine
- Args:
- query: 搜索关键词
- engine: 搜索引擎 (google, duckduckgo, bing) - 默认: google
- Returns:
- ToolResult: 搜索结果
- Example:
- search_web("Python async programming", engine="google")
- """
- try:
- browser, tools = await get_browser_session()
- # 使用 browser-use 的 search 工具
- result = await tools.search(
- query=query,
- engine=engine,
- browser_session=browser
- )
- return action_result_to_tool_result(result, f"搜索: {query}")
- except Exception as e:
- return ToolResult(
- title="搜索失败",
- output="",
- error=f"Search failed: {str(e)}",
- long_term_memory=f"搜索 '{query}' 失败"
- )
- @tool()
- async def browser_go_back() -> ToolResult:
- """
- 返回到上一个页面
- Go back to the previous page
- 模拟浏览器的"后退"按钮功能。
- Returns:
- ToolResult: 包含返回操作结果的工具返回对象
- """
- try:
- browser, tools = await get_browser_session()
- result = await tools.go_back(browser_session=browser)
- return action_result_to_tool_result(result, "返回上一页")
- except Exception as e:
- return ToolResult(
- title="返回失败",
- output="",
- error=f"Failed to go back: {str(e)}",
- long_term_memory="返回上一页失败"
- )
- @tool()
- async def browser_wait(seconds: int = 3) -> ToolResult:
- """
- 等待指定的秒数
- Wait for a specified number of seconds
- 用于等待页面加载、动画完成或其他异步操作。
- Args:
- seconds: 等待时间(秒),最大30秒
- Returns:
- ToolResult: 包含等待操作结果的工具返回对象
- Example:
- wait(5) # 等待5秒
- """
- try:
- browser, tools = await get_browser_session()
- result = await tools.wait(seconds=seconds, browser_session=browser)
- return action_result_to_tool_result(result, f"等待 {seconds} 秒")
- except Exception as e:
- return ToolResult(
- title="等待失败",
- output="",
- error=f"Failed to wait: {str(e)}",
- long_term_memory="等待失败"
- )
- # ============================================================
- # 元素交互工具 (Element Interaction Tools)
- # ============================================================
- # 定义一个专门捕获下载链接的 Handler
- class DownloadLinkCaptureHandler(logging.Handler):
- def __init__(self):
- super().__init__()
- self.captured_url = None
- def emit(self, record):
- # 如果已经捕获到了(通常第一条是最完整的),就不再处理后续日志
- if self.captured_url:
- return
- message = record.getMessage()
- # 寻找包含下载信息的日志
- if "redirection?filename=" in message or "Failed to download" in message:
- # 使用更严格的正则,确保不抓取带省略号(...)的截断链接
- # 排除掉末尾带有三个点的干扰
- match = re.search(r"https?://[^\s]+(?!\.\.\.)", message)
- if match:
- url = match.group(0)
- # 再次过滤:如果发现提取出的 URL 确实包含三个点,说明依然抓到了截断版,跳过
- if "..." not in url:
- self.captured_url = url
- # print(f"🎯 成功锁定完整直链: {url[:50]}...") # 调试用
- @tool()
- async def browser_download_direct_url(url: str, save_name: str = "book.epub") -> ToolResult:
- save_dir = Path.cwd() / ".cache/.browser_use_files"
- save_dir.mkdir(parents=True, exist_ok=True)
-
- # 提取域名作为 Referer,这能骗过 90% 的防盗链校验
- from urllib.parse import urlparse
- parsed_url = urlparse(url)
- base_url = f"{parsed_url.scheme}://{parsed_url.netloc}/"
-
- # 如果没传 save_name,自动从 URL 获取
- if not save_name:
- import unquote
- # 尝试从 URL 路径获取文件名并解码(处理中文)
- save_name = Path(urlparse(url).path).name or f"download_{int(time.time())}"
- save_name = unquote(save_name)
- target_path = save_dir / save_name
- headers = {
- "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
- "Accept": "*/*",
- "Referer": base_url, # 动态设置 Referer
- "Range": "bytes=0-", # 有时对大文件下载有奇效
- }
- try:
- print(f"🚀 开始下载: {url[:60]}...")
-
- # 使用 follow_redirects=True 处理链接中的 redirection
- async with httpx.AsyncClient(headers=headers, follow_redirects=True, timeout=60.0) as client:
- async with client.stream("GET", url) as response:
- if response.status_code != 200:
- print(f"❌ 下载失败,HTTP 状态码: {response.status_code}")
- return
-
- # 获取实际文件名(如果服务器提供了)
- # 这里会优先使用你指定的 save_name
-
- with open(target_path, "wb") as f:
- downloaded_bytes = 0
- async for chunk in response.aiter_bytes():
- f.write(chunk)
- downloaded_bytes += len(chunk)
- if downloaded_bytes % (1024 * 1024) == 0: # 每下载 1MB 打印一次
- print(f"📥 已下载: {downloaded_bytes // (1024 * 1024)} MB")
- print(f"✅ 下载完成!文件已存至: {target_path}")
- success_msg = f"✅ 下载完成!文件已存至: {target_path}"
- return ToolResult(
- title="直链下载成功",
- output=success_msg,
- long_term_memory=success_msg,
- metadata={"path": str(target_path)}
- )
- except Exception as e:
- # 异常捕获返回
- return ToolResult(
- title="下载异常",
- output="",
- error=f"💥 发生错误: {str(e)}",
- long_term_memory=f"下载任务由于异常中断: {str(e)}"
- )
-
- @tool()
- async def browser_click_element(index: int) -> ToolResult:
- """
- 点击页面元素,并自动通过拦截内部日志获取下载直链。
- """
- # 1. 挂载日志窃听器
- capture_handler = DownloadLinkCaptureHandler()
- logger = logging.getLogger("browser_use") # 拦截整个 browser_use 命名空间
- logger.addHandler(capture_handler)
-
- try:
- browser, tools = await get_browser_session()
- # 2. 执行原生的点击动作
- result = await tools.click(
- index=index,
- browser_session=browser
- )
- # 3. 检查是否有“意外收获”
- download_msg = ""
- if capture_handler.captured_url:
- captured_url = capture_handler.captured_url
- download_msg = f"\n\n⚠️ 系统检测到浏览器下载被拦截,已自动捕获准确直链:\n{captured_url}\n\n建议:你可以直接使用 browser_download_direct_url 工具下载此链接。"
-
- # 如果你想更激进一点,甚至可以在这里直接自动触发本地下载逻辑
- # await auto_download_file(captured_url)
- # 4. 转换结果并附加捕获的信息
- tool_result = action_result_to_tool_result(result, f"点击元素 {index}")
-
- if download_msg:
- # 关键:把日志里的信息塞进 output,这样 LLM 就能看到了!
- tool_result.output = (tool_result.output or "") + download_msg
- tool_result.long_term_memory = (tool_result.long_term_memory or "") + f" 捕获下载链接: {captured_url}"
- return tool_result
- except Exception as e:
- return ToolResult(
- title="点击失败",
- output="",
- error=f"Failed to click element {index}: {str(e)}",
- long_term_memory=f"点击元素 {index} 失败"
- )
- finally:
- # 5. 务必移除监听器,防止内存泄漏和日志污染
- logger.removeHandler(capture_handler)
- @tool()
- async def browser_input_text(index: int, text: str, clear: bool = True) -> ToolResult:
- """
- 在指定元素中输入文本
- Input text into an element
- Args:
- index: 元素索引(从浏览器状态中获取)
- text: 要输入的文本内容
- clear: 是否先清除现有文本(默认 True)
- Returns:
- ToolResult: 包含输入操作结果的工具返回对象
- Example:
- input_text(index=0, text="Hello World", clear=True)
- """
- try:
- browser, tools = await get_browser_session()
- result = await tools.input(
- index=index,
- text=text,
- clear=clear,
- browser_session=browser
- )
- return action_result_to_tool_result(result, f"输入文本到元素 {index}")
- except Exception as e:
- return ToolResult(
- title="输入失败",
- output="",
- error=f"Failed to input text into element {index}: {str(e)}",
- long_term_memory=f"输入文本失败"
- )
- @tool()
- async def browser_send_keys(keys: str) -> ToolResult:
- """
- 发送键盘按键或快捷键
- Send keyboard keys or shortcuts
- 支持发送单个按键、组合键和快捷键。
- Args:
- keys: 要发送的按键字符串
- - 单个按键: "Enter", "Escape", "PageDown", "Tab"
- - 组合键: "Control+o", "Shift+Tab", "Alt+F4"
- - 功能键: "F1", "F2", ..., "F12"
- Returns:
- ToolResult: 包含按键操作结果的工具返回对象
- Example:
- send_keys("Enter")
- send_keys("Control+A")
- """
- try:
- browser, tools = await get_browser_session()
- result = await tools.send_keys(
- keys=keys,
- browser_session=browser
- )
- return action_result_to_tool_result(result, f"发送按键: {keys}")
- except Exception as e:
- return ToolResult(
- title="发送按键失败",
- output="",
- error=f"Failed to send keys: {str(e)}",
- long_term_memory="发送按键失败"
- )
- @tool()
- async def browser_upload_file(index: int, path: str) -> ToolResult:
- """
- 上传文件到文件输入元素
- Upload a file to a file input element
- Args:
- index: 文件输入框的元素索引
- path: 要上传的文件路径(绝对路径)
- Returns:
- ToolResult: 包含上传操作结果的工具返回对象
- Example:
- upload_file(index=7, path="/path/to/file.pdf")
- Note:
- 文件必须存在且路径必须是绝对路径
- """
- try:
- browser, tools = await get_browser_session()
- result = await tools.upload_file(
- index=index,
- path=path,
- browser_session=browser,
- available_file_paths=[path],
- file_system=_file_system
- )
- return action_result_to_tool_result(result, f"上传文件: {path}")
- except Exception as e:
- return ToolResult(
- title="上传失败",
- output="",
- error=f"Failed to upload file: {str(e)}",
- long_term_memory=f"上传文件 {path} 失败"
- )
- # ============================================================
- # 滚动和视图工具 (Scroll & View Tools)
- # ============================================================
- @tool()
- async def browser_scroll_page(down: bool = True, pages: float = 1.0, index: Optional[int] = None) -> ToolResult:
- try:
- # 限制单次滚动幅度,避免 agent 一次滚 100 页
- MAX_PAGES = 10
- if pages > MAX_PAGES:
- pages = MAX_PAGES
- browser, tools = await get_browser_session()
- cdp_session = await browser.get_or_create_cdp_session()
- before_y_result = await cdp_session.cdp_client.send.Runtime.evaluate(
- params={'expression': 'window.scrollY'},
- session_id=cdp_session.session_id
- )
- before_y = before_y_result.get('result', {}).get('value', 0)
- # 执行滚动
- result = await tools.scroll(down=down, pages=pages, index=index, browser_session=browser)
- # 等待渲染(懒加载页面需要更长时间)
- await asyncio.sleep(2)
- after_y_result = await cdp_session.cdp_client.send.Runtime.evaluate(
- params={'expression': 'window.scrollY'},
- session_id=cdp_session.session_id
- )
- after_y = after_y_result.get('result', {}).get('value', 0)
- # 如果第一次检测没动,再等一轮(应对懒加载触发后的延迟滚动)
- if before_y == after_y and index is None:
- await asyncio.sleep(2)
- retry_result = await cdp_session.cdp_client.send.Runtime.evaluate(
- params={'expression': 'window.scrollY'},
- session_id=cdp_session.session_id
- )
- after_y = retry_result.get('result', {}).get('value', 0)
- if before_y == after_y and index is None:
- direction = "下" if down else "上"
- return ToolResult(
- title="滚动无效",
- output=f"页面已到达{direction}边界,无法继续滚动",
- error="No movement detected"
- )
- delta = abs(after_y - before_y)
- direction = "下" if down else "上"
- return action_result_to_tool_result(result, f"已向{direction}滚动 {delta}px")
- except Exception as e:
- # --- 核心修复 2: 必须补全 output 参数,否则框架会报错 ---
- return ToolResult(
- title="滚动失败",
- output="", # 补全这个缺失的必填参数
- error=str(e)
- )
- @tool()
- async def browser_find_text(text: str) -> ToolResult:
- """
- 查找页面中的文本并滚动到该位置
- Find text on the page and scroll to it
- 在页面中搜索指定的文本,找到后自动滚动到该位置。
- Args:
- text: 要查找的文本内容
- Returns:
- ToolResult: 包含查找结果的工具返回对象
- Example:
- find_text("Privacy Policy")
- """
- try:
- browser, tools = await get_browser_session()
- result = await tools.find_text(
- text=text,
- browser_session=browser
- )
- return action_result_to_tool_result(result, f"查找文本: {text}")
- except Exception as e:
- return ToolResult(
- title="查找失败",
- output="",
- error=f"Failed to find text: {str(e)}",
- long_term_memory=f"查找文本 '{text}' 失败"
- )
- @tool()
- async def browser_get_visual_selector_map() -> ToolResult:
- """
- 获取当前页面的视觉快照和交互元素索引映射。
- Get visual snapshot and selector map of interactive elements.
- 该工具会同时执行两个操作:
- 1. 捕捉当前页面的截图,并用 browser-use 内置方法在截图上标注元素索引号。
- 2. 生成页面所有可交互元素的索引字典(含 href、type 等属性信息)。
- Returns:
- ToolResult: 包含高亮截图(在 images 中)和元素列表的工具返回对象。
- """
- try:
- browser, _ = await get_browser_session()
- # 1. 构造同时包含 DOM 和 截图 的请求
- from browser_use.browser.events import BrowserStateRequestEvent
- from browser_use.browser.python_highlights import create_highlighted_screenshot_async
- event = browser.event_bus.dispatch(
- BrowserStateRequestEvent(
- include_dom=True,
- include_screenshot=True,
- include_recent_events=False
- )
- )
- # 2. 等待浏览器返回完整状态
- browser_state = await event.event_result(raise_if_none=True, raise_if_any=True)
- # 3. 提取 Selector Map
- selector_map = browser_state.dom_state.selector_map if browser_state.dom_state else {}
- # 4. 提取截图并生成带索引标注的高亮截图(通过 CDP 获取精确 DPI 和滚动偏移)
- screenshot_b64 = browser_state.screenshot or ""
- highlighted_b64 = ""
- if screenshot_b64 and selector_map:
- try:
- cdp_session = await browser.get_or_create_cdp_session()
- highlighted_b64 = await create_highlighted_screenshot_async(
- screenshot_b64, selector_map,
- cdp_session=cdp_session,
- filter_highlight_ids=False
- )
- except Exception:
- highlighted_b64 = screenshot_b64 # fallback to raw screenshot
- else:
- highlighted_b64 = screenshot_b64
- # 5. 构建供 Agent 阅读的完整元素列表,包含丰富的属性信息
- elements_info = []
- for index, node in selector_map.items():
- tag = node.tag_name
- attrs = node.attributes or {}
- desc = attrs.get('aria-label') or attrs.get('placeholder') or attrs.get('title') or node.get_all_children_text(max_depth=1) or ""
- # 收集有用的属性片段
- extra_parts = []
- if attrs.get('href'):
- extra_parts.append(f"href={attrs['href'][:60]}")
- if attrs.get('type'):
- extra_parts.append(f"type={attrs['type']}")
- if attrs.get('role'):
- extra_parts.append(f"role={attrs['role']}")
- if attrs.get('name'):
- extra_parts.append(f"name={attrs['name']}")
- extra = f" ({', '.join(extra_parts)})" if extra_parts else ""
- elements_info.append(f"Index {index}: <{tag}> \"{desc[:50]}\"{extra}")
- output = f"页面截图已捕获(含元素索引标注)\n找到 {len(selector_map)} 个交互元素\n\n"
- output += "元素列表:\n" + "\n".join(elements_info)
- # 6. 将高亮截图存入 images 字段,metadata 保留结构化数据
- images = []
- if highlighted_b64:
- images.append({"type": "base64", "media_type": "image/png", "data": highlighted_b64})
- return ToolResult(
- title="视觉元素观察",
- output=output,
- long_term_memory=f"在页面观察到 {len(selector_map)} 个元素并保存了截图",
- images=images,
- metadata={
- "selector_map": {k: str(v) for k, v in list(selector_map.items())[:100]},
- "url": browser_state.url,
- "title": browser_state.title
- }
- )
- except Exception as e:
- return ToolResult(
- title="视觉观察失败",
- output="",
- error=f"Failed to get visual selector map: {str(e)}",
- long_term_memory="获取视觉元素映射失败"
- )
-
- @tool()
- async def browser_screenshot() -> ToolResult:
- """
- 请求在下次观察中包含页面截图
- Request a screenshot to be included in the next observation
- 用于视觉检查页面状态,帮助理解页面布局和内容。
- Returns:
- ToolResult: 包含截图请求结果的工具返回对象
- Example:
- screenshot()
- Note:
- 截图会在下次页面观察时自动包含在结果中。
- """
- try:
- browser, tools = await get_browser_session()
- result = await tools.screenshot(browser_session=browser)
- return action_result_to_tool_result(result, "截图请求")
- except Exception as e:
- return ToolResult(
- title="截图失败",
- output="",
- error=f"Failed to capture screenshot: {str(e)}",
- long_term_memory="截图失败"
- )
- # ============================================================
- # 标签页管理工具 (Tab Management Tools)
- # ============================================================
- @tool()
- async def browser_switch_tab(tab_id: str) -> ToolResult:
- """
- 切换到指定标签页
- Switch to a different browser tab
- Args:
- tab_id: 4字符标签ID(target_id 的最后4位)
- Returns:
- ToolResult: 切换结果
- Example:
- switch_tab(tab_id="a3f2")
- """
- try:
- browser, tools = await get_browser_session()
- normalized_tab_id = tab_id[-4:] if tab_id else tab_id
- result = await tools.switch(
- tab_id=normalized_tab_id,
- browser_session=browser
- )
- return action_result_to_tool_result(result, f"切换到标签页 {normalized_tab_id}")
- except Exception as e:
- return ToolResult(
- title="切换标签页失败",
- output="",
- error=f"Failed to switch tab: {str(e)}",
- long_term_memory=f"切换到标签页 {tab_id} 失败"
- )
- @tool()
- async def browser_close_tab(tab_id: str) -> ToolResult:
- """
- 关闭指定标签页
- Close a browser tab
- Args:
- tab_id: 4字符标签ID
- Returns:
- ToolResult: 关闭结果
- Example:
- close_tab(tab_id="a3f2")
- """
- try:
- browser, tools = await get_browser_session()
- normalized_tab_id = tab_id[-4:] if tab_id else tab_id
- result = await tools.close(
- tab_id=normalized_tab_id,
- browser_session=browser
- )
- return action_result_to_tool_result(result, f"关闭标签页 {normalized_tab_id}")
- except Exception as e:
- return ToolResult(
- title="关闭标签页失败",
- output="",
- error=f"Failed to close tab: {str(e)}",
- long_term_memory=f"关闭标签页 {tab_id} 失败"
- )
- # ============================================================
- # 下拉框工具 (Dropdown Tools)
- # ============================================================
- @tool()
- async def browser_get_dropdown_options(index: int) -> ToolResult:
- """
- 获取下拉框的所有选项
- Get options from a dropdown element
- Args:
- index: 下拉框的元素索引
- Returns:
- ToolResult: 包含所有选项的结果
- Example:
- get_dropdown_options(index=8)
- """
- try:
- browser, tools = await get_browser_session()
- result = await tools.dropdown_options(
- index=index,
- browser_session=browser
- )
- return action_result_to_tool_result(result, f"获取下拉框选项: {index}")
- except Exception as e:
- return ToolResult(
- title="获取下拉框选项失败",
- output="",
- error=f"Failed to get dropdown options: {str(e)}",
- long_term_memory=f"获取下拉框 {index} 选项失败"
- )
- @tool()
- async def browser_select_dropdown_option(index: int, text: str) -> ToolResult:
- """
- 选择下拉框选项
- Select an option from a dropdown
- Args:
- index: 下拉框的元素索引
- text: 要选择的选项文本(精确匹配)
- Returns:
- ToolResult: 选择结果
- Example:
- select_dropdown_option(index=8, text="Option 2")
- """
- try:
- browser, tools = await get_browser_session()
- result = await tools.select_dropdown(
- index=index,
- text=text,
- browser_session=browser
- )
- return action_result_to_tool_result(result, f"选择下拉框选项: {text}")
- except Exception as e:
- return ToolResult(
- title="选择下拉框选项失败",
- output="",
- error=f"Failed to select dropdown option: {str(e)}",
- long_term_memory=f"选择选项 '{text}' 失败"
- )
- # ============================================================
- # 内容提取工具 (Content Extraction Tools)
- # ============================================================
- def scrub_search_redirect_url(url: str) -> str:
- """
- 自动检测并解析 Bing/Google 等搜索引擎的重定向链接,提取真实目标 URL。
- """
- if not url or not isinstance(url, str):
- return url
-
- try:
- parsed = urlparse(url)
-
- # 1. 处理 Bing 重定向 (特征:u 参数带 Base64)
- # 示例:...&u=a1aHR0cHM6Ly96aHVhbmxhbi56aGlodS5jb20vcC8zODYxMjgwOQ&...
- if "bing.com" in parsed.netloc:
- u_param = parse_qs(parsed.query).get('u', [None])[0]
- if u_param:
- # 移除开头的 'a1', 'a0' 等标识符
- b64_str = u_param[2:]
- # 补齐 Base64 填充符
- padding = '=' * (4 - len(b64_str) % 4)
- decoded = base64.b64decode(b64_str + padding).decode('utf-8', errors='ignore')
- if decoded.startswith('http'):
- return decoded
- # 2. 处理 Google 重定向 (特征:url 参数)
- if "google.com" in parsed.netloc:
- url_param = parse_qs(parsed.query).get('url', [None])[0]
- if url_param:
- return unquote(url_param)
- # 3. 兜底:处理常见的跳转参数
- for param in ['target', 'dest', 'destination', 'link']:
- found = parse_qs(parsed.query).get(param, [None])[0]
- if found and found.startswith('http'):
- return unquote(found)
-
- except Exception:
- pass # 解析失败则返回原链接
-
- return url
- async def extraction_adapter(input_data):
- # 提取字符串
- if isinstance(input_data, list):
- prompt = input_data[-1].content if hasattr(input_data[-1], 'content') else str(input_data[-1])
- else:
- prompt = str(input_data)
-
- response = await openrouter_llm_call(
- messages=[{"role": "user", "content": prompt}]
- )
-
- content = response["content"]
-
- # --- 核心改进:URL 自动修复 ---
- # 使用正则表达式匹配内容中的所有 URL,并尝试进行洗涤
- urls = re.findall(r'https?://[^\s<>"\']+', content)
- for original_url in urls:
- clean_url = scrub_search_redirect_url(original_url)
- if clean_url != original_url:
- content = content.replace(original_url, clean_url)
-
- from argparse import Namespace
- return Namespace(completion=content)
- @tool()
- async def browser_extract_content(query: str, extract_links: bool = False,
- start_from_char: int = 0) -> ToolResult:
- """
- 使用 LLM 从页面提取结构化数据
- Extract content from the current page using LLM
- Args:
- query: 提取查询(告诉 LLM 要提取什么内容)
- extract_links: 是否提取链接(默认 False,节省 token)
- start_from_char: 从哪个字符开始提取(用于分页提取大内容)
- Returns:
- ToolResult: 提取的内容
- Example:
- extract_content(query="提取页面上所有产品的名称和价格", extract_links=True)
- Note:
- 需要配置 page_extraction_llm,否则会失败
- 支持分页提取,最大100k字符
- """
- try:
- browser, tools = await get_browser_session()
- # 注意:extract 需要 page_extraction_llm 参数
- # 这里我们假设用户会在初始化时配置 LLM
- # 如果没有配置,会抛出异常
- result = await tools.extract(
- query=query,
- extract_links=extract_links,
- start_from_char=start_from_char,
- browser_session=browser,
- page_extraction_llm=RunnableLambda(extraction_adapter), # 需要用户配置
- file_system=_file_system
- )
- return action_result_to_tool_result(result, f"提取内容: {query}")
- except Exception as e:
- return ToolResult(
- title="内容提取失败",
- output="",
- error=f"Failed to extract content: {str(e)}",
- long_term_memory=f"提取内容失败: {query}"
- )
- async def _detect_and_download_pdf_via_cdp(browser) -> Optional[str]:
- """
- 检测当前页面是否为 PDF,如果是则通过 CDP(浏览器内 fetch)下载到本地。
- 优势:自动携带浏览器的 cookies/session,可访问需要登录的 PDF。
- 返回本地文件路径,非 PDF 页面返回 None。
- """
- try:
- current_url = await browser.get_current_page_url()
- if not current_url:
- return None
- parsed = urlparse(current_url)
- is_pdf = parsed.path.lower().endswith('.pdf')
- # URL 不明显是 PDF 时,通过 CDP 检查 content-type
- if not is_pdf:
- try:
- cdp = await browser.get_or_create_cdp_session()
- ct_result = await cdp.cdp_client.send.Runtime.evaluate(
- params={'expression': 'document.contentType'},
- session_id=cdp.session_id
- )
- content_type = ct_result.get('result', {}).get('value', '')
- is_pdf = 'pdf' in content_type.lower()
- except Exception:
- pass
- if not is_pdf:
- return None
- # 通过浏览器内 fetch API 下载 PDF(自动携带 cookies)
- cdp = await browser.get_or_create_cdp_session()
- js_code = """
- (async () => {
- try {
- const resp = await fetch(window.location.href);
- if (!resp.ok) return JSON.stringify({error: 'HTTP ' + resp.status});
- const blob = await resp.blob();
- return new Promise((resolve, reject) => {
- const reader = new FileReader();
- reader.onloadend = () => resolve(JSON.stringify({data: reader.result}));
- reader.onerror = () => resolve(JSON.stringify({error: 'FileReader failed'}));
- reader.readAsDataURL(blob);
- });
- } catch(e) {
- return JSON.stringify({error: e.message});
- }
- })()
- """
- result = await cdp.cdp_client.send.Runtime.evaluate(
- params={
- 'expression': js_code,
- 'awaitPromise': True,
- 'returnByValue': True,
- 'timeout': 60000
- },
- session_id=cdp.session_id
- )
- value = result.get('result', {}).get('value', '')
- if not value:
- print("⚠️ CDP fetch PDF: 无返回值")
- return None
- data = json.loads(value)
- if 'error' in data:
- print(f"⚠️ CDP fetch PDF 失败: {data['error']}")
- return None
- # 从 data URL 中提取 base64 并解码
- data_url = data['data'] # data:application/pdf;base64,JVBERi0...
- base64_data = data_url.split(',', 1)[1]
- pdf_bytes = base64.b64decode(base64_data)
- # 保存到本地
- save_dir = Path.cwd() / ".cache/.browser_use_files"
- save_dir.mkdir(parents=True, exist_ok=True)
- filename = Path(parsed.path).name if parsed.path else ""
- if not filename or not filename.lower().endswith('.pdf'):
- import time
- filename = f"downloaded_{int(time.time())}.pdf"
- save_path = str(save_dir / filename)
- with open(save_path, 'wb') as f:
- f.write(pdf_bytes)
- print(f"📄 PDF 已通过 CDP 下载到: {save_path} ({len(pdf_bytes)} bytes)")
- return save_path
- except Exception as e:
- print(f"⚠️ PDF 检测/下载异常: {e}")
- return None
- @tool()
- async def browser_read_long_content(
- goal: Union[str, dict],
- source: str = "page",
- context: str = "",
- **kwargs
- ) -> ToolResult:
- """
- 智能读取长内容。支持自动检测并读取网页上的 PDF 文件。
- 当 source="page" 且当前页面是 PDF 时,会通过 CDP 下载 PDF 并用 pypdf 解析,
- 而非使用 DOM 提取(DOM 无法读取浏览器内置 PDF Viewer 的内容)。
- 通过 CDP 下载可自动携带浏览器的 cookies/session,支持需要登录的 PDF。
- """
- try:
- browser, tools = await get_browser_session()
- # 1. 提取目标文本 (针对 GoalTree 字典结构)
- final_goal_text = ""
- if isinstance(goal, dict):
- final_goal_text = goal.get("mission") or goal.get("goal") or str(goal)
- else:
- final_goal_text = str(goal)
- # 2. 清洗业务背景 (过滤框架注入的 dict 类型 context)
- business_context = context if isinstance(context, str) else ""
- # 3. PDF 自动检测:当 source="page" 时检查是否为 PDF 页面
- available_files = []
- if source.lower() == "page":
- pdf_path = await _detect_and_download_pdf_via_cdp(browser)
- if pdf_path:
- source = pdf_path
- available_files.append(pdf_path)
- # 4. 验证并实例化
- action_params = ReadContentAction(
- goal=final_goal_text,
- source=source,
- context=business_context
- )
- # 5. 解包参数调用底层方法
- result = await tools.read_long_content(
- **action_params.model_dump(),
- browser_session=browser,
- page_extraction_llm=RunnableLambda(extraction_adapter),
- available_file_paths=available_files
- )
- return action_result_to_tool_result(result, f"深度读取: {source}")
- except Exception as e:
- return ToolResult(
- title="深度读取失败",
- output="",
- error=f"Read long content failed: {str(e)}",
- long_term_memory="参数解析或校验失败,请检查输入"
- )
- @tool()
- async def browser_get_page_html() -> ToolResult:
- """
- 获取当前页面的完整 HTML
- Get the full HTML of the current page
- 返回当前页面的完整 HTML 源代码。
- Returns:
- ToolResult: 包含页面 HTML 的工具返回对象
- Example:
- get_page_html()
- Note:
- - 返回的是完整的 HTML 源代码
- - 输出会被限制在 10000 字符以内(完整内容保存在 metadata 中)
- """
- try:
- browser, tools = await get_browser_session()
- # 使用 CDP 获取页面 HTML
- cdp = await browser.get_or_create_cdp_session()
- # 获取页面内容
- result = await cdp.cdp_client.send.Runtime.evaluate(
- params={'expression': 'document.documentElement.outerHTML'},
- session_id=cdp.session_id
- )
- html = result.get('result', {}).get('value', '')
- # 获取 URL 和标题
- url = await browser.get_current_page_url()
- title_result = await cdp.cdp_client.send.Runtime.evaluate(
- params={'expression': 'document.title'},
- session_id=cdp.session_id
- )
- title = title_result.get('result', {}).get('value', '')
- # 限制输出大小
- output_html = html
- if len(html) > 10000:
- output_html = html[:10000] + "... (truncated)"
- return ToolResult(
- title=f"获取 HTML: {url}",
- output=f"页面: {title}\nURL: {url}\n\nHTML:\n{output_html}",
- long_term_memory=f"获取 HTML: {url}",
- metadata={"url": url, "title": title, "html": html}
- )
- except Exception as e:
- return ToolResult(
- title="获取 HTML 失败",
- output="",
- error=f"Failed to get page HTML: {str(e)}",
- long_term_memory="获取 HTML 失败"
- )
- @tool()
- async def browser_get_selector_map() -> ToolResult:
- """
- 获取当前页面的元素索引映射
- Get the selector map of interactive elements on the current page
- 返回页面所有可交互元素的索引字典,用于后续的元素操作。
- Returns:
- ToolResult: 包含元素映射的工具返回对象
- Example:
- get_selector_map()
- Note:
- 返回的索引可以用于 click_element, input_text 等操作
- """
- try:
- browser, tools = await get_browser_session()
- # 关键修复:先触发 BrowserStateRequestEvent 来更新 DOM 状态
- # 这会触发 DOM watchdog 重新构建 DOM 树并更新 selector_map
- from browser_use.browser.events import BrowserStateRequestEvent
- # 触发事件并等待结果
- event = browser.event_bus.dispatch(
- BrowserStateRequestEvent(
- include_dom=True,
- include_screenshot=False, # 不需要截图,节省时间
- include_recent_events=False
- )
- )
- # 等待 DOM 更新完成
- browser_state = await event.event_result(raise_if_none=True, raise_if_any=True)
- # 从更新后的状态中获取 selector_map
- selector_map = browser_state.dom_state.selector_map if browser_state.dom_state else {}
- # 构建输出信息
- elements_info = []
- for index, node in list(selector_map.items())[:20]: # 只显示前20个
- tag = node.tag_name
- attrs = node.attributes or {}
- text = attrs.get('aria-label') or attrs.get('placeholder') or attrs.get('value', '')
- elements_info.append(f"索引 {index}: <{tag}> {text[:50]}")
- output = f"找到 {len(selector_map)} 个交互元素\n\n"
- output += "\n".join(elements_info)
- if len(selector_map) > 20:
- output += f"\n... 还有 {len(selector_map) - 20} 个元素"
- return ToolResult(
- title="获取元素映射",
- output=output,
- long_term_memory=f"获取到 {len(selector_map)} 个交互元素",
- metadata={"selector_map": {k: str(v) for k, v in list(selector_map.items())[:100]}}
- )
- except Exception as e:
- return ToolResult(
- title="获取元素映射失败",
- output="",
- error=f"Failed to get selector map: {str(e)}",
- long_term_memory="获取元素映射失败"
- )
- # ============================================================
- # JavaScript 执行工具 (JavaScript Tools)
- # ============================================================
- @tool()
- async def browser_evaluate(code: str) -> ToolResult:
- """
- 在页面中执行 JavaScript 代码
- Execute JavaScript code in the page context
- 允许在当前页面中执行任意 JavaScript 代码,用于复杂的页面操作或数据提取。
- Args:
- code: 要执行的 JavaScript 代码字符串
- Returns:
- ToolResult: 包含执行结果的工具返回对象
- Example:
- evaluate("document.title")
- evaluate("document.querySelectorAll('a').length")
- Note:
- - 代码在页面上下文中执行,可以访问 DOM 和全局变量
- - 返回值会被自动序列化为字符串
- - 执行结果限制在 20k 字符以内
- """
- try:
- browser, tools = await get_browser_session()
- result = await tools.evaluate(
- code=code,
- browser_session=browser
- )
- return action_result_to_tool_result(result, "执行 JavaScript")
- except Exception as e:
- return ToolResult(
- title="JavaScript 执行失败",
- output="",
- error=f"Failed to execute JavaScript: {str(e)}",
- long_term_memory="JavaScript 执行失败"
- )
- @tool()
- async def browser_ensure_login_with_cookies(cookie_type: str, url: str = "https://www.xiaohongshu.com") -> ToolResult:
- """
- 检查登录状态并在需要时注入 cookies
- """
- try:
- browser, tools = await get_browser_session()
- if url:
- await tools.navigate(url=url, browser_session=browser)
- await tools.wait(seconds=2, browser_session=browser)
- check_login_js = """
- (function() {
- const loginBtn = document.querySelector('[class*="login"]') ||
- document.querySelector('[href*="login"]') ||
- Array.from(document.querySelectorAll('button, a')).find(el => (el.textContent || '').includes('登录'));
- const userInfo = document.querySelector('[class*="user"]') ||
- document.querySelector('[class*="avatar"]');
- return {
- needLogin: !!loginBtn && !userInfo,
- hasLoginBtn: !!loginBtn,
- hasUserInfo: !!userInfo
- };
- })()
- """
- result = await tools.evaluate(code=check_login_js, browser_session=browser)
- status_output = result.extracted_content
- if isinstance(status_output, str) and status_output.startswith("Result: "):
- status_output = status_output[8:]
- login_info: Dict[str, Any] = {}
- if isinstance(status_output, str):
- try:
- login_info = json.loads(status_output)
- except Exception:
- login_info = {}
- elif isinstance(status_output, dict):
- login_info = status_output
- if not login_info.get("needLogin"):
- output = json.dumps({"need_login": False}, ensure_ascii=False)
- return ToolResult(
- title="已登录",
- output=output,
- long_term_memory=output
- )
- row = _fetch_cookie_row(cookie_type)
- cookie_value = _extract_cookie_value(row)
- if not cookie_value:
- output = json.dumps({"need_login": True, "cookies_count": 0}, ensure_ascii=False)
- return ToolResult(
- title="未找到 cookies",
- output=output,
- error="未找到 cookies",
- long_term_memory=output
- )
- domain, base_url = _cookie_domain_for_type(cookie_type, url)
- cookies = _normalize_cookies(cookie_value, domain, base_url)
- if not cookies:
- output = json.dumps({"need_login": True, "cookies_count": 0}, ensure_ascii=False)
- return ToolResult(
- title="cookies 解析失败",
- output=output,
- error="cookies 解析失败",
- long_term_memory=output
- )
- await browser._cdp_set_cookies(cookies)
- if url:
- await tools.navigate(url=url, browser_session=browser)
- await tools.wait(seconds=2, browser_session=browser)
- output = json.dumps({"need_login": True, "cookies_count": len(cookies)}, ensure_ascii=False)
- return ToolResult(
- title="已注入 cookies",
- output=output,
- long_term_memory=output
- )
- except Exception as e:
- return ToolResult(
- title="登录检查失败",
- output="",
- error=str(e),
- long_term_memory="登录检查失败"
- )
- # ============================================================
- # 等待用户操作工具 (Wait for User Action)
- # ============================================================
- @tool()
- async def browser_wait_for_user_action(message: str = "Please complete the action in browser",
- timeout: int = 300) -> ToolResult:
- """
- 等待用户在浏览器中完成操作(如登录)
- Wait for user to complete an action in the browser (e.g., login)
- 暂停自动化流程,等待用户手动完成某些操作(如登录、验证码等)。
- Args:
- message: 提示用户需要完成的操作
- timeout: 最大等待时间(秒),默认 300 秒(5 分钟)
- Returns:
- ToolResult: 包含等待结果的工具返回对象
- Example:
- wait_for_user_action("Please login to Xiaohongshu", timeout=180)
- wait_for_user_action("Please complete the CAPTCHA", timeout=60)
- Note:
- - 用户需要在浏览器窗口中手动完成操作
- - 完成后按回车键继续
- - 超时后会自动继续执行
- """
- try:
- import asyncio
- print(f"\n{'='*60}")
- print(f"⏸️ WAITING FOR USER ACTION")
- print(f"{'='*60}")
- print(f"📝 {message}")
- print(f"⏱️ Timeout: {timeout} seconds")
- print(f"\n👉 Please complete the action in the browser window")
- print(f"👉 Press ENTER when done, or wait for timeout")
- print(f"{'='*60}\n")
- # Wait for user input or timeout
- try:
- loop = asyncio.get_event_loop()
- # Wait for either user input or timeout
- await asyncio.wait_for(
- loop.run_in_executor(None, input),
- timeout=timeout
- )
- return ToolResult(
- title="用户操作完成",
- output=f"User completed: {message}",
- long_term_memory=f"用户完成操作: {message}"
- )
- except asyncio.TimeoutError:
- return ToolResult(
- title="用户操作超时",
- output=f"Timeout waiting for: {message}",
- long_term_memory=f"等待用户操作超时: {message}"
- )
- except Exception as e:
- return ToolResult(
- title="等待用户操作失败",
- output="",
- error=f"Failed to wait for user action: {str(e)}",
- long_term_memory="等待用户操作失败"
- )
- # ============================================================
- # 任务完成工具 (Task Completion)
- # ============================================================
- @tool()
- async def browser_done(text: str, success: bool = True,
- files_to_display: Optional[List[str]] = None) -> ToolResult:
- """
- 标记任务完成并返回最终消息
- Mark the task as complete and return final message to user
- Args:
- text: 给用户的最终消息
- success: 任务是否成功完成
- files_to_display: 可选的要显示的文件路径列表
- Returns:
- ToolResult: 完成结果
- Example:
- done("任务已完成,提取了10个产品信息", success=True)
- """
- try:
- browser, tools = await get_browser_session()
- result = await tools.done(
- text=text,
- success=success,
- files_to_display=files_to_display,
- file_system=_file_system
- )
- return action_result_to_tool_result(result, "任务完成")
- except Exception as e:
- return ToolResult(
- title="标记任务完成失败",
- output="",
- error=f"Failed to complete task: {str(e)}",
- long_term_memory="标记任务完成失败"
- )
- # ============================================================
- # Cookie 持久化工具
- # ============================================================
- _COOKIES_DIR = Path(__file__).parent.parent.parent.parent.parent / ".cache/.cookies"
- @tool()
- async def browser_export_cookies(name: str = "", account: str = "") -> ToolResult:
- """
- 导出当前浏览器的所有 Cookie 到本地 .cookies/ 目录。
- 文件命名格式:{域名}_{账号名}.json,如 bilibili.com_zhangsan.json
- 登录成功后调用此工具,下次可通过 browser_load_cookies 恢复登录态。
- Args:
- name: 自定义文件名(可选,提供则忽略自动命名)
- account: 账号名称(可选,用于区分同一网站的不同账号)
- """
- try:
- browser, _ = await get_browser_session()
- # 获取所有 Cookie(CDP 格式)
- all_cookies = await browser._cdp_get_cookies()
- if not all_cookies:
- return ToolResult(title="Cookie 导出", output="当前浏览器没有 Cookie", long_term_memory="无 Cookie 可导出")
- # 获取当前域名,用于过滤和命名
- from urllib.parse import urlparse
- current_url = await browser.get_current_page_url() or ''
- domain = urlparse(current_url).netloc.replace("www.", "") or "default"
- if not name:
- name = f"{domain}_{account}" if account else domain
- # 只保留当前域名的 cookie(过滤第三方)
- cookies = [c for c in all_cookies if domain in c.get("domain", "").lstrip(".")]
- # 保存
- _COOKIES_DIR.mkdir(parents=True, exist_ok=True)
- cookie_file = _COOKIES_DIR / f"{name}.json"
- cookie_file.write_text(json.dumps(cookies, ensure_ascii=False, indent=2), encoding="utf-8")
- return ToolResult(
- title="Cookie 已导出",
- output=f"已保存 {len(cookies)} 条 Cookie 到 .cookies/{name}.json(从 {len(all_cookies)} 条中过滤当前域名)",
- long_term_memory=f"导出 {len(cookies)} 条 Cookie 到 .cookies/{name}.json"
- )
- except Exception as e:
- return ToolResult(title="Cookie 导出失败", output="", error=str(e), long_term_memory="导出 Cookie 失败")
- @tool()
- async def browser_load_cookies(url: str, name: str = "", auto_navigate: bool = True) -> ToolResult:
- """
- 根据目标 URL 自动查找本地 Cookie 文件,注入浏览器并导航到目标页面恢复登录态。
- 如果找不到 Cookie 文件,会根据 auto_navigate 参数决定是否直接导航到目标页面。
- 重要:此工具会自动完成导航,调用前不需要先调用 browser_navigate_to_url。
- Args:
- url: 目标 URL(必须提供,同时用于自动匹配 Cookie 文件)
- name: Cookie 文件名(可选,不传则根据 URL 域名自动查找)
- auto_navigate: 找不到 Cookie 时是否自动导航到目标页面(默认 True)
- """
- try:
- browser, tools = await get_browser_session()
- if not url.startswith("http"):
- url = f"https://{url}"
- # 根据域名自动查找 Cookie 文件
- if not name:
- from urllib.parse import urlparse
- domain = urlparse(url).netloc.replace("www.", "")
- if _COOKIES_DIR.exists():
- # 尝试多种匹配模式
- matches = []
- # 1. 精确匹配完整域名(如 xiaohongshu.com.json)
- exact_match = _COOKIES_DIR / f"{domain}.json"
- if exact_match.exists():
- matches.append(exact_match)
- logger.info(f"Cookie 精确匹配成功: {exact_match.name}")
- # 2. 匹配域名前缀(如 xiaohongshu.com*.json)
- if not matches:
- prefix_matches = list(_COOKIES_DIR.glob(f"{domain}*.json"))
- if prefix_matches:
- matches = prefix_matches
- logger.info(f"Cookie 前缀匹配成功: {[m.name for m in matches]}")
- # 3. 模糊匹配:提取主域名(如 xiaohongshu)
- if not matches:
- main_domain = domain.split('.')[0] # 提取第一部分
- fuzzy_matches = list(_COOKIES_DIR.glob(f"{main_domain}*.json"))
- if fuzzy_matches:
- matches = fuzzy_matches
- logger.info(f"Cookie 模糊匹配成功: {[m.name for m in matches]} (主域名: {main_domain})")
- if matches:
- cookie_file = matches[0] # 取第一个匹配的
- logger.info(f"使用 Cookie 文件: {cookie_file.name}")
- else:
- available = [f.stem for f in _COOKIES_DIR.glob("*.json")]
- logger.warning(f"未找到匹配的 Cookie 文件。域名: {domain}, 可用: {available}")
- hint = f"可用的 Cookie 文件: {available}" if available else "提示:首次使用需要先手动登录,然后使用 browser_export_cookies 保存 Cookie"
- # 如果启用自动导航,直接访问目标页面
- if auto_navigate:
- await tools.navigate(url=url, browser_session=browser)
- await tools.wait(seconds=2, browser_session=browser)
- return ToolResult(
- title="未找到 Cookie,已导航到目标页面",
- output=f"没有找到 {domain} 的 Cookie 文件,已自动导航到 {url}。\n\n{hint}\n\n建议:如需保持登录态,请手动登录后使用 browser_export_cookies 保存 Cookie。",
- error=None,
- long_term_memory=f"未找到 {domain} 的 Cookie,已导航到 {url}"
- )
- else:
- return ToolResult(
- title="未找到 Cookie",
- output=f"没有匹配 {domain} 的 Cookie 文件。{hint}\n\n建议:使用 browser_navigate_to_url 访问 {url} 并手动登录,或使用 browser_export_cookies 保存当前 Cookie。",
- error=None,
- long_term_memory=f"未找到 {domain} 的 Cookie 文件"
- )
- else:
- # Cookie 目录不存在
- if auto_navigate:
- await tools.navigate(url=url, browser_session=browser)
- await tools.wait(seconds=2, browser_session=browser)
- return ToolResult(
- title="首次使用 Cookie 功能,已导航到目标页面",
- output=f"这是首次使用 Cookie 功能,已自动导航到 {url}。\n\n建议:手动完成登录后,使用 browser_export_cookies 保存 Cookie 供下次使用。",
- error=None,
- long_term_memory="首次使用 Cookie 功能,已导航到目标页面"
- )
- else:
- return ToolResult(
- title="Cookie 目录不存在",
- output=f"这是首次使用 Cookie 功能。建议:\n1. 使用 browser_navigate_to_url 访问 {url}\n2. 手动完成登录\n3. 使用 browser_export_cookies 保存 Cookie 供下次使用",
- error=None,
- long_term_memory="Cookie 目录不存在,这是首次使用"
- )
- else:
- cookie_file = _COOKIES_DIR / f"{name}.json"
- if not cookie_file.exists():
- available = [f.stem for f in _COOKIES_DIR.glob("*.json")] if _COOKIES_DIR.exists() else []
- hint = f"可用的 Cookie 文件: {available}" if available else "提示:使用 browser_export_cookies 保存 Cookie"
- if auto_navigate:
- await tools.navigate(url=url, browser_session=browser)
- await tools.wait(seconds=2, browser_session=browser)
- return ToolResult(
- title="Cookie 文件不存在,已导航到目标页面",
- output=f"未找到 .cookies/{name}.json,已自动导航到 {url}。\n\n{hint}",
- error=None,
- long_term_memory=f"未找到 {name}.json,已导航到目标页面"
- )
- else:
- return ToolResult(
- title="Cookie 文件不存在",
- output=f"未找到 .cookies/{name}.json。{hint}",
- error=None,
- long_term_memory=f"未找到 {name}.json Cookie 文件"
- )
- cookies = json.loads(cookie_file.read_text(encoding="utf-8"))
- # 直接注入(export 和 load 使用相同的 CDP 格式,无需标准化)
- await browser._cdp_set_cookies(cookies)
- # 导航到目标页面(带上刚注入的 Cookie)
- if url:
- if not url.startswith("http"):
- url = f"https://{url}"
- await tools.navigate(url=url, browser_session=browser)
- await tools.wait(seconds=3, browser_session=browser)
- return ToolResult(
- title="Cookie 注入并导航完成",
- output=f"从 {cookie_file.name} 注入 {len(cookies)} 条 Cookie,已导航到 {url}",
- long_term_memory=f"已从 {cookie_file.name} 注入 Cookie 并导航到 {url},登录态已恢复"
- )
- except Exception as e:
- return ToolResult(title="Cookie 加载失败", output="", error=str(e), long_term_memory="加载 Cookie 失败")
- # ============================================================
- # 导出所有工具函数(供外部使用)
- # ============================================================
- __all__ = [
- # 会话管理
- 'init_browser_session',
- 'get_browser_session',
- 'cleanup_browser_session',
- 'kill_browser_session',
- # 导航类工具
- 'browser_navigate_to_url',
- 'browser_search_web',
- 'browser_go_back',
- 'browser_wait',
- # 元素交互工具
- 'browser_click_element',
- 'browser_input_text',
- 'browser_send_keys',
- 'browser_upload_file',
- # 滚动和视图工具
- 'browser_scroll_page',
- 'browser_find_text',
- 'browser_screenshot',
- # 标签页管理工具
- 'browser_switch_tab',
- 'browser_close_tab',
- # 下拉框工具
- 'browser_get_dropdown_options',
- 'browser_select_dropdown_option',
- # 内容提取工具
- 'browser_extract_content',
- 'browser_get_page_html',
- 'browser_read_long_content',
- 'browser_download_direct_url',
- 'browser_get_selector_map',
- 'browser_get_visual_selector_map',
- # JavaScript 执行工具
- 'browser_evaluate',
- 'browser_ensure_login_with_cookies',
- # 等待用户操作
- 'browser_wait_for_user_action',
- # 任务完成
- 'browser_done',
- # Cookie 持久化
- 'browser_export_cookies',
- 'browser_load_cookies',
- ]
|