test_xhs_container.py 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247
  1. """
  2. 小红书容器测试脚本
  3. 演示容器浏览器的使用:
  4. 1. 初始化容器浏览器(自动创建容器并连接)
  5. 2. 搜索健身
  6. 3. 随机进入一个详情页
  7. 4. 获取详情页的HTML和iframe并保存到output
  8. """
  9. import sys
  10. import os
  11. import asyncio
  12. import json
  13. import random
  14. from datetime import datetime
  15. from pathlib import Path
  16. from urllib.parse import quote
  17. from dotenv import load_dotenv
  18. load_dotenv()
  19. project_root = Path(__file__).parent.parent
  20. sys.path.insert(0, str(project_root))
  21. from agent.tools.builtin.browser.baseClass import (
  22. init_browser_session,
  23. cleanup_browser_session,
  24. browser_navigate_to_url,
  25. browser_scroll_page,
  26. browser_evaluate,
  27. browser_wait,
  28. browser_get_page_html,
  29. browser_switch_tab,
  30. )
  31. async def test_xhs_container():
  32. """
  33. 测试小红书容器功能
  34. """
  35. print("\n" + "="*60)
  36. print("小红书容器测试")
  37. print("="*60)
  38. keyword = "健身"
  39. search_url = f"https://www.xiaohongshu.com/search_result?keyword={quote(keyword)}&type=51"
  40. # 创建输出目录
  41. output_dir = project_root / "output"
  42. output_dir.mkdir(parents=True, exist_ok=True)
  43. try:
  44. # 初始化容器浏览器(一步完成)
  45. print(f"\n🚀 初始化容器浏览器...")
  46. browser, tools = await init_browser_session(
  47. browser_type="container",
  48. url="https://www.xiaohongshu.com", # 容器启动时访问的URL
  49. headless=True
  50. )
  51. print("✅ 容器浏览器初始化成功")
  52. # 等待页面完全加载
  53. await browser_wait(3)
  54. # 步骤1: 搜索健身
  55. print(f"\n🔍 搜索关键词: {keyword}")
  56. try:
  57. nav_result = await browser_navigate_to_url(search_url)
  58. if nav_result.error:
  59. print(f"⚠️ 导航警告: {nav_result.error[:100]}")
  60. except Exception as e:
  61. print(f"⚠️ 导航异常: {str(e)[:100]}")
  62. await browser_wait(10)
  63. # 滚动页面加载更多内容
  64. print("\n📜 滚动页面...")
  65. for i in range(2):
  66. await browser_scroll_page(down=True, pages=2.0)
  67. await browser_wait(2)
  68. # 提取搜索结果
  69. print("\n🔍 提取搜索结果...")
  70. # 先保存HTML看看页面内容
  71. html_result = await browser_get_page_html()
  72. if not html_result.error:
  73. html = html_result.metadata.get("html", "")
  74. debug_html_path = output_dir / "search_page_debug.html"
  75. debug_html_path.write_text(html or "", encoding="utf-8")
  76. print(f" 💾 已保存搜索页HTML用于调试: {debug_html_path}")
  77. extract_js = """
  78. (function(){
  79. const results = [];
  80. const seen = new Set();
  81. const anchors = document.querySelectorAll('a[href*="/explore/"]');
  82. anchors.forEach(a => {
  83. const link = a.href || '';
  84. if (link && !seen.has(link)) {
  85. seen.add(link);
  86. const img = a.querySelector('img');
  87. const title = ((img && img.alt) || a.textContent || '').trim();
  88. results.push({ title, link });
  89. }
  90. });
  91. return results;
  92. })()
  93. """
  94. eval_result = await browser_evaluate(extract_js)
  95. if eval_result.error:
  96. raise RuntimeError(f"提取搜索结果失败: {eval_result.error}")
  97. output = eval_result.output
  98. if isinstance(output, str) and output.startswith("Result: "):
  99. output = output[8:]
  100. posts = json.loads(output) if isinstance(output, str) else output
  101. if not posts or len(posts) == 0:
  102. raise RuntimeError("未找到任何帖子")
  103. print(f"✅ 找到 {len(posts)} 个帖子")
  104. # 步骤2: 随机进入一个详情页
  105. selected_post = random.choice(posts)
  106. post_url = selected_post["link"]
  107. print(f"\n🎲 随机选择帖子: {selected_post['title'][:50]}...")
  108. print(f"🔗 访问帖子详情页: {post_url}")
  109. try:
  110. nav_result = await browser_navigate_to_url(post_url)
  111. if nav_result.error:
  112. print(f"⚠️ 导航警告: {nav_result.error[:100]}")
  113. except Exception as e:
  114. print(f"⚠️ 导航异常: {str(e)[:100]}")
  115. await browser_wait(8)
  116. # 滚动详情页
  117. print("\n📜 滚动详情页...")
  118. for i in range(3):
  119. await browser_scroll_page(down=True, pages=1.5)
  120. await browser_wait(2)
  121. # 步骤3: 保存详情页HTML
  122. print("\n💾 保存详情页 HTML...")
  123. html_result = await browser_get_page_html()
  124. if html_result.error:
  125. print(f"⚠️ 获取HTML失败: {html_result.error}")
  126. else:
  127. html = html_result.metadata.get("html", "")
  128. html_path = output_dir / "container_post_detail.html"
  129. html_path.write_text(html or "", encoding="utf-8")
  130. print(f"✅ 已保存详情页 HTML: {html_path}")
  131. # 查找并保存iframe
  132. print("\n🔍 查找页面中的iframe...")
  133. iframe_js = """
  134. (function(){
  135. const iframes = document.querySelectorAll('iframe');
  136. const results = [];
  137. iframes.forEach((iframe, index) => {
  138. results.push({
  139. index: index,
  140. src: iframe.src || '',
  141. id: iframe.id || '',
  142. name: iframe.name || ''
  143. });
  144. });
  145. return results;
  146. })()
  147. """
  148. iframe_result = await browser_evaluate(iframe_js)
  149. if not iframe_result.error:
  150. iframe_output = iframe_result.output
  151. if isinstance(iframe_output, str) and iframe_output.startswith("Result: "):
  152. iframe_output = iframe_output[8:]
  153. try:
  154. iframes = json.loads(iframe_output) if isinstance(iframe_output, str) else iframe_output
  155. if iframes and len(iframes) > 0:
  156. print(f"✅ 找到 {len(iframes)} 个iframe")
  157. for idx, iframe_info in enumerate(iframes):
  158. print(f"\n📄 处理iframe {idx + 1}/{len(iframes)}")
  159. print(f" src: {iframe_info.get('src', 'N/A')[:80]}")
  160. # 获取iframe HTML
  161. get_iframe_html_js = f"""
  162. (function(){{
  163. const iframe = document.querySelectorAll('iframe')[{idx}];
  164. if (!iframe) return null;
  165. try {{
  166. const iframeDoc = iframe.contentDocument || iframe.contentWindow.document;
  167. return iframeDoc.documentElement.outerHTML;
  168. }} catch(e) {{
  169. return 'Error: ' + e.message;
  170. }}
  171. }})()
  172. """
  173. iframe_html_result = await browser_evaluate(get_iframe_html_js)
  174. if not iframe_html_result.error:
  175. iframe_html = iframe_html_result.output
  176. if isinstance(iframe_html, str) and iframe_html.startswith("Result: "):
  177. iframe_html = iframe_html[8:]
  178. if iframe_html and not iframe_html.startswith("Error:"):
  179. iframe_path = output_dir / f"container_iframe_{idx}.html"
  180. iframe_path.write_text(iframe_html, encoding="utf-8")
  181. print(f" ✅ 已保存iframe HTML: {iframe_path}")
  182. else:
  183. print(f" ⚠️ iframe内容为空或无法访问")
  184. else:
  185. print("⚠️ 页面中没有找到iframe")
  186. except Exception as e:
  187. print(f"⚠️ 处理iframe失败: {str(e)}")
  188. print("\n✅ 测试完成!")
  189. except Exception as e:
  190. print(f"\n❌ 发生错误: {str(e)}")
  191. import traceback
  192. traceback.print_exc()
  193. finally:
  194. # 清理浏览器会话
  195. try:
  196. await cleanup_browser_session()
  197. except Exception:
  198. pass
  199. async def main():
  200. await test_xhs_container()
  201. if __name__ == "__main__":
  202. asyncio.run(main())