|
@@ -0,0 +1,291 @@
|
|
|
|
|
+"""
|
|
|
|
|
+小红书容器测试脚本
|
|
|
|
|
+根据 test.md 要求实现:
|
|
|
|
|
+1. 创建容器并导航到小红书
|
|
|
|
|
+2. 初始化浏览器会话
|
|
|
|
|
+3. 切换到指定窗口
|
|
|
|
|
+4. 搜索健身
|
|
|
|
|
+5. 随机进入一个详情页
|
|
|
|
|
+6. 获取详情页的HTML和iframe并保存到output
|
|
|
|
|
+"""
|
|
|
|
|
+
|
|
|
|
|
+import sys
|
|
|
|
|
+import os
|
|
|
|
|
+import asyncio
|
|
|
|
|
+import json
|
|
|
|
|
+import random
|
|
|
|
|
+from datetime import datetime
|
|
|
|
|
+from pathlib import Path
|
|
|
|
|
+from urllib.parse import quote
|
|
|
|
|
+from dotenv import load_dotenv
|
|
|
|
|
+
|
|
|
|
|
+load_dotenv()
|
|
|
|
|
+
|
|
|
|
|
+project_root = Path(__file__).parent.parent
|
|
|
|
|
+sys.path.insert(0, str(project_root))
|
|
|
|
|
+
|
|
|
|
|
+from agent.tools.builtin.browser.baseClass import (
|
|
|
|
|
+ create_container,
|
|
|
|
|
+ init_browser_session,
|
|
|
|
|
+ cleanup_browser_session,
|
|
|
|
|
+ navigate_to_url,
|
|
|
|
|
+ scroll_page,
|
|
|
|
|
+ evaluate,
|
|
|
|
|
+ wait,
|
|
|
|
|
+ get_page_html,
|
|
|
|
|
+)
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+async def test_xhs_container():
|
|
|
|
|
+ """
|
|
|
|
|
+ 测试小红书容器功能
|
|
|
|
|
+ """
|
|
|
|
|
+ print("\n" + "="*60)
|
|
|
|
|
+ print("小红书容器测试")
|
|
|
|
|
+ print("="*60)
|
|
|
|
|
+
|
|
|
|
|
+ keyword = "健身"
|
|
|
|
|
+ search_url = f"https://www.xiaohongshu.com/search_result?keyword={quote(keyword)}&type=51"
|
|
|
|
|
+
|
|
|
|
|
+ # 创建输出目录
|
|
|
|
|
+ output_dir = project_root / "output"
|
|
|
|
|
+ output_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
+
|
|
|
|
|
+ try:
|
|
|
|
|
+ # 步骤1: 创建容器并导航到小红书
|
|
|
|
|
+ container_info = await create_container(url="https://www.xiaohongshu.com")
|
|
|
|
|
+
|
|
|
|
|
+ if not container_info["success"]:
|
|
|
|
|
+ raise RuntimeError(f"容器创建失败: {container_info['error']}")
|
|
|
|
|
+
|
|
|
|
|
+ cdp_url = container_info["cdp"]
|
|
|
|
|
+ container_id = container_info["container_id"]
|
|
|
|
|
+ connection_id = container_info.get("connection_id")
|
|
|
|
|
+
|
|
|
|
|
+ print(f"\n📋 容器信息:")
|
|
|
|
|
+ print(f" CDP URL: {cdp_url}")
|
|
|
|
|
+ print(f" Container ID: {container_id}")
|
|
|
|
|
+ print(f" Connection ID: {connection_id}")
|
|
|
|
|
+
|
|
|
|
|
+ # 等待容器完全启动
|
|
|
|
|
+ print(f"\n⏳ 等待容器启动...")
|
|
|
|
|
+ await asyncio.sleep(3)
|
|
|
|
|
+
|
|
|
|
|
+ # 步骤2: 初始化浏览器会话
|
|
|
|
|
+ print(f"\n🌐 初始化浏览器会话...")
|
|
|
|
|
+ browser, tools = await init_browser_session(
|
|
|
|
|
+ headless=True,
|
|
|
|
|
+ cdp_url=cdp_url
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ if browser is None or tools is None:
|
|
|
|
|
+ raise RuntimeError("浏览器初始化失败")
|
|
|
|
|
+
|
|
|
|
|
+ print("✅ 浏览器会话初始化成功")
|
|
|
|
|
+
|
|
|
|
|
+ # 步骤3: 如果有 connection_id,切换到对应窗口
|
|
|
|
|
+ if connection_id:
|
|
|
|
|
+ print(f"\n🔄 切换到窗口: {connection_id}")
|
|
|
|
|
+ await wait(2)
|
|
|
|
|
+
|
|
|
|
|
+ # 获取当前浏览器状态
|
|
|
|
|
+ try:
|
|
|
|
|
+ state = await browser.get_browser_state_summary(cached=False)
|
|
|
|
|
+ print(f" 当前标签页数: {len(state.tabs)}")
|
|
|
|
|
+ for tab in state.tabs:
|
|
|
|
|
+ print(f" - Tab ID: {tab.target_id[-4:]}, URL: {tab.url}")
|
|
|
|
|
+
|
|
|
|
|
+ # 尝试切换到 connection_id 对应的标签页
|
|
|
|
|
+ # connection_id 可能是完整ID,取最后4位
|
|
|
|
|
+ from agent.tools.builtin.browser.baseClass import switch_tab
|
|
|
|
|
+ await switch_tab(connection_id[-4:] if len(connection_id) > 4 else connection_id)
|
|
|
|
|
+ await wait(2)
|
|
|
|
|
+ print(f"✅ 已切换到窗口")
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ print(f"⚠️ 切换窗口警告: {str(e)[:100]}")
|
|
|
|
|
+ print(f" 将继续使用当前窗口")
|
|
|
|
|
+
|
|
|
|
|
+ await wait(3)
|
|
|
|
|
+
|
|
|
|
|
+ # 步骤4: 搜索健身
|
|
|
|
|
+ print(f"\n🔍 搜索关键词: {keyword}")
|
|
|
|
|
+ try:
|
|
|
|
|
+ nav_result = await navigate_to_url(search_url)
|
|
|
|
|
+ if nav_result.error:
|
|
|
|
|
+ print(f"⚠️ 导航警告: {nav_result.error[:100]}")
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ print(f"⚠️ 导航异常: {str(e)[:100]}")
|
|
|
|
|
+
|
|
|
|
|
+ await wait(10)
|
|
|
|
|
+
|
|
|
|
|
+ # 滚动页面加载更多内容
|
|
|
|
|
+ print("\n📜 滚动页面...")
|
|
|
|
|
+ for i in range(2):
|
|
|
|
|
+ await scroll_page(down=True, pages=2.0)
|
|
|
|
|
+ await wait(2)
|
|
|
|
|
+
|
|
|
|
|
+ # 提取搜索结果
|
|
|
|
|
+ print("\n🔍 提取搜索结果...")
|
|
|
|
|
+
|
|
|
|
|
+ # 先保存HTML看看页面内容
|
|
|
|
|
+ html_result = await get_page_html()
|
|
|
|
|
+ if not html_result.error:
|
|
|
|
|
+ html = html_result.metadata.get("html", "")
|
|
|
|
|
+ debug_html_path = output_dir / "search_page_debug.html"
|
|
|
|
|
+ debug_html_path.write_text(html or "", encoding="utf-8")
|
|
|
|
|
+ print(f" 💾 已保存搜索页HTML用于调试: {debug_html_path}")
|
|
|
|
|
+
|
|
|
|
|
+ extract_js = """
|
|
|
|
|
+ (function(){
|
|
|
|
|
+ const results = [];
|
|
|
|
|
+ const seen = new Set();
|
|
|
|
|
+
|
|
|
|
|
+ const anchors = document.querySelectorAll('a[href*="/explore/"]');
|
|
|
|
|
+ anchors.forEach(a => {
|
|
|
|
|
+ const link = a.href || '';
|
|
|
|
|
+ if (link && !seen.has(link)) {
|
|
|
|
|
+ seen.add(link);
|
|
|
|
|
+ const img = a.querySelector('img');
|
|
|
|
|
+ const title = ((img && img.alt) || a.textContent || '').trim();
|
|
|
|
|
+ results.push({ title, link });
|
|
|
|
|
+ }
|
|
|
|
|
+ });
|
|
|
|
|
+
|
|
|
|
|
+ return results;
|
|
|
|
|
+ })()
|
|
|
|
|
+ """
|
|
|
|
|
+
|
|
|
|
|
+ eval_result = await evaluate(extract_js)
|
|
|
|
|
+ if eval_result.error:
|
|
|
|
|
+ raise RuntimeError(f"提取搜索结果失败: {eval_result.error}")
|
|
|
|
|
+
|
|
|
|
|
+ output = eval_result.output
|
|
|
|
|
+ if isinstance(output, str) and output.startswith("Result: "):
|
|
|
|
|
+ output = output[8:]
|
|
|
|
|
+
|
|
|
|
|
+ posts = json.loads(output) if isinstance(output, str) else output
|
|
|
|
|
+
|
|
|
|
|
+ if not posts or len(posts) == 0:
|
|
|
|
|
+ raise RuntimeError("未找到任何帖子")
|
|
|
|
|
+
|
|
|
|
|
+ print(f"✅ 找到 {len(posts)} 个帖子")
|
|
|
|
|
+
|
|
|
|
|
+ # 步骤5: 随机进入一个详情页
|
|
|
|
|
+ selected_post = random.choice(posts)
|
|
|
|
|
+ post_url = selected_post["link"]
|
|
|
|
|
+
|
|
|
|
|
+ print(f"\n🎲 随机选择帖子: {selected_post['title'][:50]}...")
|
|
|
|
|
+ print(f"🔗 访问帖子详情页: {post_url}")
|
|
|
|
|
+
|
|
|
|
|
+ try:
|
|
|
|
|
+ nav_result = await navigate_to_url(post_url)
|
|
|
|
|
+ if nav_result.error:
|
|
|
|
|
+ print(f"⚠️ 导航警告: {nav_result.error[:100]}")
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ print(f"⚠️ 导航异常: {str(e)[:100]}")
|
|
|
|
|
+
|
|
|
|
|
+ await wait(8)
|
|
|
|
|
+
|
|
|
|
|
+ # 滚动详情页
|
|
|
|
|
+ print("\n📜 滚动详情页...")
|
|
|
|
|
+ for i in range(3):
|
|
|
|
|
+ await scroll_page(down=True, pages=1.5)
|
|
|
|
|
+ await wait(2)
|
|
|
|
|
+
|
|
|
|
|
+ # 步骤6: 保存详情页HTML
|
|
|
|
|
+ print("\n💾 保存详情页 HTML...")
|
|
|
|
|
+ html_result = await get_page_html()
|
|
|
|
|
+ if html_result.error:
|
|
|
|
|
+ print(f"⚠️ 获取HTML失败: {html_result.error}")
|
|
|
|
|
+ else:
|
|
|
|
|
+ html = html_result.metadata.get("html", "")
|
|
|
|
|
+ html_path = output_dir / "container_post_detail.html"
|
|
|
|
|
+ html_path.write_text(html or "", encoding="utf-8")
|
|
|
|
|
+ print(f"✅ 已保存详情页 HTML: {html_path}")
|
|
|
|
|
+
|
|
|
|
|
+ # 查找并保存iframe
|
|
|
|
|
+ print("\n🔍 查找页面中的iframe...")
|
|
|
|
|
+ iframe_js = """
|
|
|
|
|
+ (function(){
|
|
|
|
|
+ const iframes = document.querySelectorAll('iframe');
|
|
|
|
|
+ const results = [];
|
|
|
|
|
+ iframes.forEach((iframe, index) => {
|
|
|
|
|
+ results.push({
|
|
|
|
|
+ index: index,
|
|
|
|
|
+ src: iframe.src || '',
|
|
|
|
|
+ id: iframe.id || '',
|
|
|
|
|
+ name: iframe.name || ''
|
|
|
|
|
+ });
|
|
|
|
|
+ });
|
|
|
|
|
+ return results;
|
|
|
|
|
+ })()
|
|
|
|
|
+ """
|
|
|
|
|
+
|
|
|
|
|
+ iframe_result = await evaluate(iframe_js)
|
|
|
|
|
+ if not iframe_result.error:
|
|
|
|
|
+ iframe_output = iframe_result.output
|
|
|
|
|
+ if isinstance(iframe_output, str) and iframe_output.startswith("Result: "):
|
|
|
|
|
+ iframe_output = iframe_output[8:]
|
|
|
|
|
+
|
|
|
|
|
+ try:
|
|
|
|
|
+ iframes = json.loads(iframe_output) if isinstance(iframe_output, str) else iframe_output
|
|
|
|
|
+
|
|
|
|
|
+ if iframes and len(iframes) > 0:
|
|
|
|
|
+ print(f"✅ 找到 {len(iframes)} 个iframe")
|
|
|
|
|
+
|
|
|
|
|
+ for idx, iframe_info in enumerate(iframes):
|
|
|
|
|
+ print(f"\n📄 处理iframe {idx + 1}/{len(iframes)}")
|
|
|
|
|
+ print(f" src: {iframe_info.get('src', 'N/A')[:80]}")
|
|
|
|
|
+
|
|
|
|
|
+ # 获取iframe HTML
|
|
|
|
|
+ get_iframe_html_js = f"""
|
|
|
|
|
+ (function(){{
|
|
|
|
|
+ const iframe = document.querySelectorAll('iframe')[{idx}];
|
|
|
|
|
+ if (!iframe) return null;
|
|
|
|
|
+ try {{
|
|
|
|
|
+ const iframeDoc = iframe.contentDocument || iframe.contentWindow.document;
|
|
|
|
|
+ return iframeDoc.documentElement.outerHTML;
|
|
|
|
|
+ }} catch(e) {{
|
|
|
|
|
+ return 'Error: ' + e.message;
|
|
|
|
|
+ }}
|
|
|
|
|
+ }})()
|
|
|
|
|
+ """
|
|
|
|
|
+
|
|
|
|
|
+ iframe_html_result = await evaluate(get_iframe_html_js)
|
|
|
|
|
+ if not iframe_html_result.error:
|
|
|
|
|
+ iframe_html = iframe_html_result.output
|
|
|
|
|
+ if isinstance(iframe_html, str) and iframe_html.startswith("Result: "):
|
|
|
|
|
+ iframe_html = iframe_html[8:]
|
|
|
|
|
+
|
|
|
|
|
+ if iframe_html and not iframe_html.startswith("Error:"):
|
|
|
|
|
+ iframe_path = output_dir / f"container_iframe_{idx}.html"
|
|
|
|
|
+ iframe_path.write_text(iframe_html, encoding="utf-8")
|
|
|
|
|
+ print(f" ✅ 已保存iframe HTML: {iframe_path}")
|
|
|
|
|
+ else:
|
|
|
|
|
+ print(f" ⚠️ iframe内容为空或无法访问")
|
|
|
|
|
+ else:
|
|
|
|
|
+ print("⚠️ 页面中没有找到iframe")
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ print(f"⚠️ 处理iframe失败: {str(e)}")
|
|
|
|
|
+
|
|
|
|
|
+ print("\n✅ 测试完成!")
|
|
|
|
|
+
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ print(f"\n❌ 发生错误: {str(e)}")
|
|
|
|
|
+ import traceback
|
|
|
|
|
+ traceback.print_exc()
|
|
|
|
|
+
|
|
|
|
|
+ finally:
|
|
|
|
|
+ # 清理浏览器会话
|
|
|
|
|
+ try:
|
|
|
|
|
+ await cleanup_browser_session()
|
|
|
|
|
+ except Exception:
|
|
|
|
|
+ pass
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+async def main():
|
|
|
|
|
+ await test_xhs_container()
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+if __name__ == "__main__":
|
|
|
|
|
+ asyncio.run(main())
|