| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283 |
- import sys
- import os
- import asyncio
- import json
- import re
- from datetime import datetime
- from pathlib import Path
- from urllib.parse import quote
- from dotenv import load_dotenv
- load_dotenv()
- project_root = Path(__file__).parent.parent
- sys.path.insert(0, str(project_root))
- from agent.tools.builtin.browser.baseClass import (
- init_browser_session,
- cleanup_browser_session,
- kill_browser_session,
- navigate_to_url,
- scroll_page,
- evaluate,
- wait,
- get_page_html,
- ensure_login_with_cookies,
- wait_for_user_action,
- )
- async def example_xhs_fitness_search() -> dict:
- print("\n" + "="*60)
- print("示例: 小红书云浏览器搜索 - 健身")
- print("="*60)
- api_key = os.getenv("BROWSER_USE_API_KEY")
- if not api_key:
- raise RuntimeError("未找到 BROWSER_USE_API_KEY")
- keyword = "健身"
- search_url = f"https://www.xiaohongshu.com/search_result?keyword={quote(keyword)}&type=51"
- last_data: dict = {
- "success": False,
- "keyword": keyword,
- "count": 0,
- "results": [],
- "error": "未知错误",
- "timestamp": datetime.now().isoformat(),
- }
- for _ in range(3):
- try:
- browser, tools = await init_browser_session(
- headless=False,
- use_cloud=True,
- )
- if browser is None or tools is None:
- raise RuntimeError("浏览器初始化失败")
- nav_result = await navigate_to_url("https://www.xiaohongshu.com")
- if nav_result.error:
- raise RuntimeError(nav_result.error)
- await wait(3)
- login_result = await ensure_login_with_cookies(
- cookie_type="xhs",
- url="https://www.xiaohongshu.com"
- )
- if login_result.error and "未找到 cookies" not in login_result.error:
- raise RuntimeError(login_result.error)
- login_payload = {}
- if isinstance(login_result.output, str) and login_result.output:
- try:
- login_payload = json.loads(login_result.output)
- except Exception:
- login_payload = {}
- if login_payload.get("need_login") and login_payload.get("cookies_count", 0) == 0:
- await wait_for_user_action(
- message="未找到可用 cookies,请在云浏览器中完成小红书登录或验证码,完成后按 Enter 继续",
- timeout=300
- )
- nav_result = await navigate_to_url(search_url)
- if nav_result.error:
- raise RuntimeError(nav_result.error)
- await wait(8)
- for _ in range(3):
- await scroll_page(down=True, pages=2.0)
- await wait(2)
- extract_js = """
- (function(){
- const maxCount = 20;
- const seen = new Set();
- const results = [];
- function pushItem(item){
- if (!item || !item.link || seen.has(item.link)) return;
- seen.add(item.link);
- results.push(item);
- }
- const anchors = document.querySelectorAll('a[href*="/explore/"]');
- anchors.forEach(a => {
- if (results.length >= maxCount) return;
- const link = a.href || '';
- const img = a.querySelector('img');
- const title = ((img && img.alt) || a.textContent || '').trim();
- const cover = (img && img.src) || '';
- if (link && title) {
- pushItem({ title, link, cover });
- }
- });
- const scriptNodes = document.querySelectorAll('script[type="application/json"], script#__NEXT_DATA__, script#__NUXT__');
- const walk = (node) => {
- if (!node || results.length >= maxCount) return;
- if (Array.isArray(node)) {
- for (const item of node) {
- walk(item);
- if (results.length >= maxCount) return;
- }
- return;
- }
- if (typeof node === 'object') {
- const title = (node.title || node.desc || node.name || node.noteTitle || '').toString().trim();
- const id = node.noteId || node.note_id || node.id || node.noteID;
- const cover = (node.cover && (node.cover.url || node.cover.urlDefault)) || node.coverUrl || node.image || '';
- let link = '';
- if (id) {
- link = `https://www.xiaohongshu.com/explore/${id}`;
- }
- if (title && link) {
- pushItem({ title, link, cover });
- }
- for (const key in node) {
- if (typeof node[key] === 'object') walk(node[key]);
- }
- }
- };
- scriptNodes.forEach(node => {
- if (results.length >= maxCount) return;
- const text = node.textContent || '';
- if (!text) return;
- try {
- const data = JSON.parse(text);
- walk(data);
- } catch (e) {}
- });
- return {
- success: true,
- keyword: __KEYWORD__,
- count: results.length,
- results: results,
- timestamp: new Date().toISOString(),
- };
- })()
- """
- extract_js = extract_js.replace("__KEYWORD__", json.dumps(keyword, ensure_ascii=False))
- async def run_extract() -> dict:
- result = await evaluate(extract_js)
- if result.error:
- raise RuntimeError(result.error)
- output = result.output
- if isinstance(output, str) and output.startswith("Result: "):
- output = output[8:]
- if not output:
- return {
- "success": False,
- "keyword": keyword,
- "count": 0,
- "results": [],
- "error": "可能被登录或验证码拦截",
- "timestamp": datetime.now().isoformat(),
- }
- try:
- data = json.loads(output)
- except Exception:
- data = {
- "success": False,
- "keyword": keyword,
- "count": 0,
- "results": [],
- "error": "JSON 解析失败",
- "raw_output": str(output)[:2000],
- "timestamp": datetime.now().isoformat(),
- }
- if isinstance(data, dict) and data.get("count", 0) == 0:
- html_result = await get_page_html()
- if html_result.error:
- raise RuntimeError(html_result.error)
- html = html_result.metadata.get("html", "")
- blocked_markers = ["登录", "验证", "验证码", "请先登录", "异常访问"]
- if html and any(marker in html for marker in blocked_markers):
- data = {
- "success": False,
- "keyword": keyword,
- "count": 0,
- "results": [],
- "error": "可能被登录或验证码拦截",
- "timestamp": datetime.now().isoformat(),
- }
- elif html:
- results = []
- seen = set()
- pattern = re.compile(r'"noteId":"(.*?)".*?"title":"(.*?)"', re.S)
- for match in pattern.finditer(html):
- note_id = match.group(1)
- title = match.group(2).encode("utf-8", "ignore").decode("unicode_escape").strip()
- link = f"https://www.xiaohongshu.com/explore/{note_id}"
- if note_id and link not in seen and title:
- seen.add(link)
- results.append({"title": title, "link": link})
- if len(results) >= 20:
- break
- if results:
- data = {
- "success": True,
- "keyword": keyword,
- "count": len(results),
- "results": results,
- "timestamp": datetime.now().isoformat(),
- "source": "html_fallback",
- }
- return data
- data = await run_extract()
- if isinstance(data, dict) and data.get("error") == "可能被登录或验证码拦截":
- await wait_for_user_action(
- message="请在云浏览器中完成小红书登录或验证码,完成后按 Enter 继续",
- timeout=300
- )
- nav_result = await navigate_to_url(search_url)
- if nav_result.error:
- raise RuntimeError(nav_result.error)
- await wait(8)
- for _ in range(3):
- await scroll_page(down=True, pages=2.0)
- await wait(2)
- data = await run_extract()
- last_data = data if isinstance(data, dict) else last_data
- if isinstance(last_data, dict) and last_data.get("count", 0) > 0:
- return last_data
- if isinstance(last_data, dict) and last_data.get("error") != "可能被登录或验证码拦截":
- return last_data
- except Exception as e:
- err_text = str(e)
- if any(key in err_text for key in ["WebSocket", "browser not connected", "NoneType"]):
- try:
- await kill_browser_session()
- except Exception:
- pass
- last_data = {
- "success": False,
- "keyword": keyword,
- "count": 0,
- "results": [],
- "error": err_text,
- "timestamp": datetime.now().isoformat(),
- }
- finally:
- await cleanup_browser_session()
- await wait(5)
- return last_data
- async def main():
- data = await example_xhs_fitness_search()
- print(json.dumps(data, ensure_ascii=False, indent=2))
- if __name__ == "__main__":
- asyncio.run(main())
|