test_xhs_container.py 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291
  1. """
  2. 小红书容器测试脚本
  3. 根据 test.md 要求实现:
  4. 1. 创建容器并导航到小红书
  5. 2. 初始化浏览器会话
  6. 3. 切换到指定窗口
  7. 4. 搜索健身
  8. 5. 随机进入一个详情页
  9. 6. 获取详情页的HTML和iframe并保存到output
  10. """
  11. import sys
  12. import os
  13. import asyncio
  14. import json
  15. import random
  16. from datetime import datetime
  17. from pathlib import Path
  18. from urllib.parse import quote
  19. from dotenv import load_dotenv
  20. load_dotenv()
  21. project_root = Path(__file__).parent.parent
  22. sys.path.insert(0, str(project_root))
  23. from agent.tools.builtin.browser.baseClass import (
  24. create_container,
  25. init_browser_session,
  26. cleanup_browser_session,
  27. navigate_to_url,
  28. scroll_page,
  29. evaluate,
  30. wait,
  31. get_page_html,
  32. )
  33. async def test_xhs_container():
  34. """
  35. 测试小红书容器功能
  36. """
  37. print("\n" + "="*60)
  38. print("小红书容器测试")
  39. print("="*60)
  40. keyword = "健身"
  41. search_url = f"https://www.xiaohongshu.com/search_result?keyword={quote(keyword)}&type=51"
  42. # 创建输出目录
  43. output_dir = project_root / "output"
  44. output_dir.mkdir(parents=True, exist_ok=True)
  45. try:
  46. # 步骤1: 创建容器并导航到小红书
  47. container_info = await create_container(url="https://www.xiaohongshu.com")
  48. if not container_info["success"]:
  49. raise RuntimeError(f"容器创建失败: {container_info['error']}")
  50. cdp_url = container_info["cdp"]
  51. container_id = container_info["container_id"]
  52. connection_id = container_info.get("connection_id")
  53. print(f"\n📋 容器信息:")
  54. print(f" CDP URL: {cdp_url}")
  55. print(f" Container ID: {container_id}")
  56. print(f" Connection ID: {connection_id}")
  57. # 等待容器完全启动
  58. print(f"\n⏳ 等待容器启动...")
  59. await asyncio.sleep(3)
  60. # 步骤2: 初始化浏览器会话
  61. print(f"\n🌐 初始化浏览器会话...")
  62. browser, tools = await init_browser_session(
  63. headless=True,
  64. cdp_url=cdp_url
  65. )
  66. if browser is None or tools is None:
  67. raise RuntimeError("浏览器初始化失败")
  68. print("✅ 浏览器会话初始化成功")
  69. # 步骤3: 如果有 connection_id,切换到对应窗口
  70. if connection_id:
  71. print(f"\n🔄 切换到窗口: {connection_id}")
  72. await wait(2)
  73. # 获取当前浏览器状态
  74. try:
  75. state = await browser.get_browser_state_summary(cached=False)
  76. print(f" 当前标签页数: {len(state.tabs)}")
  77. for tab in state.tabs:
  78. print(f" - Tab ID: {tab.target_id[-4:]}, URL: {tab.url}")
  79. # 尝试切换到 connection_id 对应的标签页
  80. # connection_id 可能是完整ID,取最后4位
  81. from agent.tools.builtin.browser.baseClass import switch_tab
  82. await switch_tab(connection_id[-4:] if len(connection_id) > 4 else connection_id)
  83. await wait(2)
  84. print(f"✅ 已切换到窗口")
  85. except Exception as e:
  86. print(f"⚠️ 切换窗口警告: {str(e)[:100]}")
  87. print(f" 将继续使用当前窗口")
  88. await wait(3)
  89. # 步骤4: 搜索健身
  90. print(f"\n🔍 搜索关键词: {keyword}")
  91. try:
  92. nav_result = await navigate_to_url(search_url)
  93. if nav_result.error:
  94. print(f"⚠️ 导航警告: {nav_result.error[:100]}")
  95. except Exception as e:
  96. print(f"⚠️ 导航异常: {str(e)[:100]}")
  97. await wait(10)
  98. # 滚动页面加载更多内容
  99. print("\n📜 滚动页面...")
  100. for i in range(2):
  101. await scroll_page(down=True, pages=2.0)
  102. await wait(2)
  103. # 提取搜索结果
  104. print("\n🔍 提取搜索结果...")
  105. # 先保存HTML看看页面内容
  106. html_result = await get_page_html()
  107. if not html_result.error:
  108. html = html_result.metadata.get("html", "")
  109. debug_html_path = output_dir / "search_page_debug.html"
  110. debug_html_path.write_text(html or "", encoding="utf-8")
  111. print(f" 💾 已保存搜索页HTML用于调试: {debug_html_path}")
  112. extract_js = """
  113. (function(){
  114. const results = [];
  115. const seen = new Set();
  116. const anchors = document.querySelectorAll('a[href*="/explore/"]');
  117. anchors.forEach(a => {
  118. const link = a.href || '';
  119. if (link && !seen.has(link)) {
  120. seen.add(link);
  121. const img = a.querySelector('img');
  122. const title = ((img && img.alt) || a.textContent || '').trim();
  123. results.push({ title, link });
  124. }
  125. });
  126. return results;
  127. })()
  128. """
  129. eval_result = await evaluate(extract_js)
  130. if eval_result.error:
  131. raise RuntimeError(f"提取搜索结果失败: {eval_result.error}")
  132. output = eval_result.output
  133. if isinstance(output, str) and output.startswith("Result: "):
  134. output = output[8:]
  135. posts = json.loads(output) if isinstance(output, str) else output
  136. if not posts or len(posts) == 0:
  137. raise RuntimeError("未找到任何帖子")
  138. print(f"✅ 找到 {len(posts)} 个帖子")
  139. # 步骤5: 随机进入一个详情页
  140. selected_post = random.choice(posts)
  141. post_url = selected_post["link"]
  142. print(f"\n🎲 随机选择帖子: {selected_post['title'][:50]}...")
  143. print(f"🔗 访问帖子详情页: {post_url}")
  144. try:
  145. nav_result = await navigate_to_url(post_url)
  146. if nav_result.error:
  147. print(f"⚠️ 导航警告: {nav_result.error[:100]}")
  148. except Exception as e:
  149. print(f"⚠️ 导航异常: {str(e)[:100]}")
  150. await wait(8)
  151. # 滚动详情页
  152. print("\n📜 滚动详情页...")
  153. for i in range(3):
  154. await scroll_page(down=True, pages=1.5)
  155. await wait(2)
  156. # 步骤6: 保存详情页HTML
  157. print("\n💾 保存详情页 HTML...")
  158. html_result = await get_page_html()
  159. if html_result.error:
  160. print(f"⚠️ 获取HTML失败: {html_result.error}")
  161. else:
  162. html = html_result.metadata.get("html", "")
  163. html_path = output_dir / "container_post_detail.html"
  164. html_path.write_text(html or "", encoding="utf-8")
  165. print(f"✅ 已保存详情页 HTML: {html_path}")
  166. # 查找并保存iframe
  167. print("\n🔍 查找页面中的iframe...")
  168. iframe_js = """
  169. (function(){
  170. const iframes = document.querySelectorAll('iframe');
  171. const results = [];
  172. iframes.forEach((iframe, index) => {
  173. results.push({
  174. index: index,
  175. src: iframe.src || '',
  176. id: iframe.id || '',
  177. name: iframe.name || ''
  178. });
  179. });
  180. return results;
  181. })()
  182. """
  183. iframe_result = await evaluate(iframe_js)
  184. if not iframe_result.error:
  185. iframe_output = iframe_result.output
  186. if isinstance(iframe_output, str) and iframe_output.startswith("Result: "):
  187. iframe_output = iframe_output[8:]
  188. try:
  189. iframes = json.loads(iframe_output) if isinstance(iframe_output, str) else iframe_output
  190. if iframes and len(iframes) > 0:
  191. print(f"✅ 找到 {len(iframes)} 个iframe")
  192. for idx, iframe_info in enumerate(iframes):
  193. print(f"\n📄 处理iframe {idx + 1}/{len(iframes)}")
  194. print(f" src: {iframe_info.get('src', 'N/A')[:80]}")
  195. # 获取iframe HTML
  196. get_iframe_html_js = f"""
  197. (function(){{
  198. const iframe = document.querySelectorAll('iframe')[{idx}];
  199. if (!iframe) return null;
  200. try {{
  201. const iframeDoc = iframe.contentDocument || iframe.contentWindow.document;
  202. return iframeDoc.documentElement.outerHTML;
  203. }} catch(e) {{
  204. return 'Error: ' + e.message;
  205. }}
  206. }})()
  207. """
  208. iframe_html_result = await evaluate(get_iframe_html_js)
  209. if not iframe_html_result.error:
  210. iframe_html = iframe_html_result.output
  211. if isinstance(iframe_html, str) and iframe_html.startswith("Result: "):
  212. iframe_html = iframe_html[8:]
  213. if iframe_html and not iframe_html.startswith("Error:"):
  214. iframe_path = output_dir / f"container_iframe_{idx}.html"
  215. iframe_path.write_text(iframe_html, encoding="utf-8")
  216. print(f" ✅ 已保存iframe HTML: {iframe_path}")
  217. else:
  218. print(f" ⚠️ iframe内容为空或无法访问")
  219. else:
  220. print("⚠️ 页面中没有找到iframe")
  221. except Exception as e:
  222. print(f"⚠️ 处理iframe失败: {str(e)}")
  223. print("\n✅ 测试完成!")
  224. except Exception as e:
  225. print(f"\n❌ 发生错误: {str(e)}")
  226. import traceback
  227. traceback.print_exc()
  228. finally:
  229. # 清理浏览器会话
  230. try:
  231. await cleanup_browser_session()
  232. except Exception:
  233. pass
  234. async def main():
  235. await test_xhs_container()
  236. if __name__ == "__main__":
  237. asyncio.run(main())