| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112 |
- """
- Browser-Use 原生工具适配器
- Native Browser-Use Tools Adapter
- 直接使用 browser-use 的原生类(BrowserSession, Tools)实现所有浏览器操作工具。
- 不依赖 Playwright,完全基于 CDP 协议。
- 核心特性:
- 1. 浏览器会话持久化 - 只启动一次浏览器
- 2. 状态自动保持 - 登录状态、Cookie、LocalStorage 等
- 3. 完整的底层访问 - 可以直接使用 CDP 协议
- 4. 性能优异 - 避免频繁创建/销毁浏览器实例
- 5. 多种浏览器类型 - 支持 local、cloud、container 三种模式
- 支持的浏览器类型:
- 1. Local (本地浏览器):
- - 在本地运行 Chrome
- - 支持可视化调试
- - 速度最快
- - 示例: init_browser_session(browser_type="local")
- 2. Cloud (云浏览器):
- - 在云端运行
- - 不占用本地资源
- - 适合生产环境
- - 示例: init_browser_session(browser_type="cloud")
- 3. Container (容器浏览器):
- - 在独立容器中运行
- - 隔离性好
- - 支持预配置账户
- - 示例: init_browser_session(browser_type="container", container_url="https://example.com")
- 使用方法:
- 1. 在 Agent 初始化时调用 init_browser_session() 并指定 browser_type
- 2. 使用各个工具函数执行浏览器操作
- 3. 任务结束时调用 cleanup_browser_session()
- 文件操作说明:
- - 浏览器专用文件目录:.cache/.browser_use_files/ (在当前工作目录下)
- 用于存储浏览器会话产生的临时文件(下载、上传、截图等)
- - 一般文件操作:请使用 agent.tools.builtin 中的文件工具 (read_file, write_file, edit_file)
- 这些工具功能更完善,支持diff预览、智能匹配、分页读取等
- """
- import logging
- import sys
- import os
- import json
- import httpx
- import asyncio
- import aiohttp
- import re
- import base64
- from urllib.parse import urlparse, parse_qs, unquote
- from typing import Optional, List, Dict, Any, Tuple, Union
- from pathlib import Path
- from langchain_core.runnables import RunnableLambda
- from argparse import Namespace # 使用 Namespace 快速构造带属性的对象
- from langchain_core.messages import AIMessage
- from ....llm.openrouter import openrouter_llm_call
- # 将项目根目录添加到 Python 路径
- sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
- # 导入框架的工具装饰器和结果类
- from agent.tools import tool, ToolResult
- from agent.tools.builtin.browser.sync_mysql_help import mysql
- # 导入 browser-use 的核心类
- from browser_use import BrowserSession, BrowserProfile
- from browser_use.tools.service import Tools
- try:
- from browser_use.tools.views import ReadContentAction # type: ignore
- except Exception:
- from pydantic import BaseModel
- class ReadContentAction(BaseModel):
- goal: str
- source: str = "page"
- context: str = ""
- from browser_use.agent.views import ActionResult
- from browser_use.filesystem.file_system import FileSystem
- # ============================================================
- # 无需注册的内部辅助函数
- # ============================================================
- # ============================================================
- # 全局浏览器会话管理
- # ============================================================
- # 全局变量:浏览器会话和工具实例
- _browser_session: Optional[BrowserSession] = None
- _browser_tools: Optional[Tools] = None
- _file_system: Optional[FileSystem] = None
- async def create_container(url: str, account_name: str = "liuwenwu") -> Dict[str, Any]:
- """
- 创建浏览器容器并导航到指定URL
- 按照 test.md 的要求:
- 1.1 调用接口创建容器
- 1.2 调用接口创建窗口并导航到URL
- Args:
- url: 要导航的URL地址
- account_name: 账户名称
- Returns:
- 包含容器信息的字典:
- - success: 是否成功
- - container_id: 容器ID
- - vnc: VNC访问URL
- - cdp: CDP协议URL(用于浏览器连接)
- - connection_id: 窗口连接ID
- - error: 错误信息(如果失败)
- """
- result = {
- "success": False,
- "container_id": None,
- "vnc": None,
- "cdp": None,
- "connection_id": None,
- "error": None
- }
- try:
- async with aiohttp.ClientSession() as session:
- # 步骤1.1: 创建容器
- print("📦 步骤1.1: 创建容器...")
- create_url = "http://47.84.182.56:8200/api/v1/container/create"
- create_payload = {
- "auto_remove": True,
- "need_port_binding": True,
- "max_lifetime_seconds": 900
- }
- async with session.post(create_url, json=create_payload) as resp:
- if resp.status != 200:
- raise RuntimeError(f"创建容器失败: HTTP {resp.status}")
- create_result = await resp.json()
- if create_result.get("code") != 0:
- raise RuntimeError(f"创建容器失败: {create_result.get('msg')}")
- data = create_result.get("data", {})
- result["container_id"] = data.get("container_id")
- result["vnc"] = data.get("vnc")
- result["cdp"] = data.get("cdp")
- print(f"✅ 容器创建成功")
- print(f" Container ID: {result['container_id']}")
- print(f" VNC: {result['vnc']}")
- print(f" CDP: {result['cdp']}")
- # 等待容器内的浏览器启动
- print(f"\n⏳ 等待容器内浏览器启动...")
- await asyncio.sleep(5)
- # 步骤1.2: 创建页面并导航
- print(f"\n📱 步骤1.2: 创建页面并导航到 {url}...")
- page_create_url = "http://47.84.182.56:8200/api/v1/browser/page/create"
- page_payload = {
- "container_id": result["container_id"],
- "url": url,
- "account_name": account_name,
- "need_wait": True,
- "timeout": 30
- }
- # 重试机制:最多尝试3次
- max_retries = 3
- page_created = False
- last_error = None
- for attempt in range(max_retries):
- try:
- if attempt > 0:
- print(f" 重试 {attempt + 1}/{max_retries}...")
- await asyncio.sleep(3) # 重试前等待
- async with session.post(page_create_url, json=page_payload, timeout=aiohttp.ClientTimeout(total=60)) as resp:
- if resp.status != 200:
- response_text = await resp.text()
- last_error = f"HTTP {resp.status}: {response_text[:200]}"
- continue
- page_result = await resp.json()
- if page_result.get("code") != 0:
- last_error = f"{page_result.get('msg')}"
- continue
- page_data = page_result.get("data", {})
- result["connection_id"] = page_data.get("connection_id")
- result["success"] = True
- page_created = True
- print(f"✅ 页面创建成功")
- print(f" Connection ID: {result['connection_id']}")
- break
- except asyncio.TimeoutError:
- last_error = "请求超时"
- continue
- except aiohttp.ClientError as e:
- last_error = f"网络错误: {str(e)}"
- continue
- except Exception as e:
- last_error = f"未知错误: {str(e)}"
- continue
- if not page_created:
- raise RuntimeError(f"创建页面失败(尝试{max_retries}次后): {last_error}")
- except Exception as e:
- result["error"] = str(e)
- print(f"❌ 错误: {str(e)}")
- return result
- async def init_browser_session(
- browser_type: str = "local",
- headless: bool = False,
- url: Optional[str] = None,
- profile_name: str = "default",
- user_data_dir: Optional[str] = None,
- browser_profile: Optional[BrowserProfile] = None,
- **kwargs
- ) -> tuple[BrowserSession, Tools]:
- global _browser_session, _browser_tools, _file_system
- if _browser_session is not None:
- return _browser_session, _browser_tools
- valid_types = ["local", "cloud", "container"]
- if browser_type not in valid_types:
- raise ValueError(f"无效的 browser_type: {browser_type}")
- # --- 核心:定义本地统一存储路径 ---
- save_dir = Path.cwd() / ".cache/.browser_use_files"
- save_dir.mkdir(parents=True, exist_ok=True)
- # 基础参数配置
- session_params = {
- "headless": headless,
- # 告诉 Playwright 所有的下载临时流先存入此本地目录
- "downloads_path": str(save_dir),
- }
- if browser_type == "container":
- print("🐳 使用容器浏览器模式")
- if not url: url = "about:blank"
- container_info = await create_container(url=url, account_name=profile_name)
- if not container_info["success"]:
- raise RuntimeError(f"容器创建失败: {container_info['error']}")
- session_params["cdp_url"] = container_info["cdp"]
- await asyncio.sleep(3)
- elif browser_type == "cloud":
- print("🌐 使用云浏览器模式")
- session_params["use_cloud"] = True
- if profile_name and profile_name != "default":
- session_params["cloud_profile_id"] = profile_name
- else: # local
- print("💻 使用本地浏览器模式")
- session_params["is_local"] = True
- if user_data_dir is None and profile_name:
- user_data_dir = str(Path.home() / ".browser_use" / "profiles" / profile_name)
- Path(user_data_dir).mkdir(parents=True, exist_ok=True)
- session_params["user_data_dir"] = user_data_dir
-
- # macOS 路径兼容
- import platform
- if platform.system() == "Darwin":
- chrome_path = "/Applications/Google Chrome.app/Contents/MacOS/Google Chrome"
- if Path(chrome_path).exists():
- session_params["executable_path"] = chrome_path
- if browser_profile:
- session_params["browser_profile"] = browser_profile
- session_params.update(kwargs)
- # 创建会话
- _browser_session = BrowserSession(**session_params)
- await _browser_session.start()
- _browser_tools = Tools()
- _file_system = FileSystem(base_dir=str(save_dir))
- print(f"✅ 浏览器会话初始化成功 | 默认下载路径: {save_dir}")
- if browser_type in ["local", "cloud"] and url:
- await _browser_tools.navigate(url=url, browser_session=_browser_session)
- return _browser_session, _browser_tools
- async def get_browser_session() -> tuple[BrowserSession, Tools]:
- """
- 获取当前浏览器会话,如果不存在或连接已断开则自动重新创建
- Returns:
- (BrowserSession, Tools) 元组
- """
- global _browser_session, _browser_tools, _file_system
- if _browser_session is not None:
- # 检查底层 CDP 连接是否仍然存活
- # 当 runner.stop() 暂停后用户在菜单停留较久,WebSocket 可能超时断开,
- # 但 _browser_session 对象仍然存在,导致后续操作抛出 ConnectionClosedError
- alive = False
- try:
- cdp_root = getattr(_browser_session, '_cdp_client_root', None)
- sess_mgr = getattr(_browser_session, 'session_manager', None)
- if cdp_root is not None and sess_mgr is not None:
- cdp_session = await _browser_session.get_or_create_cdp_session()
- await asyncio.wait_for(
- cdp_session.cdp_client.send.Runtime.evaluate(
- params={'expression': '1+1'},
- session_id=cdp_session.session_id
- ),
- timeout=3.0,
- )
- alive = True
- except Exception:
- pass
- if not alive:
- print("⚠️ 浏览器会话连接已断开,正在重新初始化...")
- try:
- await cleanup_browser_session()
- except Exception:
- _browser_session = None
- _browser_tools = None
- _file_system = None
- if _browser_session is None:
- await init_browser_session()
- return _browser_session, _browser_tools
- async def cleanup_browser_session():
- """
- 清理浏览器会话
- 优雅地停止浏览器但保留会话状态
- """
- global _browser_session, _browser_tools, _file_system
- if _browser_session is not None:
- await _browser_session.stop()
- _browser_session = None
- _browser_tools = None
- _file_system = None
- async def kill_browser_session():
- """
- 强制终止浏览器会话
- 完全关闭浏览器进程
- """
- global _browser_session, _browser_tools, _file_system
- if _browser_session is not None:
- await _browser_session.kill()
- _browser_session = None
- _browser_tools = None
- _file_system = None
- # ============================================================
- # 辅助函数:ActionResult 转 ToolResult
- # ============================================================
- def action_result_to_tool_result(result: ActionResult, title: str = None) -> ToolResult:
- """
- 将 browser-use 的 ActionResult 转换为框架的 ToolResult
- Args:
- result: browser-use 的 ActionResult
- title: 可选的标题(如果不提供则从 result 推断)
- Returns:
- ToolResult
- """
- if result.error:
- return ToolResult(
- title=title or "操作失败",
- output="",
- error=result.error,
- long_term_memory=result.long_term_memory or result.error
- )
- return ToolResult(
- title=title or "操作成功",
- output=result.extracted_content or "",
- long_term_memory=result.long_term_memory or result.extracted_content or "",
- metadata=result.metadata or {}
- )
- def _cookie_domain_for_type(cookie_type: str, url: str) -> Tuple[str, str]:
- if cookie_type:
- key = cookie_type.lower()
- if key in {"xiaohongshu", "xhs"}:
- return ".xiaohongshu.com", "https://www.xiaohongshu.com"
- parsed = urlparse(url or "")
- domain = parsed.netloc or ""
- domain = domain.replace("www.", "")
- if domain:
- domain = f".{domain}"
- base_url = f"{parsed.scheme}://{parsed.netloc}" if parsed.scheme and parsed.netloc else url
- return domain, base_url
- def _parse_cookie_string(cookie_str: str, domain: str, url: str) -> List[Dict[str, Any]]:
- cookies: List[Dict[str, Any]] = []
- if not cookie_str:
- return cookies
- parts = cookie_str.split(";")
- for part in parts:
- if not part:
- continue
- if "=" not in part:
- continue
- name, value = part.split("=", 1)
- cookie = {
- "name": str(name).strip(),
- "value": str(value).strip(),
- "domain": domain,
- "path": "/",
- "expires": -1,
- "httpOnly": False,
- "secure": True,
- "sameSite": "None"
- }
- if url:
- cookie["url"] = url
- cookies.append(cookie)
- return cookies
- def _normalize_cookies(cookie_value: Any, domain: str, url: str) -> List[Dict[str, Any]]:
- if cookie_value is None:
- return []
- if isinstance(cookie_value, list):
- return cookie_value
- if isinstance(cookie_value, dict):
- if "cookies" in cookie_value:
- return _normalize_cookies(cookie_value.get("cookies"), domain, url)
- if "name" in cookie_value and "value" in cookie_value:
- return [cookie_value]
- return []
- if isinstance(cookie_value, (bytes, bytearray)):
- cookie_value = cookie_value.decode("utf-8", errors="ignore")
- if isinstance(cookie_value, str):
- text = cookie_value.strip()
- if not text:
- return []
- try:
- parsed = json.loads(text)
- except Exception:
- parsed = None
- if parsed is not None:
- return _normalize_cookies(parsed, domain, url)
- return _parse_cookie_string(text, domain, url)
- return []
- def _extract_cookie_value(row: Optional[Dict[str, Any]]) -> Any:
- if not row:
- return None
- # 优先使用 cookies 字段
- if "cookies" in row:
- return row["cookies"]
- # 兼容其他可能的字段名
- for key, value in row.items():
- if "cookie" in key.lower():
- return value
- return None
- def _fetch_cookie_row(cookie_type: str) -> Optional[Dict[str, Any]]:
- if not cookie_type:
- return None
- try:
- return mysql.fetchone(
- "select * from agent_channel_cookies where type=%s limit 1",
- (cookie_type,)
- )
- except Exception:
- return None
- def _fetch_profile_id(cookie_type: str) -> Optional[str]:
- """从数据库获取 cloud_profile_id"""
- if not cookie_type:
- return None
- try:
- row = mysql.fetchone(
- "select profileId from agent_channel_cookies where type=%s limit 1",
- (cookie_type,)
- )
- if row and "profileId" in row:
- return row["profileId"]
- return None
- except Exception:
- return None
- # ============================================================
- # 需要注册的工具
- # ============================================================
- # ============================================================
- # 导航类工具 (Navigation Tools)
- # ============================================================
- @tool()
- async def browser_navigate_to_url(url: str, new_tab: bool = False) -> ToolResult:
- """
- 导航到指定的 URL
- Navigate to a specific URL
- 使用 browser-use 的原生导航功能,支持在新标签页打开。
- Args:
- url: 要访问的 URL 地址
- new_tab: 是否在新标签页中打开(默认 False)
- Returns:
- ToolResult: 包含导航结果的工具返回对象
- Example:
- navigate_to_url("https://www.baidu.com")
- navigate_to_url("https://www.google.com", new_tab=True)
- """
- try:
- browser, tools = await get_browser_session()
- # 使用 browser-use 的 navigate 工具
- result = await tools.navigate(
- url=url,
- new_tab=new_tab,
- browser_session=browser
- )
- return action_result_to_tool_result(result, f"导航到 {url}")
- except Exception as e:
- return ToolResult(
- title="导航失败",
- output="",
- error=f"Failed to navigate to {url}: {str(e)}",
- long_term_memory=f"导航到 {url} 失败"
- )
- @tool()
- async def browser_search_web(query: str, engine: str = "bing") -> ToolResult:
- """
- 使用搜索引擎搜索
- Search the web using a search engine
- Args:
- query: 搜索关键词
- engine: 搜索引擎 (google, duckduckgo, bing) - 默认: google
- Returns:
- ToolResult: 搜索结果
- Example:
- search_web("Python async programming", engine="google")
- """
- try:
- browser, tools = await get_browser_session()
- # 使用 browser-use 的 search 工具
- result = await tools.search(
- query=query,
- engine=engine,
- browser_session=browser
- )
- return action_result_to_tool_result(result, f"搜索: {query}")
- except Exception as e:
- return ToolResult(
- title="搜索失败",
- output="",
- error=f"Search failed: {str(e)}",
- long_term_memory=f"搜索 '{query}' 失败"
- )
- @tool()
- async def browser_go_back() -> ToolResult:
- """
- 返回到上一个页面
- Go back to the previous page
- 模拟浏览器的"后退"按钮功能。
- Returns:
- ToolResult: 包含返回操作结果的工具返回对象
- """
- try:
- browser, tools = await get_browser_session()
- result = await tools.go_back(browser_session=browser)
- return action_result_to_tool_result(result, "返回上一页")
- except Exception as e:
- return ToolResult(
- title="返回失败",
- output="",
- error=f"Failed to go back: {str(e)}",
- long_term_memory="返回上一页失败"
- )
- @tool()
- async def browser_wait(seconds: int = 3) -> ToolResult:
- """
- 等待指定的秒数
- Wait for a specified number of seconds
- 用于等待页面加载、动画完成或其他异步操作。
- Args:
- seconds: 等待时间(秒),最大30秒
- Returns:
- ToolResult: 包含等待操作结果的工具返回对象
- Example:
- wait(5) # 等待5秒
- """
- try:
- browser, tools = await get_browser_session()
- result = await tools.wait(seconds=seconds, browser_session=browser)
- return action_result_to_tool_result(result, f"等待 {seconds} 秒")
- except Exception as e:
- return ToolResult(
- title="等待失败",
- output="",
- error=f"Failed to wait: {str(e)}",
- long_term_memory="等待失败"
- )
- # ============================================================
- # 元素交互工具 (Element Interaction Tools)
- # ============================================================
- # 定义一个专门捕获下载链接的 Handler
- class DownloadLinkCaptureHandler(logging.Handler):
- def __init__(self):
- super().__init__()
- self.captured_url = None
- def emit(self, record):
- # 如果已经捕获到了(通常第一条是最完整的),就不再处理后续日志
- if self.captured_url:
- return
- message = record.getMessage()
- # 寻找包含下载信息的日志
- if "redirection?filename=" in message or "Failed to download" in message:
- # 使用更严格的正则,确保不抓取带省略号(...)的截断链接
- # 排除掉末尾带有三个点的干扰
- match = re.search(r"https?://[^\s]+(?!\.\.\.)", message)
- if match:
- url = match.group(0)
- # 再次过滤:如果发现提取出的 URL 确实包含三个点,说明依然抓到了截断版,跳过
- if "..." not in url:
- self.captured_url = url
- # print(f"🎯 成功锁定完整直链: {url[:50]}...") # 调试用
- @tool()
- async def browser_download_direct_url(url: str, save_name: str = "book.epub") -> ToolResult:
- save_dir = Path.cwd() / ".cache/.browser_use_files"
- save_dir.mkdir(parents=True, exist_ok=True)
-
- # 提取域名作为 Referer,这能骗过 90% 的防盗链校验
- from urllib.parse import urlparse
- parsed_url = urlparse(url)
- base_url = f"{parsed_url.scheme}://{parsed_url.netloc}/"
-
- # 如果没传 save_name,自动从 URL 获取
- if not save_name:
- import unquote
- # 尝试从 URL 路径获取文件名并解码(处理中文)
- save_name = Path(urlparse(url).path).name or f"download_{int(time.time())}"
- save_name = unquote(save_name)
- target_path = save_dir / save_name
- headers = {
- "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
- "Accept": "*/*",
- "Referer": base_url, # 动态设置 Referer
- "Range": "bytes=0-", # 有时对大文件下载有奇效
- }
- try:
- print(f"🚀 开始下载: {url[:60]}...")
-
- # 使用 follow_redirects=True 处理链接中的 redirection
- async with httpx.AsyncClient(headers=headers, follow_redirects=True, timeout=60.0) as client:
- async with client.stream("GET", url) as response:
- if response.status_code != 200:
- print(f"❌ 下载失败,HTTP 状态码: {response.status_code}")
- return
-
- # 获取实际文件名(如果服务器提供了)
- # 这里会优先使用你指定的 save_name
-
- with open(target_path, "wb") as f:
- downloaded_bytes = 0
- async for chunk in response.aiter_bytes():
- f.write(chunk)
- downloaded_bytes += len(chunk)
- if downloaded_bytes % (1024 * 1024) == 0: # 每下载 1MB 打印一次
- print(f"📥 已下载: {downloaded_bytes // (1024 * 1024)} MB")
- print(f"✅ 下载完成!文件已存至: {target_path}")
- success_msg = f"✅ 下载完成!文件已存至: {target_path}"
- return ToolResult(
- title="直链下载成功",
- output=success_msg,
- long_term_memory=success_msg,
- metadata={"path": str(target_path)}
- )
- except Exception as e:
- # 异常捕获返回
- return ToolResult(
- title="下载异常",
- output="",
- error=f"💥 发生错误: {str(e)}",
- long_term_memory=f"下载任务由于异常中断: {str(e)}"
- )
-
- @tool()
- async def browser_click_element(index: int) -> ToolResult:
- """
- 点击页面元素,并自动通过拦截内部日志获取下载直链。
- """
- # 1. 挂载日志窃听器
- capture_handler = DownloadLinkCaptureHandler()
- logger = logging.getLogger("browser_use") # 拦截整个 browser_use 命名空间
- logger.addHandler(capture_handler)
-
- try:
- browser, tools = await get_browser_session()
- # 2. 执行原生的点击动作
- result = await tools.click(
- index=index,
- browser_session=browser
- )
- # 3. 检查是否有“意外收获”
- download_msg = ""
- if capture_handler.captured_url:
- captured_url = capture_handler.captured_url
- download_msg = f"\n\n⚠️ 系统检测到浏览器下载被拦截,已自动捕获准确直链:\n{captured_url}\n\n建议:你可以直接使用 browser_download_direct_url 工具下载此链接。"
-
- # 如果你想更激进一点,甚至可以在这里直接自动触发本地下载逻辑
- # await auto_download_file(captured_url)
- # 4. 转换结果并附加捕获的信息
- tool_result = action_result_to_tool_result(result, f"点击元素 {index}")
-
- if download_msg:
- # 关键:把日志里的信息塞进 output,这样 LLM 就能看到了!
- tool_result.output = (tool_result.output or "") + download_msg
- tool_result.long_term_memory = (tool_result.long_term_memory or "") + f" 捕获下载链接: {captured_url}"
- return tool_result
- except Exception as e:
- return ToolResult(
- title="点击失败",
- output="",
- error=f"Failed to click element {index}: {str(e)}",
- long_term_memory=f"点击元素 {index} 失败"
- )
- finally:
- # 5. 务必移除监听器,防止内存泄漏和日志污染
- logger.removeHandler(capture_handler)
- @tool()
- async def browser_input_text(index: int, text: str, clear: bool = True) -> ToolResult:
- """
- 在指定元素中输入文本
- Input text into an element
- Args:
- index: 元素索引(从浏览器状态中获取)
- text: 要输入的文本内容
- clear: 是否先清除现有文本(默认 True)
- Returns:
- ToolResult: 包含输入操作结果的工具返回对象
- Example:
- input_text(index=0, text="Hello World", clear=True)
- """
- try:
- browser, tools = await get_browser_session()
- result = await tools.input(
- index=index,
- text=text,
- clear=clear,
- browser_session=browser
- )
- return action_result_to_tool_result(result, f"输入文本到元素 {index}")
- except Exception as e:
- return ToolResult(
- title="输入失败",
- output="",
- error=f"Failed to input text into element {index}: {str(e)}",
- long_term_memory=f"输入文本失败"
- )
- @tool()
- async def browser_send_keys(keys: str) -> ToolResult:
- """
- 发送键盘按键或快捷键
- Send keyboard keys or shortcuts
- 支持发送单个按键、组合键和快捷键。
- Args:
- keys: 要发送的按键字符串
- - 单个按键: "Enter", "Escape", "PageDown", "Tab"
- - 组合键: "Control+o", "Shift+Tab", "Alt+F4"
- - 功能键: "F1", "F2", ..., "F12"
- Returns:
- ToolResult: 包含按键操作结果的工具返回对象
- Example:
- send_keys("Enter")
- send_keys("Control+A")
- """
- try:
- browser, tools = await get_browser_session()
- result = await tools.send_keys(
- keys=keys,
- browser_session=browser
- )
- return action_result_to_tool_result(result, f"发送按键: {keys}")
- except Exception as e:
- return ToolResult(
- title="发送按键失败",
- output="",
- error=f"Failed to send keys: {str(e)}",
- long_term_memory="发送按键失败"
- )
- @tool()
- async def browser_upload_file(index: int, path: str) -> ToolResult:
- """
- 上传文件到文件输入元素
- Upload a file to a file input element
- Args:
- index: 文件输入框的元素索引
- path: 要上传的文件路径(绝对路径)
- Returns:
- ToolResult: 包含上传操作结果的工具返回对象
- Example:
- upload_file(index=7, path="/path/to/file.pdf")
- Note:
- 文件必须存在且路径必须是绝对路径
- """
- try:
- browser, tools = await get_browser_session()
- result = await tools.upload_file(
- index=index,
- path=path,
- browser_session=browser,
- available_file_paths=[path],
- file_system=_file_system
- )
- return action_result_to_tool_result(result, f"上传文件: {path}")
- except Exception as e:
- return ToolResult(
- title="上传失败",
- output="",
- error=f"Failed to upload file: {str(e)}",
- long_term_memory=f"上传文件 {path} 失败"
- )
- # ============================================================
- # 滚动和视图工具 (Scroll & View Tools)
- # ============================================================
- @tool()
- async def browser_scroll_page(down: bool = True, pages: float = 1.0, index: Optional[int] = None) -> ToolResult:
- try:
- # 限制单次滚动幅度,避免 agent 一次滚 100 页
- MAX_PAGES = 10
- if pages > MAX_PAGES:
- pages = MAX_PAGES
- browser, tools = await get_browser_session()
- cdp_session = await browser.get_or_create_cdp_session()
- before_y_result = await cdp_session.cdp_client.send.Runtime.evaluate(
- params={'expression': 'window.scrollY'},
- session_id=cdp_session.session_id
- )
- before_y = before_y_result.get('result', {}).get('value', 0)
- # 执行滚动
- result = await tools.scroll(down=down, pages=pages, index=index, browser_session=browser)
- # 等待渲染(懒加载页面需要更长时间)
- await asyncio.sleep(2)
- after_y_result = await cdp_session.cdp_client.send.Runtime.evaluate(
- params={'expression': 'window.scrollY'},
- session_id=cdp_session.session_id
- )
- after_y = after_y_result.get('result', {}).get('value', 0)
- # 如果第一次检测没动,再等一轮(应对懒加载触发后的延迟滚动)
- if before_y == after_y and index is None:
- await asyncio.sleep(2)
- retry_result = await cdp_session.cdp_client.send.Runtime.evaluate(
- params={'expression': 'window.scrollY'},
- session_id=cdp_session.session_id
- )
- after_y = retry_result.get('result', {}).get('value', 0)
- if before_y == after_y and index is None:
- direction = "下" if down else "上"
- return ToolResult(
- title="滚动无效",
- output=f"页面已到达{direction}边界,无法继续滚动",
- error="No movement detected"
- )
- delta = abs(after_y - before_y)
- direction = "下" if down else "上"
- return action_result_to_tool_result(result, f"已向{direction}滚动 {delta}px")
- except Exception as e:
- # --- 核心修复 2: 必须补全 output 参数,否则框架会报错 ---
- return ToolResult(
- title="滚动失败",
- output="", # 补全这个缺失的必填参数
- error=str(e)
- )
- @tool()
- async def browser_find_text(text: str) -> ToolResult:
- """
- 查找页面中的文本并滚动到该位置
- Find text on the page and scroll to it
- 在页面中搜索指定的文本,找到后自动滚动到该位置。
- Args:
- text: 要查找的文本内容
- Returns:
- ToolResult: 包含查找结果的工具返回对象
- Example:
- find_text("Privacy Policy")
- """
- try:
- browser, tools = await get_browser_session()
- result = await tools.find_text(
- text=text,
- browser_session=browser
- )
- return action_result_to_tool_result(result, f"查找文本: {text}")
- except Exception as e:
- return ToolResult(
- title="查找失败",
- output="",
- error=f"Failed to find text: {str(e)}",
- long_term_memory=f"查找文本 '{text}' 失败"
- )
- @tool()
- async def browser_get_visual_selector_map() -> ToolResult:
- """
- 获取当前页面的视觉快照和交互元素索引映射。
- Get visual snapshot and selector map of interactive elements.
- 该工具会同时执行两个操作:
- 1. 捕捉当前页面的截图,并用 browser-use 内置方法在截图上标注元素索引号。
- 2. 生成页面所有可交互元素的索引字典(含 href、type 等属性信息)。
- Returns:
- ToolResult: 包含高亮截图(在 images 中)和元素列表的工具返回对象。
- """
- try:
- browser, _ = await get_browser_session()
- # 1. 构造同时包含 DOM 和 截图 的请求
- from browser_use.browser.events import BrowserStateRequestEvent
- from browser_use.browser.python_highlights import create_highlighted_screenshot_async
- event = browser.event_bus.dispatch(
- BrowserStateRequestEvent(
- include_dom=True,
- include_screenshot=True,
- include_recent_events=False
- )
- )
- # 2. 等待浏览器返回完整状态
- browser_state = await event.event_result(raise_if_none=True, raise_if_any=True)
- # 3. 提取 Selector Map
- selector_map = browser_state.dom_state.selector_map if browser_state.dom_state else {}
- # 4. 提取截图并生成带索引标注的高亮截图(通过 CDP 获取精确 DPI 和滚动偏移)
- screenshot_b64 = browser_state.screenshot or ""
- highlighted_b64 = ""
- if screenshot_b64 and selector_map:
- try:
- cdp_session = await browser.get_or_create_cdp_session()
- highlighted_b64 = await create_highlighted_screenshot_async(
- screenshot_b64, selector_map,
- cdp_session=cdp_session,
- filter_highlight_ids=False
- )
- except Exception:
- highlighted_b64 = screenshot_b64 # fallback to raw screenshot
- else:
- highlighted_b64 = screenshot_b64
- # 5. 构建供 Agent 阅读的完整元素列表,包含丰富的属性信息
- elements_info = []
- for index, node in selector_map.items():
- tag = node.tag_name
- attrs = node.attributes or {}
- desc = attrs.get('aria-label') or attrs.get('placeholder') or attrs.get('title') or node.get_all_children_text(max_depth=1) or ""
- # 收集有用的属性片段
- extra_parts = []
- if attrs.get('href'):
- extra_parts.append(f"href={attrs['href'][:60]}")
- if attrs.get('type'):
- extra_parts.append(f"type={attrs['type']}")
- if attrs.get('role'):
- extra_parts.append(f"role={attrs['role']}")
- if attrs.get('name'):
- extra_parts.append(f"name={attrs['name']}")
- extra = f" ({', '.join(extra_parts)})" if extra_parts else ""
- elements_info.append(f"Index {index}: <{tag}> \"{desc[:50]}\"{extra}")
- output = f"页面截图已捕获(含元素索引标注)\n找到 {len(selector_map)} 个交互元素\n\n"
- output += "元素列表:\n" + "\n".join(elements_info)
- # 6. 将高亮截图存入 images 字段,metadata 保留结构化数据
- images = []
- if highlighted_b64:
- images.append({"type": "base64", "media_type": "image/png", "data": highlighted_b64})
- return ToolResult(
- title="视觉元素观察",
- output=output,
- long_term_memory=f"在页面观察到 {len(selector_map)} 个元素并保存了截图",
- images=images,
- metadata={
- "selector_map": {k: str(v) for k, v in list(selector_map.items())[:100]},
- "url": browser_state.url,
- "title": browser_state.title
- }
- )
- except Exception as e:
- return ToolResult(
- title="视觉观察失败",
- output="",
- error=f"Failed to get visual selector map: {str(e)}",
- long_term_memory="获取视觉元素映射失败"
- )
-
- @tool()
- async def browser_screenshot() -> ToolResult:
- """
- 请求在下次观察中包含页面截图
- Request a screenshot to be included in the next observation
- 用于视觉检查页面状态,帮助理解页面布局和内容。
- Returns:
- ToolResult: 包含截图请求结果的工具返回对象
- Example:
- screenshot()
- Note:
- 截图会在下次页面观察时自动包含在结果中。
- """
- try:
- browser, tools = await get_browser_session()
- result = await tools.screenshot(browser_session=browser)
- return action_result_to_tool_result(result, "截图请求")
- except Exception as e:
- return ToolResult(
- title="截图失败",
- output="",
- error=f"Failed to capture screenshot: {str(e)}",
- long_term_memory="截图失败"
- )
- # ============================================================
- # 标签页管理工具 (Tab Management Tools)
- # ============================================================
- @tool()
- async def browser_switch_tab(tab_id: str) -> ToolResult:
- """
- 切换到指定标签页
- Switch to a different browser tab
- Args:
- tab_id: 4字符标签ID(target_id 的最后4位)
- Returns:
- ToolResult: 切换结果
- Example:
- switch_tab(tab_id="a3f2")
- """
- try:
- browser, tools = await get_browser_session()
- normalized_tab_id = tab_id[-4:] if tab_id else tab_id
- result = await tools.switch(
- tab_id=normalized_tab_id,
- browser_session=browser
- )
- return action_result_to_tool_result(result, f"切换到标签页 {normalized_tab_id}")
- except Exception as e:
- return ToolResult(
- title="切换标签页失败",
- output="",
- error=f"Failed to switch tab: {str(e)}",
- long_term_memory=f"切换到标签页 {tab_id} 失败"
- )
- @tool()
- async def browser_close_tab(tab_id: str) -> ToolResult:
- """
- 关闭指定标签页
- Close a browser tab
- Args:
- tab_id: 4字符标签ID
- Returns:
- ToolResult: 关闭结果
- Example:
- close_tab(tab_id="a3f2")
- """
- try:
- browser, tools = await get_browser_session()
- normalized_tab_id = tab_id[-4:] if tab_id else tab_id
- result = await tools.close(
- tab_id=normalized_tab_id,
- browser_session=browser
- )
- return action_result_to_tool_result(result, f"关闭标签页 {normalized_tab_id}")
- except Exception as e:
- return ToolResult(
- title="关闭标签页失败",
- output="",
- error=f"Failed to close tab: {str(e)}",
- long_term_memory=f"关闭标签页 {tab_id} 失败"
- )
- # ============================================================
- # 下拉框工具 (Dropdown Tools)
- # ============================================================
- @tool()
- async def browser_get_dropdown_options(index: int) -> ToolResult:
- """
- 获取下拉框的所有选项
- Get options from a dropdown element
- Args:
- index: 下拉框的元素索引
- Returns:
- ToolResult: 包含所有选项的结果
- Example:
- get_dropdown_options(index=8)
- """
- try:
- browser, tools = await get_browser_session()
- result = await tools.dropdown_options(
- index=index,
- browser_session=browser
- )
- return action_result_to_tool_result(result, f"获取下拉框选项: {index}")
- except Exception as e:
- return ToolResult(
- title="获取下拉框选项失败",
- output="",
- error=f"Failed to get dropdown options: {str(e)}",
- long_term_memory=f"获取下拉框 {index} 选项失败"
- )
- @tool()
- async def browser_select_dropdown_option(index: int, text: str) -> ToolResult:
- """
- 选择下拉框选项
- Select an option from a dropdown
- Args:
- index: 下拉框的元素索引
- text: 要选择的选项文本(精确匹配)
- Returns:
- ToolResult: 选择结果
- Example:
- select_dropdown_option(index=8, text="Option 2")
- """
- try:
- browser, tools = await get_browser_session()
- result = await tools.select_dropdown(
- index=index,
- text=text,
- browser_session=browser
- )
- return action_result_to_tool_result(result, f"选择下拉框选项: {text}")
- except Exception as e:
- return ToolResult(
- title="选择下拉框选项失败",
- output="",
- error=f"Failed to select dropdown option: {str(e)}",
- long_term_memory=f"选择选项 '{text}' 失败"
- )
- # ============================================================
- # 内容提取工具 (Content Extraction Tools)
- # ============================================================
- def scrub_search_redirect_url(url: str) -> str:
- """
- 自动检测并解析 Bing/Google 等搜索引擎的重定向链接,提取真实目标 URL。
- """
- if not url or not isinstance(url, str):
- return url
-
- try:
- parsed = urlparse(url)
-
- # 1. 处理 Bing 重定向 (特征:u 参数带 Base64)
- # 示例:...&u=a1aHR0cHM6Ly96aHVhbmxhbi56aGlodS5jb20vcC8zODYxMjgwOQ&...
- if "bing.com" in parsed.netloc:
- u_param = parse_qs(parsed.query).get('u', [None])[0]
- if u_param:
- # 移除开头的 'a1', 'a0' 等标识符
- b64_str = u_param[2:]
- # 补齐 Base64 填充符
- padding = '=' * (4 - len(b64_str) % 4)
- decoded = base64.b64decode(b64_str + padding).decode('utf-8', errors='ignore')
- if decoded.startswith('http'):
- return decoded
- # 2. 处理 Google 重定向 (特征:url 参数)
- if "google.com" in parsed.netloc:
- url_param = parse_qs(parsed.query).get('url', [None])[0]
- if url_param:
- return unquote(url_param)
- # 3. 兜底:处理常见的跳转参数
- for param in ['target', 'dest', 'destination', 'link']:
- found = parse_qs(parsed.query).get(param, [None])[0]
- if found and found.startswith('http'):
- return unquote(found)
-
- except Exception:
- pass # 解析失败则返回原链接
-
- return url
- async def extraction_adapter(input_data):
- # 提取字符串
- if isinstance(input_data, list):
- prompt = input_data[-1].content if hasattr(input_data[-1], 'content') else str(input_data[-1])
- else:
- prompt = str(input_data)
-
- response = await openrouter_llm_call(
- messages=[{"role": "user", "content": prompt}]
- )
-
- content = response["content"]
-
- # --- 核心改进:URL 自动修复 ---
- # 使用正则表达式匹配内容中的所有 URL,并尝试进行洗涤
- urls = re.findall(r'https?://[^\s<>"\']+', content)
- for original_url in urls:
- clean_url = scrub_search_redirect_url(original_url)
- if clean_url != original_url:
- content = content.replace(original_url, clean_url)
-
- from argparse import Namespace
- return Namespace(completion=content)
- @tool()
- async def browser_extract_content(query: str, extract_links: bool = False,
- start_from_char: int = 0) -> ToolResult:
- """
- 使用 LLM 从页面提取结构化数据
- Extract content from the current page using LLM
- Args:
- query: 提取查询(告诉 LLM 要提取什么内容)
- extract_links: 是否提取链接(默认 False,节省 token)
- start_from_char: 从哪个字符开始提取(用于分页提取大内容)
- Returns:
- ToolResult: 提取的内容
- Example:
- extract_content(query="提取页面上所有产品的名称和价格", extract_links=True)
- Note:
- 需要配置 page_extraction_llm,否则会失败
- 支持分页提取,最大100k字符
- """
- try:
- browser, tools = await get_browser_session()
- # 注意:extract 需要 page_extraction_llm 参数
- # 这里我们假设用户会在初始化时配置 LLM
- # 如果没有配置,会抛出异常
- result = await tools.extract(
- query=query,
- extract_links=extract_links,
- start_from_char=start_from_char,
- browser_session=browser,
- page_extraction_llm=RunnableLambda(extraction_adapter), # 需要用户配置
- file_system=_file_system
- )
- return action_result_to_tool_result(result, f"提取内容: {query}")
- except Exception as e:
- return ToolResult(
- title="内容提取失败",
- output="",
- error=f"Failed to extract content: {str(e)}",
- long_term_memory=f"提取内容失败: {query}"
- )
- async def _detect_and_download_pdf_via_cdp(browser) -> Optional[str]:
- """
- 检测当前页面是否为 PDF,如果是则通过 CDP(浏览器内 fetch)下载到本地。
- 优势:自动携带浏览器的 cookies/session,可访问需要登录的 PDF。
- 返回本地文件路径,非 PDF 页面返回 None。
- """
- try:
- current_url = await browser.get_current_page_url()
- if not current_url:
- return None
- parsed = urlparse(current_url)
- is_pdf = parsed.path.lower().endswith('.pdf')
- # URL 不明显是 PDF 时,通过 CDP 检查 content-type
- if not is_pdf:
- try:
- cdp = await browser.get_or_create_cdp_session()
- ct_result = await cdp.cdp_client.send.Runtime.evaluate(
- params={'expression': 'document.contentType'},
- session_id=cdp.session_id
- )
- content_type = ct_result.get('result', {}).get('value', '')
- is_pdf = 'pdf' in content_type.lower()
- except Exception:
- pass
- if not is_pdf:
- return None
- # 通过浏览器内 fetch API 下载 PDF(自动携带 cookies)
- cdp = await browser.get_or_create_cdp_session()
- js_code = """
- (async () => {
- try {
- const resp = await fetch(window.location.href);
- if (!resp.ok) return JSON.stringify({error: 'HTTP ' + resp.status});
- const blob = await resp.blob();
- return new Promise((resolve, reject) => {
- const reader = new FileReader();
- reader.onloadend = () => resolve(JSON.stringify({data: reader.result}));
- reader.onerror = () => resolve(JSON.stringify({error: 'FileReader failed'}));
- reader.readAsDataURL(blob);
- });
- } catch(e) {
- return JSON.stringify({error: e.message});
- }
- })()
- """
- result = await cdp.cdp_client.send.Runtime.evaluate(
- params={
- 'expression': js_code,
- 'awaitPromise': True,
- 'returnByValue': True,
- 'timeout': 60000
- },
- session_id=cdp.session_id
- )
- value = result.get('result', {}).get('value', '')
- if not value:
- print("⚠️ CDP fetch PDF: 无返回值")
- return None
- data = json.loads(value)
- if 'error' in data:
- print(f"⚠️ CDP fetch PDF 失败: {data['error']}")
- return None
- # 从 data URL 中提取 base64 并解码
- data_url = data['data'] # data:application/pdf;base64,JVBERi0...
- base64_data = data_url.split(',', 1)[1]
- pdf_bytes = base64.b64decode(base64_data)
- # 保存到本地
- save_dir = Path.cwd() / ".cache/.browser_use_files"
- save_dir.mkdir(parents=True, exist_ok=True)
- filename = Path(parsed.path).name if parsed.path else ""
- if not filename or not filename.lower().endswith('.pdf'):
- import time
- filename = f"downloaded_{int(time.time())}.pdf"
- save_path = str(save_dir / filename)
- with open(save_path, 'wb') as f:
- f.write(pdf_bytes)
- print(f"📄 PDF 已通过 CDP 下载到: {save_path} ({len(pdf_bytes)} bytes)")
- return save_path
- except Exception as e:
- print(f"⚠️ PDF 检测/下载异常: {e}")
- return None
- @tool()
- async def browser_read_long_content(
- goal: Union[str, dict],
- source: str = "page",
- context: str = "",
- **kwargs
- ) -> ToolResult:
- """
- 智能读取长内容。支持自动检测并读取网页上的 PDF 文件。
- 当 source="page" 且当前页面是 PDF 时,会通过 CDP 下载 PDF 并用 pypdf 解析,
- 而非使用 DOM 提取(DOM 无法读取浏览器内置 PDF Viewer 的内容)。
- 通过 CDP 下载可自动携带浏览器的 cookies/session,支持需要登录的 PDF。
- """
- try:
- browser, tools = await get_browser_session()
- # 1. 提取目标文本 (针对 GoalTree 字典结构)
- final_goal_text = ""
- if isinstance(goal, dict):
- final_goal_text = goal.get("mission") or goal.get("goal") or str(goal)
- else:
- final_goal_text = str(goal)
- # 2. 清洗业务背景 (过滤框架注入的 dict 类型 context)
- business_context = context if isinstance(context, str) else ""
- # 3. PDF 自动检测:当 source="page" 时检查是否为 PDF 页面
- available_files = []
- if source.lower() == "page":
- pdf_path = await _detect_and_download_pdf_via_cdp(browser)
- if pdf_path:
- source = pdf_path
- available_files.append(pdf_path)
- # 4. 验证并实例化
- action_params = ReadContentAction(
- goal=final_goal_text,
- source=source,
- context=business_context
- )
- # 5. 解包参数调用底层方法
- result = await tools.read_long_content(
- **action_params.model_dump(),
- browser_session=browser,
- page_extraction_llm=RunnableLambda(extraction_adapter),
- available_file_paths=available_files
- )
- return action_result_to_tool_result(result, f"深度读取: {source}")
- except Exception as e:
- return ToolResult(
- title="深度读取失败",
- output="",
- error=f"Read long content failed: {str(e)}",
- long_term_memory="参数解析或校验失败,请检查输入"
- )
- @tool()
- async def browser_get_page_html() -> ToolResult:
- """
- 获取当前页面的完整 HTML
- Get the full HTML of the current page
- 返回当前页面的完整 HTML 源代码。
- Returns:
- ToolResult: 包含页面 HTML 的工具返回对象
- Example:
- get_page_html()
- Note:
- - 返回的是完整的 HTML 源代码
- - 输出会被限制在 10000 字符以内(完整内容保存在 metadata 中)
- """
- try:
- browser, tools = await get_browser_session()
- # 使用 CDP 获取页面 HTML
- cdp = await browser.get_or_create_cdp_session()
- # 获取页面内容
- result = await cdp.cdp_client.send.Runtime.evaluate(
- params={'expression': 'document.documentElement.outerHTML'},
- session_id=cdp.session_id
- )
- html = result.get('result', {}).get('value', '')
- # 获取 URL 和标题
- url = await browser.get_current_page_url()
- title_result = await cdp.cdp_client.send.Runtime.evaluate(
- params={'expression': 'document.title'},
- session_id=cdp.session_id
- )
- title = title_result.get('result', {}).get('value', '')
- # 限制输出大小
- output_html = html
- if len(html) > 10000:
- output_html = html[:10000] + "... (truncated)"
- return ToolResult(
- title=f"获取 HTML: {url}",
- output=f"页面: {title}\nURL: {url}\n\nHTML:\n{output_html}",
- long_term_memory=f"获取 HTML: {url}",
- metadata={"url": url, "title": title, "html": html}
- )
- except Exception as e:
- return ToolResult(
- title="获取 HTML 失败",
- output="",
- error=f"Failed to get page HTML: {str(e)}",
- long_term_memory="获取 HTML 失败"
- )
- @tool()
- async def browser_get_selector_map() -> ToolResult:
- """
- 获取当前页面的元素索引映射
- Get the selector map of interactive elements on the current page
- 返回页面所有可交互元素的索引字典,用于后续的元素操作。
- Returns:
- ToolResult: 包含元素映射的工具返回对象
- Example:
- get_selector_map()
- Note:
- 返回的索引可以用于 click_element, input_text 等操作
- """
- try:
- browser, tools = await get_browser_session()
- # 关键修复:先触发 BrowserStateRequestEvent 来更新 DOM 状态
- # 这会触发 DOM watchdog 重新构建 DOM 树并更新 selector_map
- from browser_use.browser.events import BrowserStateRequestEvent
- # 触发事件并等待结果
- event = browser.event_bus.dispatch(
- BrowserStateRequestEvent(
- include_dom=True,
- include_screenshot=False, # 不需要截图,节省时间
- include_recent_events=False
- )
- )
- # 等待 DOM 更新完成
- browser_state = await event.event_result(raise_if_none=True, raise_if_any=True)
- # 从更新后的状态中获取 selector_map
- selector_map = browser_state.dom_state.selector_map if browser_state.dom_state else {}
- # 构建输出信息
- elements_info = []
- for index, node in list(selector_map.items())[:20]: # 只显示前20个
- tag = node.tag_name
- attrs = node.attributes or {}
- text = attrs.get('aria-label') or attrs.get('placeholder') or attrs.get('value', '')
- elements_info.append(f"索引 {index}: <{tag}> {text[:50]}")
- output = f"找到 {len(selector_map)} 个交互元素\n\n"
- output += "\n".join(elements_info)
- if len(selector_map) > 20:
- output += f"\n... 还有 {len(selector_map) - 20} 个元素"
- return ToolResult(
- title="获取元素映射",
- output=output,
- long_term_memory=f"获取到 {len(selector_map)} 个交互元素",
- metadata={"selector_map": {k: str(v) for k, v in list(selector_map.items())[:100]}}
- )
- except Exception as e:
- return ToolResult(
- title="获取元素映射失败",
- output="",
- error=f"Failed to get selector map: {str(e)}",
- long_term_memory="获取元素映射失败"
- )
- # ============================================================
- # JavaScript 执行工具 (JavaScript Tools)
- # ============================================================
- @tool()
- async def browser_evaluate(code: str) -> ToolResult:
- """
- 在页面中执行 JavaScript 代码
- Execute JavaScript code in the page context
- 允许在当前页面中执行任意 JavaScript 代码,用于复杂的页面操作或数据提取。
- Args:
- code: 要执行的 JavaScript 代码字符串
- Returns:
- ToolResult: 包含执行结果的工具返回对象
- Example:
- evaluate("document.title")
- evaluate("document.querySelectorAll('a').length")
- Note:
- - 代码在页面上下文中执行,可以访问 DOM 和全局变量
- - 返回值会被自动序列化为字符串
- - 执行结果限制在 20k 字符以内
- """
- try:
- browser, tools = await get_browser_session()
- result = await tools.evaluate(
- code=code,
- browser_session=browser
- )
- return action_result_to_tool_result(result, "执行 JavaScript")
- except Exception as e:
- return ToolResult(
- title="JavaScript 执行失败",
- output="",
- error=f"Failed to execute JavaScript: {str(e)}",
- long_term_memory="JavaScript 执行失败"
- )
- @tool()
- async def browser_ensure_login_with_cookies(cookie_type: str, url: str = "https://www.xiaohongshu.com") -> ToolResult:
- """
- 检查登录状态并在需要时注入 cookies
- """
- try:
- browser, tools = await get_browser_session()
- if url:
- await tools.navigate(url=url, browser_session=browser)
- await tools.wait(seconds=2, browser_session=browser)
- check_login_js = """
- (function() {
- const loginBtn = document.querySelector('[class*="login"]') ||
- document.querySelector('[href*="login"]') ||
- Array.from(document.querySelectorAll('button, a')).find(el => (el.textContent || '').includes('登录'));
- const userInfo = document.querySelector('[class*="user"]') ||
- document.querySelector('[class*="avatar"]');
- return {
- needLogin: !!loginBtn && !userInfo,
- hasLoginBtn: !!loginBtn,
- hasUserInfo: !!userInfo
- };
- })()
- """
- result = await tools.evaluate(code=check_login_js, browser_session=browser)
- status_output = result.extracted_content
- if isinstance(status_output, str) and status_output.startswith("Result: "):
- status_output = status_output[8:]
- login_info: Dict[str, Any] = {}
- if isinstance(status_output, str):
- try:
- login_info = json.loads(status_output)
- except Exception:
- login_info = {}
- elif isinstance(status_output, dict):
- login_info = status_output
- if not login_info.get("needLogin"):
- output = json.dumps({"need_login": False}, ensure_ascii=False)
- return ToolResult(
- title="已登录",
- output=output,
- long_term_memory=output
- )
- row = _fetch_cookie_row(cookie_type)
- cookie_value = _extract_cookie_value(row)
- if not cookie_value:
- output = json.dumps({"need_login": True, "cookies_count": 0}, ensure_ascii=False)
- return ToolResult(
- title="未找到 cookies",
- output=output,
- error="未找到 cookies",
- long_term_memory=output
- )
- domain, base_url = _cookie_domain_for_type(cookie_type, url)
- cookies = _normalize_cookies(cookie_value, domain, base_url)
- if not cookies:
- output = json.dumps({"need_login": True, "cookies_count": 0}, ensure_ascii=False)
- return ToolResult(
- title="cookies 解析失败",
- output=output,
- error="cookies 解析失败",
- long_term_memory=output
- )
- await browser._cdp_set_cookies(cookies)
- if url:
- await tools.navigate(url=url, browser_session=browser)
- await tools.wait(seconds=2, browser_session=browser)
- output = json.dumps({"need_login": True, "cookies_count": len(cookies)}, ensure_ascii=False)
- return ToolResult(
- title="已注入 cookies",
- output=output,
- long_term_memory=output
- )
- except Exception as e:
- return ToolResult(
- title="登录检查失败",
- output="",
- error=str(e),
- long_term_memory="登录检查失败"
- )
- # ============================================================
- # 等待用户操作工具 (Wait for User Action)
- # ============================================================
- @tool()
- async def browser_wait_for_user_action(message: str = "Please complete the action in browser",
- timeout: int = 300) -> ToolResult:
- """
- 等待用户在浏览器中完成操作(如登录)
- Wait for user to complete an action in the browser (e.g., login)
- 暂停自动化流程,等待用户手动完成某些操作(如登录、验证码等)。
- Args:
- message: 提示用户需要完成的操作
- timeout: 最大等待时间(秒),默认 300 秒(5 分钟)
- Returns:
- ToolResult: 包含等待结果的工具返回对象
- Example:
- wait_for_user_action("Please login to Xiaohongshu", timeout=180)
- wait_for_user_action("Please complete the CAPTCHA", timeout=60)
- Note:
- - 用户需要在浏览器窗口中手动完成操作
- - 完成后按回车键继续
- - 超时后会自动继续执行
- """
- try:
- import asyncio
- print(f"\n{'='*60}")
- print(f"⏸️ WAITING FOR USER ACTION")
- print(f"{'='*60}")
- print(f"📝 {message}")
- print(f"⏱️ Timeout: {timeout} seconds")
- print(f"\n👉 Please complete the action in the browser window")
- print(f"👉 Press ENTER when done, or wait for timeout")
- print(f"{'='*60}\n")
- # Wait for user input or timeout
- try:
- loop = asyncio.get_event_loop()
- # Wait for either user input or timeout
- await asyncio.wait_for(
- loop.run_in_executor(None, input),
- timeout=timeout
- )
- return ToolResult(
- title="用户操作完成",
- output=f"User completed: {message}",
- long_term_memory=f"用户完成操作: {message}"
- )
- except asyncio.TimeoutError:
- return ToolResult(
- title="用户操作超时",
- output=f"Timeout waiting for: {message}",
- long_term_memory=f"等待用户操作超时: {message}"
- )
- except Exception as e:
- return ToolResult(
- title="等待用户操作失败",
- output="",
- error=f"Failed to wait for user action: {str(e)}",
- long_term_memory="等待用户操作失败"
- )
- # ============================================================
- # 任务完成工具 (Task Completion)
- # ============================================================
- @tool()
- async def browser_done(text: str, success: bool = True,
- files_to_display: Optional[List[str]] = None) -> ToolResult:
- """
- 标记任务完成并返回最终消息
- Mark the task as complete and return final message to user
- Args:
- text: 给用户的最终消息
- success: 任务是否成功完成
- files_to_display: 可选的要显示的文件路径列表
- Returns:
- ToolResult: 完成结果
- Example:
- done("任务已完成,提取了10个产品信息", success=True)
- """
- try:
- browser, tools = await get_browser_session()
- result = await tools.done(
- text=text,
- success=success,
- files_to_display=files_to_display,
- file_system=_file_system
- )
- return action_result_to_tool_result(result, "任务完成")
- except Exception as e:
- return ToolResult(
- title="标记任务完成失败",
- output="",
- error=f"Failed to complete task: {str(e)}",
- long_term_memory="标记任务完成失败"
- )
- # ============================================================
- # Cookie 持久化工具
- # ============================================================
- _COOKIES_DIR = Path(__file__).parent.parent.parent.parent.parent / ".cache/.cookies"
- @tool()
- async def browser_export_cookies(name: str = "", account: str = "") -> ToolResult:
- """
- 导出当前浏览器的所有 Cookie 到本地 .cookies/ 目录。
- 文件命名格式:{域名}_{账号名}.json,如 bilibili.com_zhangsan.json
- 登录成功后调用此工具,下次可通过 browser_load_cookies 恢复登录态。
- Args:
- name: 自定义文件名(可选,提供则忽略自动命名)
- account: 账号名称(可选,用于区分同一网站的不同账号)
- """
- try:
- browser, _ = await get_browser_session()
- # 获取所有 Cookie(CDP 格式)
- all_cookies = await browser._cdp_get_cookies()
- if not all_cookies:
- return ToolResult(title="Cookie 导出", output="当前浏览器没有 Cookie", long_term_memory="无 Cookie 可导出")
- # 获取当前域名,用于过滤和命名
- from urllib.parse import urlparse
- current_url = await browser.get_current_page_url() or ''
- domain = urlparse(current_url).netloc.replace("www.", "") or "default"
- if not name:
- name = f"{domain}_{account}" if account else domain
- # 只保留当前域名的 cookie(过滤第三方)
- cookies = [c for c in all_cookies if domain in c.get("domain", "").lstrip(".")]
- # 保存
- _COOKIES_DIR.mkdir(parents=True, exist_ok=True)
- cookie_file = _COOKIES_DIR / f"{name}.json"
- cookie_file.write_text(json.dumps(cookies, ensure_ascii=False, indent=2), encoding="utf-8")
- return ToolResult(
- title="Cookie 已导出",
- output=f"已保存 {len(cookies)} 条 Cookie 到 .cookies/{name}.json(从 {len(all_cookies)} 条中过滤当前域名)",
- long_term_memory=f"导出 {len(cookies)} 条 Cookie 到 .cookies/{name}.json"
- )
- except Exception as e:
- return ToolResult(title="Cookie 导出失败", output="", error=str(e), long_term_memory="导出 Cookie 失败")
- @tool()
- async def browser_load_cookies(url: str, name: str = "") -> ToolResult:
- """
- 根据目标 URL 自动查找本地 Cookie 文件,注入浏览器并导航到目标页面恢复登录态。
- 重要:此工具会自动完成导航,调用前不需要先调用 browser_navigate_to_url。
- Args:
- url: 目标 URL(必须提供,同时用于自动匹配 Cookie 文件)
- name: Cookie 文件名(可选,不传则根据 URL 域名自动查找)
- """
- try:
- browser, tools = await get_browser_session()
- if not url.startswith("http"):
- url = f"https://{url}"
- # 根据域名自动查找 Cookie 文件
- if not name:
- from urllib.parse import urlparse
- domain = urlparse(url).netloc.replace("www.", "")
- if _COOKIES_DIR.exists():
- matches = list(_COOKIES_DIR.glob(f"{domain}*.json"))
- if matches:
- cookie_file = matches[0] # 取第一个匹配的
- else:
- available = [f.stem for f in _COOKIES_DIR.glob("*.json")]
- return ToolResult(title="未找到 Cookie", output=f"没有匹配 {domain} 的文件,可用: {available}", error=f"无 {domain} 的 Cookie 文件")
- else:
- return ToolResult(title="未找到 Cookie", output=".cookies 目录不存在", error="Cookie 目录不存在")
- else:
- cookie_file = _COOKIES_DIR / f"{name}.json"
- if not cookie_file.exists():
- available = [f.stem for f in _COOKIES_DIR.glob("*.json")] if _COOKIES_DIR.exists() else []
- return ToolResult(title="文件不存在", output=f"可用: {available}", error=f"未找到 .cookies/{name}.json")
- cookies = json.loads(cookie_file.read_text(encoding="utf-8"))
- # 直接注入(export 和 load 使用相同的 CDP 格式,无需标准化)
- await browser._cdp_set_cookies(cookies)
- # 导航到目标页面(带上刚注入的 Cookie)
- if url:
- if not url.startswith("http"):
- url = f"https://{url}"
- await tools.navigate(url=url, browser_session=browser)
- await tools.wait(seconds=3, browser_session=browser)
- return ToolResult(
- title="Cookie 注入并导航完成",
- output=f"从 {cookie_file.name} 注入 {len(cookies)} 条 Cookie,已导航到 {url}",
- long_term_memory=f"已从 {cookie_file.name} 注入 Cookie 并导航到 {url},登录态已恢复"
- )
- except Exception as e:
- return ToolResult(title="Cookie 加载失败", output="", error=str(e), long_term_memory="加载 Cookie 失败")
- # ============================================================
- # 导出所有工具函数(供外部使用)
- # ============================================================
- __all__ = [
- # 会话管理
- 'init_browser_session',
- 'get_browser_session',
- 'cleanup_browser_session',
- 'kill_browser_session',
- # 导航类工具
- 'browser_navigate_to_url',
- 'browser_search_web',
- 'browser_go_back',
- 'browser_wait',
- # 元素交互工具
- 'browser_click_element',
- 'browser_input_text',
- 'browser_send_keys',
- 'browser_upload_file',
- # 滚动和视图工具
- 'browser_scroll_page',
- 'browser_find_text',
- 'browser_screenshot',
- # 标签页管理工具
- 'browser_switch_tab',
- 'browser_close_tab',
- # 下拉框工具
- 'browser_get_dropdown_options',
- 'browser_select_dropdown_option',
- # 内容提取工具
- 'browser_extract_content',
- 'browser_get_page_html',
- 'browser_read_long_content',
- 'browser_download_direct_url',
- 'browser_get_selector_map',
- 'browser_get_visual_selector_map',
- # JavaScript 执行工具
- 'browser_evaluate',
- 'browser_ensure_login_with_cookies',
- # 等待用户操作
- 'browser_wait_for_user_action',
- # 任务完成
- 'browser_done',
- # Cookie 持久化
- 'browser_export_cookies',
- 'browser_load_cookies',
- ]
|