""" 小红书云浏览器数据获取脚本(数据库配置版) 从数据库 agent_channel_cookies 获取 Cookie 和 cloud_profile_id """ import sys import os import asyncio import json import re from datetime import datetime from pathlib import Path from urllib.parse import quote from dotenv import load_dotenv load_dotenv() project_root = Path(__file__).parent.parent sys.path.insert(0, str(project_root)) from agent.tools.builtin.browser.baseClass import ( init_browser_session, cleanup_browser_session, kill_browser_session, browser_navigate_to_url, browser_scroll_page, browser_evaluate, browser_wait, browser_get_page_html, _fetch_cookie_row, _fetch_profile_id, _normalize_cookies, _cookie_domain_for_type, _extract_cookie_value, ) async def example_xhs_fitness_search(cookie_type: str = "xhs") -> dict: """ 小红书搜索示例 Args: cookie_type: Cookie 类型,用于从数据库获取配置 """ print("\n" + "="*60) print("示例: 小红书云浏览器搜索 - 健身") print("="*60) api_key = os.getenv("BROWSER_USE_API_KEY") if not api_key: raise RuntimeError("未找到 BROWSER_USE_API_KEY") keyword = "健身" search_url = f"https://www.xiaohongshu.com/search_result?keyword={quote(keyword)}&type=51" last_data: dict = { "success": False, "keyword": keyword, "count": 0, "results": [], "error": "未知错误", "timestamp": datetime.now().isoformat(), } # 从数据库获取配置 print(f"\n🔍 从数据库获取配置 (type={cookie_type})...") profile_id = _fetch_profile_id(cookie_type) cookie_row = _fetch_cookie_row(cookie_type) if profile_id: print(f"✅ 获取到 cloud_profile_id: {profile_id}") else: print("⚠️ 未找到 cloud_profile_id,将使用环境变量或默认值") profile_id = os.getenv("XHS_PROFILE_ID") if cookie_row: print(f"✅ 获取到 Cookie 配置") else: print("⚠️ 未找到 Cookie 配置") for attempt in range(3): try: # 确保每次重试都清理旧会话 if attempt > 0: try: await kill_browser_session() except Exception: pass await asyncio.sleep(2) # 等待清理完成 print(f"\n🌐 启动云浏览器 (尝试 {attempt + 1}/3)...") browser, tools = await init_browser_session( headless=False, use_cloud=True, cloud_profile_id=profile_id, user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", disable_security=False, ) if browser is None or tools is None: raise RuntimeError("浏览器初始化失败") print("✅ 云浏览器启动成功") # 访问首页 print("\n🏠 访问小红书首页...") nav_result = await browser_navigate_to_url("https://www.xiaohongshu.com") if nav_result.error: raise RuntimeError(nav_result.error) await browser_wait(3) # 注入 Cookie(如果有) if cookie_row: print("\n🍪 注入 Cookie...") cookie_value = _extract_cookie_value(cookie_row) if cookie_value: domain, base_url = _cookie_domain_for_type(cookie_type, "https://www.xiaohongshu.com") cookies = _normalize_cookies(cookie_value, domain, base_url) if cookies: await browser._cdp_set_cookies(cookies) print(f"✅ 成功注入 {len(cookies)} 个 Cookie") # 刷新页面使 Cookie 生效 await navigate_to_url("https://www.xiaohongshu.com") await browser_wait(2) else: print("⚠️ Cookie 解析失败") else: print("⚠️ 未找到 Cookie 值") # 访问搜索页面 print(f"\n🔗 访问搜索页面: {keyword}") nav_result = await browser_navigate_to_url(search_url) if nav_result.error: raise RuntimeError(nav_result.error) await browser_wait(8) # 滚动页面 print("\n📜 滚动页面...") for i in range(3): await browser_scroll_page(down=True, pages=2.0) await browser_wait(2) # 提取数据 print("\n🔍 提取数据...") html_result = await browser_get_page_html() if html_result.error: raise RuntimeError(html_result.error) html = html_result.metadata.get("html", "") output_dir = project_root / "output" output_dir.mkdir(parents=True, exist_ok=True) output_path = output_dir / "xhs.html" output_path.write_text(html or "", encoding="utf-8") print(f"✅ 已保存页面 HTML: {output_path}") extract_js = """ (function(){ const maxCount = 20; const seen = new Set(); const results = []; function pushItem(item){ if (!item || !item.link || seen.has(item.link)) return; seen.add(item.link); results.push(item); } const anchors = document.querySelectorAll('a[href*="/explore/"]'); anchors.forEach(a => { if (results.length >= maxCount) return; const link = a.href || ''; const img = a.querySelector('img'); const title = ((img && img.alt) || a.textContent || '').trim(); const cover = (img && img.src) || ''; if (link && title) { pushItem({ title, link, cover }); } }); const scriptNodes = document.querySelectorAll('script[type="application/json"], script#__NEXT_DATA__, script#__NUXT__'); const walk = (node) => { if (!node || results.length >= maxCount) return; if (Array.isArray(node)) { for (const item of node) { walk(item); if (results.length >= maxCount) return; } return; } if (typeof node === 'object') { const title = (node.title || node.desc || node.name || node.noteTitle || '').toString().trim(); const id = node.noteId || node.note_id || node.id || node.noteID; const cover = (node.cover && (node.cover.url || node.cover.urlDefault)) || node.coverUrl || node.image || ''; let link = ''; if (id) { link = `https://www.xiaohongshu.com/explore/${id}`; } if (title && link) { pushItem({ title, link, cover }); } for (const key in node) { if (typeof node[key] === 'object') walk(node[key]); } } }; scriptNodes.forEach(node => { if (results.length >= maxCount) return; const text = node.textContent || ''; if (!text) return; try { const data = JSON.parse(text); walk(data); } catch (e) {} }); return { success: true, keyword: __KEYWORD__, count: results.length, results: results, timestamp: new Date().toISOString(), }; })() """ extract_js = extract_js.replace("__KEYWORD__", json.dumps(keyword, ensure_ascii=False)) async def run_extract() -> dict: result = await browser_evaluate(extract_js) if result.error: raise RuntimeError(result.error) output = result.output if isinstance(output, str) and output.startswith("Result: "): output = output[8:] if not output: return { "success": False, "keyword": keyword, "count": 0, "results": [], "error": "可能被登录或验证码拦截", "timestamp": datetime.now().isoformat(), } try: data = json.loads(output) except Exception: data = { "success": False, "keyword": keyword, "count": 0, "results": [], "error": "JSON 解析失败", "raw_output": str(output)[:2000], "timestamp": datetime.now().isoformat(), } if isinstance(data, dict) and data.get("count", 0) == 0: html_result = await browser_get_page_html() if html_result.error: raise RuntimeError(html_result.error) html = html_result.metadata.get("html", "") blocked_markers = ["登录", "验证", "验证码", "请先登录", "异常访问"] if html and any(marker in html for marker in blocked_markers): data = { "success": False, "keyword": keyword, "count": 0, "results": [], "error": "可能被登录或验证码拦截", "timestamp": datetime.now().isoformat(), } elif html: results = [] seen = set() pattern = re.compile(r'"noteId":"(.*?)".*?"title":"(.*?)"', re.S) for match in pattern.finditer(html): note_id = match.group(1) title = match.group(2).encode("utf-8", "ignore").decode("unicode_escape").strip() link = f"https://www.xiaohongshu.com/explore/{note_id}" if note_id and link not in seen and title: seen.add(link) results.append({"title": title, "link": link}) if len(results) >= 20: break if results: data = { "success": True, "keyword": keyword, "count": len(results), "results": results, "timestamp": datetime.now().isoformat(), "source": "html_fallback", } return data data = await run_extract() last_data = data if isinstance(data, dict) else last_data # 输出结果 if isinstance(last_data, dict) and last_data.get("count", 0) > 0: print(f"\n✅ 成功获取 {last_data['count']} 条数据") print(f"数据来源: {last_data.get('source', 'javascript')}") print("\n前 5 条结果:") for i, item in enumerate(last_data["results"][:5], 1): print(f"{i}. {item['title'][:50]}...") # 成功获取数据,清理并返回 await cleanup_browser_session() return last_data if isinstance(last_data, dict) and last_data.get("error") == "可能被登录或验证码拦截": print("\n⚠️ 检测到登录或验证码拦截") print("💡 建议:在数据库中配置有效的 Cookie") except Exception as e: err_text = str(e) print(f"⚠️ 尝试 {attempt + 1}/3 失败: {err_text}") last_data = { "success": False, "keyword": keyword, "count": 0, "results": [], "error": err_text, "timestamp": datetime.now().isoformat(), } finally: # 清理当前会话 try: await cleanup_browser_session() except Exception: pass # 如果不是最后一次尝试,等待后继续 if attempt < 2: print(f"等待 5 秒后重试...") await asyncio.sleep(5) return last_data async def main(): # 可以通过命令行参数指定 cookie_type cookie_type = sys.argv[1] if len(sys.argv) > 1 else "xhs" data = await example_xhs_fitness_search(cookie_type) print("\n" + "="*60) print("📊 最终结果") print("="*60) print(json.dumps(data, ensure_ascii=False, indent=2)) if __name__ == "__main__": asyncio.run(main())