Просмотр исходного кода

feat(browser): 重构云浏览器示例以支持数据库配置

- 删除旧的独立配置文件示例,合并为统一的数据库驱动示例
- 在baseClass.py中添加_fetch_profile_id函数从数据库获取cloud_profile_id
- 更新_cookie_domain_for_type函数以支持更多平台
- 修改_fetch_cookie_row查询逻辑,优先使用cookies字段
- 新增cloud_browser_demo_db.py示例,支持从agent_channel_cookies表获取配置
- 更新Claude权限配置,添加timeout命令支持
max_liu 1 месяц назад
Родитель
Сommit
8a83efbee9

+ 2 - 1
.claude/settings.local.json

@@ -7,7 +7,8 @@
       "Read(//usr/local/anaconda3/lib/python3.13/site-packages/browser_use/**)",
       "Bash(tee:*)",
       "Bash(browser-use:*)",
-      "Bash(pip install:*)"
+      "Bash(pip install:*)",
+      "Bash(timeout 60 python:*)"
     ],
     "deny": [],
     "ask": []

+ 21 - 1
agent/tools/builtin/browser/baseClass.py

@@ -271,6 +271,10 @@ def _normalize_cookies(cookie_value: Any, domain: str, url: str) -> List[Dict[st
 def _extract_cookie_value(row: Optional[Dict[str, Any]]) -> Any:
     if not row:
         return None
+    # 优先使用 cookies 字段
+    if "cookies" in row:
+        return row["cookies"]
+    # 兼容其他可能的字段名
     for key, value in row.items():
         if "cookie" in key.lower():
             return value
@@ -282,13 +286,29 @@ def _fetch_cookie_row(cookie_type: str) -> Optional[Dict[str, Any]]:
         return None
     try:
         return mysql.fetchone(
-            "select * from agent_channel_cookies where type=%s order by id desc limit 1",
+            "select * from agent_channel_cookies where type=%s limit 1",
             (cookie_type,)
         )
     except Exception:
         return None
 
 
+def _fetch_profile_id(cookie_type: str) -> Optional[str]:
+    """从数据库获取 cloud_profile_id"""
+    if not cookie_type:
+        return None
+    try:
+        row = mysql.fetchone(
+            "select profileId from agent_channel_cookies where type=%s limit 1",
+            (cookie_type,)
+        )
+        if row and "profileId" in row:
+            return row["profileId"]
+        return None
+    except Exception:
+        return None
+
+
 # ============================================================
 # 导航类工具 (Navigation Tools)
 # ============================================================

+ 118 - 47
examples/cloud_browser_demo.py → examples/cloud_browser_demo_db.py

@@ -1,3 +1,8 @@
+"""
+小红书云浏览器数据获取脚本(数据库配置版)
+从数据库 agent_channel_cookies 获取 Cookie 和 cloud_profile_id
+"""
+
 import sys
 import os
 import asyncio
@@ -22,12 +27,21 @@ from agent.tools.builtin.browser.baseClass import (
     evaluate,
     wait,
     get_page_html,
-    ensure_login_with_cookies,
-    wait_for_user_action,
+    _fetch_cookie_row,
+    _fetch_profile_id,
+    _normalize_cookies,
+    _cookie_domain_for_type,
+    _extract_cookie_value,
 )
 
 
-async def example_xhs_fitness_search() -> dict:
+async def example_xhs_fitness_search(cookie_type: str = "xhs") -> dict:
+    """
+    小红书搜索示例
+
+    Args:
+        cookie_type: Cookie 类型,用于从数据库获取配置
+    """
     print("\n" + "="*60)
     print("示例: 小红书云浏览器搜索 - 健身")
     print("="*60)
@@ -47,49 +61,95 @@ async def example_xhs_fitness_search() -> dict:
         "timestamp": datetime.now().isoformat(),
     }
 
-    for _ in range(3):
+    # 从数据库获取配置
+    print(f"\n🔍 从数据库获取配置 (type={cookie_type})...")
+    profile_id = _fetch_profile_id(cookie_type)
+    cookie_row = _fetch_cookie_row(cookie_type)
+
+    if profile_id:
+        print(f"✅ 获取到 cloud_profile_id: {profile_id}")
+    else:
+        print("⚠️  未找到 cloud_profile_id,将使用环境变量或默认值")
+        profile_id = os.getenv("XHS_PROFILE_ID")
+
+    if cookie_row:
+        print(f"✅ 获取到 Cookie 配置")
+    else:
+        print("⚠️  未找到 Cookie 配置")
+
+    for attempt in range(3):
         try:
+            # 确保每次重试都清理旧会话
+            if attempt > 0:
+                try:
+                    await kill_browser_session()
+                except Exception:
+                    pass
+                await asyncio.sleep(2)  # 等待清理完成
+
+            print(f"\n🌐 启动云浏览器 (尝试 {attempt + 1}/3)...")
             browser, tools = await init_browser_session(
                 headless=False,
                 use_cloud=True,
+                cloud_profile_id=profile_id,
+                user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
+                disable_security=False,
             )
             if browser is None or tools is None:
                 raise RuntimeError("浏览器初始化失败")
 
+            print("✅ 云浏览器启动成功")
+
+            # 访问首页
+            print("\n🏠 访问小红书首页...")
             nav_result = await navigate_to_url("https://www.xiaohongshu.com")
             if nav_result.error:
                 raise RuntimeError(nav_result.error)
             await wait(3)
 
-            login_result = await ensure_login_with_cookies(
-                cookie_type="xhs",
-                url="https://www.xiaohongshu.com"
-            )
-            if login_result.error and "未找到 cookies" not in login_result.error:
-                raise RuntimeError(login_result.error)
-
-            login_payload = {}
-            if isinstance(login_result.output, str) and login_result.output:
-                try:
-                    login_payload = json.loads(login_result.output)
-                except Exception:
-                    login_payload = {}
-
-            if login_payload.get("need_login") and login_payload.get("cookies_count", 0) == 0:
-                await wait_for_user_action(
-                    message="未找到可用 cookies,请在云浏览器中完成小红书登录或验证码,完成后按 Enter 继续",
-                    timeout=300
-                )
+            # 注入 Cookie(如果有)
+            if cookie_row:
+                print("\n🍪 注入 Cookie...")
+                cookie_value = _extract_cookie_value(cookie_row)
+                if cookie_value:
+                    domain, base_url = _cookie_domain_for_type(cookie_type, "https://www.xiaohongshu.com")
+                    cookies = _normalize_cookies(cookie_value, domain, base_url)
+                    if cookies:
+                        await browser._cdp_set_cookies(cookies)
+                        print(f"✅ 成功注入 {len(cookies)} 个 Cookie")
+                        # 刷新页面使 Cookie 生效
+                        await navigate_to_url("https://www.xiaohongshu.com")
+                        await wait(2)
+                    else:
+                        print("⚠️  Cookie 解析失败")
+                else:
+                    print("⚠️  未找到 Cookie 值")
 
+            # 访问搜索页面
+            print(f"\n🔗 访问搜索页面: {keyword}")
             nav_result = await navigate_to_url(search_url)
             if nav_result.error:
                 raise RuntimeError(nav_result.error)
             await wait(8)
 
-            for _ in range(3):
+            # 滚动页面
+            print("\n📜 滚动页面...")
+            for i in range(3):
                 await scroll_page(down=True, pages=2.0)
                 await wait(2)
 
+            # 提取数据
+            print("\n🔍 提取数据...")
+            html_result = await get_page_html()
+            if html_result.error:
+                raise RuntimeError(html_result.error)
+            html = html_result.metadata.get("html", "")
+            output_dir = project_root / "output"
+            output_dir.mkdir(parents=True, exist_ok=True)
+            output_path = output_dir / "xhs.html"
+            output_path.write_text(html or "", encoding="utf-8")
+            print(f"✅ 已保存页面 HTML: {output_path}")
+
             extract_js = """
         (function(){
             const maxCount = 20;
@@ -233,32 +293,28 @@ async def example_xhs_fitness_search() -> dict:
                 return data
 
             data = await run_extract()
-            if isinstance(data, dict) and data.get("error") == "可能被登录或验证码拦截":
-                await wait_for_user_action(
-                    message="请在云浏览器中完成小红书登录或验证码,完成后按 Enter 继续",
-                    timeout=300
-                )
-                nav_result = await navigate_to_url(search_url)
-                if nav_result.error:
-                    raise RuntimeError(nav_result.error)
-                await wait(8)
-                for _ in range(3):
-                    await scroll_page(down=True, pages=2.0)
-                    await wait(2)
-                data = await run_extract()
 
             last_data = data if isinstance(data, dict) else last_data
+
+            # 输出结果
             if isinstance(last_data, dict) and last_data.get("count", 0) > 0:
+                print(f"\n✅ 成功获取 {last_data['count']} 条数据")
+                print(f"数据来源: {last_data.get('source', 'javascript')}")
+                print("\n前 5 条结果:")
+                for i, item in enumerate(last_data["results"][:5], 1):
+                    print(f"{i}. {item['title'][:50]}...")
+
+                # 成功获取数据,清理并返回
+                await cleanup_browser_session()
                 return last_data
-            if isinstance(last_data, dict) and last_data.get("error") != "可能被登录或验证码拦截":
-                return last_data
+
+            if isinstance(last_data, dict) and last_data.get("error") == "可能被登录或验证码拦截":
+                print("\n⚠️  检测到登录或验证码拦截")
+                print("💡 建议:在数据库中配置有效的 Cookie")
+
         except Exception as e:
             err_text = str(e)
-            if any(key in err_text for key in ["WebSocket", "browser not connected", "NoneType"]):
-                try:
-                    await kill_browser_session()
-                except Exception:
-                    pass
+            print(f"⚠️  尝试 {attempt + 1}/3 失败: {err_text}")
             last_data = {
                 "success": False,
                 "keyword": keyword,
@@ -268,14 +324,29 @@ async def example_xhs_fitness_search() -> dict:
                 "timestamp": datetime.now().isoformat(),
             }
         finally:
-            await cleanup_browser_session()
-        await wait(5)
+            # 清理当前会话
+            try:
+                await cleanup_browser_session()
+            except Exception:
+                pass
+
+        # 如果不是最后一次尝试,等待后继续
+        if attempt < 2:
+            print(f"等待 5 秒后重试...")
+            await asyncio.sleep(5)
 
     return last_data
 
 
 async def main():
-    data = await example_xhs_fitness_search()
+    # 可以通过命令行参数指定 cookie_type
+    cookie_type = sys.argv[1] if len(sys.argv) > 1 else "xhs"
+
+    data = await example_xhs_fitness_search(cookie_type)
+
+    print("\n" + "="*60)
+    print("📊 最终结果")
+    print("="*60)
     print(json.dumps(data, ensure_ascii=False, indent=2))
 
 

+ 0 - 169
examples/cloud_xhs.py

@@ -1,169 +0,0 @@
-import os
-import time
-import random
-from dotenv import load_dotenv
-from browser_use import BrowserUse
-from fake_useragent import UserAgent
-
-# 加载环境变量(建议将敏感信息存在.env文件中)
-load_dotenv()
-API_KEY = os.getenv("BROWSER_USE_API_KEY")  # 你的BrowserUse API密钥
-PROFILE_ID = os.getenv("XHS_PROFILE_ID")    # 你的小红书专属Profile ID
-
-# 小红书关键Cookie(替换为你从浏览器导出的完整Cookie)
-XHS_COOKIES = [
-    {"name": "web_session", "value": "你的web_session值"},
-    {"name": "a1", "value": "你的a1值"},
-    {"name": "webId", "value": "你的webId值"},
-    {"name": "gid", "value": "你的gid值"},
-    {"name": "xhsTrackerId", "value": "你的xhsTrackerId值"},
-    {"name": "timestamp", "value": str(int(time.time()))}
-]
-
-# 真实浏览器UA池(匹配小红书常用设备)
-ua = UserAgent()
-XHS_UA = ua.random if ua.random else "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
-
-def init_xhs_browser():
-    """初始化小红书专用浏览器(防指纹+IP+Cookie配置)"""
-    try:
-        browser = BrowserUse(
-            api_key=API_KEY,
-            # 核心反指纹配置
-            stealth=True,                # 开启Stealth模式,清除自动化特征
-            browser_profile_id=PROFILE_ID, # 绑定专属Profile,持久化Cookie/存储
-            # 真实浏览器指纹配置
-            user_agent=XHS_UA,
-            screen_resolution="1920,1080", # 匹配真实屏幕分辨率
-            timezone="Asia/Shanghai",      # 时区与IP归属地一致
-            language="zh-CN,zh;q=0.9",     # 中文语言环境
-            # 禁用自动化标识(关键)
-            extra_args=[
-                "--disable-blink-features=AutomationControlled",
-                "--disable-dev-shm-usage",
-                "--no-sandbox",
-                "--disable-extensions",    # 禁用扩展,减少特征暴露
-                "--disable-images=false"   # 加载图片,模拟真实访问
-            ],
-            # 代理配置(替换为你的专属静态IP)
-            proxy={
-                "type": "http",           # 推荐http/socks5
-                "host": "你的代理IP",
-                "port": 你的代理端口,
-                "username": "代理账号(如有)",
-                "password": "代理密码(如有)"
-            }
-        )
-        print("✅ 浏览器初始化成功")
-        return browser
-    except Exception as e:
-        print(f"❌ 浏览器初始化失败: {e}")
-        return None
-
-def inject_xhs_cookies(browser, url="https://www.xiaohongshu.com"):
-    """注入小红书完整Cookie并验证登录状态"""
-    try:
-        # 先访问小红书首页(建立会话)
-        browser.get(url)
-        time.sleep(random.uniform(2, 4))  # 随机延迟,模拟人类加载
-        
-        # 注入Cookie(逐个添加,确保完整)
-        for cookie in XHS_COOKIES:
-            browser.add_cookie(cookie)
-        print("✅ Cookie注入完成")
-        
-        # 刷新页面,验证登录状态
-        browser.refresh()
-        time.sleep(random.uniform(3, 5))
-        
-        # 检查是否登录成功(通过页面元素判断)
-        page_source = browser.page_source
-        if "未登录" not in page_source and "我的主页" in page_source:
-            print("✅ 小红书登录验证成功")
-            return True
-        else:
-            print("❌ Cookie无效或登录状态未维持")
-            return False
-    except Exception as e:
-        print(f"❌ Cookie注入失败: {e}")
-        return False
-
-def simulate_human_behavior(browser):
-    """模拟人类行为(滚动、延迟、点击),规避行为风控"""
-    try:
-        # 模拟滚动浏览(随机步长)
-        scroll_steps = random.randint(3, 8)
-        for _ in range(scroll_steps):
-            scroll_height = random.randint(300, 800)
-            browser.execute_script(f"window.scrollBy(0, {scroll_height});")
-            time.sleep(random.uniform(1.5, 3.5))  # 随机滚动间隔
-        
-        # 模拟随机停留
-        time.sleep(random.uniform(5, 10))
-        print("✅ 人类行为模拟完成")
-    except Exception as e:
-        print(f"❌ 行为模拟失败: {e}")
-
-def get_xhs_data(browser, target_url):
-    """获取小红书目标页面数据(笔记/用户信息)"""
-    try:
-        # 访问目标页面(非直接请求,模拟点击跳转)
-        browser.get(target_url)
-        time.sleep(random.uniform(4, 6))  # 页面加载延迟
-        
-        # 模拟人类行为后再提取数据
-        simulate_human_behavior(browser)
-        
-        # 提取页面核心数据(可根据需求修改)
-        page_title = browser.title
-        page_html = browser.page_source
-        
-        # 示例:提取笔记标题(小红书笔记页专属)
-        note_title = browser.execute_script("""
-            return document.querySelector('h1[class*="note-title"]')?.innerText || '未找到标题';
-        """)
-        
-        print(f"\n📊 数据提取结果:")
-        print(f"页面标题: {page_title}")
-        print(f"笔记标题: {note_title}")
-        
-        # 可扩展:提取点赞、评论、内容等信息
-        return {
-            "title": note_title,
-            "page_html": page_html,
-            "status": "success"
-        }
-    except Exception as e:
-        print(f"❌ 数据提取失败: {e}")
-        return {"status": "failed", "error": str(e)}
-
-def main():
-    """主流程:初始化→注入Cookie→获取数据→清理会话"""
-    # 1. 初始化浏览器
-    browser = init_xhs_browser()
-    if not browser:
-        return
-    
-    try:
-        # 2. 注入Cookie并验证登录
-        login_success = inject_xhs_cookies(browser)
-        if not login_success:
-            return
-        
-        # 3. 访问目标小红书页面(替换为你要爬取的URL)
-        target_url = "https://www.xiaohongshu.com/explore/65a1b2c3d4e5f67890abcdef"
-        data = get_xhs_data(browser, target_url)
-        
-        if data["status"] == "success":
-            print("\n🎉 小红书数据获取成功!")
-        else:
-            print("\n❌ 小红书数据获取失败,请检查Cookie/IP/指纹配置")
-    
-    finally:
-        # 4. 安全关闭会话(避免资源浪费)
-        time.sleep(random.uniform(2, 3))
-        browser.quit()
-        print("\n🔚 浏览器会话已关闭")
-
-if __name__ == "__main__":
-    main()