""" 云浏览器模式示例 Cloud Browser Mode Example 本示例展示如何使用 browser-use 的云浏览器模式进行网页自动化操作。 云浏览器模式的优势: 1. 无需本地安装 Chrome/Chromium 2. 可以在无头服务器上运行 3. 更好的稳定性和性能 4. 支持分布式部署 使用前提: 1. 在 .env 文件中配置 BROWSER_USE_API_KEY 2. 确保网络连接正常 """ import sys import os import asyncio import json import re from datetime import datetime from pathlib import Path from urllib.parse import quote from dotenv import load_dotenv # 加载环境变量 load_dotenv() # 将项目根目录添加到 Python 路径 project_root = Path(__file__).parent.parent sys.path.insert(0, str(project_root)) # 导入 browser-use 核心类 from browser_use import BrowserSession, BrowserProfile from browser_use.tools.service import Tools # 导入框架的工具函数 from agent.tools.builtin.baseClass import ( init_browser_session, cleanup_browser_session, navigate_to_url, search_web, get_selector_map, click_element, input_text, screenshot, get_page_html, evaluate, wait, scroll_page, wait_for_user_action, ) async def example_1_basic_navigation(): """ 示例 1: 基础导航操作 演示如何使用云浏览器访问网页 """ print("\n" + "="*60) print("示例 1: 基础导航操作") print("="*60) try: # 初始化云浏览器会话 # 关键参数:is_local=False 表示使用云浏览器 api_key = os.getenv("BROWSER_USE_API_KEY") if not api_key: print("❌ 错误: 未找到 BROWSER_USE_API_KEY,请在 .env 文件中配置") return print(f"✅ 使用云浏览器 API Key: {api_key[:20]}...") # 初始化浏览器会话(云模式) # 注意:API key 会自动从环境变量 BROWSER_USE_API_KEY 读取 browser, tools = await init_browser_session( headless=True, # 云浏览器通常使用无头模式 use_cloud=True, # 关键:设置为 True 使用云浏览器 ) print("✅ 云浏览器会话已启动") # 导航到百度 print("\n📍 导航到百度...") result = await navigate_to_url("https://www.baidu.com") print(f" 结果: {result.title}") # 等待页面加载 await wait(2) # 获取页面标题 print("\n📄 获取页面信息...") title_result = await evaluate("document.title") print(f" 页面标题: {title_result.output}") # 截图 print("\n📸 截图...") screenshot_result = await screenshot() print(f" 截图结果: {screenshot_result.title}") print("\n✅ 示例 1 完成") except Exception as e: print(f"❌ 错误: {str(e)}") finally: # 清理浏览器会话 await cleanup_browser_session() print("🧹 浏览器会话已清理") async def example_2_search_and_extract(): """ 示例 2: 搜索和内容提取 演示如何使用云浏览器进行搜索并提取内容 """ print("\n" + "="*60) print("示例 2: 搜索和内容提取") print("="*60) try: # 初始化云浏览器 api_key = os.getenv("BROWSER_USE_API_KEY") if not api_key: print("❌ 错误: 未找到 BROWSER_USE_API_KEY") return browser, tools = await init_browser_session( headless=True, use_cloud=True, ) print("✅ 云浏览器会话已启动") # 使用搜索引擎搜索 print("\n🔍 搜索: Python async programming...") result = await search_web("Python async programming", engine="google") print(f" 搜索结果: {result.title}") # 等待搜索结果加载 await wait(3) # 获取页面 HTML(部分) print("\n📄 获取页面 HTML...") html_result = await get_page_html() print(f" HTML 长度: {len(html_result.metadata.get('html', ''))} 字符") # 获取可交互元素 print("\n🎯 获取页面元素...") selector_result = await get_selector_map() print(f" {selector_result.output[:200]}...") print("\n✅ 示例 2 完成") except Exception as e: print(f"❌ 错误: {str(e)}") finally: await cleanup_browser_session() print("🧹 浏览器会话已清理") async def example_3_with_browser_profile(): """ 示例 3: 使用 BrowserProfile 预设配置 演示如何使用 BrowserProfile 预设 cookies、localStorage 等 """ print("\n" + "="*60) print("示例 3: 使用 BrowserProfile 预设配置") print("="*60) try: api_key = os.getenv("BROWSER_USE_API_KEY") if not api_key: print("❌ 错误: 未找到 BROWSER_USE_API_KEY") return # 创建 BrowserProfile 并预设一些配置 profile = BrowserProfile( # 可以预设 cookies cookies=[ { "name": "test_cookie", "value": "test_value", "domain": ".example.com", "path": "/", } ], # 可以预设 localStorage local_storage={ "example.com": { "key1": "value1", "key2": "value2", } }, # 可以设置用户代理 user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36", ) print("✅ 创建了 BrowserProfile 配置") # 使用 profile 初始化浏览器 browser, tools = await init_browser_session( headless=True, use_cloud=True, browser_profile=profile, # 传入 profile ) print("✅ 云浏览器会话已启动(带预设配置)") # 访问一个网页 print("\n📍 导航到示例网站...") result = await navigate_to_url("https://httpbin.org/headers") print(f" 结果: {result.title}") await wait(2) # 检查 User-Agent 是否生效 print("\n🔍 检查 User-Agent...") ua_result = await evaluate("navigator.userAgent") print(f" User-Agent: {ua_result.output[:100]}...") print("\n✅ 示例 3 完成") except Exception as e: print(f"❌ 错误: {str(e)}") finally: await cleanup_browser_session() print("🧹 浏览器会话已清理") async def example_4_form_interaction(): """ 示例 4: 表单交互 演示如何在云浏览器中进行表单填写和提交 """ print("\n" + "="*60) print("示例 4: 表单交互") print("="*60) try: api_key = os.getenv("BROWSER_USE_API_KEY") if not api_key: print("❌ 错误: 未找到 BROWSER_USE_API_KEY") return browser, tools = await init_browser_session( headless=True, use_cloud=True, ) print("✅ 云浏览器会话已启动") # 访问一个有表单的测试页面 print("\n📍 导航到表单测试页面...") result = await navigate_to_url("https://httpbin.org/forms/post") print(f" 结果: {result.title}") await wait(2) # 获取页面元素 print("\n🎯 获取页面元素...") selector_result = await get_selector_map() print(f" 找到 {selector_result.long_term_memory}") # 注意:实际使用时需要根据页面结构找到正确的元素索引 # 这里只是演示流程 print("\n✅ 示例 4 完成") except Exception as e: print(f"❌ 错误: {str(e)}") finally: await cleanup_browser_session() print("🧹 浏览器会话已清理") async def example_5_multi_tab(): """ 示例 5: 多标签页操作 演示如何在云浏览器中管理多个标签页 """ print("\n" + "="*60) print("示例 5: 多标签页操作") print("="*60) try: api_key = os.getenv("BROWSER_USE_API_KEY") if not api_key: print("❌ 错误: 未找到 BROWSER_USE_API_KEY") return browser, tools = await init_browser_session( headless=True, use_cloud=True, ) print("✅ 云浏览器会话已启动") # 在第一个标签页打开百度 print("\n📍 标签页 1: 打开百度...") result1 = await navigate_to_url("https://www.baidu.com") print(f" 结果: {result1.title}") await wait(2) # 在新标签页打开谷歌 print("\n📍 标签页 2: 打开谷歌(新标签页)...") result2 = await navigate_to_url("https://www.google.com", new_tab=True) print(f" 结果: {result2.title}") await wait(2) # 获取当前页面信息 print("\n📄 当前页面信息...") title_result = await evaluate("document.title") print(f" 当前标题: {title_result.output}") print("\n✅ 示例 5 完成") except Exception as e: print(f"❌ 错误: {str(e)}") finally: await cleanup_browser_session() print("🧹 浏览器会话已清理") def load_cookies(cookie_str, domain, url=None): cookies = [] try: for cookie_part in cookie_str.split(';'): if cookie_part: name, value = cookie_part.split('=', 1) cookie = {"name": str(name).strip(), "value": str(value).strip(), "domain": domain, "path":"/", "expires":-1, "httpOnly": False, "secure": True, "sameSite":"None"} if url: cookie["url"] = url cookies.append(cookie) except: pass return cookies async def example_6_xhs_search_save(): """ 示例 6: 小红书搜索并保存结果(带登录) 演示如何处理需要登录的网站 """ print("\n" + "="*60) print("示例 6: 小红书搜索并保存结果(带登录)") print("="*60) try: api_key = os.getenv("BROWSER_USE_API_KEY") if not api_key: print("❌ 错误: 未找到 BROWSER_USE_API_KEY") return # 创建 BrowserProfile cookiesStr = "gid=yjJiiqSqKKf8yjJiiqSJiWMKyJvfq2vIJxYDh4EfAyCW9Sq89uUhxI888y4JW8y8WJS448Kj; a1=19a5821e25frfgqcz1g48ktmjilzla6dvt8saird230000337474; webId=bf5a89012d3e96b8e8317a9158d2237b; abRequestId=bf5a89012d3e96b8e8317a9158d2237b; x-user-id-pgy.xiaohongshu.com=64cb5fa2000000002b00a903; x-user-id-ad.xiaohongshu.com=67078bac000000001d022a25; x-user-id-mcc.xiaohongshu.com=67078bac000000001d022a25; web_session=040069b5bf1ceafef95542ee0a3b4b114d9a59; x-user-id-pro.xiaohongshu.com=67078bac000000001d022a25; x-user-id-creator.xiaohongshu.com=64cb5fa2000000002b00a903; webBuild=5.8.0; unread={%22ub%22:%226972cc62000000001a032ef0%22%2C%22ue%22:%226978c695000000001a030baf%22%2C%22uc%22:25}; acw_tc=0a0d0d6817697823078311273e2749a170e3d6e7c28bc3c6b3df1b05366b21; xsecappid=ugc; websectiga=f47eda31ec99545da40c2f731f0630efd2b0959e1dd10d5fedac3dce0bd1e04d; sec_poison_id=8f37e824-4cf9-4c1a-8a6b-1297a36d51ba; customer-sso-sid=68c517601157138359418885nha1gpvvujwqbhia; customerClientId=609975161834570; access-token-creator.xiaohongshu.com=customer.creator.AT-68c517601157138359418887mosxcziw5qwkllrs; galaxy_creator_session_id=NIUNVxmv6LPmZ31jZ2DoKYgyUutPOItjJ24t; galaxy.creator.beaker.session.id=1769782309631057230248; loadts=1769782310288" cookie_url = "https://www.xiaohongshu.com" cookies = load_cookies(cookiesStr, ".xiaohongshu.com", cookie_url) profile = BrowserProfile( user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36", ) # 初始化云浏览器(非无头模式,方便用户看到登录界面) browser, tools = await init_browser_session( headless=False, # 设置为 False,方便用户看到浏览器界面 use_cloud=True, browser_profile=profile, ) print("✅ 云浏览器会话已启动") print("📝 提示: 云浏览器启动时会输出 Live URL,你可以在浏览器中打开查看") # 步骤 1: 先访问小红书首页,检查是否需要登录 print("\n📍 步骤 1: 访问小红书首页...") await navigate_to_url("https://www.xiaohongshu.com") await wait(3) await browser._cdp_set_cookies(cookies) await wait(1) await navigate_to_url("https://www.xiaohongshu.com") await wait(3) # 检查是否需要登录 print("\n🔍 检查登录状态...") check_login_js = """ (function() { // 检查是否有登录按钮或登录相关元素 const loginBtn = document.querySelector('[class*="login"]') || document.querySelector('[href*="login"]') || Array.from(document.querySelectorAll('button, a')).find(el => (el.textContent || '').includes('登录')); // 检查是否有用户信息(已登录) const userInfo = document.querySelector('[class*="user"]') || document.querySelector('[class*="avatar"]'); return { needLogin: !!loginBtn && !userInfo, hasLoginBtn: !!loginBtn, hasUserInfo: !!userInfo }; })() """ login_status = await evaluate(check_login_js) print(f" 登录状态检查: {login_status.output}") status_output = login_status.output if isinstance(status_output, str) and status_output.startswith("Result: "): status_output = status_output[8:] login_info = None if isinstance(status_output, str): try: login_info = json.loads(status_output) except Exception: login_info = None elif isinstance(status_output, dict): login_info = status_output if login_info and login_info.get("needLogin"): print("\n👤 步骤 2: 登录处理...") print(" 如果小红书需要登录,请在云浏览器中完成以下操作:") print(" 1. 打开上面输出的 Live URL(在日志中查找 '🔗 Live URL')") print(" 2. 在 Live URL 页面中完成登录(扫码或账号密码)") print(" 3. 登录成功后,回到这里按 Enter 继续") await wait_for_user_action( message="请在云浏览器中完成小红书登录,完成后按 Enter 继续", timeout=300 ) print("\n✅ 用户已确认登录完成,继续执行...") else: print("\n✅ 已检测为登录状态,跳过手动登录") # 步骤 3: 执行搜索 keyword = "瑜伽美女" search_url = f"https://www.xiaohongshu.com/search_result?keyword={quote(keyword)}&type=51" print(f"\n📍 步骤 3: 导航到搜索页: {keyword} ...") await navigate_to_url(search_url) await wait(6) # 滚动页面加载更多内容 print("\n📜 滚动页面加载更多内容...") for i in range(3): print(f" 滚动 {i+1}/3...") await scroll_page(down=True, pages=2.0) await wait(2) # 步骤 4: 提取数据 print("\n📊 步骤 4: 提取搜索结果...") extract_js = """ (function(){ const maxCount = 20; const seen = new Set(); const results = []; function pushItem(item){ if (!item || !item.link || seen.has(item.link)) return; seen.add(item.link); results.push(item); } // 方法 1: 从 DOM 中提取 const anchors = document.querySelectorAll('a[href*="/explore/"]'); anchors.forEach(a => { if (results.length >= maxCount) return; const link = a.href || ''; const img = a.querySelector('img'); const title = ((img && img.alt) || a.textContent || '').trim(); const cover = (img && img.src) || ''; if (link && title) { pushItem({ title, link, cover }); } }); // 方法 2: 从 JSON 数据中提取 const scriptNodes = document.querySelectorAll('script[type="application/json"], script#__NEXT_DATA__, script#__NUXT__'); const walk = (node) => { if (!node || results.length >= maxCount) return; if (Array.isArray(node)) { for (const item of node) { walk(item); if (results.length >= maxCount) return; } return; } if (typeof node === 'object') { const title = (node.title || node.desc || node.name || node.noteTitle || '').toString().trim(); const id = node.noteId || node.note_id || node.id || node.noteID; const cover = (node.cover && (node.cover.url || node.cover.urlDefault)) || node.coverUrl || node.image || ''; let link = ''; if (id) { link = `https://www.xiaohongshu.com/explore/${id}`; } if (title && link) { pushItem({ title, link, cover }); } for (const key in node) { if (typeof node[key] === 'object') walk(node[key]); } } }; scriptNodes.forEach(node => { if (results.length >= maxCount) return; const text = node.textContent || ''; if (!text) return; try { const data = JSON.parse(text); walk(data); } catch (e) {} }); return { success: true, keyword: '瑜伽美女', count: results.length, results: results, timestamp: new Date().toISOString(), }; })() """ async def run_extract(): result = await evaluate(extract_js) output = result.output if isinstance(output, str) and output.startswith("Result: "): output = output[8:] try: data = json.loads(output) except Exception: data = { "success": False, "keyword": keyword, "error": "JSON 解析失败", "raw_output": str(output)[:2000], "timestamp": datetime.now().isoformat(), } if isinstance(data, dict) and data.get("count", 0) == 0: print(" JS 提取结果为空,尝试从 HTML 中提取...") html_result = await get_page_html() html = html_result.metadata.get("html", "") if html: def decode_text(value: str) -> str: try: return bytes(value, "utf-8").decode("unicode_escape") except Exception: return value results = [] seen = set() pattern = re.compile(r'"noteId":"(.*?)".*?"title":"(.*?)"', re.S) for match in pattern.finditer(html): note_id = match.group(1) title = decode_text(match.group(2)).strip() link = f"https://www.xiaohongshu.com/explore/{note_id}" if note_id and link not in seen and title: seen.add(link) results.append({"title": title, "link": link}) if len(results) >= 20: break if results: data = { "success": True, "keyword": keyword, "count": len(results), "results": results, "timestamp": datetime.now().isoformat(), "source": "html_fallback", } else: blocked_markers = ["登录", "验证", "验证码", "请先登录", "异常访问"] if any(marker in html for marker in blocked_markers): data = { "success": False, "keyword": keyword, "count": 0, "results": [], "error": "可能被登录或验证码拦截", "timestamp": datetime.now().isoformat(), } return data data = await run_extract() if isinstance(data, dict) and data.get("count", 0) == 0 and data.get("error") == "可能被登录或验证码拦截": print("\n👤 检测到拦截,请在云浏览器中完成登录或验证码验证") await wait_for_user_action( message="完成后按 Enter 继续,将重新提取搜索结果", timeout=300 ) data = await run_extract() # 步骤 5: 保存结果 print(f"\n💾 步骤 5: 保存结果...") print(f" 提取到 {data.get('count', 0)} 条数据") output_dir = Path(__file__).parent.parent / "output" output_dir.mkdir(parents=True, exist_ok=True) output_path = output_dir / "xhs.json" with open(output_path, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=2) print(f"✅ 数据已保存到: {output_path}") # 显示部分结果 if data.get("results"): print(f"\n📋 前 3 条结果预览:") for i, item in enumerate(data["results"][:3], 1): print(f" {i}. {item.get('title', 'N/A')[:50]}") print(f" {item.get('link', 'N/A')}") print("\n✅ 示例 6 完成") except Exception as e: print(f"❌ 错误: {str(e)}") import traceback traceback.print_exc() finally: await cleanup_browser_session() print("🧹 浏览器会话已清理") async def example_7_baidu_search_save(): print("\n" + "="*60) print("示例 7: 百度搜索并保存结果") print("="*60) try: api_key = os.getenv("BROWSER_USE_API_KEY") if not api_key: print("❌ 错误: 未找到 BROWSER_USE_API_KEY") return await init_browser_session( headless=True, use_cloud=True, ) print("✅ 云浏览器会话已启动") keyword = "瑜伽美女" search_url = f"https://www.baidu.com/s?wd={quote(keyword)}" print(f"\n📍 导航到百度搜索页: {keyword} ...") await navigate_to_url(search_url) await wait(3) await scroll_page(down=True, pages=1.5) await wait(2) extract_js = """ (function(){ const results = []; const items = document.querySelectorAll('#content_left > div[class*="result"]'); items.forEach((item, index) => { if (index >= 10) return; const titleEl = item.querySelector('h3 a, .t a'); const title = titleEl ? titleEl.textContent.trim() : ''; const link = titleEl ? titleEl.href : ''; const summaryEl = item.querySelector('.c-abstract, .content-right_8Zs40'); const summary = summaryEl ? summaryEl.textContent.trim() : ''; const sourceEl = item.querySelector('.c-color-gray, .source_1Vdff'); const source = sourceEl ? sourceEl.textContent.trim() : ''; if (title || link) { results.push({ index: index + 1, title, link, summary: summary.substring(0, 200), source, }); } }); return { success: true, keyword: '瑜伽美女', count: results.length, results, timestamp: new Date().toISOString(), }; })() """ result = await evaluate(extract_js) output = result.output if isinstance(output, str) and output.startswith("Result: "): output = output[8:] try: data = json.loads(output) except Exception: data = { "success": False, "keyword": keyword, "error": "JSON 解析失败", "raw_output": str(output)[:2000], "timestamp": datetime.now().isoformat(), } output_dir = Path(__file__).parent.parent / "output" output_dir.mkdir(parents=True, exist_ok=True) output_path = output_dir / "baidu.json" with open(output_path, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=2) print(f"✅ 数据已保存到: {output_path}") if data.get("results"): print("\n📋 前 3 条结果预览:") for i, item in enumerate(data["results"][:3], 1): print(f" {i}. {item.get('title', 'N/A')[:50]}") print(f" {item.get('link', 'N/A')}") print("\n✅ 示例 7 完成") except Exception as e: print(f"❌ 错误: {str(e)}") finally: await cleanup_browser_session() print("🧹 浏览器会话已清理") async def main(): """ 主函数:运行所有示例 """ import argparse print("\n" + "="*60) print("🌐 Browser-Use 云浏览器模式示例") print("="*60) # 检查 API Key api_key = os.getenv("BROWSER_USE_API_KEY") if not api_key: print("\n❌ 错误: 未找到 BROWSER_USE_API_KEY") print("请在 .env 文件中配置 BROWSER_USE_API_KEY") return print(f"\n✅ 已加载 API Key: {api_key[:20]}...") # 运行示例(可以选择运行哪些示例) examples = [ ("基础导航操作", example_1_basic_navigation), ("搜索和内容提取", example_2_search_and_extract), ("使用 BrowserProfile", example_3_with_browser_profile), ("表单交互", example_4_form_interaction), ("多标签页操作", example_5_multi_tab), ("小红书搜索并保存结果", example_6_xhs_search_save), ("百度搜索并保存结果", example_7_baidu_search_save), ] # 解析命令行参数 parser = argparse.ArgumentParser(description="Browser-Use 云浏览器模式示例") parser.add_argument( "--example", type=int, choices=range(1, len(examples) + 1), help="选择要运行的示例 (1-7),不指定则运行第一个示例" ) parser.add_argument( "--all", action="store_true", help="运行所有示例" ) args = parser.parse_args() print("\n可用示例:") for i, (name, _) in enumerate(examples, 1): print(f" {i}. {name}") if args.all: # 运行所有示例 print("\n运行所有示例...") for name, func in examples: await func() print("\n" + "-"*60) elif args.example: # 运行指定示例 name, func = examples[args.example - 1] print(f"\n运行示例 {args.example}: {name}") await func() else: # 默认运行第一个示例 name, func = examples[0] print(f"\n运行默认示例: {name}") print("(使用 --example N 运行其他示例,或 --all 运行所有示例)") await func() print("\n" + "="*60) print("✅ 示例运行完成") print("="*60) if __name__ == "__main__": # 运行主函数 asyncio.run(main())