| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291 |
- """
- 小红书容器测试脚本
- 根据 test.md 要求实现:
- 1. 创建容器并导航到小红书
- 2. 初始化浏览器会话
- 3. 切换到指定窗口
- 4. 搜索健身
- 5. 随机进入一个详情页
- 6. 获取详情页的HTML和iframe并保存到output
- """
- import sys
- import os
- import asyncio
- import json
- import random
- from datetime import datetime
- from pathlib import Path
- from urllib.parse import quote
- from dotenv import load_dotenv
- load_dotenv()
- project_root = Path(__file__).parent.parent
- sys.path.insert(0, str(project_root))
- from agent.tools.builtin.browser.baseClass import (
- create_container,
- init_browser_session,
- cleanup_browser_session,
- navigate_to_url,
- scroll_page,
- evaluate,
- wait,
- get_page_html,
- )
- async def test_xhs_container():
- """
- 测试小红书容器功能
- """
- print("\n" + "="*60)
- print("小红书容器测试")
- print("="*60)
- keyword = "健身"
- search_url = f"https://www.xiaohongshu.com/search_result?keyword={quote(keyword)}&type=51"
- # 创建输出目录
- output_dir = project_root / "output"
- output_dir.mkdir(parents=True, exist_ok=True)
- try:
- # 步骤1: 创建容器并导航到小红书
- container_info = await create_container(url="https://www.xiaohongshu.com")
- if not container_info["success"]:
- raise RuntimeError(f"容器创建失败: {container_info['error']}")
- cdp_url = container_info["cdp"]
- container_id = container_info["container_id"]
- connection_id = container_info.get("connection_id")
- print(f"\n📋 容器信息:")
- print(f" CDP URL: {cdp_url}")
- print(f" Container ID: {container_id}")
- print(f" Connection ID: {connection_id}")
- # 等待容器完全启动
- print(f"\n⏳ 等待容器启动...")
- await asyncio.sleep(3)
- # 步骤2: 初始化浏览器会话
- print(f"\n🌐 初始化浏览器会话...")
- browser, tools = await init_browser_session(
- headless=True,
- cdp_url=cdp_url
- )
- if browser is None or tools is None:
- raise RuntimeError("浏览器初始化失败")
- print("✅ 浏览器会话初始化成功")
- # 步骤3: 如果有 connection_id,切换到对应窗口
- if connection_id:
- print(f"\n🔄 切换到窗口: {connection_id}")
- await wait(2)
- # 获取当前浏览器状态
- try:
- state = await browser.get_browser_state_summary(cached=False)
- print(f" 当前标签页数: {len(state.tabs)}")
- for tab in state.tabs:
- print(f" - Tab ID: {tab.target_id[-4:]}, URL: {tab.url}")
- # 尝试切换到 connection_id 对应的标签页
- # connection_id 可能是完整ID,取最后4位
- from agent.tools.builtin.browser.baseClass import switch_tab
- await switch_tab(connection_id[-4:] if len(connection_id) > 4 else connection_id)
- await wait(2)
- print(f"✅ 已切换到窗口")
- except Exception as e:
- print(f"⚠️ 切换窗口警告: {str(e)[:100]}")
- print(f" 将继续使用当前窗口")
- await wait(3)
- # 步骤4: 搜索健身
- print(f"\n🔍 搜索关键词: {keyword}")
- try:
- nav_result = await navigate_to_url(search_url)
- if nav_result.error:
- print(f"⚠️ 导航警告: {nav_result.error[:100]}")
- except Exception as e:
- print(f"⚠️ 导航异常: {str(e)[:100]}")
- await wait(10)
- # 滚动页面加载更多内容
- print("\n📜 滚动页面...")
- for i in range(2):
- await scroll_page(down=True, pages=2.0)
- await wait(2)
- # 提取搜索结果
- print("\n🔍 提取搜索结果...")
- # 先保存HTML看看页面内容
- html_result = await get_page_html()
- if not html_result.error:
- html = html_result.metadata.get("html", "")
- debug_html_path = output_dir / "search_page_debug.html"
- debug_html_path.write_text(html or "", encoding="utf-8")
- print(f" 💾 已保存搜索页HTML用于调试: {debug_html_path}")
- extract_js = """
- (function(){
- const results = [];
- const seen = new Set();
- const anchors = document.querySelectorAll('a[href*="/explore/"]');
- anchors.forEach(a => {
- const link = a.href || '';
- if (link && !seen.has(link)) {
- seen.add(link);
- const img = a.querySelector('img');
- const title = ((img && img.alt) || a.textContent || '').trim();
- results.push({ title, link });
- }
- });
- return results;
- })()
- """
- eval_result = await evaluate(extract_js)
- if eval_result.error:
- raise RuntimeError(f"提取搜索结果失败: {eval_result.error}")
- output = eval_result.output
- if isinstance(output, str) and output.startswith("Result: "):
- output = output[8:]
- posts = json.loads(output) if isinstance(output, str) else output
- if not posts or len(posts) == 0:
- raise RuntimeError("未找到任何帖子")
- print(f"✅ 找到 {len(posts)} 个帖子")
- # 步骤5: 随机进入一个详情页
- selected_post = random.choice(posts)
- post_url = selected_post["link"]
- print(f"\n🎲 随机选择帖子: {selected_post['title'][:50]}...")
- print(f"🔗 访问帖子详情页: {post_url}")
- try:
- nav_result = await navigate_to_url(post_url)
- if nav_result.error:
- print(f"⚠️ 导航警告: {nav_result.error[:100]}")
- except Exception as e:
- print(f"⚠️ 导航异常: {str(e)[:100]}")
- await wait(8)
- # 滚动详情页
- print("\n📜 滚动详情页...")
- for i in range(3):
- await scroll_page(down=True, pages=1.5)
- await wait(2)
- # 步骤6: 保存详情页HTML
- print("\n💾 保存详情页 HTML...")
- html_result = await get_page_html()
- if html_result.error:
- print(f"⚠️ 获取HTML失败: {html_result.error}")
- else:
- html = html_result.metadata.get("html", "")
- html_path = output_dir / "container_post_detail.html"
- html_path.write_text(html or "", encoding="utf-8")
- print(f"✅ 已保存详情页 HTML: {html_path}")
- # 查找并保存iframe
- print("\n🔍 查找页面中的iframe...")
- iframe_js = """
- (function(){
- const iframes = document.querySelectorAll('iframe');
- const results = [];
- iframes.forEach((iframe, index) => {
- results.push({
- index: index,
- src: iframe.src || '',
- id: iframe.id || '',
- name: iframe.name || ''
- });
- });
- return results;
- })()
- """
- iframe_result = await evaluate(iframe_js)
- if not iframe_result.error:
- iframe_output = iframe_result.output
- if isinstance(iframe_output, str) and iframe_output.startswith("Result: "):
- iframe_output = iframe_output[8:]
- try:
- iframes = json.loads(iframe_output) if isinstance(iframe_output, str) else iframe_output
- if iframes and len(iframes) > 0:
- print(f"✅ 找到 {len(iframes)} 个iframe")
- for idx, iframe_info in enumerate(iframes):
- print(f"\n📄 处理iframe {idx + 1}/{len(iframes)}")
- print(f" src: {iframe_info.get('src', 'N/A')[:80]}")
- # 获取iframe HTML
- get_iframe_html_js = f"""
- (function(){{
- const iframe = document.querySelectorAll('iframe')[{idx}];
- if (!iframe) return null;
- try {{
- const iframeDoc = iframe.contentDocument || iframe.contentWindow.document;
- return iframeDoc.documentElement.outerHTML;
- }} catch(e) {{
- return 'Error: ' + e.message;
- }}
- }})()
- """
- iframe_html_result = await evaluate(get_iframe_html_js)
- if not iframe_html_result.error:
- iframe_html = iframe_html_result.output
- if isinstance(iframe_html, str) and iframe_html.startswith("Result: "):
- iframe_html = iframe_html[8:]
- if iframe_html and not iframe_html.startswith("Error:"):
- iframe_path = output_dir / f"container_iframe_{idx}.html"
- iframe_path.write_text(iframe_html, encoding="utf-8")
- print(f" ✅ 已保存iframe HTML: {iframe_path}")
- else:
- print(f" ⚠️ iframe内容为空或无法访问")
- else:
- print("⚠️ 页面中没有找到iframe")
- except Exception as e:
- print(f"⚠️ 处理iframe失败: {str(e)}")
- print("\n✅ 测试完成!")
- except Exception as e:
- print(f"\n❌ 发生错误: {str(e)}")
- import traceback
- traceback.print_exc()
- finally:
- # 清理浏览器会话
- try:
- await cleanup_browser_session()
- except Exception:
- pass
- async def main():
- await test_xhs_container()
- if __name__ == "__main__":
- asyncio.run(main())
|