xhs_fetch.py 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281
  1. """
  2. 按 URL 抓取小红书帖子内容。
  3. 走 explore 页 HTML 内嵌的 window.__INITIAL_STATE__ JSON,不需要 cookie;
  4. URL 必须带 xsec_token(分享/搜索/explore_feed 链接默认都带)。
  5. 输出字段集与 examples/process_pipeline/output/<id>/raw_cases/source.json 的
  6. post 子对象对齐:channel_content_id / title / content_type / body_text /
  7. like_count / publish_timestamp / images / videos / channel / link
  8. 用法:
  9. python xhs_fetch/xhs_fetch.py <url> [<url> ...] [--output <subdir>]
  10. python xhs_fetch/xhs_fetch.py --urls-file urls.txt
  11. python xhs_fetch/xhs_fetch.py <url> --stdout # 不写文件,打 JSON 数组到 stdout
  12. 退码:0 全成功 / 1 全失败或参数错 / 2 部分失败 / 130 Ctrl+C
  13. 脚本通过探测 .git/pyproject.toml 自动定位项目根,可以放在仓库内任意位置。
  14. """
  15. import argparse
  16. import json
  17. import re
  18. import sys
  19. from pathlib import Path
  20. from typing import Any, Dict, List, Optional
  21. import httpx
  22. # Windows 控制台 UTF-8(中文输出必备)
  23. for _s in (sys.stdout, sys.stderr):
  24. try:
  25. _s.reconfigure(encoding="utf-8")
  26. except (AttributeError, OSError):
  27. pass
  28. def _find_project_root(start: Path) -> Path:
  29. """沿父目录上爬找 .git / pyproject.toml。"""
  30. p = start.resolve()
  31. for ancestor in [p, *p.parents]:
  32. if (ancestor / ".git").exists() or (ancestor / "pyproject.toml").exists():
  33. return ancestor
  34. return start.resolve().parent
  35. PROJECT_ROOT = _find_project_root(Path(__file__))
  36. SCRIPT_DIR = Path(__file__).resolve().parent
  37. OUTPUTS_DIR = SCRIPT_DIR / "outputs"
  38. sys.path.insert(0, str(PROJECT_ROOT))
  39. try:
  40. from dotenv import load_dotenv
  41. load_dotenv(PROJECT_ROOT / ".env")
  42. except ImportError:
  43. pass
  44. # ── 抓取 / 解析 ─────────────────────────────────────
  45. HEADERS = {
  46. "User-Agent": (
  47. "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
  48. "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0 Safari/537.36"
  49. ),
  50. "Accept": "text/html,application/xhtml+xml",
  51. }
  52. # 注意:用 .*? + 后面 </script> 固定锚点。lazy 的 \{.+?\} 在嵌套 JSON 上会断在第一个 }。
  53. INITIAL_STATE_RE = re.compile(
  54. r"window\.__INITIAL_STATE__\s*=\s*(\{.*?\})\s*</script>", re.S
  55. )
  56. def _parse_initial_state(html: str) -> Dict[str, Any]:
  57. m = INITIAL_STATE_RE.search(html)
  58. if not m:
  59. raise RuntimeError("INITIAL_STATE not found(页面可能未渲染或被风控)")
  60. # SSR 直接把 JS undefined 当裸字面量塞了进来,json.loads 不接,先替换
  61. raw = re.sub(r":\s*undefined", ": null", m.group(1))
  62. return json.loads(raw)
  63. def _coerce_int(v: Any) -> int:
  64. s = str(v or "").strip()
  65. if s.isdigit():
  66. return int(s)
  67. try:
  68. return int(float(s))
  69. except (ValueError, TypeError):
  70. return 0
  71. def parse_post(html: str) -> Dict[str, Any]:
  72. """从 explore 页 HTML 解析出一个 post 字典(source.json 兼容格式)。"""
  73. data = _parse_initial_state(html)
  74. nd_map = ((data.get("note") or {}).get("noteDetailMap")) or {}
  75. if not nd_map:
  76. raise RuntimeError("noteDetailMap empty")
  77. nid, val = next(iter(nd_map.items()))
  78. note = (val or {}).get("note") or {}
  79. images = [
  80. (img.get("urlDefault") or img.get("url") or "")
  81. for img in (note.get("imageList") or [])
  82. if (img.get("urlDefault") or img.get("url"))
  83. ]
  84. interact = note.get("interactInfo") or {}
  85. return {
  86. "channel_content_id": nid,
  87. "title": note.get("title") or "",
  88. "content_type": note.get("type") or "note",
  89. "body_text": note.get("desc") or "",
  90. "like_count": _coerce_int(interact.get("likedCount")),
  91. "publish_timestamp": note.get("time") or "",
  92. "images": images,
  93. "videos": [],
  94. "channel": "xhs",
  95. "link": f"https://www.xiaohongshu.com/explore/{nid}",
  96. }
  97. def fetch_one(client: httpx.Client, url: str) -> Dict[str, Any]:
  98. r = client.get(url, headers=HEADERS, follow_redirects=True, timeout=30.0)
  99. r.raise_for_status()
  100. return parse_post(r.text)
  101. # ── 输出沙盒 ────────────────────────────────────────
  102. def resolve_output_subdir(rel_path: Optional[str]) -> Path:
  103. """把 --output 解析到 OUTPUTS_DIR 之下,禁止绝对路径与 '..' 越界。"""
  104. if not rel_path:
  105. return OUTPUTS_DIR
  106. p = Path(rel_path)
  107. if p.is_absolute():
  108. raise SystemExit(f"ERROR: --output 必须是相对路径: {rel_path!r}")
  109. target = (OUTPUTS_DIR / p).resolve()
  110. try:
  111. target.relative_to(OUTPUTS_DIR.resolve())
  112. except ValueError:
  113. raise SystemExit(f"ERROR: --output 越界到 {target}(不允许 '..')")
  114. return target
  115. def safe_filename(post: Dict[str, Any]) -> str:
  116. title = post.get("title") or post.get("channel_content_id") or "untitled"
  117. safe = re.sub(r"[^\w一-龥]+", "_", title)[:40].strip("_")
  118. return f"xhs_{post['channel_content_id'][:12]}_{safe}.json"
  119. # ── 输入收集 ────────────────────────────────────────
  120. def load_urls(args) -> List[str]:
  121. urls: List[str] = []
  122. if args.urls_file:
  123. text = Path(args.urls_file).read_text(encoding="utf-8", errors="replace")
  124. urls.extend(
  125. line.strip()
  126. for line in text.splitlines()
  127. if line.strip() and not line.strip().startswith("#")
  128. )
  129. urls.extend(args.urls or [])
  130. if not urls:
  131. raise SystemExit(
  132. "ERROR: 请通过位置参数或 --urls-file 提供至少一个 URL"
  133. )
  134. return urls
  135. # ── CLI ─────────────────────────────────────────────
  136. def build_parser() -> argparse.ArgumentParser:
  137. p = argparse.ArgumentParser(
  138. description=__doc__,
  139. formatter_class=argparse.RawDescriptionHelpFormatter,
  140. )
  141. p.add_argument(
  142. "urls", nargs="*",
  143. help="小红书 explore URL(一个或多个,需带 xsec_token)",
  144. )
  145. p.add_argument(
  146. "--urls-file",
  147. help="URL 列表文件路径(每行一个 URL,# 开头为注释)",
  148. )
  149. p.add_argument(
  150. "--output",
  151. help="相对 outputs/ 的子目录路径,用于本次输出(默认直接写到 outputs/)",
  152. )
  153. p.add_argument(
  154. "--stdout", action="store_true",
  155. help="不写文件,把抓到的 post 数组打到 stdout(JSON)",
  156. )
  157. return p
  158. def run(args) -> int:
  159. urls = load_urls(args)
  160. out_dir: Optional[Path] = None
  161. if not args.stdout:
  162. out_dir = resolve_output_subdir(args.output)
  163. out_dir.mkdir(parents=True, exist_ok=True)
  164. print(f"[info] outputs -> {out_dir}", file=sys.stderr)
  165. print(f"[info] urls={len(urls)}", file=sys.stderr)
  166. posts: List[Dict[str, Any]] = []
  167. failures = 0
  168. with httpx.Client() as client:
  169. for i, url in enumerate(urls, 1):
  170. short = url[:80]
  171. try:
  172. post = fetch_one(client, url)
  173. except Exception as e:
  174. failures += 1
  175. print(
  176. f"[err {i}/{len(urls)}] {type(e).__name__}: {e} url={short}",
  177. file=sys.stderr,
  178. )
  179. for attr in ("response", "body"):
  180. obj = getattr(e, attr, None)
  181. if obj is not None:
  182. try:
  183. text = obj.text if hasattr(obj, "text") else str(obj)
  184. print(f" server body: {text[:400]}", file=sys.stderr)
  185. except Exception:
  186. pass
  187. continue
  188. print(
  189. f"[info {i}/{len(urls)}] OK id={post['channel_content_id']} "
  190. f"title={post['title'][:30]!r} body={len(post['body_text'])} "
  191. f"imgs={len(post['images'])}",
  192. file=sys.stderr,
  193. )
  194. if args.stdout:
  195. posts.append(post)
  196. else:
  197. assert out_dir is not None
  198. path = out_dir / safe_filename(post)
  199. path.write_text(
  200. json.dumps(post, ensure_ascii=False, indent=2),
  201. encoding="utf-8",
  202. )
  203. print(f" -> {path.relative_to(SCRIPT_DIR)}", file=sys.stderr)
  204. if args.stdout:
  205. json.dump(posts, sys.stdout, ensure_ascii=False, indent=2)
  206. sys.stdout.write("\n")
  207. ok = len(urls) - failures
  208. print(f"[info] done: ok={ok} fail={failures}", file=sys.stderr)
  209. if failures == 0:
  210. return 0
  211. if failures < len(urls):
  212. return 2
  213. return 1
  214. def main():
  215. try:
  216. args = build_parser().parse_args()
  217. sys.exit(run(args))
  218. except KeyboardInterrupt:
  219. print("\n[info] interrupted by user (Ctrl+C)", file=sys.stderr)
  220. sys.exit(130)
  221. except SystemExit:
  222. raise
  223. except BaseException as e:
  224. import traceback
  225. print(f"\n!!! UNEXPECTED ERROR: {type(e).__name__}: {e}", file=sys.stderr)
  226. traceback.print_exc(file=sys.stderr)
  227. sys.exit(1)
  228. if __name__ == "__main__":
  229. main()