| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281 |
- """
- 按 URL 抓取小红书帖子内容。
- 走 explore 页 HTML 内嵌的 window.__INITIAL_STATE__ JSON,不需要 cookie;
- URL 必须带 xsec_token(分享/搜索/explore_feed 链接默认都带)。
- 输出字段集与 examples/process_pipeline/output/<id>/raw_cases/source.json 的
- post 子对象对齐:channel_content_id / title / content_type / body_text /
- like_count / publish_timestamp / images / videos / channel / link
- 用法:
- python xhs_fetch/xhs_fetch.py <url> [<url> ...] [--output <subdir>]
- python xhs_fetch/xhs_fetch.py --urls-file urls.txt
- python xhs_fetch/xhs_fetch.py <url> --stdout # 不写文件,打 JSON 数组到 stdout
- 退码:0 全成功 / 1 全失败或参数错 / 2 部分失败 / 130 Ctrl+C
- 脚本通过探测 .git/pyproject.toml 自动定位项目根,可以放在仓库内任意位置。
- """
- import argparse
- import json
- import re
- import sys
- from pathlib import Path
- from typing import Any, Dict, List, Optional
- import httpx
- # Windows 控制台 UTF-8(中文输出必备)
- for _s in (sys.stdout, sys.stderr):
- try:
- _s.reconfigure(encoding="utf-8")
- except (AttributeError, OSError):
- pass
- def _find_project_root(start: Path) -> Path:
- """沿父目录上爬找 .git / pyproject.toml。"""
- p = start.resolve()
- for ancestor in [p, *p.parents]:
- if (ancestor / ".git").exists() or (ancestor / "pyproject.toml").exists():
- return ancestor
- return start.resolve().parent
- PROJECT_ROOT = _find_project_root(Path(__file__))
- SCRIPT_DIR = Path(__file__).resolve().parent
- OUTPUTS_DIR = SCRIPT_DIR / "outputs"
- sys.path.insert(0, str(PROJECT_ROOT))
- try:
- from dotenv import load_dotenv
- load_dotenv(PROJECT_ROOT / ".env")
- except ImportError:
- pass
- # ── 抓取 / 解析 ─────────────────────────────────────
- HEADERS = {
- "User-Agent": (
- "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
- "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0 Safari/537.36"
- ),
- "Accept": "text/html,application/xhtml+xml",
- }
- # 注意:用 .*? + 后面 </script> 固定锚点。lazy 的 \{.+?\} 在嵌套 JSON 上会断在第一个 }。
- INITIAL_STATE_RE = re.compile(
- r"window\.__INITIAL_STATE__\s*=\s*(\{.*?\})\s*</script>", re.S
- )
- def _parse_initial_state(html: str) -> Dict[str, Any]:
- m = INITIAL_STATE_RE.search(html)
- if not m:
- raise RuntimeError("INITIAL_STATE not found(页面可能未渲染或被风控)")
- # SSR 直接把 JS undefined 当裸字面量塞了进来,json.loads 不接,先替换
- raw = re.sub(r":\s*undefined", ": null", m.group(1))
- return json.loads(raw)
- def _coerce_int(v: Any) -> int:
- s = str(v or "").strip()
- if s.isdigit():
- return int(s)
- try:
- return int(float(s))
- except (ValueError, TypeError):
- return 0
- def parse_post(html: str) -> Dict[str, Any]:
- """从 explore 页 HTML 解析出一个 post 字典(source.json 兼容格式)。"""
- data = _parse_initial_state(html)
- nd_map = ((data.get("note") or {}).get("noteDetailMap")) or {}
- if not nd_map:
- raise RuntimeError("noteDetailMap empty")
- nid, val = next(iter(nd_map.items()))
- note = (val or {}).get("note") or {}
- images = [
- (img.get("urlDefault") or img.get("url") or "")
- for img in (note.get("imageList") or [])
- if (img.get("urlDefault") or img.get("url"))
- ]
- interact = note.get("interactInfo") or {}
- return {
- "channel_content_id": nid,
- "title": note.get("title") or "",
- "content_type": note.get("type") or "note",
- "body_text": note.get("desc") or "",
- "like_count": _coerce_int(interact.get("likedCount")),
- "publish_timestamp": note.get("time") or "",
- "images": images,
- "videos": [],
- "channel": "xhs",
- "link": f"https://www.xiaohongshu.com/explore/{nid}",
- }
- def fetch_one(client: httpx.Client, url: str) -> Dict[str, Any]:
- r = client.get(url, headers=HEADERS, follow_redirects=True, timeout=30.0)
- r.raise_for_status()
- return parse_post(r.text)
- # ── 输出沙盒 ────────────────────────────────────────
- def resolve_output_subdir(rel_path: Optional[str]) -> Path:
- """把 --output 解析到 OUTPUTS_DIR 之下,禁止绝对路径与 '..' 越界。"""
- if not rel_path:
- return OUTPUTS_DIR
- p = Path(rel_path)
- if p.is_absolute():
- raise SystemExit(f"ERROR: --output 必须是相对路径: {rel_path!r}")
- target = (OUTPUTS_DIR / p).resolve()
- try:
- target.relative_to(OUTPUTS_DIR.resolve())
- except ValueError:
- raise SystemExit(f"ERROR: --output 越界到 {target}(不允许 '..')")
- return target
- def safe_filename(post: Dict[str, Any]) -> str:
- title = post.get("title") or post.get("channel_content_id") or "untitled"
- safe = re.sub(r"[^\w一-龥]+", "_", title)[:40].strip("_")
- return f"xhs_{post['channel_content_id'][:12]}_{safe}.json"
- # ── 输入收集 ────────────────────────────────────────
- def load_urls(args) -> List[str]:
- urls: List[str] = []
- if args.urls_file:
- text = Path(args.urls_file).read_text(encoding="utf-8", errors="replace")
- urls.extend(
- line.strip()
- for line in text.splitlines()
- if line.strip() and not line.strip().startswith("#")
- )
- urls.extend(args.urls or [])
- if not urls:
- raise SystemExit(
- "ERROR: 请通过位置参数或 --urls-file 提供至少一个 URL"
- )
- return urls
- # ── CLI ─────────────────────────────────────────────
- def build_parser() -> argparse.ArgumentParser:
- p = argparse.ArgumentParser(
- description=__doc__,
- formatter_class=argparse.RawDescriptionHelpFormatter,
- )
- p.add_argument(
- "urls", nargs="*",
- help="小红书 explore URL(一个或多个,需带 xsec_token)",
- )
- p.add_argument(
- "--urls-file",
- help="URL 列表文件路径(每行一个 URL,# 开头为注释)",
- )
- p.add_argument(
- "--output",
- help="相对 outputs/ 的子目录路径,用于本次输出(默认直接写到 outputs/)",
- )
- p.add_argument(
- "--stdout", action="store_true",
- help="不写文件,把抓到的 post 数组打到 stdout(JSON)",
- )
- return p
- def run(args) -> int:
- urls = load_urls(args)
- out_dir: Optional[Path] = None
- if not args.stdout:
- out_dir = resolve_output_subdir(args.output)
- out_dir.mkdir(parents=True, exist_ok=True)
- print(f"[info] outputs -> {out_dir}", file=sys.stderr)
- print(f"[info] urls={len(urls)}", file=sys.stderr)
- posts: List[Dict[str, Any]] = []
- failures = 0
- with httpx.Client() as client:
- for i, url in enumerate(urls, 1):
- short = url[:80]
- try:
- post = fetch_one(client, url)
- except Exception as e:
- failures += 1
- print(
- f"[err {i}/{len(urls)}] {type(e).__name__}: {e} url={short}",
- file=sys.stderr,
- )
- for attr in ("response", "body"):
- obj = getattr(e, attr, None)
- if obj is not None:
- try:
- text = obj.text if hasattr(obj, "text") else str(obj)
- print(f" server body: {text[:400]}", file=sys.stderr)
- except Exception:
- pass
- continue
- print(
- f"[info {i}/{len(urls)}] OK id={post['channel_content_id']} "
- f"title={post['title'][:30]!r} body={len(post['body_text'])} "
- f"imgs={len(post['images'])}",
- file=sys.stderr,
- )
- if args.stdout:
- posts.append(post)
- else:
- assert out_dir is not None
- path = out_dir / safe_filename(post)
- path.write_text(
- json.dumps(post, ensure_ascii=False, indent=2),
- encoding="utf-8",
- )
- print(f" -> {path.relative_to(SCRIPT_DIR)}", file=sys.stderr)
- if args.stdout:
- json.dump(posts, sys.stdout, ensure_ascii=False, indent=2)
- sys.stdout.write("\n")
- ok = len(urls) - failures
- print(f"[info] done: ok={ok} fail={failures}", file=sys.stderr)
- if failures == 0:
- return 0
- if failures < len(urls):
- return 2
- return 1
- def main():
- try:
- args = build_parser().parse_args()
- sys.exit(run(args))
- except KeyboardInterrupt:
- print("\n[info] interrupted by user (Ctrl+C)", file=sys.stderr)
- sys.exit(130)
- except SystemExit:
- raise
- except BaseException as e:
- import traceback
- print(f"\n!!! UNEXPECTED ERROR: {type(e).__name__}: {e}", file=sys.stderr)
- traceback.print_exc(file=sys.stderr)
- sys.exit(1)
- if __name__ == "__main__":
- main()
|