xhs_fetch.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363
  1. """
  2. 按 URL 抓取小红书帖子内容。
  3. 走 explore 页 HTML 内嵌的 window.__INITIAL_STATE__ JSON,不需要 cookie;
  4. URL 必须带 xsec_token(分享/搜索/explore_feed 链接默认都带)。
  5. 输出字段集与 examples/process_pipeline/output/<id>/raw_cases/source.json 的
  6. post 子对象对齐:channel_content_id / title / content_type / body_text /
  7. like_count / publish_timestamp / images / videos / channel / link
  8. 用法:
  9. python xhs_fetch/xhs_fetch.py <url> [<url> ...] [--output <subdir>]
  10. python xhs_fetch/xhs_fetch.py --urls-file urls.txt
  11. python xhs_fetch/xhs_fetch.py <url> --stdout # 不写文件,打 JSON 数组到 stdout
  12. python xhs_fetch/xhs_fetch.py <video_url> --transcribe # 视频帖跑 Deepgram 转写并入 body_text
  13. 退码:0 全成功 / 1 全失败或参数错 / 2 部分失败 / 130 Ctrl+C
  14. 脚本通过探测 .git/pyproject.toml 自动定位项目根,可以放在仓库内任意位置。
  15. """
  16. import argparse
  17. import json
  18. import re
  19. import sys
  20. from pathlib import Path
  21. from typing import Any, Dict, List, Optional
  22. import httpx
  23. # Windows 控制台 UTF-8(中文输出必备)
  24. for _s in (sys.stdout, sys.stderr):
  25. try:
  26. _s.reconfigure(encoding="utf-8")
  27. except (AttributeError, OSError):
  28. pass
  29. def _find_project_root(start: Path) -> Path:
  30. """沿父目录上爬找 .git / pyproject.toml。"""
  31. p = start.resolve()
  32. for ancestor in [p, *p.parents]:
  33. if (ancestor / ".git").exists() or (ancestor / "pyproject.toml").exists():
  34. return ancestor
  35. return start.resolve().parent
  36. PROJECT_ROOT = _find_project_root(Path(__file__))
  37. SCRIPT_DIR = Path(__file__).resolve().parent
  38. OUTPUTS_DIR = SCRIPT_DIR / "outputs"
  39. sys.path.insert(0, str(PROJECT_ROOT))
  40. try:
  41. from dotenv import load_dotenv
  42. load_dotenv(PROJECT_ROOT / ".env")
  43. except ImportError:
  44. pass
  45. # ── 抓取 / 解析 ─────────────────────────────────────
  46. HEADERS = {
  47. "User-Agent": (
  48. "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
  49. "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0 Safari/537.36"
  50. ),
  51. "Accept": "text/html,application/xhtml+xml",
  52. }
  53. # 注意:用 .*? + 后面 </script> 固定锚点。lazy 的 \{.+?\} 在嵌套 JSON 上会断在第一个 }。
  54. INITIAL_STATE_RE = re.compile(
  55. r"window\.__INITIAL_STATE__\s*=\s*(\{.*?\})\s*</script>", re.S
  56. )
  57. def _parse_initial_state(html: str) -> Dict[str, Any]:
  58. m = INITIAL_STATE_RE.search(html)
  59. if not m:
  60. raise RuntimeError("INITIAL_STATE not found(页面可能未渲染或被风控)")
  61. # SSR 直接把 JS undefined 当裸字面量塞了进来,json.loads 不接,先替换
  62. raw = re.sub(r":\s*undefined", ": null", m.group(1))
  63. return json.loads(raw)
  64. def _coerce_int(v: Any) -> int:
  65. s = str(v or "").strip()
  66. if s.isdigit():
  67. return int(s)
  68. try:
  69. return int(float(s))
  70. except (ValueError, TypeError):
  71. return 0
  72. def extract_video_urls(note: Dict[str, Any]) -> List[str]:
  73. """从 note.video.media.stream 收集可播放的 mp4 地址。
  74. stream 按编码分桶(h264/h265/av1),每桶是个数组,元素里 masterUrl 是首选
  75. 地址、backupUrls 是备份。优先 h264(兼容性最好),同一物理视频不同编码会重复,
  76. 所以按 masterUrl 去重。图片笔记没有 video 子树,返回空列表。
  77. """
  78. stream = (((note.get("video") or {}).get("media") or {}).get("stream")) or {}
  79. urls: List[str] = []
  80. seen = set()
  81. for codec in ("h264", "h265", "av1"):
  82. for item in (stream.get(codec) or []):
  83. candidates = [item.get("masterUrl") or ""] + list(item.get("backupUrls") or [])
  84. for u in candidates:
  85. if u and u not in seen:
  86. seen.add(u)
  87. urls.append(u)
  88. break # 每个清晰度只取一个可用地址
  89. return urls
  90. _TRANSCRIPT_MARKER = "[视频字幕]"
  91. def merge_transcript_into_body(post: Dict[str, Any], transcript: str) -> None:
  92. """把视频转写拼进 post['body_text'](in-place),带 [视频字幕] 标记。
  93. 跟 script/extract_sources.py 的 _merge_transcript_into_body 同语义:
  94. 幂等——body_text 已含标记就跳过;视频帖 body 通常只有话题词或为空,
  95. 这一步把真正的视频内容暴露到前端读取的字段下。
  96. """
  97. transcript = (transcript or "").strip()
  98. if not transcript:
  99. return
  100. body = (post.get("body_text") or "").strip()
  101. if body and _TRANSCRIPT_MARKER in body:
  102. return
  103. post["body_text"] = (
  104. f"{body}\n\n{_TRANSCRIPT_MARKER}\n{transcript}" if body
  105. else f"{_TRANSCRIPT_MARKER}\n{transcript}"
  106. )
  107. def transcribe_post(post: Dict[str, Any]) -> Optional[str]:
  108. """对一条 xhs 视频 post 跑 Deepgram 转写,复用项目的 transcription 模块。
  109. 无视频源直接返回 None。需要 DEEPGRAM_KEY(.env 已加载)+ yt-dlp + ffmpeg。
  110. 任意环节失败返回 None(静默兜底),不影响抓取主流程。
  111. """
  112. import asyncio
  113. from agent.tools.builtin.content.transcription import (
  114. extract_video_url,
  115. transcribe_video_from_post,
  116. )
  117. if not extract_video_url("xhs", post):
  118. return None
  119. return asyncio.run(transcribe_video_from_post("xhs", post))
  120. def parse_post(html: str) -> Dict[str, Any]:
  121. """从 explore 页 HTML 解析出一个 post 字典(source.json 兼容格式)。"""
  122. data = _parse_initial_state(html)
  123. nd_map = ((data.get("note") or {}).get("noteDetailMap")) or {}
  124. if not nd_map:
  125. raise RuntimeError("noteDetailMap empty")
  126. nid, val = next(iter(nd_map.items()))
  127. note = (val or {}).get("note") or {}
  128. images = [
  129. (img.get("urlDefault") or img.get("url") or "")
  130. for img in (note.get("imageList") or [])
  131. if (img.get("urlDefault") or img.get("url"))
  132. ]
  133. interact = note.get("interactInfo") or {}
  134. return {
  135. "channel_content_id": nid,
  136. "title": note.get("title") or "",
  137. "content_type": note.get("type") or "note",
  138. "body_text": note.get("desc") or "",
  139. "like_count": _coerce_int(interact.get("likedCount")),
  140. "publish_timestamp": note.get("time") or "",
  141. "images": images,
  142. "videos": extract_video_urls(note),
  143. "channel": "xhs",
  144. "link": f"https://www.xiaohongshu.com/explore/{nid}",
  145. }
  146. def fetch_one(client: httpx.Client, url: str) -> Dict[str, Any]:
  147. r = client.get(url, headers=HEADERS, follow_redirects=True, timeout=30.0)
  148. r.raise_for_status()
  149. return parse_post(r.text)
  150. # ── 输出沙盒 ────────────────────────────────────────
  151. def resolve_output_subdir(rel_path: Optional[str]) -> Path:
  152. """把 --output 解析到 OUTPUTS_DIR 之下,禁止绝对路径与 '..' 越界。"""
  153. if not rel_path:
  154. return OUTPUTS_DIR
  155. p = Path(rel_path)
  156. if p.is_absolute():
  157. raise SystemExit(f"ERROR: --output 必须是相对路径: {rel_path!r}")
  158. target = (OUTPUTS_DIR / p).resolve()
  159. try:
  160. target.relative_to(OUTPUTS_DIR.resolve())
  161. except ValueError:
  162. raise SystemExit(f"ERROR: --output 越界到 {target}(不允许 '..')")
  163. return target
  164. def safe_filename(post: Dict[str, Any]) -> str:
  165. title = post.get("title") or post.get("channel_content_id") or "untitled"
  166. safe = re.sub(r"[^\w一-龥]+", "_", title)[:40].strip("_")
  167. return f"xhs_{post['channel_content_id'][:12]}_{safe}.json"
  168. # ── 输入收集 ────────────────────────────────────────
  169. def load_urls(args) -> List[str]:
  170. urls: List[str] = []
  171. if args.urls_file:
  172. text = Path(args.urls_file).read_text(encoding="utf-8", errors="replace")
  173. urls.extend(
  174. line.strip()
  175. for line in text.splitlines()
  176. if line.strip() and not line.strip().startswith("#")
  177. )
  178. urls.extend(args.urls or [])
  179. if not urls:
  180. raise SystemExit(
  181. "ERROR: 请通过位置参数或 --urls-file 提供至少一个 URL"
  182. )
  183. return urls
  184. # ── CLI ─────────────────────────────────────────────
  185. def build_parser() -> argparse.ArgumentParser:
  186. p = argparse.ArgumentParser(
  187. description=__doc__,
  188. formatter_class=argparse.RawDescriptionHelpFormatter,
  189. )
  190. p.add_argument(
  191. "urls", nargs="*",
  192. help="小红书 explore URL(一个或多个,需带 xsec_token)",
  193. )
  194. p.add_argument(
  195. "--urls-file",
  196. help="URL 列表文件路径(每行一个 URL,# 开头为注释)",
  197. )
  198. p.add_argument(
  199. "--output",
  200. help="相对 outputs/ 的子目录路径,用于本次输出(默认直接写到 outputs/)",
  201. )
  202. p.add_argument(
  203. "--stdout", action="store_true",
  204. help="不写文件,把抓到的 post 数组打到 stdout(JSON)",
  205. )
  206. p.add_argument(
  207. "--transcribe", action="store_true",
  208. help="对视频帖跑 Deepgram 转写,把字幕并入 body_text(需 DEEPGRAM_KEY+yt-dlp+ffmpeg,按量计费)",
  209. )
  210. return p
  211. def run(args) -> int:
  212. urls = load_urls(args)
  213. out_dir: Optional[Path] = None
  214. if not args.stdout:
  215. out_dir = resolve_output_subdir(args.output)
  216. out_dir.mkdir(parents=True, exist_ok=True)
  217. print(f"[info] outputs -> {out_dir}", file=sys.stderr)
  218. print(f"[info] urls={len(urls)}", file=sys.stderr)
  219. posts: List[Dict[str, Any]] = []
  220. failures = 0
  221. with httpx.Client() as client:
  222. for i, url in enumerate(urls, 1):
  223. short = url[:80]
  224. try:
  225. post = fetch_one(client, url)
  226. except Exception as e:
  227. failures += 1
  228. print(
  229. f"[err {i}/{len(urls)}] {type(e).__name__}: {e} url={short}",
  230. file=sys.stderr,
  231. )
  232. for attr in ("response", "body"):
  233. obj = getattr(e, attr, None)
  234. if obj is not None:
  235. try:
  236. text = obj.text if hasattr(obj, "text") else str(obj)
  237. print(f" server body: {text[:400]}", file=sys.stderr)
  238. except Exception:
  239. pass
  240. continue
  241. if args.transcribe and post["videos"]:
  242. try:
  243. transcript = transcribe_post(post)
  244. except Exception as e:
  245. transcript = None
  246. print(
  247. f" transcribe error: {type(e).__name__}: {e}",
  248. file=sys.stderr,
  249. )
  250. if transcript:
  251. merge_transcript_into_body(post, transcript)
  252. print(
  253. f" transcript merged ({len(transcript)} chars)",
  254. file=sys.stderr,
  255. )
  256. else:
  257. print(" transcript: <none>", file=sys.stderr)
  258. print(
  259. f"[info {i}/{len(urls)}] OK id={post['channel_content_id']} "
  260. f"title={post['title'][:30]!r} body={len(post['body_text'])} "
  261. f"imgs={len(post['images'])} vids={len(post['videos'])}",
  262. file=sys.stderr,
  263. )
  264. if args.stdout:
  265. posts.append(post)
  266. else:
  267. assert out_dir is not None
  268. path = out_dir / safe_filename(post)
  269. path.write_text(
  270. json.dumps(post, ensure_ascii=False, indent=2),
  271. encoding="utf-8",
  272. )
  273. print(f" -> {path.relative_to(SCRIPT_DIR)}", file=sys.stderr)
  274. if args.stdout:
  275. json.dump(posts, sys.stdout, ensure_ascii=False, indent=2)
  276. sys.stdout.write("\n")
  277. ok = len(urls) - failures
  278. print(f"[info] done: ok={ok} fail={failures}", file=sys.stderr)
  279. if failures == 0:
  280. return 0
  281. if failures < len(urls):
  282. return 2
  283. return 1
  284. def main():
  285. try:
  286. args = build_parser().parse_args()
  287. sys.exit(run(args))
  288. except KeyboardInterrupt:
  289. print("\n[info] interrupted by user (Ctrl+C)", file=sys.stderr)
  290. sys.exit(130)
  291. except SystemExit:
  292. raise
  293. except BaseException as e:
  294. import traceback
  295. print(f"\n!!! UNEXPECTED ERROR: {type(e).__name__}: {e}", file=sys.stderr)
  296. traceback.print_exc(file=sys.stderr)
  297. sys.exit(1)
  298. if __name__ == "__main__":
  299. main()