Просмотр исходного кода

feat(browser): 添加容器管理功能并扩展权限配置

添加浏览器容器管理功能,支持通过API创建远程浏览器容器并自动导航到指定URL。同时扩展Claude权限配置以支持更长时间的操作和网络请求。

- 在浏览器工具中添加 `create_container` 函数,支持创建容器、初始化浏览器会话并导航
- 添加测试脚本 `test_xhs_container.py` 验证容器功能在小红书场景下的使用
- 扩展 `.claude/settings.local.json` 权限配置,添加240秒超时和curl命令支持
- 修复 `switch_tab` 和 `close_tab` 函数,规范化标签页ID处理(使用后4位)
max_liu 1 месяц назад
Родитель
Сommit
76af39e02d
3 измененных файлов с 436 добавлено и 5 удалено
  1. 3 1
      .claude/settings.local.json
  2. 142 4
      agent/tools/builtin/browser/baseClass.py
  3. 291 0
      examples/test_xhs_container.py

+ 3 - 1
.claude/settings.local.json

@@ -8,7 +8,9 @@
       "Bash(tee:*)",
       "Bash(browser-use:*)",
       "Bash(pip install:*)",
-      "Bash(timeout 60 python:*)"
+      "Bash(timeout 60 python:*)",
+      "Bash(timeout 240 python:*)",
+      "Bash(curl:*)"
     ],
     "deny": [],
     "ask": []

+ 142 - 4
agent/tools/builtin/browser/baseClass.py

@@ -20,6 +20,7 @@ Native Browser-Use Tools Adapter
 import sys
 import os
 import json
+import asyncio
 from typing import Optional, List, Dict, Any, Tuple
 from pathlib import Path
 from urllib.parse import urlparse
@@ -762,12 +763,13 @@ async def switch_tab(tab_id: str, uid: str = "") -> ToolResult:
     try:
         browser, tools = await get_browser_session()
 
+        normalized_tab_id = tab_id[-4:] if tab_id else tab_id
         result = await tools.switch(
-            tab_id=tab_id,
+            tab_id=normalized_tab_id,
             browser_session=browser
         )
 
