| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802 |
- """
- 云浏览器模式示例
- Cloud Browser Mode Example
- 本示例展示如何使用 browser-use 的云浏览器模式进行网页自动化操作。
- 云浏览器模式的优势:
- 1. 无需本地安装 Chrome/Chromium
- 2. 可以在无头服务器上运行
- 3. 更好的稳定性和性能
- 4. 支持分布式部署
- 使用前提:
- 1. 在 .env 文件中配置 BROWSER_USE_API_KEY
- 2. 确保网络连接正常
- """
- import sys
- import os
- import asyncio
- import json
- import re
- from datetime import datetime
- from pathlib import Path
- from urllib.parse import quote
- from dotenv import load_dotenv
- # 加载环境变量
- load_dotenv()
- # 将项目根目录添加到 Python 路径
- project_root = Path(__file__).parent.parent
- sys.path.insert(0, str(project_root))
- # 导入 browser-use 核心类
- from browser_use import BrowserSession, BrowserProfile
- from browser_use.tools.service import Tools
- # 导入框架的工具函数
- from agent.tools.builtin.baseClass import (
- init_browser_session,
- cleanup_browser_session,
- navigate_to_url,
- search_web,
- get_selector_map,
- click_element,
- input_text,
- screenshot,
- get_page_html,
- evaluate,
- wait,
- scroll_page,
- wait_for_user_action,
- )
- async def example_1_basic_navigation():
- """
- 示例 1: 基础导航操作
- 演示如何使用云浏览器访问网页
- """
- print("\n" + "="*60)
- print("示例 1: 基础导航操作")
- print("="*60)
- try:
- # 初始化云浏览器会话
- # 关键参数:is_local=False 表示使用云浏览器
- api_key = os.getenv("BROWSER_USE_API_KEY")
- if not api_key:
- print("❌ 错误: 未找到 BROWSER_USE_API_KEY,请在 .env 文件中配置")
- return
- print(f"✅ 使用云浏览器 API Key: {api_key[:20]}...")
- # 初始化浏览器会话(云模式)
- # 注意:API key 会自动从环境变量 BROWSER_USE_API_KEY 读取
- browser, tools = await init_browser_session(
- headless=True, # 云浏览器通常使用无头模式
- use_cloud=True, # 关键:设置为 True 使用云浏览器
- )
- print("✅ 云浏览器会话已启动")
- # 导航到百度
- print("\n📍 导航到百度...")
- result = await navigate_to_url("https://www.baidu.com")
- print(f" 结果: {result.title}")
- # 等待页面加载
- await wait(2)
- # 获取页面标题
- print("\n📄 获取页面信息...")
- title_result = await evaluate("document.title")
- print(f" 页面标题: {title_result.output}")
- # 截图
- print("\n📸 截图...")
- screenshot_result = await screenshot()
- print(f" 截图结果: {screenshot_result.title}")
- print("\n✅ 示例 1 完成")
- except Exception as e:
- print(f"❌ 错误: {str(e)}")
- finally:
- # 清理浏览器会话
- await cleanup_browser_session()
- print("🧹 浏览器会话已清理")
- async def example_2_search_and_extract():
- """
- 示例 2: 搜索和内容提取
- 演示如何使用云浏览器进行搜索并提取内容
- """
- print("\n" + "="*60)
- print("示例 2: 搜索和内容提取")
- print("="*60)
- try:
- # 初始化云浏览器
- api_key = os.getenv("BROWSER_USE_API_KEY")
- if not api_key:
- print("❌ 错误: 未找到 BROWSER_USE_API_KEY")
- return
- browser, tools = await init_browser_session(
- headless=True,
- use_cloud=True,
- )
- print("✅ 云浏览器会话已启动")
- # 使用搜索引擎搜索
- print("\n🔍 搜索: Python async programming...")
- result = await search_web("Python async programming", engine="google")
- print(f" 搜索结果: {result.title}")
- # 等待搜索结果加载
- await wait(3)
- # 获取页面 HTML(部分)
- print("\n📄 获取页面 HTML...")
- html_result = await get_page_html()
- print(f" HTML 长度: {len(html_result.metadata.get('html', ''))} 字符")
- # 获取可交互元素
- print("\n🎯 获取页面元素...")
- selector_result = await get_selector_map()
- print(f" {selector_result.output[:200]}...")
- print("\n✅ 示例 2 完成")
- except Exception as e:
- print(f"❌ 错误: {str(e)}")
- finally:
- await cleanup_browser_session()
- print("🧹 浏览器会话已清理")
- async def example_3_with_browser_profile():
- """
- 示例 3: 使用 BrowserProfile 预设配置
- 演示如何使用 BrowserProfile 预设 cookies、localStorage 等
- """
- print("\n" + "="*60)
- print("示例 3: 使用 BrowserProfile 预设配置")
- print("="*60)
- try:
- api_key = os.getenv("BROWSER_USE_API_KEY")
- if not api_key:
- print("❌ 错误: 未找到 BROWSER_USE_API_KEY")
- return
- # 创建 BrowserProfile 并预设一些配置
- profile = BrowserProfile(
- # 可以预设 cookies
- cookies=[
- {
- "name": "test_cookie",
- "value": "test_value",
- "domain": ".example.com",
- "path": "/",
- }
- ],
- # 可以预设 localStorage
- local_storage={
- "example.com": {
- "key1": "value1",
- "key2": "value2",
- }
- },
- # 可以设置用户代理
- user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36",
- )
- print("✅ 创建了 BrowserProfile 配置")
- # 使用 profile 初始化浏览器
- browser, tools = await init_browser_session(
- headless=True,
- use_cloud=True,
- browser_profile=profile, # 传入 profile
- )
- print("✅ 云浏览器会话已启动(带预设配置)")
- # 访问一个网页
- print("\n📍 导航到示例网站...")
- result = await navigate_to_url("https://httpbin.org/headers")
- print(f" 结果: {result.title}")
- await wait(2)
- # 检查 User-Agent 是否生效
- print("\n🔍 检查 User-Agent...")
- ua_result = await evaluate("navigator.userAgent")
- print(f" User-Agent: {ua_result.output[:100]}...")
- print("\n✅ 示例 3 完成")
- except Exception as e:
- print(f"❌ 错误: {str(e)}")
- finally:
- await cleanup_browser_session()
- print("🧹 浏览器会话已清理")
- async def example_4_form_interaction():
- """
- 示例 4: 表单交互
- 演示如何在云浏览器中进行表单填写和提交
- """
- print("\n" + "="*60)
- print("示例 4: 表单交互")
- print("="*60)
- try:
- api_key = os.getenv("BROWSER_USE_API_KEY")
- if not api_key:
- print("❌ 错误: 未找到 BROWSER_USE_API_KEY")
- return
- browser, tools = await init_browser_session(
- headless=True,
- use_cloud=True,
- )
- print("✅ 云浏览器会话已启动")
- # 访问一个有表单的测试页面
- print("\n📍 导航到表单测试页面...")
- result = await navigate_to_url("https://httpbin.org/forms/post")
- print(f" 结果: {result.title}")
- await wait(2)
- # 获取页面元素
- print("\n🎯 获取页面元素...")
- selector_result = await get_selector_map()
- print(f" 找到 {selector_result.long_term_memory}")
- # 注意:实际使用时需要根据页面结构找到正确的元素索引
- # 这里只是演示流程
- print("\n✅ 示例 4 完成")
- except Exception as e:
- print(f"❌ 错误: {str(e)}")
- finally:
- await cleanup_browser_session()
- print("🧹 浏览器会话已清理")
- async def example_5_multi_tab():
- """
- 示例 5: 多标签页操作
- 演示如何在云浏览器中管理多个标签页
- """
- print("\n" + "="*60)
- print("示例 5: 多标签页操作")
- print("="*60)
- try:
- api_key = os.getenv("BROWSER_USE_API_KEY")
- if not api_key:
- print("❌ 错误: 未找到 BROWSER_USE_API_KEY")
- return
- browser, tools = await init_browser_session(
- headless=True,
- use_cloud=True,
- )
- print("✅ 云浏览器会话已启动")
- # 在第一个标签页打开百度
- print("\n📍 标签页 1: 打开百度...")
- result1 = await navigate_to_url("https://www.baidu.com")
- print(f" 结果: {result1.title}")
- await wait(2)
- # 在新标签页打开谷歌
- print("\n📍 标签页 2: 打开谷歌(新标签页)...")
- result2 = await navigate_to_url("https://www.google.com", new_tab=True)
- print(f" 结果: {result2.title}")
- await wait(2)
- # 获取当前页面信息
- print("\n📄 当前页面信息...")
- title_result = await evaluate("document.title")
- print(f" 当前标题: {title_result.output}")
- print("\n✅ 示例 5 完成")
- except Exception as e:
- print(f"❌ 错误: {str(e)}")
- finally:
- await cleanup_browser_session()
- print("🧹 浏览器会话已清理")
-
- def load_cookies(cookie_str, domain, url=None):
- cookies = []
- try:
- for cookie_part in cookie_str.split(';'):
- if cookie_part:
- name, value = cookie_part.split('=', 1)
- cookie = {"name": str(name).strip(), "value": str(value).strip(), "domain": domain,
- "path":"/",
- "expires":-1,
- "httpOnly": False,
- "secure": True,
- "sameSite":"None"}
- if url:
- cookie["url"] = url
- cookies.append(cookie)
- except:
- pass
- return cookies
- async def example_6_xhs_search_save():
- """
- 示例 6: 小红书搜索并保存结果(带登录)
- 演示如何处理需要登录的网站
- """
- print("\n" + "="*60)
- print("示例 6: 小红书搜索并保存结果(带登录)")
- print("="*60)
- try:
- api_key = os.getenv("BROWSER_USE_API_KEY")
- if not api_key:
- print("❌ 错误: 未找到 BROWSER_USE_API_KEY")
- return
- # 创建 BrowserProfile
-
- cookiesStr = "gid=yjJiiqSqKKf8yjJiiqSJiWMKyJvfq2vIJxYDh4EfAyCW9Sq89uUhxI888y4JW8y8WJS448Kj; a1=19a5821e25frfgqcz1g48ktmjilzla6dvt8saird230000337474; webId=bf5a89012d3e96b8e8317a9158d2237b; abRequestId=bf5a89012d3e96b8e8317a9158d2237b; x-user-id-pgy.xiaohongshu.com=64cb5fa2000000002b00a903; x-user-id-ad.xiaohongshu.com=67078bac000000001d022a25; x-user-id-mcc.xiaohongshu.com=67078bac000000001d022a25; web_session=040069b5bf1ceafef95542ee0a3b4b114d9a59; x-user-id-pro.xiaohongshu.com=67078bac000000001d022a25; x-user-id-creator.xiaohongshu.com=64cb5fa2000000002b00a903; webBuild=5.8.0; unread={%22ub%22:%226972cc62000000001a032ef0%22%2C%22ue%22:%226978c695000000001a030baf%22%2C%22uc%22:25}; acw_tc=0a0d0d6817697823078311273e2749a170e3d6e7c28bc3c6b3df1b05366b21; xsecappid=ugc; websectiga=f47eda31ec99545da40c2f731f0630efd2b0959e1dd10d5fedac3dce0bd1e04d; sec_poison_id=8f37e824-4cf9-4c1a-8a6b-1297a36d51ba; customer-sso-sid=68c517601157138359418885nha1gpvvujwqbhia; customerClientId=609975161834570; access-token-creator.xiaohongshu.com=customer.creator.AT-68c517601157138359418887mosxcziw5qwkllrs; galaxy_creator_session_id=NIUNVxmv6LPmZ31jZ2DoKYgyUutPOItjJ24t; galaxy.creator.beaker.session.id=1769782309631057230248; loadts=1769782310288"
-
- cookie_url = "https://www.xiaohongshu.com"
- cookies = load_cookies(cookiesStr, ".xiaohongshu.com", cookie_url)
- profile = BrowserProfile(
- user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36",
- )
- # 初始化云浏览器(非无头模式,方便用户看到登录界面)
- browser, tools = await init_browser_session(
- headless=False, # 设置为 False,方便用户看到浏览器界面
- use_cloud=True,
- browser_profile=profile,
- )
- print("✅ 云浏览器会话已启动")
- print("📝 提示: 云浏览器启动时会输出 Live URL,你可以在浏览器中打开查看")
- # 步骤 1: 先访问小红书首页,检查是否需要登录
- print("\n📍 步骤 1: 访问小红书首页...")
- await navigate_to_url("https://www.xiaohongshu.com")
- await wait(3)
- await browser._cdp_set_cookies(cookies)
- await wait(1)
- await navigate_to_url("https://www.xiaohongshu.com")
- await wait(3)
- # 检查是否需要登录
- print("\n🔍 检查登录状态...")
- check_login_js = """
- (function() {
- // 检查是否有登录按钮或登录相关元素
- const loginBtn = document.querySelector('[class*="login"]') ||
- document.querySelector('[href*="login"]') ||
- Array.from(document.querySelectorAll('button, a')).find(el => (el.textContent || '').includes('登录'));
- // 检查是否有用户信息(已登录)
- const userInfo = document.querySelector('[class*="user"]') ||
- document.querySelector('[class*="avatar"]');
- return {
- needLogin: !!loginBtn && !userInfo,
- hasLoginBtn: !!loginBtn,
- hasUserInfo: !!userInfo
- };
- })()
- """
- login_status = await evaluate(check_login_js)
- print(f" 登录状态检查: {login_status.output}")
- status_output = login_status.output
- if isinstance(status_output, str) and status_output.startswith("Result: "):
- status_output = status_output[8:]
- login_info = None
- if isinstance(status_output, str):
- try:
- login_info = json.loads(status_output)
- except Exception:
- login_info = None
- elif isinstance(status_output, dict):
- login_info = status_output
- if login_info and login_info.get("needLogin"):
- print("\n👤 步骤 2: 登录处理...")
- print(" 如果小红书需要登录,请在云浏览器中完成以下操作:")
- print(" 1. 打开上面输出的 Live URL(在日志中查找 '🔗 Live URL')")
- print(" 2. 在 Live URL 页面中完成登录(扫码或账号密码)")
- print(" 3. 登录成功后,回到这里按 Enter 继续")
- await wait_for_user_action(
- message="请在云浏览器中完成小红书登录,完成后按 Enter 继续",
- timeout=300
- )
- print("\n✅ 用户已确认登录完成,继续执行...")
- else:
- print("\n✅ 已检测为登录状态,跳过手动登录")
- # 步骤 3: 执行搜索
- keyword = "瑜伽美女"
- search_url = f"https://www.xiaohongshu.com/search_result?keyword={quote(keyword)}&type=51"
- print(f"\n📍 步骤 3: 导航到搜索页: {keyword} ...")
- await navigate_to_url(search_url)
- await wait(6)
- # 滚动页面加载更多内容
- print("\n📜 滚动页面加载更多内容...")
- for i in range(3):
- print(f" 滚动 {i+1}/3...")
- await scroll_page(down=True, pages=2.0)
- await wait(2)
- # 步骤 4: 提取数据
- print("\n📊 步骤 4: 提取搜索结果...")
- extract_js = """
- (function(){
- const maxCount = 20;
- const seen = new Set();
- const results = [];
- function pushItem(item){
- if (!item || !item.link || seen.has(item.link)) return;
- seen.add(item.link);
- results.push(item);
- }
- // 方法 1: 从 DOM 中提取
- const anchors = document.querySelectorAll('a[href*="/explore/"]');
- anchors.forEach(a => {
- if (results.length >= maxCount) return;
- const link = a.href || '';
- const img = a.querySelector('img');
- const title = ((img && img.alt) || a.textContent || '').trim();
- const cover = (img && img.src) || '';
- if (link && title) {
- pushItem({ title, link, cover });
- }
- });
- // 方法 2: 从 JSON 数据中提取
- const scriptNodes = document.querySelectorAll('script[type="application/json"], script#__NEXT_DATA__, script#__NUXT__');
- const walk = (node) => {
- if (!node || results.length >= maxCount) return;
- if (Array.isArray(node)) {
- for (const item of node) {
- walk(item);
- if (results.length >= maxCount) return;
- }
- return;
- }
- if (typeof node === 'object') {
- const title = (node.title || node.desc || node.name || node.noteTitle || '').toString().trim();
- const id = node.noteId || node.note_id || node.id || node.noteID;
- const cover = (node.cover && (node.cover.url || node.cover.urlDefault)) || node.coverUrl || node.image || '';
- let link = '';
- if (id) {
- link = `https://www.xiaohongshu.com/explore/${id}`;
- }
- if (title && link) {
- pushItem({ title, link, cover });
- }
- for (const key in node) {
- if (typeof node[key] === 'object') walk(node[key]);
- }
- }
- };
- scriptNodes.forEach(node => {
- if (results.length >= maxCount) return;
- const text = node.textContent || '';
- if (!text) return;
- try {
- const data = JSON.parse(text);
- walk(data);
- } catch (e) {}
- });
- return {
- success: true,
- keyword: '瑜伽美女',
- count: results.length,
- results: results,
- timestamp: new Date().toISOString(),
- };
- })()
- """
- async def run_extract():
- result = await evaluate(extract_js)
- output = result.output
- if isinstance(output, str) and output.startswith("Result: "):
- output = output[8:]
- try:
- data = json.loads(output)
- except Exception:
- data = {
- "success": False,
- "keyword": keyword,
- "error": "JSON 解析失败",
- "raw_output": str(output)[:2000],
- "timestamp": datetime.now().isoformat(),
- }
- if isinstance(data, dict) and data.get("count", 0) == 0:
- print(" JS 提取结果为空,尝试从 HTML 中提取...")
- html_result = await get_page_html()
- html = html_result.metadata.get("html", "")
- if html:
- def decode_text(value: str) -> str:
- try:
- return bytes(value, "utf-8").decode("unicode_escape")
- except Exception:
- return value
- results = []
- seen = set()
- pattern = re.compile(r'"noteId":"(.*?)".*?"title":"(.*?)"', re.S)
- for match in pattern.finditer(html):
- note_id = match.group(1)
- title = decode_text(match.group(2)).strip()
- link = f"https://www.xiaohongshu.com/explore/{note_id}"
- if note_id and link not in seen and title:
- seen.add(link)
- results.append({"title": title, "link": link})
- if len(results) >= 20:
- break
- if results:
- data = {
- "success": True,
- "keyword": keyword,
- "count": len(results),
- "results": results,
- "timestamp": datetime.now().isoformat(),
- "source": "html_fallback",
- }
- else:
- blocked_markers = ["登录", "验证", "验证码", "请先登录", "异常访问"]
- if any(marker in html for marker in blocked_markers):
- data = {
- "success": False,
- "keyword": keyword,
- "count": 0,
- "results": [],
- "error": "可能被登录或验证码拦截",
- "timestamp": datetime.now().isoformat(),
- }
- return data
- data = await run_extract()
- if isinstance(data, dict) and data.get("count", 0) == 0 and data.get("error") == "可能被登录或验证码拦截":
- print("\n👤 检测到拦截,请在云浏览器中完成登录或验证码验证")
- await wait_for_user_action(
- message="完成后按 Enter 继续,将重新提取搜索结果",
- timeout=300
- )
- data = await run_extract()
- # 步骤 5: 保存结果
- print(f"\n💾 步骤 5: 保存结果...")
- print(f" 提取到 {data.get('count', 0)} 条数据")
- output_dir = Path(__file__).parent.parent / "output"
- output_dir.mkdir(parents=True, exist_ok=True)
- output_path = output_dir / "xhs.json"
- with open(output_path, "w", encoding="utf-8") as f:
- json.dump(data, f, ensure_ascii=False, indent=2)
- print(f"✅ 数据已保存到: {output_path}")
- # 显示部分结果
- if data.get("results"):
- print(f"\n📋 前 3 条结果预览:")
- for i, item in enumerate(data["results"][:3], 1):
- print(f" {i}. {item.get('title', 'N/A')[:50]}")
- print(f" {item.get('link', 'N/A')}")
- print("\n✅ 示例 6 完成")
- except Exception as e:
- print(f"❌ 错误: {str(e)}")
- import traceback
- traceback.print_exc()
- finally:
- await cleanup_browser_session()
- print("🧹 浏览器会话已清理")
- async def example_7_baidu_search_save():
- print("\n" + "="*60)
- print("示例 7: 百度搜索并保存结果")
- print("="*60)
- try:
- api_key = os.getenv("BROWSER_USE_API_KEY")
- if not api_key:
- print("❌ 错误: 未找到 BROWSER_USE_API_KEY")
- return
- await init_browser_session(
- headless=True,
- use_cloud=True,
- )
- print("✅ 云浏览器会话已启动")
- keyword = "瑜伽美女"
- search_url = f"https://www.baidu.com/s?wd={quote(keyword)}"
- print(f"\n📍 导航到百度搜索页: {keyword} ...")
- await navigate_to_url(search_url)
- await wait(3)
- await scroll_page(down=True, pages=1.5)
- await wait(2)
- extract_js = """
- (function(){
- const results = [];
- const items = document.querySelectorAll('#content_left > div[class*="result"]');
- items.forEach((item, index) => {
- if (index >= 10) return;
- const titleEl = item.querySelector('h3 a, .t a');
- const title = titleEl ? titleEl.textContent.trim() : '';
- const link = titleEl ? titleEl.href : '';
- const summaryEl = item.querySelector('.c-abstract, .content-right_8Zs40');
- const summary = summaryEl ? summaryEl.textContent.trim() : '';
- const sourceEl = item.querySelector('.c-color-gray, .source_1Vdff');
- const source = sourceEl ? sourceEl.textContent.trim() : '';
- if (title || link) {
- results.push({
- index: index + 1,
- title,
- link,
- summary: summary.substring(0, 200),
- source,
- });
- }
- });
- return {
- success: true,
- keyword: '瑜伽美女',
- count: results.length,
- results,
- timestamp: new Date().toISOString(),
- };
- })()
- """
- result = await evaluate(extract_js)
- output = result.output
- if isinstance(output, str) and output.startswith("Result: "):
- output = output[8:]
- try:
- data = json.loads(output)
- except Exception:
- data = {
- "success": False,
- "keyword": keyword,
- "error": "JSON 解析失败",
- "raw_output": str(output)[:2000],
- "timestamp": datetime.now().isoformat(),
- }
- output_dir = Path(__file__).parent.parent / "output"
- output_dir.mkdir(parents=True, exist_ok=True)
- output_path = output_dir / "baidu.json"
- with open(output_path, "w", encoding="utf-8") as f:
- json.dump(data, f, ensure_ascii=False, indent=2)
- print(f"✅ 数据已保存到: {output_path}")
- if data.get("results"):
- print("\n📋 前 3 条结果预览:")
- for i, item in enumerate(data["results"][:3], 1):
- print(f" {i}. {item.get('title', 'N/A')[:50]}")
- print(f" {item.get('link', 'N/A')}")
- print("\n✅ 示例 7 完成")
- except Exception as e:
- print(f"❌ 错误: {str(e)}")
- finally:
- await cleanup_browser_session()
- print("🧹 浏览器会话已清理")
- async def main():
- """
- 主函数:运行所有示例
- """
- import argparse
- print("\n" + "="*60)
- print("🌐 Browser-Use 云浏览器模式示例")
- print("="*60)
- # 检查 API Key
- api_key = os.getenv("BROWSER_USE_API_KEY")
- if not api_key:
- print("\n❌ 错误: 未找到 BROWSER_USE_API_KEY")
- print("请在 .env 文件中配置 BROWSER_USE_API_KEY")
- return
- print(f"\n✅ 已加载 API Key: {api_key[:20]}...")
- # 运行示例(可以选择运行哪些示例)
- examples = [
- ("基础导航操作", example_1_basic_navigation),
- ("搜索和内容提取", example_2_search_and_extract),
- ("使用 BrowserProfile", example_3_with_browser_profile),
- ("表单交互", example_4_form_interaction),
- ("多标签页操作", example_5_multi_tab),
- ("小红书搜索并保存结果", example_6_xhs_search_save),
- ("百度搜索并保存结果", example_7_baidu_search_save),
- ]
- # 解析命令行参数
- parser = argparse.ArgumentParser(description="Browser-Use 云浏览器模式示例")
- parser.add_argument(
- "--example",
- type=int,
- choices=range(1, len(examples) + 1),
- help="选择要运行的示例 (1-7),不指定则运行第一个示例"
- )
- parser.add_argument(
- "--all",
- action="store_true",
- help="运行所有示例"
- )
- args = parser.parse_args()
- print("\n可用示例:")
- for i, (name, _) in enumerate(examples, 1):
- print(f" {i}. {name}")
- if args.all:
- # 运行所有示例
- print("\n运行所有示例...")
- for name, func in examples:
- await func()
- print("\n" + "-"*60)
- elif args.example:
- # 运行指定示例
- name, func = examples[args.example - 1]
- print(f"\n运行示例 {args.example}: {name}")
- await func()
- else:
- # 默认运行第一个示例
- name, func = examples[0]
- print(f"\n运行默认示例: {name}")
- print("(使用 --example N 运行其他示例,或 --all 运行所有示例)")
- await func()
- print("\n" + "="*60)
- print("✅ 示例运行完成")
- print("="*60)
- if __name__ == "__main__":
- # 运行主函数
- asyncio.run(main())
|