| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247 |
- import asyncio
- import json
- import os
- import sys
- from datetime import datetime
- from pathlib import Path
- from urllib.parse import quote
- sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
- from tools.baseClassTools import (
- init_browser_session,
- navigate_to_url,
- wait,
- get_page_html,
- evaluate,
- scroll_page,
- cleanup_browser_session,
- )
- async def run_task():
- project_root = Path(__file__).resolve().parents[1]
- output_dir = project_root / "output"
- output_dir.mkdir(parents=True, exist_ok=True)
- json_file = output_dir / "xhs.json"
- html_file = output_dir / "xhs_page.html"
- def normalize_output(raw: str) -> str:
- value = raw
- if value.startswith("Result: "):
- value = value[8:]
- return value.strip()
- try:
- await init_browser_session(headless=False, profile_name="xhs_profile")
- await navigate_to_url("https://www.xiaohongshu.com")
- await wait(seconds=3)
- keyword = "瑜伽美女"
- search_url = f"https://www.xiaohongshu.com/search_result?keyword={quote(keyword)}&type=51"
- await navigate_to_url(search_url)
- await wait(seconds=6)
- unlock_js = """
- (function(){
- try {
- document.documentElement.classList.remove('reds-lock-scroll');
- document.body.classList.remove('reds-lock-scroll');
- const candidates = Array.from(document.querySelectorAll('[role="dialog"], .reds-modal, .reds-alert, [class*="modal"], [class*="mask"], [class*="dialog"]'));
- for (const el of candidates) {
- try {
- const style = window.getComputedStyle(el);
- const z = parseInt(style.zIndex || '0', 10);
- if (style.position === 'fixed' && z >= 999) {
- el.remove();
- }
- } catch {}
- }
- const closeButtons = Array.from(document.querySelectorAll('button, [role="button"]'));
- for (const btn of closeButtons) {
- const text = (btn.textContent || '').trim();
- const label = (btn.getAttribute('aria-label') || '').trim();
- if (text.includes('关闭') || text.includes('我知道了') || text.includes('同意') || label.includes('关闭')) {
- btn.click();
- }
- }
- return true;
- } catch (e) {
- return false;
- }
- })()
- """
- count_js = """
- (function(){
- const anchorCount = document.querySelectorAll('a[href*="/explore/"]').length;
- const cardCount = document.querySelectorAll('[data-testid="search-note-item"], .note-item, article, li[data-note-id]').length;
- return JSON.stringify({count: Math.max(anchorCount, cardCount)});
- })()
- """
- await evaluate(code=unlock_js)
- for _ in range(8):
- count_result = await evaluate(code=count_js)
- count_output = normalize_output(count_result.output)
- try:
- count_value = int(json.loads(count_output).get("count", 0))
- except Exception:
- count_value = 0
- if count_value >= 5:
- break
- await scroll_page(down=True, pages=0.8)
- await wait(seconds=3)
- await evaluate(code=unlock_js)
- extract_js = """
- (function(){
- try {
- const results = [];
- const jsonScripts = Array.from(document.querySelectorAll('script[type="application/json"], script#__NEXT_DATA__'));
- for (const s of jsonScripts) {
- try {
- const txt = s.textContent.trim();
- if (txt && txt.length > 0) {
- const data = JSON.parse(txt);
- const candidates = [];
- function collect(obj) {
- if (!obj || typeof obj !== 'object') return;
- for (const k of Object.keys(obj)) {
- const v = obj[k];
- if (v && typeof v === 'object') {
- if (Array.isArray(v)) {
- candidates.push(v);
- }
- collect(v);
- }
- }
- }
- collect(data);
- for (const arr of candidates) {
- for (const item of arr) {
- try {
- const title = (item.title || item.noteTitle || item.name || '').toString().trim();
- const link = (item.link || item.url || item.noteUrl || item.jumpUrl || '').toString().trim();
- if ((title || link) && (link.includes('/explore/') || link.startsWith('http'))) {
- results.push({
- index: results.length + 1,
- title,
- link,
- summary: (item.desc || item.content || item.noteDesc || '').toString().trim().substring(0, 200)
- });
- if (results.length >= 20) break;
- }
- } catch {}
- }
- if (results.length >= 20) break;
- }
- }
- } catch {}
- if (results.length >= 5) break;
- }
- if (results.length < 5) {
- const anchors = Array.from(document.querySelectorAll('a[href*="/explore/"]'));
- const seen = new Set();
- for (const a of anchors) {
- try {
- const href = a.href;
- if (!href || seen.has(href)) continue;
- seen.add(href);
- let title = (a.textContent || '').trim();
- if (!title) {
- const img = a.querySelector('img[alt]');
- if (img && img.alt) title = img.alt.trim();
- }
- if (!title) {
- const parentTitle = a.closest('[data-testid="search-note-item"], .note-item, article, li')?.querySelector('[data-testid="note-title"], .title, h3, p');
- if (parentTitle) title = (parentTitle.textContent || '').trim();
- }
- const descEl = a.closest('[data-testid="search-note-item"], .note-item, article, li')?.querySelector('[data-testid="note-desc"], .desc, .description, p');
- const desc = descEl ? (descEl.textContent || '').trim() : '';
- results.push({
- index: results.length + 1,
- title,
- link: href,
- summary: desc.substring(0, 200)
- });
- if (results.length >= 20) break;
- } catch {}
- }
- }
- return {
- success: true,
- count: results.length,
- keyword: '瑜伽美女',
- timestamp: new Date().toISOString(),
- results: results
- };
- } catch (e) {
- return {
- success: false,
- error: e.message,
- stack: e.stack
- };
- }
- })()
- """
- result = await evaluate(code=extract_js)
- output = normalize_output(result.output)
- try:
- data = json.loads(output)
- except json.JSONDecodeError:
- data = {
- "success": False,
- "error": "JSON解析失败",
- "raw_output": output[:1000],
- "keyword": keyword,
- "timestamp": datetime.now().isoformat(),
- }
- with open(json_file, "w", encoding="utf-8") as f:
- json.dump(data, f, ensure_ascii=False, indent=2)
- html_result = await get_page_html()
- html_content = html_result.metadata.get("html", "")
- page_url = html_result.metadata.get("url", "")
- page_title = html_result.metadata.get("title", "")
- meta_info = (
- "\n".join(
- [
- "<!--",
- f" 页面标题: {page_title}",
- f" 页面URL: {page_url}",
- f" 保存时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
- f" 搜索关键词: {keyword}",
- "-->",
- "",
- ]
- )
- + "\n"
- )
- with open(html_file, "w", encoding="utf-8") as f:
- f.write(meta_info)
- f.write(html_content)
- print(f"✅ 数据已保存到: {json_file}")
- print(f"✅ HTML 已保存到: {html_file}")
- finally:
- await cleanup_browser_session()
- def main():
- asyncio.run(run_task())
- if __name__ == "__main__":
- main()
|