-        return action_result_to_tool_result(result, f"切换到标签页 {tab_id}")
+        return action_result_to_tool_result(result, f"切换到标签页 {normalized_tab_id}")
 
     except Exception as e:
         return ToolResult(
@@ -797,12 +799,13 @@ async def close_tab(tab_id: str, uid: str = "") -> ToolResult:
     try:
         browser, tools = await get_browser_session()
 
+        normalized_tab_id = tab_id[-4:] if tab_id else tab_id
         result = await tools.close(
-            tab_id=tab_id,
+            tab_id=normalized_tab_id,
             browser_session=browser
         )
 
-        return action_result_to_tool_result(result, f"关闭标签页 {tab_id}")
+        return action_result_to_tool_result(result, f"关闭标签页 {normalized_tab_id}")
 
     except Exception as e:
         return ToolResult(
@@ -1454,6 +1457,138 @@ async def done(text: str, success: bool = True,
         )
 
 
+# ============================================================
+# 容器管理工具 (Container Management Tools)
+# ============================================================
+
+import aiohttp
+
+async def create_container(url: str, account_name: str = "liuwenwu") -> Dict[str, Any]:
+    """
+    创建浏览器容器并导航到指定URL
+
+    按照 test.md 的要求:
+    1.1 调用接口创建容器
+    1.2 调用接口创建窗口并导航到URL
+
+    Args:
+        url: 要导航的URL地址
+        account_name: 账户名称
+
+    Returns:
+        包含容器信息的字典:
+        - success: 是否成功
+        - container_id: 容器ID
+        - vnc: VNC访问URL
+        - cdp: CDP协议URL(用于浏览器连接)
+        - connection_id: 窗口连接ID
+        - error: 错误信息(如果失败)
+    """
+    result = {
+        "success": False,
+        "container_id": None,
+        "vnc": None,
+        "cdp": None,
+        "connection_id": None,
+        "error": None
+    }
+
+    try:
+        async with aiohttp.ClientSession() as session:
+            # 步骤1.1: 创建容器
+            print("📦 步骤1.1: 创建容器...")
+            create_url = "http://47.84.182.56:8200/api/v1/container/create"
+            create_payload = {
+                "auto_remove": True,
+                "need_port_binding": True,
+                "max_lifetime_seconds": 900
+            }
+
+            async with session.post(create_url, json=create_payload) as resp:
+                if resp.status != 200:
+                    raise RuntimeError(f"创建容器失败: HTTP {resp.status}")
+
+                create_result = await resp.json()
+                if create_result.get("code") != 0:
+                    raise RuntimeError(f"创建容器失败: {create_result.get('msg')}")
+
+                data = create_result.get("data", {})
+                result["container_id"] = data.get("container_id")
+                result["vnc"] = data.get("vnc")
+                result["cdp"] = data.get("cdp")
+
+                print(f"✅ 容器创建成功")
+                print(f"   Container ID: {result['container_id']}")
+                print(f"   VNC: {result['vnc']}")
+                print(f"   CDP: {result['cdp']}")
+
+            # 等待容器内的浏览器启动
+            print(f"\n⏳ 等待容器内浏览器启动...")
+            await asyncio.sleep(5)
+
+            # 步骤1.2: 创建页面并导航
+            print(f"\n📱 步骤1.2: 创建页面并导航到 {url}...")
+
+            page_create_url = "http://47.84.182.56:8200/api/v1/browser/page/create"
+            page_payload = {
+                "container_id": result["container_id"],
+                "url": url,
+                "account_name": account_name,
+                "need_wait": True,
+                "timeout": 30
+            }
+
+            # 重试机制:最多尝试3次
+            max_retries = 3
+            page_created = False
+            last_error = None
+
+            for attempt in range(max_retries):
+                try:
+                    if attempt > 0:
+                        print(f"   重试 {attempt + 1}/{max_retries}...")
+                        await asyncio.sleep(3)  # 重试前等待
+
+                    async with session.post(page_create_url, json=page_payload, timeout=aiohttp.ClientTimeout(total=60)) as resp:
+                        if resp.status != 200:
+                            response_text = await resp.text()
+                            last_error = f"HTTP {resp.status}: {response_text[:200]}"
+                            continue
+
+                        page_result = await resp.json()
+                        if page_result.get("code") != 0:
+                            last_error = f"{page_result.get('msg')}"
+                            continue
+
+                        page_data = page_result.get("data", {})
+                        result["connection_id"] = page_data.get("connection_id")
+                        result["success"] = True
+                        page_created = True
+
+                        print(f"✅ 页面创建成功")
+                        print(f"   Connection ID: {result['connection_id']}")
+                        break
+
+                except asyncio.TimeoutError:
+                    last_error = "请求超时"
+                    continue
+                except aiohttp.ClientError as e:
+                    last_error = f"网络错误: {str(e)}"
+                    continue
+                except Exception as e:
+                    last_error = f"未知错误: {str(e)}"
+                    continue
+
+            if not page_created:
+                raise RuntimeError(f"创建页面失败(尝试{max_retries}次后): {last_error}")
+
+    except Exception as e:
+        result["error"] = str(e)
+        print(f"❌ 错误: {str(e)}")
+
+    return result
+
+
 # ============================================================
 # 导出所有工具函数(供外部使用)
 # ============================================================
@@ -1509,4 +1644,7 @@ __all__ = [
 
     # 任务完成
     'done',
+
+    # 容器管理
+    'create_container',
 ]

+ 291 - 0
examples/test_xhs_container.py

@@ -0,0 +1,291 @@
+"""
+小红书容器测试脚本
+根据 test.md 要求实现:
+1. 创建容器并导航到小红书
+2. 初始化浏览器会话
+3. 切换到指定窗口
+4. 搜索健身
+5. 随机进入一个详情页
+6. 获取详情页的HTML和iframe并保存到output
+"""
+
+import sys
+import os
+import asyncio
+import json
+import random
+from datetime import datetime
+from pathlib import Path
+from urllib.parse import quote
+from dotenv import load_dotenv
+
+load_dotenv()
+
+project_root = Path(__file__).parent.parent
+sys.path.insert(0, str(project_root))
+
+from agent.tools.builtin.browser.baseClass import (
+    create_container,
+    init_browser_session,
+    cleanup_browser_session,
+    navigate_to_url,
+    scroll_page,
+    evaluate,
+    wait,
+    get_page_html,
+)
+
+
+async def test_xhs_container():
+    """
+    测试小红书容器功能
+    """
+    print("\n" + "="*60)
+    print("小红书容器测试")
+    print("="*60)
+
+    keyword = "健身"
+    search_url = f"https://www.xiaohongshu.com/search_result?keyword={quote(keyword)}&type=51"
+
+    # 创建输出目录
+    output_dir = project_root / "output"
+    output_dir.mkdir(parents=True, exist_ok=True)
+
+    try:
+        # 步骤1: 创建容器并导航到小红书
+        container_info = await create_container(url="https://www.xiaohongshu.com")
+
+        if not container_info["success"]:
+            raise RuntimeError(f"容器创建失败: {container_info['error']}")
+
+        cdp_url = container_info["cdp"]
+        container_id = container_info["container_id"]
+        connection_id = container_info.get("connection_id")
+
+        print(f"\n📋 容器信息:")
+        print(f"   CDP URL: {cdp_url}")
+        print(f"   Container ID: {container_id}")
+        print(f"   Connection ID: {connection_id}")
+
+        # 等待容器完全启动
+        print(f"\n⏳ 等待容器启动...")
+        await asyncio.sleep(3)
+
+        # 步骤2: 初始化浏览器会话
+        print(f"\n🌐 初始化浏览器会话...")
+        browser, tools = await init_browser_session(
+            headless=True,
+            cdp_url=cdp_url
+        )
+
+        if browser is None or tools is None:
+            raise RuntimeError("浏览器初始化失败")
+
+        print("✅ 浏览器会话初始化成功")
+
+        # 步骤3: 如果有 connection_id,切换到对应窗口
+        if connection_id:
+            print(f"\n🔄 切换到窗口: {connection_id}")
+            await wait(2)
+
+            # 获取当前浏览器状态
+            try:
+                state = await browser.get_browser_state_summary(cached=False)
+                print(f"   当前标签页数: {len(state.tabs)}")
+                for tab in state.tabs:
+                    print(f"   - Tab ID: {tab.target_id[-4:]}, URL: {tab.url}")
+
+                # 尝试切换到 connection_id 对应的标签页
+                # connection_id 可能是完整ID,取最后4位
+                from agent.tools.builtin.browser.baseClass import switch_tab
+                await switch_tab(connection_id[-4:] if len(connection_id) > 4 else connection_id)
+                await wait(2)
+                print(f"✅ 已切换到窗口")
+            except Exception as e:
+                print(f"⚠️  切换窗口警告: {str(e)[:100]}")
+                print(f"   将继续使用当前窗口")
+
+        await wait(3)
+
+        # 步骤4: 搜索健身
+        print(f"\n🔍 搜索关键词: {keyword}")
+        try:
+            nav_result = await navigate_to_url(search_url)
+            if nav_result.error:
+                print(f"⚠️  导航警告: {nav_result.error[:100]}")
+        except Exception as e:
+            print(f"⚠️  导航异常: {str(e)[:100]}")
+
+        await wait(10)
+
+        # 滚动页面加载更多内容
+        print("\n📜 滚动页面...")
+        for i in range(2):
+            await scroll_page(down=True, pages=2.0)
+            await wait(2)
+
+        # 提取搜索结果
+        print("\n🔍 提取搜索结果...")
+
+        # 先保存HTML看看页面内容
+        html_result = await get_page_html()
+        if not html_result.error:
+            html = html_result.metadata.get("html", "")
+            debug_html_path = output_dir / "search_page_debug.html"
+            debug_html_path.write_text(html or "", encoding="utf-8")
+            print(f"   💾 已保存搜索页HTML用于调试: {debug_html_path}")
+
+        extract_js = """
+        (function(){
+            const results = [];
+            const seen = new Set();
+
+            const anchors = document.querySelectorAll('a[href*="/explore/"]');
+            anchors.forEach(a => {
+                const link = a.href || '';
+                if (link && !seen.has(link)) {
+                    seen.add(link);
+                    const img = a.querySelector('img');
+                    const title = ((img && img.alt) || a.textContent || '').trim();
+                    results.push({ title, link });
+                }
+            });
+
+            return results;
+        })()
+        """
+
+        eval_result = await evaluate(extract_js)
+        if eval_result.error:
+            raise RuntimeError(f"提取搜索结果失败: {eval_result.error}")
+
+        output = eval_result.output
+        if isinstance(output, str) and output.startswith("Result: "):
+            output = output[8:]
+
+        posts = json.loads(output) if isinstance(output, str) else output
+
+        if not posts or len(posts) == 0:
+            raise RuntimeError("未找到任何帖子")
+
+        print(f"✅ 找到 {len(posts)} 个帖子")
+
+        # 步骤5: 随机进入一个详情页
+        selected_post = random.choice(posts)
+        post_url = selected_post["link"]
+
+        print(f"\n🎲 随机选择帖子: {selected_post['title'][:50]}...")
+        print(f"🔗 访问帖子详情页: {post_url}")
+
+        try:
+            nav_result = await navigate_to_url(post_url)
+            if nav_result.error:
+                print(f"⚠️  导航警告: {nav_result.error[:100]}")
+        except Exception as e:
+            print(f"⚠️  导航异常: {str(e)[:100]}")
+
+        await wait(8)
+
+        # 滚动详情页
+        print("\n📜 滚动详情页...")
+        for i in range(3):
+            await scroll_page(down=True, pages=1.5)
+            await wait(2)
+
+        # 步骤6: 保存详情页HTML
+        print("\n💾 保存详情页 HTML...")
+        html_result = await get_page_html()
+        if html_result.error:
+            print(f"⚠️  获取HTML失败: {html_result.error}")
+        else:
+            html = html_result.metadata.get("html", "")
+            html_path = output_dir / "container_post_detail.html"
+            html_path.write_text(html or "", encoding="utf-8")
+            print(f"✅ 已保存详情页 HTML: {html_path}")
+
+        # 查找并保存iframe
+        print("\n🔍 查找页面中的iframe...")
+        iframe_js = """
+        (function(){
+            const iframes = document.querySelectorAll('iframe');
+            const results = [];
+            iframes.forEach((iframe, index) => {
+                results.push({
+                    index: index,
+                    src: iframe.src || '',
+                    id: iframe.id || '',
+                    name: iframe.name || ''
+                });
+            });
+            return results;
+        })()
+        """
+
+        iframe_result = await evaluate(iframe_js)
+        if not iframe_result.error:
+            iframe_output = iframe_result.output
+            if isinstance(iframe_output, str) and iframe_output.startswith("Result: "):
+                iframe_output = iframe_output[8:]
+
+            try:
+                iframes = json.loads(iframe_output) if isinstance(iframe_output, str) else iframe_output
+
+                if iframes and len(iframes) > 0:
+                    print(f"✅ 找到 {len(iframes)} 个iframe")
+
+                    for idx, iframe_info in enumerate(iframes):
+                        print(f"\n📄 处理iframe {idx + 1}/{len(iframes)}")
+                        print(f"   src: {iframe_info.get('src', 'N/A')[:80]}")
+
+                        # 获取iframe HTML
+                        get_iframe_html_js = f"""
+                        (function(){{
+                            const iframe = document.querySelectorAll('iframe')[{idx}];
+                            if (!iframe) return null;
+                            try {{
+                                const iframeDoc = iframe.contentDocument || iframe.contentWindow.document;
+                                return iframeDoc.documentElement.outerHTML;
+                            }} catch(e) {{
+                                return 'Error: ' + e.message;
+                            }}
+                        }})()
+                        """
+
+                        iframe_html_result = await evaluate(get_iframe_html_js)
+                        if not iframe_html_result.error:
+                            iframe_html = iframe_html_result.output
+                            if isinstance(iframe_html, str) and iframe_html.startswith("Result: "):
+                                iframe_html = iframe_html[8:]
+
+                            if iframe_html and not iframe_html.startswith("Error:"):
+                                iframe_path = output_dir / f"container_iframe_{idx}.html"
+                                iframe_path.write_text(iframe_html, encoding="utf-8")
+                                print(f"   ✅ 已保存iframe HTML: {iframe_path}")
+                            else:
+                                print(f"   ⚠️  iframe内容为空或无法访问")
+                else:
+                    print("⚠️  页面中没有找到iframe")
+            except Exception as e:
+                print(f"⚠️  处理iframe失败: {str(e)}")
+
+        print("\n✅ 测试完成!")
+
+    except Exception as e:
+        print(f"\n❌ 发生错误: {str(e)}")
+        import traceback
+        traceback.print_exc()
+
+    finally:
+        # 清理浏览器会话
+        try:
+            await cleanup_browser_session()
+        except Exception:
+            pass
+
+
+async def main():
+    await test_xhs_container()
+
+
+if __name__ == "__main__":
+    asyncio.run(main())