| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247 |
- """
- 小红书容器测试脚本
- 演示容器浏览器的使用:
- 1. 初始化容器浏览器(自动创建容器并连接)
- 2. 搜索健身
- 3. 随机进入一个详情页
- 4. 获取详情页的HTML和iframe并保存到output
- """
- import sys
- import os
- import asyncio
- import json
- import random
- from datetime import datetime
- from pathlib import Path
- from urllib.parse import quote
- from dotenv import load_dotenv
- load_dotenv()
- project_root = Path(__file__).parent.parent
- sys.path.insert(0, str(project_root))
- from agent.tools.builtin.browser.baseClass import (
- init_browser_session,
- cleanup_browser_session,
- browser_navigate_to_url,
- browser_scroll_page,
- browser_evaluate,
- browser_wait,
- browser_get_page_html,
- browser_switch_tab,
- )
- async def test_xhs_container():
- """
- 测试小红书容器功能
- """
- print("\n" + "="*60)
- print("小红书容器测试")
- print("="*60)
- keyword = "健身"
- search_url = f"https://www.xiaohongshu.com/search_result?keyword={quote(keyword)}&type=51"
- # 创建输出目录
- output_dir = project_root / "output"
- output_dir.mkdir(parents=True, exist_ok=True)
- try:
- # 初始化容器浏览器(一步完成)
- print(f"\n🚀 初始化容器浏览器...")
- browser, tools = await init_browser_session(
- browser_type="container",
- url="https://www.xiaohongshu.com", # 容器启动时访问的URL
- headless=True
- )
- print("✅ 容器浏览器初始化成功")
- # 等待页面完全加载
- await browser_wait(3)
- # 步骤1: 搜索健身
- print(f"\n🔍 搜索关键词: {keyword}")
- try:
- nav_result = await browser_navigate_to_url(search_url)
- if nav_result.error:
- print(f"⚠️ 导航警告: {nav_result.error[:100]}")
- except Exception as e:
- print(f"⚠️ 导航异常: {str(e)[:100]}")
- await browser_wait(10)
- # 滚动页面加载更多内容
- print("\n📜 滚动页面...")
- for i in range(2):
- await browser_scroll_page(down=True, pages=2.0)
- await browser_wait(2)
- # 提取搜索结果
- print("\n🔍 提取搜索结果...")
- # 先保存HTML看看页面内容
- html_result = await browser_get_page_html()
- if not html_result.error:
- html = html_result.metadata.get("html", "")
- debug_html_path = output_dir / "search_page_debug.html"
- debug_html_path.write_text(html or "", encoding="utf-8")
- print(f" 💾 已保存搜索页HTML用于调试: {debug_html_path}")
- extract_js = """
- (function(){
- const results = [];
- const seen = new Set();
- const anchors = document.querySelectorAll('a[href*="/explore/"]');
- anchors.forEach(a => {
- const link = a.href || '';
- if (link && !seen.has(link)) {
- seen.add(link);
- const img = a.querySelector('img');
- const title = ((img && img.alt) || a.textContent || '').trim();
- results.push({ title, link });
- }
- });
- return results;
- })()
- """
- eval_result = await browser_evaluate(extract_js)
- if eval_result.error:
- raise RuntimeError(f"提取搜索结果失败: {eval_result.error}")
- output = eval_result.output
- if isinstance(output, str) and output.startswith("Result: "):
- output = output[8:]
- posts = json.loads(output) if isinstance(output, str) else output
- if not posts or len(posts) == 0:
- raise RuntimeError("未找到任何帖子")
- print(f"✅ 找到 {len(posts)} 个帖子")
- # 步骤2: 随机进入一个详情页
- selected_post = random.choice(posts)
- post_url = selected_post["link"]
- print(f"\n🎲 随机选择帖子: {selected_post['title'][:50]}...")
- print(f"🔗 访问帖子详情页: {post_url}")
- try:
- nav_result = await browser_navigate_to_url(post_url)
- if nav_result.error:
- print(f"⚠️ 导航警告: {nav_result.error[:100]}")
- except Exception as e:
- print(f"⚠️ 导航异常: {str(e)[:100]}")
- await browser_wait(8)
- # 滚动详情页
- print("\n📜 滚动详情页...")
- for i in range(3):
- await browser_scroll_page(down=True, pages=1.5)
- await browser_wait(2)
- # 步骤3: 保存详情页HTML
- print("\n💾 保存详情页 HTML...")
- html_result = await browser_get_page_html()
- if html_result.error:
- print(f"⚠️ 获取HTML失败: {html_result.error}")
- else:
- html = html_result.metadata.get("html", "")
- html_path = output_dir / "container_post_detail.html"
- html_path.write_text(html or "", encoding="utf-8")
- print(f"✅ 已保存详情页 HTML: {html_path}")
- # 查找并保存iframe
- print("\n🔍 查找页面中的iframe...")
- iframe_js = """
- (function(){
- const iframes = document.querySelectorAll('iframe');
- const results = [];
- iframes.forEach((iframe, index) => {
- results.push({
- index: index,
- src: iframe.src || '',
- id: iframe.id || '',
- name: iframe.name || ''
- });
- });
- return results;
- })()
- """
- iframe_result = await browser_evaluate(iframe_js)
- if not iframe_result.error:
- iframe_output = iframe_result.output
- if isinstance(iframe_output, str) and iframe_output.startswith("Result: "):
- iframe_output = iframe_output[8:]
- try:
- iframes = json.loads(iframe_output) if isinstance(iframe_output, str) else iframe_output
- if iframes and len(iframes) > 0:
- print(f"✅ 找到 {len(iframes)} 个iframe")
- for idx, iframe_info in enumerate(iframes):
- print(f"\n📄 处理iframe {idx + 1}/{len(iframes)}")
- print(f" src: {iframe_info.get('src', 'N/A')[:80]}")
- # 获取iframe HTML
- get_iframe_html_js = f"""
- (function(){{
- const iframe = document.querySelectorAll('iframe')[{idx}];
- if (!iframe) return null;
- try {{
- const iframeDoc = iframe.contentDocument || iframe.contentWindow.document;
- return iframeDoc.documentElement.outerHTML;
- }} catch(e) {{
- return 'Error: ' + e.message;
- }}
- }})()
- """
- iframe_html_result = await browser_evaluate(get_iframe_html_js)
- if not iframe_html_result.error:
- iframe_html = iframe_html_result.output
- if isinstance(iframe_html, str) and iframe_html.startswith("Result: "):
- iframe_html = iframe_html[8:]
- if iframe_html and not iframe_html.startswith("Error:"):
- iframe_path = output_dir / f"container_iframe_{idx}.html"
- iframe_path.write_text(iframe_html, encoding="utf-8")
- print(f" ✅ 已保存iframe HTML: {iframe_path}")
- else:
- print(f" ⚠️ iframe内容为空或无法访问")
- else:
- print("⚠️ 页面中没有找到iframe")
- except Exception as e:
- print(f"⚠️ 处理iframe失败: {str(e)}")
- print("\n✅ 测试完成!")
- except Exception as e:
- print(f"\n❌ 发生错误: {str(e)}")
- import traceback
- traceback.print_exc()
- finally:
- # 清理浏览器会话
- try:
- await cleanup_browser_session()
- except Exception:
- pass
- async def main():
- await test_xhs_container()
- if __name__ == "__main__":
- asyncio.run(main())
|