| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354 |
- """
- 小红书云浏览器数据获取脚本(数据库配置版)
- 从数据库 agent_channel_cookies 获取 Cookie 和 cloud_profile_id
- """
- import sys
- import os
- import asyncio
- import json
- import re
- from datetime import datetime
- from pathlib import Path
- from urllib.parse import quote
- from dotenv import load_dotenv
- load_dotenv()
- project_root = Path(__file__).parent.parent
- sys.path.insert(0, str(project_root))
- from agent.tools.builtin.browser.baseClass import (
- init_browser_session,
- cleanup_browser_session,
- kill_browser_session,
- browser_navigate_to_url,
- browser_scroll_page,
- browser_evaluate,
- browser_wait,
- browser_get_page_html,
- _fetch_cookie_row,
- _fetch_profile_id,
- _normalize_cookies,
- _cookie_domain_for_type,
- _extract_cookie_value,
- )
- async def example_xhs_fitness_search(cookie_type: str = "xhs") -> dict:
- """
- 小红书搜索示例
- Args:
- cookie_type: Cookie 类型,用于从数据库获取配置
- """
- print("\n" + "="*60)
- print("示例: 小红书云浏览器搜索 - 健身")
- print("="*60)
- api_key = os.getenv("BROWSER_USE_API_KEY")
- if not api_key:
- raise RuntimeError("未找到 BROWSER_USE_API_KEY")
- keyword = "健身"
- search_url = f"https://www.xiaohongshu.com/search_result?keyword={quote(keyword)}&type=51"
- last_data: dict = {
- "success": False,
- "keyword": keyword,
- "count": 0,
- "results": [],
- "error": "未知错误",
- "timestamp": datetime.now().isoformat(),
- }
- # 从数据库获取配置
- print(f"\n🔍 从数据库获取配置 (type={cookie_type})...")
- profile_id = _fetch_profile_id(cookie_type)
- cookie_row = _fetch_cookie_row(cookie_type)
- if profile_id:
- print(f"✅ 获取到 cloud_profile_id: {profile_id}")
- else:
- print("⚠️ 未找到 cloud_profile_id,将使用环境变量或默认值")
- profile_id = os.getenv("XHS_PROFILE_ID")
- if cookie_row:
- print(f"✅ 获取到 Cookie 配置")
- else:
- print("⚠️ 未找到 Cookie 配置")
- for attempt in range(3):
- try:
- # 确保每次重试都清理旧会话
- if attempt > 0:
- try:
- await kill_browser_session()
- except Exception:
- pass
- await asyncio.sleep(2) # 等待清理完成
- print(f"\n🌐 启动云浏览器 (尝试 {attempt + 1}/3)...")
- browser, tools = await init_browser_session(
- headless=False,
- use_cloud=True,
- cloud_profile_id=profile_id,
- user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
- disable_security=False,
- )
- if browser is None or tools is None:
- raise RuntimeError("浏览器初始化失败")
- print("✅ 云浏览器启动成功")
- # 访问首页
- print("\n🏠 访问小红书首页...")
- nav_result = await browser_navigate_to_url("https://www.xiaohongshu.com")
- if nav_result.error:
- raise RuntimeError(nav_result.error)
- await browser_wait(3)
- # 注入 Cookie(如果有)
- if cookie_row:
- print("\n🍪 注入 Cookie...")
- cookie_value = _extract_cookie_value(cookie_row)
- if cookie_value:
- domain, base_url = _cookie_domain_for_type(cookie_type, "https://www.xiaohongshu.com")
- cookies = _normalize_cookies(cookie_value, domain, base_url)
- if cookies:
- await browser._cdp_set_cookies(cookies)
- print(f"✅ 成功注入 {len(cookies)} 个 Cookie")
- # 刷新页面使 Cookie 生效
- await navigate_to_url("https://www.xiaohongshu.com")
- await browser_wait(2)
- else:
- print("⚠️ Cookie 解析失败")
- else:
- print("⚠️ 未找到 Cookie 值")
- # 访问搜索页面
- print(f"\n🔗 访问搜索页面: {keyword}")
- nav_result = await browser_navigate_to_url(search_url)
- if nav_result.error:
- raise RuntimeError(nav_result.error)
- await browser_wait(8)
- # 滚动页面
- print("\n📜 滚动页面...")
- for i in range(3):
- await browser_scroll_page(down=True, pages=2.0)
- await browser_wait(2)
- # 提取数据
- print("\n🔍 提取数据...")
- html_result = await browser_get_page_html()
- if html_result.error:
- raise RuntimeError(html_result.error)
- html = html_result.metadata.get("html", "")
- output_dir = project_root / "output"
- output_dir.mkdir(parents=True, exist_ok=True)
- output_path = output_dir / "xhs.html"
- output_path.write_text(html or "", encoding="utf-8")
- print(f"✅ 已保存页面 HTML: {output_path}")
- extract_js = """
- (function(){
- const maxCount = 20;
- const seen = new Set();
- const results = [];
- function pushItem(item){
- if (!item || !item.link || seen.has(item.link)) return;
- seen.add(item.link);
- results.push(item);
- }
- const anchors = document.querySelectorAll('a[href*="/explore/"]');
- anchors.forEach(a => {
- if (results.length >= maxCount) return;
- const link = a.href || '';
- const img = a.querySelector('img');
- const title = ((img && img.alt) || a.textContent || '').trim();
- const cover = (img && img.src) || '';
- if (link && title) {
- pushItem({ title, link, cover });
- }
- });
- const scriptNodes = document.querySelectorAll('script[type="application/json"], script#__NEXT_DATA__, script#__NUXT__');
- const walk = (node) => {
- if (!node || results.length >= maxCount) return;
- if (Array.isArray(node)) {
- for (const item of node) {
- walk(item);
- if (results.length >= maxCount) return;
- }
- return;
- }
- if (typeof node === 'object') {
- const title = (node.title || node.desc || node.name || node.noteTitle || '').toString().trim();
- const id = node.noteId || node.note_id || node.id || node.noteID;
- const cover = (node.cover && (node.cover.url || node.cover.urlDefault)) || node.coverUrl || node.image || '';
- let link = '';
- if (id) {
- link = `https://www.xiaohongshu.com/explore/${id}`;
- }
- if (title && link) {
- pushItem({ title, link, cover });
- }
- for (const key in node) {
- if (typeof node[key] === 'object') walk(node[key]);
- }
- }
- };
- scriptNodes.forEach(node => {
- if (results.length >= maxCount) return;
- const text = node.textContent || '';
- if (!text) return;
- try {
- const data = JSON.parse(text);
- walk(data);
- } catch (e) {}
- });
- return {
- success: true,
- keyword: __KEYWORD__,
- count: results.length,
- results: results,
- timestamp: new Date().toISOString(),
- };
- })()
- """
- extract_js = extract_js.replace("__KEYWORD__", json.dumps(keyword, ensure_ascii=False))
- async def run_extract() -> dict:
- result = await browser_evaluate(extract_js)
- if result.error:
- raise RuntimeError(result.error)
- output = result.output
- if isinstance(output, str) and output.startswith("Result: "):
- output = output[8:]
- if not output:
- return {
- "success": False,
- "keyword": keyword,
- "count": 0,
- "results": [],
- "error": "可能被登录或验证码拦截",
- "timestamp": datetime.now().isoformat(),
- }
- try:
- data = json.loads(output)
- except Exception:
- data = {
- "success": False,
- "keyword": keyword,
- "count": 0,
- "results": [],
- "error": "JSON 解析失败",
- "raw_output": str(output)[:2000],
- "timestamp": datetime.now().isoformat(),
- }
- if isinstance(data, dict) and data.get("count", 0) == 0:
- html_result = await browser_get_page_html()
- if html_result.error:
- raise RuntimeError(html_result.error)
- html = html_result.metadata.get("html", "")
- blocked_markers = ["登录", "验证", "验证码", "请先登录", "异常访问"]
- if html and any(marker in html for marker in blocked_markers):
- data = {
- "success": False,
- "keyword": keyword,
- "count": 0,
- "results": [],
- "error": "可能被登录或验证码拦截",
- "timestamp": datetime.now().isoformat(),
- }
- elif html:
- results = []
- seen = set()
- pattern = re.compile(r'"noteId":"(.*?)".*?"title":"(.*?)"', re.S)
- for match in pattern.finditer(html):
- note_id = match.group(1)
- title = match.group(2).encode("utf-8", "ignore").decode("unicode_escape").strip()
- link = f"https://www.xiaohongshu.com/explore/{note_id}"
- if note_id and link not in seen and title:
- seen.add(link)
- results.append({"title": title, "link": link})
- if len(results) >= 20:
- break
- if results:
- data = {
- "success": True,
- "keyword": keyword,
- "count": len(results),
- "results": results,
- "timestamp": datetime.now().isoformat(),
- "source": "html_fallback",
- }
- return data
- data = await run_extract()
- last_data = data if isinstance(data, dict) else last_data
- # 输出结果
- if isinstance(last_data, dict) and last_data.get("count", 0) > 0:
- print(f"\n✅ 成功获取 {last_data['count']} 条数据")
- print(f"数据来源: {last_data.get('source', 'javascript')}")
- print("\n前 5 条结果:")
- for i, item in enumerate(last_data["results"][:5], 1):
- print(f"{i}. {item['title'][:50]}...")
- # 成功获取数据,清理并返回
- await cleanup_browser_session()
- return last_data
- if isinstance(last_data, dict) and last_data.get("error") == "可能被登录或验证码拦截":
- print("\n⚠️ 检测到登录或验证码拦截")
- print("💡 建议:在数据库中配置有效的 Cookie")
- except Exception as e:
- err_text = str(e)
- print(f"⚠️ 尝试 {attempt + 1}/3 失败: {err_text}")
- last_data = {
- "success": False,
- "keyword": keyword,
- "count": 0,
- "results": [],
- "error": err_text,
- "timestamp": datetime.now().isoformat(),
- }
- finally:
- # 清理当前会话
- try:
- await cleanup_browser_session()
- except Exception:
- pass
- # 如果不是最后一次尝试,等待后继续
- if attempt < 2:
- print(f"等待 5 秒后重试...")
- await asyncio.sleep(5)
- return last_data
- async def main():
- # 可以通过命令行参数指定 cookie_type
- cookie_type = sys.argv[1] if len(sys.argv) > 1 else "xhs"
- data = await example_xhs_fitness_search(cookie_type)
- print("\n" + "="*60)
- print("📊 最终结果")
- print("="*60)
- print(json.dumps(data, ensure_ascii=False, indent=2))
- if __name__ == "__main__":
- asyncio.run(main())
|