|
|
@@ -1,802 +0,0 @@
|
|
|
-"""
|
|
|
-云浏览器模式示例
|
|
|
-Cloud Browser Mode Example
|
|
|
-
|
|
|
-本示例展示如何使用 browser-use 的云浏览器模式进行网页自动化操作。
|
|
|
-云浏览器模式的优势:
|
|
|
-1. 无需本地安装 Chrome/Chromium
|
|
|
-2. 可以在无头服务器上运行
|
|
|
-3. 更好的稳定性和性能
|
|
|
-4. 支持分布式部署
|
|
|
-
|
|
|
-使用前提:
|
|
|
-1. 在 .env 文件中配置 BROWSER_USE_API_KEY
|
|
|
-2. 确保网络连接正常
|
|
|
-"""
|
|
|
-
|
|
|
-import sys
|
|
|
-import os
|
|
|
-import asyncio
|
|
|
-import json
|
|
|
-import re
|
|
|
-from datetime import datetime
|
|
|
-from pathlib import Path
|
|
|
-from urllib.parse import quote
|
|
|
-from dotenv import load_dotenv
|
|
|
-
|
|
|
-# 加载环境变量
|
|
|
-load_dotenv()
|
|
|
-
|
|
|
-# 将项目根目录添加到 Python 路径
|
|
|
-project_root = Path(__file__).parent.parent
|
|
|
-sys.path.insert(0, str(project_root))
|
|
|
-
|
|
|
-# 导入 browser-use 核心类
|
|
|
-from browser_use import BrowserSession, BrowserProfile
|
|
|
-from browser_use.tools.service import Tools
|
|
|
-
|
|
|
-# 导入框架的工具函数
|
|
|
-from agent.tools.builtin.baseClass import (
|
|
|
- init_browser_session,
|
|
|
- cleanup_browser_session,
|
|
|
- navigate_to_url,
|
|
|
- search_web,
|
|
|
- get_selector_map,
|
|
|
- click_element,
|
|
|
- input_text,
|
|
|
- screenshot,
|
|
|
- get_page_html,
|
|
|
- evaluate,
|
|
|
- wait,
|
|
|
- scroll_page,
|
|
|
- wait_for_user_action,
|
|
|
-)
|
|
|
-
|
|
|
-
|
|
|
-async def example_1_basic_navigation():
|
|
|
- """
|
|
|
- 示例 1: 基础导航操作
|
|
|
- 演示如何使用云浏览器访问网页
|
|
|
- """
|
|
|
- print("\n" + "="*60)
|
|
|
- print("示例 1: 基础导航操作")
|
|
|
- print("="*60)
|
|
|
-
|
|
|
- try:
|
|
|
- # 初始化云浏览器会话
|
|
|
- # 关键参数:is_local=False 表示使用云浏览器
|
|
|
- api_key = os.getenv("BROWSER_USE_API_KEY")
|
|
|
- if not api_key:
|
|
|
- print("❌ 错误: 未找到 BROWSER_USE_API_KEY,请在 .env 文件中配置")
|
|
|
- return
|
|
|
-
|
|
|
- print(f"✅ 使用云浏览器 API Key: {api_key[:20]}...")
|
|
|
-
|
|
|
- # 初始化浏览器会话(云模式)
|
|
|
- # 注意:API key 会自动从环境变量 BROWSER_USE_API_KEY 读取
|
|
|
- browser, tools = await init_browser_session(
|
|
|
- headless=True, # 云浏览器通常使用无头模式
|
|
|
- use_cloud=True, # 关键:设置为 True 使用云浏览器
|
|
|
- )
|
|
|
-
|
|
|
- print("✅ 云浏览器会话已启动")
|
|
|
-
|
|
|
- # 导航到百度
|
|
|
- print("\n📍 导航到百度...")
|
|
|
- result = await navigate_to_url("https://www.baidu.com")
|
|
|
- print(f" 结果: {result.title}")
|
|
|
-
|
|
|
- # 等待页面加载
|
|
|
- await wait(2)
|
|
|
-
|
|
|
- # 获取页面标题
|
|
|
- print("\n📄 获取页面信息...")
|
|
|
- title_result = await evaluate("document.title")
|
|
|
- print(f" 页面标题: {title_result.output}")
|
|
|
-
|
|
|
- # 截图
|
|
|
- print("\n📸 截图...")
|
|
|
- screenshot_result = await screenshot()
|
|
|
- print(f" 截图结果: {screenshot_result.title}")
|
|
|
-
|
|
|
- print("\n✅ 示例 1 完成")
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- print(f"❌ 错误: {str(e)}")
|
|
|
- finally:
|
|
|
- # 清理浏览器会话
|
|
|
- await cleanup_browser_session()
|
|
|
- print("🧹 浏览器会话已清理")
|
|
|
-
|
|
|
-
|
|
|
-async def example_2_search_and_extract():
|
|
|
- """
|
|
|
- 示例 2: 搜索和内容提取
|
|
|
- 演示如何使用云浏览器进行搜索并提取内容
|
|
|
- """
|
|
|
- print("\n" + "="*60)
|
|
|
- print("示例 2: 搜索和内容提取")
|
|
|
- print("="*60)
|
|
|
-
|
|
|
- try:
|
|
|
- # 初始化云浏览器
|
|
|
- api_key = os.getenv("BROWSER_USE_API_KEY")
|
|
|
- if not api_key:
|
|
|
- print("❌ 错误: 未找到 BROWSER_USE_API_KEY")
|
|
|
- return
|
|
|
-
|
|
|
- browser, tools = await init_browser_session(
|
|
|
- headless=True,
|
|
|
- use_cloud=True,
|
|
|
- )
|
|
|
-
|
|
|
- print("✅ 云浏览器会话已启动")
|
|
|
-
|
|
|
- # 使用搜索引擎搜索
|
|
|
- print("\n🔍 搜索: Python async programming...")
|
|
|
- result = await search_web("Python async programming", engine="google")
|
|
|
- print(f" 搜索结果: {result.title}")
|
|
|
-
|
|
|
- # 等待搜索结果加载
|
|
|
- await wait(3)
|
|
|
-
|
|
|
- # 获取页面 HTML(部分)
|
|
|
- print("\n📄 获取页面 HTML...")
|
|
|
- html_result = await get_page_html()
|
|
|
- print(f" HTML 长度: {len(html_result.metadata.get('html', ''))} 字符")
|
|
|
-
|
|
|
- # 获取可交互元素
|
|
|
- print("\n🎯 获取页面元素...")
|
|
|
- selector_result = await get_selector_map()
|
|
|
- print(f" {selector_result.output[:200]}...")
|
|
|
-
|
|
|
- print("\n✅ 示例 2 完成")
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- print(f"❌ 错误: {str(e)}")
|
|
|
- finally:
|
|
|
- await cleanup_browser_session()
|
|
|
- print("🧹 浏览器会话已清理")
|
|
|
-
|
|
|
-
|
|
|
-async def example_3_with_browser_profile():
|
|
|
- """
|
|
|
- 示例 3: 使用 BrowserProfile 预设配置
|
|
|
- 演示如何使用 BrowserProfile 预设 cookies、localStorage 等
|
|
|
- """
|
|
|
- print("\n" + "="*60)
|
|
|
- print("示例 3: 使用 BrowserProfile 预设配置")
|
|
|
- print("="*60)
|
|
|
-
|
|
|
- try:
|
|
|
- api_key = os.getenv("BROWSER_USE_API_KEY")
|
|
|
- if not api_key:
|
|
|
- print("❌ 错误: 未找到 BROWSER_USE_API_KEY")
|
|
|
- return
|
|
|
-
|
|
|
- # 创建 BrowserProfile 并预设一些配置
|
|
|
- profile = BrowserProfile(
|
|
|
- # 可以预设 cookies
|
|
|
- cookies=[
|
|
|
- {
|
|
|
- "name": "test_cookie",
|
|
|
- "value": "test_value",
|
|
|
- "domain": ".example.com",
|
|
|
- "path": "/",
|
|
|
- }
|
|
|
- ],
|
|
|
- # 可以预设 localStorage
|
|
|
- local_storage={
|
|
|
- "example.com": {
|
|
|
- "key1": "value1",
|
|
|
- "key2": "value2",
|
|
|
- }
|
|
|
- },
|
|
|
- # 可以设置用户代理
|
|
|
- user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36",
|
|
|
- )
|
|
|
-
|
|
|
- print("✅ 创建了 BrowserProfile 配置")
|
|
|
-
|
|
|
- # 使用 profile 初始化浏览器
|
|
|
- browser, tools = await init_browser_session(
|
|
|
- headless=True,
|
|
|
- use_cloud=True,
|
|
|
- browser_profile=profile, # 传入 profile
|
|
|
- )
|
|
|
-
|
|
|
- print("✅ 云浏览器会话已启动(带预设配置)")
|
|
|
-
|
|
|
- # 访问一个网页
|
|
|
- print("\n📍 导航到示例网站...")
|
|
|
- result = await navigate_to_url("https://httpbin.org/headers")
|
|
|
- print(f" 结果: {result.title}")
|
|
|
-
|
|
|
- await wait(2)
|
|
|
-
|
|
|
- # 检查 User-Agent 是否生效
|
|
|
- print("\n🔍 检查 User-Agent...")
|
|
|
- ua_result = await evaluate("navigator.userAgent")
|
|
|
- print(f" User-Agent: {ua_result.output[:100]}...")
|
|
|
-
|
|
|
- print("\n✅ 示例 3 完成")
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- print(f"❌ 错误: {str(e)}")
|
|
|
- finally:
|
|
|
- await cleanup_browser_session()
|
|
|
- print("🧹 浏览器会话已清理")
|
|
|
-
|
|
|
-
|
|
|
-async def example_4_form_interaction():
|
|
|
- """
|
|
|
- 示例 4: 表单交互
|
|
|
- 演示如何在云浏览器中进行表单填写和提交
|
|
|
- """
|
|
|
- print("\n" + "="*60)
|
|
|
- print("示例 4: 表单交互")
|
|
|
- print("="*60)
|
|
|
-
|
|
|
- try:
|
|
|
- api_key = os.getenv("BROWSER_USE_API_KEY")
|
|
|
- if not api_key:
|
|
|
- print("❌ 错误: 未找到 BROWSER_USE_API_KEY")
|
|
|
- return
|
|
|
-
|
|
|
- browser, tools = await init_browser_session(
|
|
|
- headless=True,
|
|
|
- use_cloud=True,
|
|
|
- )
|
|
|
-
|
|
|
- print("✅ 云浏览器会话已启动")
|
|
|
-
|
|
|
- # 访问一个有表单的测试页面
|
|
|
- print("\n📍 导航到表单测试页面...")
|
|
|
- result = await navigate_to_url("https://httpbin.org/forms/post")
|
|
|
- print(f" 结果: {result.title}")
|
|
|
-
|
|
|
- await wait(2)
|
|
|
-
|
|
|
- # 获取页面元素
|
|
|
- print("\n🎯 获取页面元素...")
|
|
|
- selector_result = await get_selector_map()
|
|
|
- print(f" 找到 {selector_result.long_term_memory}")
|
|
|
-
|
|
|
- # 注意:实际使用时需要根据页面结构找到正确的元素索引
|
|
|
- # 这里只是演示流程
|
|
|
-
|
|
|
- print("\n✅ 示例 4 完成")
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- print(f"❌ 错误: {str(e)}")
|
|
|
- finally:
|
|
|
- await cleanup_browser_session()
|
|
|
- print("🧹 浏览器会话已清理")
|
|
|
-
|
|
|
-
|
|
|
-async def example_5_multi_tab():
|
|
|
- """
|
|
|
- 示例 5: 多标签页操作
|
|
|
- 演示如何在云浏览器中管理多个标签页
|
|
|
- """
|
|
|
- print("\n" + "="*60)
|
|
|
- print("示例 5: 多标签页操作")
|
|
|
- print("="*60)
|
|
|
-
|
|
|
- try:
|
|
|
- api_key = os.getenv("BROWSER_USE_API_KEY")
|
|
|
- if not api_key:
|
|
|
- print("❌ 错误: 未找到 BROWSER_USE_API_KEY")
|
|
|
- return
|
|
|
-
|
|
|
- browser, tools = await init_browser_session(
|
|
|
- headless=True,
|
|
|
- use_cloud=True,
|
|
|
- )
|
|
|
-
|
|
|
- print("✅ 云浏览器会话已启动")
|
|
|
-
|
|
|
- # 在第一个标签页打开百度
|
|
|
- print("\n📍 标签页 1: 打开百度...")
|
|
|
- result1 = await navigate_to_url("https://www.baidu.com")
|
|
|
- print(f" 结果: {result1.title}")
|
|
|
-
|
|
|
- await wait(2)
|
|
|
-
|
|
|
- # 在新标签页打开谷歌
|
|
|
- print("\n📍 标签页 2: 打开谷歌(新标签页)...")
|
|
|
- result2 = await navigate_to_url("https://www.google.com", new_tab=True)
|
|
|
- print(f" 结果: {result2.title}")
|
|
|
-
|
|
|
- await wait(2)
|
|
|
-
|
|
|
- # 获取当前页面信息
|
|
|
- print("\n📄 当前页面信息...")
|
|
|
- title_result = await evaluate("document.title")
|
|
|
- print(f" 当前标题: {title_result.output}")
|
|
|
-
|
|
|
- print("\n✅ 示例 5 完成")
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- print(f"❌ 错误: {str(e)}")
|
|
|
- finally:
|
|
|
- await cleanup_browser_session()
|
|
|
- print("🧹 浏览器会话已清理")
|
|
|
-
|
|
|
-
|
|
|
-def load_cookies(cookie_str, domain, url=None):
|
|
|
- cookies = []
|
|
|
- try:
|
|
|
- for cookie_part in cookie_str.split(';'):
|
|
|
- if cookie_part:
|
|
|
- name, value = cookie_part.split('=', 1)
|
|
|
- cookie = {"name": str(name).strip(), "value": str(value).strip(), "domain": domain,
|
|
|
- "path":"/",
|
|
|
- "expires":-1,
|
|
|
- "httpOnly": False,
|
|
|
- "secure": True,
|
|
|
- "sameSite":"None"}
|
|
|
- if url:
|
|
|
- cookie["url"] = url
|
|
|
- cookies.append(cookie)
|
|
|
- except:
|
|
|
- pass
|
|
|
- return cookies
|
|
|
-
|
|
|
-async def example_6_xhs_search_save():
|
|
|
- """
|
|
|
- 示例 6: 小红书搜索并保存结果(带登录)
|
|
|
- 演示如何处理需要登录的网站
|
|
|
- """
|
|
|
- print("\n" + "="*60)
|
|
|
- print("示例 6: 小红书搜索并保存结果(带登录)")
|
|
|
- print("="*60)
|
|
|
-
|
|
|
- try:
|
|
|
- api_key = os.getenv("BROWSER_USE_API_KEY")
|
|
|
- if not api_key:
|
|
|
- print("❌ 错误: 未找到 BROWSER_USE_API_KEY")
|
|
|
- return
|
|
|
-
|
|
|
- # 创建 BrowserProfile
|
|
|
-
|
|
|
- cookiesStr = "gid=yjJiiqSqKKf8yjJiiqSJiWMKyJvfq2vIJxYDh4EfAyCW9Sq89uUhxI888y4JW8y8WJS448Kj; a1=19a5821e25frfgqcz1g48ktmjilzla6dvt8saird230000337474; webId=bf5a89012d3e96b8e8317a9158d2237b; abRequestId=bf5a89012d3e96b8e8317a9158d2237b; x-user-id-pgy.xiaohongshu.com=64cb5fa2000000002b00a903; x-user-id-ad.xiaohongshu.com=67078bac000000001d022a25; x-user-id-mcc.xiaohongshu.com=67078bac000000001d022a25; web_session=040069b5bf1ceafef95542ee0a3b4b114d9a59; x-user-id-pro.xiaohongshu.com=67078bac000000001d022a25; x-user-id-creator.xiaohongshu.com=64cb5fa2000000002b00a903; webBuild=5.8.0; unread={%22ub%22:%226972cc62000000001a032ef0%22%2C%22ue%22:%226978c695000000001a030baf%22%2C%22uc%22:25}; acw_tc=0a0d0d6817697823078311273e2749a170e3d6e7c28bc3c6b3df1b05366b21; xsecappid=ugc; websectiga=f47eda31ec99545da40c2f731f0630efd2b0959e1dd10d5fedac3dce0bd1e04d; sec_poison_id=8f37e824-4cf9-4c1a-8a6b-1297a36d51ba; customer-sso-sid=68c517601157138359418885nha1gpvvujwqbhia; customerClientId=609975161834570; access-token-creator.xiaohongshu.com=customer.creator.AT-68c517601157138359418887mosxcziw5qwkllrs; galaxy_creator_session_id=NIUNVxmv6LPmZ31jZ2DoKYgyUutPOItjJ24t; galaxy.creator.beaker.session.id=1769782309631057230248; loadts=1769782310288"
|
|
|
-
|
|
|
- cookie_url = "https://www.xiaohongshu.com"
|
|
|
- cookies = load_cookies(cookiesStr, ".xiaohongshu.com", cookie_url)
|
|
|
-
|
|
|
- profile = BrowserProfile(
|
|
|
- user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36",
|
|
|
- )
|
|
|
-
|
|
|
- # 初始化云浏览器(非无头模式,方便用户看到登录界面)
|
|
|
- browser, tools = await init_browser_session(
|
|
|
- headless=False, # 设置为 False,方便用户看到浏览器界面
|
|
|
- use_cloud=True,
|
|
|
- browser_profile=profile,
|
|
|
- )
|
|
|
-
|
|
|
- print("✅ 云浏览器会话已启动")
|
|
|
- print("📝 提示: 云浏览器启动时会输出 Live URL,你可以在浏览器中打开查看")
|
|
|
-
|
|
|
- # 步骤 1: 先访问小红书首页,检查是否需要登录
|
|
|
- print("\n📍 步骤 1: 访问小红书首页...")
|
|
|
- await navigate_to_url("https://www.xiaohongshu.com")
|
|
|
- await wait(3)
|
|
|
- await browser._cdp_set_cookies(cookies)
|
|
|
- await wait(1)
|
|
|
- await navigate_to_url("https://www.xiaohongshu.com")
|
|
|
- await wait(3)
|
|
|
-
|
|
|
- # 检查是否需要登录
|
|
|
- print("\n🔍 检查登录状态...")
|
|
|
- check_login_js = """
|
|
|
- (function() {
|
|
|
- // 检查是否有登录按钮或登录相关元素
|
|
|
- const loginBtn = document.querySelector('[class*="login"]') ||
|
|
|
- document.querySelector('[href*="login"]') ||
|
|
|
- Array.from(document.querySelectorAll('button, a')).find(el => (el.textContent || '').includes('登录'));
|
|
|
-
|
|
|
- // 检查是否有用户信息(已登录)
|
|
|
- const userInfo = document.querySelector('[class*="user"]') ||
|
|
|
- document.querySelector('[class*="avatar"]');
|
|
|
-
|
|
|
- return {
|
|
|
- needLogin: !!loginBtn && !userInfo,
|
|
|
- hasLoginBtn: !!loginBtn,
|
|
|
- hasUserInfo: !!userInfo
|
|
|
- };
|
|
|
- })()
|
|
|
- """
|
|
|
-
|
|
|
- login_status = await evaluate(check_login_js)
|
|
|
- print(f" 登录状态检查: {login_status.output}")
|
|
|
- status_output = login_status.output
|
|
|
- if isinstance(status_output, str) and status_output.startswith("Result: "):
|
|
|
- status_output = status_output[8:]
|
|
|
- login_info = None
|
|
|
- if isinstance(status_output, str):
|
|
|
- try:
|
|
|
- login_info = json.loads(status_output)
|
|
|
- except Exception:
|
|
|
- login_info = None
|
|
|
- elif isinstance(status_output, dict):
|
|
|
- login_info = status_output
|
|
|
-
|
|
|
- if login_info and login_info.get("needLogin"):
|
|
|
- print("\n👤 步骤 2: 登录处理...")
|
|
|
- print(" 如果小红书需要登录,请在云浏览器中完成以下操作:")
|
|
|
- print(" 1. 打开上面输出的 Live URL(在日志中查找 '🔗 Live URL')")
|
|
|
- print(" 2. 在 Live URL 页面中完成登录(扫码或账号密码)")
|
|
|
- print(" 3. 登录成功后,回到这里按 Enter 继续")
|
|
|
- await wait_for_user_action(
|
|
|
- message="请在云浏览器中完成小红书登录,完成后按 Enter 继续",
|
|
|
- timeout=300
|
|
|
- )
|
|
|
- print("\n✅ 用户已确认登录完成,继续执行...")
|
|
|
- else:
|
|
|
- print("\n✅ 已检测为登录状态,跳过手动登录")
|
|
|
-
|
|
|
- # 步骤 3: 执行搜索
|
|
|
- keyword = "瑜伽美女"
|
|
|
- search_url = f"https://www.xiaohongshu.com/search_result?keyword={quote(keyword)}&type=51"
|
|
|
- print(f"\n📍 步骤 3: 导航到搜索页: {keyword} ...")
|
|
|
- await navigate_to_url(search_url)
|
|
|
- await wait(6)
|
|
|
-
|
|
|
- # 滚动页面加载更多内容
|
|
|
- print("\n📜 滚动页面加载更多内容...")
|
|
|
- for i in range(3):
|
|
|
- print(f" 滚动 {i+1}/3...")
|
|
|
- await scroll_page(down=True, pages=2.0)
|
|
|
- await wait(2)
|
|
|
-
|
|
|
- # 步骤 4: 提取数据
|
|
|
- print("\n📊 步骤 4: 提取搜索结果...")
|
|
|
- extract_js = """
|
|
|
- (function(){
|
|
|
- const maxCount = 20;
|
|
|
- const seen = new Set();
|
|
|
- const results = [];
|
|
|
-
|
|
|
- function pushItem(item){
|
|
|
- if (!item || !item.link || seen.has(item.link)) return;
|
|
|
- seen.add(item.link);
|
|
|
- results.push(item);
|
|
|
- }
|
|
|
-
|
|
|
- // 方法 1: 从 DOM 中提取
|
|
|
- const anchors = document.querySelectorAll('a[href*="/explore/"]');
|
|
|
- anchors.forEach(a => {
|
|
|
- if (results.length >= maxCount) return;
|
|
|
- const link = a.href || '';
|
|
|
- const img = a.querySelector('img');
|
|
|
- const title = ((img && img.alt) || a.textContent || '').trim();
|
|
|
- const cover = (img && img.src) || '';
|
|
|
- if (link && title) {
|
|
|
- pushItem({ title, link, cover });
|
|
|
- }
|
|
|
- });
|
|
|
-
|
|
|
- // 方法 2: 从 JSON 数据中提取
|
|
|
- const scriptNodes = document.querySelectorAll('script[type="application/json"], script#__NEXT_DATA__, script#__NUXT__');
|
|
|
- const walk = (node) => {
|
|
|
- if (!node || results.length >= maxCount) return;
|
|
|
- if (Array.isArray(node)) {
|
|
|
- for (const item of node) {
|
|
|
- walk(item);
|
|
|
- if (results.length >= maxCount) return;
|
|
|
- }
|
|
|
- return;
|
|
|
- }
|
|
|
- if (typeof node === 'object') {
|
|
|
- const title = (node.title || node.desc || node.name || node.noteTitle || '').toString().trim();
|
|
|
- const id = node.noteId || node.note_id || node.id || node.noteID;
|
|
|
- const cover = (node.cover && (node.cover.url || node.cover.urlDefault)) || node.coverUrl || node.image || '';
|
|
|
- let link = '';
|
|
|
- if (id) {
|
|
|
- link = `https://www.xiaohongshu.com/explore/${id}`;
|
|
|
- }
|
|
|
- if (title && link) {
|
|
|
- pushItem({ title, link, cover });
|
|
|
- }
|
|
|
- for (const key in node) {
|
|
|
- if (typeof node[key] === 'object') walk(node[key]);
|
|
|
- }
|
|
|
- }
|
|
|
- };
|
|
|
-
|
|
|
- scriptNodes.forEach(node => {
|
|
|
- if (results.length >= maxCount) return;
|
|
|
- const text = node.textContent || '';
|
|
|
- if (!text) return;
|
|
|
- try {
|
|
|
- const data = JSON.parse(text);
|
|
|
- walk(data);
|
|
|
- } catch (e) {}
|
|
|
- });
|
|
|
-
|
|
|
- return {
|
|
|
- success: true,
|
|
|
- keyword: '瑜伽美女',
|
|
|
- count: results.length,
|
|
|
- results: results,
|
|
|
- timestamp: new Date().toISOString(),
|
|
|
- };
|
|
|
- })()
|
|
|
- """
|
|
|
-
|
|
|
- async def run_extract():
|
|
|
- result = await evaluate(extract_js)
|
|
|
- output = result.output
|
|
|
- if isinstance(output, str) and output.startswith("Result: "):
|
|
|
- output = output[8:]
|
|
|
-
|
|
|
- try:
|
|
|
- data = json.loads(output)
|
|
|
- except Exception:
|
|
|
- data = {
|
|
|
- "success": False,
|
|
|
- "keyword": keyword,
|
|
|
- "error": "JSON 解析失败",
|
|
|
- "raw_output": str(output)[:2000],
|
|
|
- "timestamp": datetime.now().isoformat(),
|
|
|
- }
|
|
|
-
|
|
|
- if isinstance(data, dict) and data.get("count", 0) == 0:
|
|
|
- print(" JS 提取结果为空,尝试从 HTML 中提取...")
|
|
|
- html_result = await get_page_html()
|
|
|
- html = html_result.metadata.get("html", "")
|
|
|
- if html:
|
|
|
- def decode_text(value: str) -> str:
|
|
|
- try:
|
|
|
- return bytes(value, "utf-8").decode("unicode_escape")
|
|
|
- except Exception:
|
|
|
- return value
|
|
|
-
|
|
|
- results = []
|
|
|
- seen = set()
|
|
|
- pattern = re.compile(r'"noteId":"(.*?)".*?"title":"(.*?)"', re.S)
|
|
|
- for match in pattern.finditer(html):
|
|
|
- note_id = match.group(1)
|
|
|
- title = decode_text(match.group(2)).strip()
|
|
|
- link = f"https://www.xiaohongshu.com/explore/{note_id}"
|
|
|
- if note_id and link not in seen and title:
|
|
|
- seen.add(link)
|
|
|
- results.append({"title": title, "link": link})
|
|
|
- if len(results) >= 20:
|
|
|
- break
|
|
|
-
|
|
|
- if results:
|
|
|
- data = {
|
|
|
- "success": True,
|
|
|
- "keyword": keyword,
|
|
|
- "count": len(results),
|
|
|
- "results": results,
|
|
|
- "timestamp": datetime.now().isoformat(),
|
|
|
- "source": "html_fallback",
|
|
|
- }
|
|
|
- else:
|
|
|
- blocked_markers = ["登录", "验证", "验证码", "请先登录", "异常访问"]
|
|
|
- if any(marker in html for marker in blocked_markers):
|
|
|
- data = {
|
|
|
- "success": False,
|
|
|
- "keyword": keyword,
|
|
|
- "count": 0,
|
|
|
- "results": [],
|
|
|
- "error": "可能被登录或验证码拦截",
|
|
|
- "timestamp": datetime.now().isoformat(),
|
|
|
- }
|
|
|
- return data
|
|
|
-
|
|
|
- data = await run_extract()
|
|
|
- if isinstance(data, dict) and data.get("count", 0) == 0 and data.get("error") == "可能被登录或验证码拦截":
|
|
|
- print("\n👤 检测到拦截,请在云浏览器中完成登录或验证码验证")
|
|
|
- await wait_for_user_action(
|
|
|
- message="完成后按 Enter 继续,将重新提取搜索结果",
|
|
|
- timeout=300
|
|
|
- )
|
|
|
- data = await run_extract()
|
|
|
-
|
|
|
- # 步骤 5: 保存结果
|
|
|
- print(f"\n💾 步骤 5: 保存结果...")
|
|
|
- print(f" 提取到 {data.get('count', 0)} 条数据")
|
|
|
-
|
|
|
- output_dir = Path(__file__).parent.parent / "output"
|
|
|
- output_dir.mkdir(parents=True, exist_ok=True)
|
|
|
- output_path = output_dir / "xhs.json"
|
|
|
- with open(output_path, "w", encoding="utf-8") as f:
|
|
|
- json.dump(data, f, ensure_ascii=False, indent=2)
|
|
|
-
|
|
|
- print(f"✅ 数据已保存到: {output_path}")
|
|
|
-
|
|
|
- # 显示部分结果
|
|
|
- if data.get("results"):
|
|
|
- print(f"\n📋 前 3 条结果预览:")
|
|
|
- for i, item in enumerate(data["results"][:3], 1):
|
|
|
- print(f" {i}. {item.get('title', 'N/A')[:50]}")
|
|
|
- print(f" {item.get('link', 'N/A')}")
|
|
|
-
|
|
|
- print("\n✅ 示例 6 完成")
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- print(f"❌ 错误: {str(e)}")
|
|
|
- import traceback
|
|
|
- traceback.print_exc()
|
|
|
- finally:
|
|
|
- await cleanup_browser_session()
|
|
|
- print("🧹 浏览器会话已清理")
|
|
|
-
|
|
|
-
|
|
|
-async def example_7_baidu_search_save():
|
|
|
- print("\n" + "="*60)
|
|
|
- print("示例 7: 百度搜索并保存结果")
|
|
|
- print("="*60)
|
|
|
-
|
|
|
- try:
|
|
|
- api_key = os.getenv("BROWSER_USE_API_KEY")
|
|
|
- if not api_key:
|
|
|
- print("❌ 错误: 未找到 BROWSER_USE_API_KEY")
|
|
|
- return
|
|
|
-
|
|
|
- await init_browser_session(
|
|
|
- headless=True,
|
|
|
- use_cloud=True,
|
|
|
- )
|
|
|
-
|
|
|
- print("✅ 云浏览器会话已启动")
|
|
|
-
|
|
|
- keyword = "瑜伽美女"
|
|
|
- search_url = f"https://www.baidu.com/s?wd={quote(keyword)}"
|
|
|
- print(f"\n📍 导航到百度搜索页: {keyword} ...")
|
|
|
- await navigate_to_url(search_url)
|
|
|
- await wait(3)
|
|
|
- await scroll_page(down=True, pages=1.5)
|
|
|
- await wait(2)
|
|
|
-
|
|
|
- extract_js = """
|
|
|
- (function(){
|
|
|
- const results = [];
|
|
|
- const items = document.querySelectorAll('#content_left > div[class*="result"]');
|
|
|
- items.forEach((item, index) => {
|
|
|
- if (index >= 10) return;
|
|
|
- const titleEl = item.querySelector('h3 a, .t a');
|
|
|
- const title = titleEl ? titleEl.textContent.trim() : '';
|
|
|
- const link = titleEl ? titleEl.href : '';
|
|
|
- const summaryEl = item.querySelector('.c-abstract, .content-right_8Zs40');
|
|
|
- const summary = summaryEl ? summaryEl.textContent.trim() : '';
|
|
|
- const sourceEl = item.querySelector('.c-color-gray, .source_1Vdff');
|
|
|
- const source = sourceEl ? sourceEl.textContent.trim() : '';
|
|
|
- if (title || link) {
|
|
|
- results.push({
|
|
|
- index: index + 1,
|
|
|
- title,
|
|
|
- link,
|
|
|
- summary: summary.substring(0, 200),
|
|
|
- source,
|
|
|
- });
|
|
|
- }
|
|
|
- });
|
|
|
- return {
|
|
|
- success: true,
|
|
|
- keyword: '瑜伽美女',
|
|
|
- count: results.length,
|
|
|
- results,
|
|
|
- timestamp: new Date().toISOString(),
|
|
|
- };
|
|
|
- })()
|
|
|
- """
|
|
|
-
|
|
|
- result = await evaluate(extract_js)
|
|
|
- output = result.output
|
|
|
- if isinstance(output, str) and output.startswith("Result: "):
|
|
|
- output = output[8:]
|
|
|
-
|
|
|
- try:
|
|
|
- data = json.loads(output)
|
|
|
- except Exception:
|
|
|
- data = {
|
|
|
- "success": False,
|
|
|
- "keyword": keyword,
|
|
|
- "error": "JSON 解析失败",
|
|
|
- "raw_output": str(output)[:2000],
|
|
|
- "timestamp": datetime.now().isoformat(),
|
|
|
- }
|
|
|
-
|
|
|
- output_dir = Path(__file__).parent.parent / "output"
|
|
|
- output_dir.mkdir(parents=True, exist_ok=True)
|
|
|
- output_path = output_dir / "baidu.json"
|
|
|
- with open(output_path, "w", encoding="utf-8") as f:
|
|
|
- json.dump(data, f, ensure_ascii=False, indent=2)
|
|
|
-
|
|
|
- print(f"✅ 数据已保存到: {output_path}")
|
|
|
- if data.get("results"):
|
|
|
- print("\n📋 前 3 条结果预览:")
|
|
|
- for i, item in enumerate(data["results"][:3], 1):
|
|
|
- print(f" {i}. {item.get('title', 'N/A')[:50]}")
|
|
|
- print(f" {item.get('link', 'N/A')}")
|
|
|
-
|
|
|
- print("\n✅ 示例 7 完成")
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- print(f"❌ 错误: {str(e)}")
|
|
|
- finally:
|
|
|
- await cleanup_browser_session()
|
|
|
- print("🧹 浏览器会话已清理")
|
|
|
-
|
|
|
-
|
|
|
-async def main():
|
|
|
- """
|
|
|
- 主函数:运行所有示例
|
|
|
- """
|
|
|
- import argparse
|
|
|
-
|
|
|
- print("\n" + "="*60)
|
|
|
- print("🌐 Browser-Use 云浏览器模式示例")
|
|
|
- print("="*60)
|
|
|
-
|
|
|
- # 检查 API Key
|
|
|
- api_key = os.getenv("BROWSER_USE_API_KEY")
|
|
|
- if not api_key:
|
|
|
- print("\n❌ 错误: 未找到 BROWSER_USE_API_KEY")
|
|
|
- print("请在 .env 文件中配置 BROWSER_USE_API_KEY")
|
|
|
- return
|
|
|
-
|
|
|
- print(f"\n✅ 已加载 API Key: {api_key[:20]}...")
|
|
|
-
|
|
|
- # 运行示例(可以选择运行哪些示例)
|
|
|
- examples = [
|
|
|
- ("基础导航操作", example_1_basic_navigation),
|
|
|
- ("搜索和内容提取", example_2_search_and_extract),
|
|
|
- ("使用 BrowserProfile", example_3_with_browser_profile),
|
|
|
- ("表单交互", example_4_form_interaction),
|
|
|
- ("多标签页操作", example_5_multi_tab),
|
|
|
- ("小红书搜索并保存结果", example_6_xhs_search_save),
|
|
|
- ("百度搜索并保存结果", example_7_baidu_search_save),
|
|
|
- ]
|
|
|
-
|
|
|
- # 解析命令行参数
|
|
|
- parser = argparse.ArgumentParser(description="Browser-Use 云浏览器模式示例")
|
|
|
- parser.add_argument(
|
|
|
- "--example",
|
|
|
- type=int,
|
|
|
- choices=range(1, len(examples) + 1),
|
|
|
- help="选择要运行的示例 (1-7),不指定则运行第一个示例"
|
|
|
- )
|
|
|
- parser.add_argument(
|
|
|
- "--all",
|
|
|
- action="store_true",
|
|
|
- help="运行所有示例"
|
|
|
- )
|
|
|
- args = parser.parse_args()
|
|
|
-
|
|
|
- print("\n可用示例:")
|
|
|
- for i, (name, _) in enumerate(examples, 1):
|
|
|
- print(f" {i}. {name}")
|
|
|
-
|
|
|
- if args.all:
|
|
|
- # 运行所有示例
|
|
|
- print("\n运行所有示例...")
|
|
|
- for name, func in examples:
|
|
|
- await func()
|
|
|
- print("\n" + "-"*60)
|
|
|
- elif args.example:
|
|
|
- # 运行指定示例
|
|
|
- name, func = examples[args.example - 1]
|
|
|
- print(f"\n运行示例 {args.example}: {name}")
|
|
|
- await func()
|
|
|
- else:
|
|
|
- # 默认运行第一个示例
|
|
|
- name, func = examples[0]
|
|
|
- print(f"\n运行默认示例: {name}")
|
|
|
- print("(使用 --example N 运行其他示例,或 --all 运行所有示例)")
|
|
|
- await func()
|
|
|
-
|
|
|
- print("\n" + "="*60)
|
|
|
- print("✅ 示例运行完成")
|
|
|
- print("="*60)
|
|
|
-
|
|
|
-
|
|
|
-if __name__ == "__main__":
|
|
|
- # 运行主函数
|
|
|
- asyncio.run(main())
|