xiaohongshu_detail.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471
  1. #!/usr/bin/env python3
  2. """
  3. 小红书笔记详情工具
  4. 根据笔记ID获取笔记详情(包含完整正文、视频等)
  5. """
  6. import requests
  7. import json
  8. import os
  9. import argparse
  10. import time
  11. import hashlib
  12. import re
  13. from datetime import datetime
  14. from typing import Dict, Any, Optional
  15. from pathlib import Path
  16. class XiaohongshuDetail:
  17. """小红书笔记详情API封装类"""
  18. BASE_URL = "http://47.84.182.56:8001"
  19. TOOL_NAME = "get_xhs_detail_by_note_id"
  20. PLATFORM = "xiaohongshu"
  21. def __init__(self, results_dir: str = None, use_cache: bool = True):
  22. """
  23. 初始化API客户端
  24. Args:
  25. results_dir: 结果输出目录,默认为项目根目录下的 data/detail 文件夹
  26. use_cache: 是否启用缓存,默认为 True
  27. """
  28. self.api_url = f"{self.BASE_URL}/tools/call/{self.TOOL_NAME}"
  29. self.use_cache = use_cache
  30. # 设置结果输出目录
  31. if results_dir:
  32. self.results_base_dir = results_dir
  33. else:
  34. # 默认使用项目根目录的 data/detail 文件夹
  35. script_dir = os.path.dirname(os.path.abspath(__file__))
  36. project_root = os.path.dirname(os.path.dirname(script_dir))
  37. self.results_base_dir = os.path.join(project_root, "data", "detail")
  38. def _sanitize_note_id(self, note_id: str) -> str:
  39. """
  40. 清理笔记ID,使其可以作为文件夹名称
  41. Args:
  42. note_id: 原始笔记ID
  43. Returns:
  44. 清理后的笔记ID
  45. """
  46. # 替换不能用作文件夹名称的字符
  47. sanitized = re.sub(r'[<>:"/\\|?*]', '_', note_id)
  48. sanitized = sanitized.strip().strip('.')
  49. if not sanitized:
  50. sanitized = "unnamed"
  51. if len(sanitized) > 200:
  52. sanitized = sanitized[:200]
  53. return sanitized
  54. def _get_latest_cache(self, note_id: str) -> Optional[tuple[str, str]]:
  55. """
  56. 获取最新的缓存文件(raw 和 clean)
  57. Args:
  58. note_id: 笔记ID
  59. Returns:
  60. (raw_filepath, clean_filepath) 元组 或 None
  61. """
  62. safe_note_id = self._sanitize_note_id(note_id)
  63. detail_dir = os.path.join(self.results_base_dir, "xiaohongshu_detail", safe_note_id)
  64. raw_dir = os.path.join(detail_dir, "raw")
  65. clean_dir = os.path.join(detail_dir, "clean")
  66. if not os.path.exists(raw_dir) or not os.path.exists(clean_dir):
  67. return None
  68. # 获取 raw 目录下的所有 JSON 文件
  69. raw_path = Path(raw_dir)
  70. raw_files = sorted(raw_path.glob("*.json"), key=lambda x: x.stat().st_mtime, reverse=True)
  71. if not raw_files:
  72. return None
  73. # 获取最新的 raw 文件,并构造对应的 clean 文件路径
  74. latest_raw = raw_files[0]
  75. latest_clean = Path(clean_dir) / latest_raw.name
  76. if latest_clean.exists():
  77. return (str(latest_raw), str(latest_clean))
  78. return None
  79. def _load_cached_result(self, filepath: str) -> Optional[Dict[str, Any]]:
  80. """
  81. 加载缓存的数据
  82. Args:
  83. filepath: 文件路径
  84. Returns:
  85. 数据字典 或 None
  86. """
  87. try:
  88. with open(filepath, 'r', encoding='utf-8') as f:
  89. data = json.load(f)
  90. # 兼容新旧格式
  91. if "api_response" in data:
  92. return data["api_response"]
  93. else:
  94. return data
  95. except Exception:
  96. return None
  97. def get_detail(
  98. self,
  99. note_id: str,
  100. timeout: int = 30,
  101. max_retries: int = 5,
  102. retry_delay: int = 2,
  103. force: bool = False
  104. ) -> tuple[Dict[str, Any], bool]:
  105. """
  106. 获取小红书笔记详情,带自动重试机制和缓存
  107. Args:
  108. note_id: 笔记ID
  109. timeout: 请求超时时间(秒),默认30秒
  110. max_retries: 最大重试次数,默认5次
  111. retry_delay: 重试延迟(秒),默认2秒,每次重试会指数增长
  112. force: 强制重新请求API,忽略缓存,默认为 False
  113. Returns:
  114. (原始数据, 是否来自缓存) 的元组
  115. Raises:
  116. requests.exceptions.RequestException: 所有重试失败后抛出异常
  117. """
  118. # 检查缓存(如果启用且未强制刷新)
  119. if self.use_cache and not force:
  120. cached_files = self._get_latest_cache(note_id)
  121. if cached_files:
  122. raw_file, clean_file = cached_files
  123. cached_result = self._load_cached_result(raw_file)
  124. if cached_result:
  125. print(f"✓ 使用缓存数据: {raw_file}")
  126. return cached_result, True # 返回缓存标记
  127. payload = {"note_id": note_id}
  128. last_exception = None
  129. for attempt in range(max_retries):
  130. try:
  131. if attempt > 0:
  132. wait_time = retry_delay * (2 ** (attempt - 1))
  133. print(f"等待 {wait_time} 秒后进行第 {attempt + 1} 次重试...")
  134. time.sleep(wait_time)
  135. print(f"正在获取笔记详情: {note_id} (尝试 {attempt + 1}/{max_retries})")
  136. response = requests.post(
  137. self.api_url,
  138. json=payload,
  139. timeout=timeout,
  140. headers={"Content-Type": "application/json"}
  141. )
  142. response.raise_for_status()
  143. raw_result = response.json()
  144. # 如果 result 字段是字符串,需要解析成 JSON 对象
  145. if 'result' in raw_result and isinstance(raw_result['result'], str):
  146. try:
  147. raw_result['result'] = json.loads(raw_result['result'])
  148. except json.JSONDecodeError:
  149. pass
  150. # 检查 API 返回是否成功
  151. if not raw_result.get('success'):
  152. error_msg = raw_result.get('message', '未知错误')
  153. print(f"✗ API 返回失败: {error_msg}")
  154. last_exception = Exception(f"API 返回失败: {error_msg}")
  155. continue # 继续重试
  156. print(f"✓ 获取成功!")
  157. return raw_result, False # 返回新数据标记
  158. except requests.exceptions.Timeout as e:
  159. last_exception = e
  160. print(f"✗ 请求超时: {e}")
  161. except requests.exceptions.ConnectionError as e:
  162. last_exception = e
  163. print(f"✗ 连接错误: {e}")
  164. except requests.exceptions.HTTPError as e:
  165. last_exception = e
  166. status_code = e.response.status_code if e.response else "未知"
  167. print(f"✗ HTTP错误 {status_code}: {e}")
  168. # 如果是客户端错误(4xx),不重试
  169. if e.response and 400 <= e.response.status_code < 500:
  170. print(f"客户端错误,停止重试")
  171. raise
  172. except requests.exceptions.RequestException as e:
  173. last_exception = e
  174. print(f"✗ 请求失败: {e}")
  175. # 所有重试都失败
  176. print(f"✗ 已达到最大重试次数 ({max_retries}),请求失败")
  177. raise last_exception
  178. def _extract_clean_data(self, raw_result: Dict[str, Any]) -> Dict[str, Any]:
  179. """
  180. 提取并清理数据,生成扁平化的结构(参考现有格式)
  181. Args:
  182. raw_result: 原始 API 响应
  183. Returns:
  184. 清理后的笔记详情
  185. """
  186. if not raw_result.get("success"):
  187. return {}
  188. result = raw_result.get("result", [])
  189. if not result or not isinstance(result, list) or len(result) == 0:
  190. return {}
  191. data = result[0].get("data", {})
  192. # 提取图片 URL 并按顺序去重
  193. images = []
  194. seen = set()
  195. for img in data.get("images", []):
  196. url = None
  197. if isinstance(img, dict) and "cdn_url" in img:
  198. url = img["cdn_url"]
  199. elif isinstance(img, str):
  200. url = img
  201. # 按顺序去重
  202. if url and url not in seen:
  203. images.append(url)
  204. seen.add(url)
  205. # 处理时间戳转换为时间字符串
  206. publish_timestamp = data.get("publish_timestamp")
  207. publish_time = None
  208. if publish_timestamp:
  209. try:
  210. from datetime import datetime
  211. # 毫秒时间戳转换为秒
  212. dt = datetime.fromtimestamp(publish_timestamp / 1000)
  213. publish_time = dt.strftime("%Y-%m-%d %H:%M:%S")
  214. except:
  215. publish_time = None
  216. # 获取 video 字段
  217. video = data.get("video") or None
  218. # 根据 video 字段判断 content_type
  219. if video:
  220. content_type = "video"
  221. else:
  222. content_type = "normal"
  223. # 构建清理后的数据(扁平化结构,参考现有格式)
  224. # 不存在的字段统一用 None/null 表示
  225. clean_data = {
  226. "channel_content_id": data.get("channel_content_id") or None,
  227. "link": data.get("content_link") or None,
  228. "comment_count": data.get("comment_count"),
  229. "images": images if images else [],
  230. "like_count": data.get("like_count"),
  231. "body_text": data.get("body_text") or None,
  232. "title": data.get("title") or None,
  233. "collect_count": data.get("collect_count"),
  234. "channel_account_id": data.get("channel_account_id") or None,
  235. "channel_account_name": data.get("channel_account_name") or None,
  236. "content_type": content_type,
  237. "video": video,
  238. "publish_timestamp": publish_timestamp,
  239. "publish_time": publish_time
  240. }
  241. return clean_data
  242. def save_result(
  243. self,
  244. note_id: str,
  245. raw_result: Dict[str, Any]
  246. ) -> tuple[str, str]:
  247. """
  248. 保存原始数据和清理后数据到不同的目录
  249. 目录结构:
  250. data/detail/xiaohongshu_detail/
  251. └── {note_id}/
  252. ├── raw/ # 原始数据(完整 API 响应)
  253. │ └── {timestamp}.json
  254. └── clean/ # 清理后数据(扁平化结构)
  255. └── {timestamp}.json
  256. Args:
  257. note_id: 笔记ID
  258. raw_result: 原始数据(已解析 result 字段)
  259. Returns:
  260. (原始数据路径, 清理后数据路径) 的元组
  261. """
  262. # 清理笔记ID用于文件夹名称
  263. safe_note_id = self._sanitize_note_id(note_id)
  264. # 创建目录结构
  265. base_dir = os.path.join(self.results_base_dir, "xiaohongshu_detail", safe_note_id)
  266. raw_dir = os.path.join(base_dir, "raw")
  267. clean_dir = os.path.join(base_dir, "clean")
  268. os.makedirs(raw_dir, exist_ok=True)
  269. os.makedirs(clean_dir, exist_ok=True)
  270. # 生成文件名(使用时间戳)
  271. timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
  272. filename = f"{timestamp}.json"
  273. raw_filepath = os.path.join(raw_dir, filename)
  274. clean_filepath = os.path.join(clean_dir, filename)
  275. # 添加元数据到 raw 数据
  276. raw_data_with_meta = {
  277. "note_id": note_id,
  278. "timestamp": timestamp,
  279. "api_response": raw_result
  280. }
  281. # 保存原始结果(包含元数据)
  282. with open(raw_filepath, 'w', encoding='utf-8') as f:
  283. json.dump(raw_data_with_meta, f, ensure_ascii=False, indent=2)
  284. # 提取并保存清理后的数据(扁平化结构,直接保存)
  285. clean_data = self._extract_clean_data(raw_result)
  286. with open(clean_filepath, 'w', encoding='utf-8') as f:
  287. json.dump(clean_data, f, ensure_ascii=False, indent=2)
  288. return raw_filepath, clean_filepath
  289. def get_xiaohongshu_detail(
  290. note_id: str,
  291. force: bool = False
  292. ) -> Dict[str, Any]:
  293. """
  294. 获取小红书笔记详情
  295. Args:
  296. note_id: 笔记ID
  297. force: 强制刷新(忽略缓存)
  298. Returns:
  299. 笔记详情数据(clean 格式,扁平化结构)
  300. Examples:
  301. >>> # 基本使用
  302. >>> detail = get_xiaohongshu_detail("6915588b00000000040143b5")
  303. >>> print(detail['title'])
  304. >>> print(detail['body_text'])
  305. >>> # 强制刷新
  306. >>> detail = get_xiaohongshu_detail("6915588b00000000040143b5", force=True)
  307. """
  308. # 创建客户端(使用默认配置)
  309. client = XiaohongshuDetail(use_cache=True)
  310. # 获取详情(内部处理重试、超时等)
  311. raw_result, from_cache = client.get_detail(note_id=note_id, force=force)
  312. # 只有新请求的数据才需要保存
  313. if not from_cache:
  314. raw_filepath, clean_filepath = client.save_result(note_id=note_id, raw_result=raw_result)
  315. # 读取并返回 clean 数据
  316. with open(clean_filepath, 'r', encoding='utf-8') as f:
  317. return json.load(f)
  318. else:
  319. # 如果是缓存数据,直接提取 clean 数据返回
  320. clean_data = client._extract_clean_data(raw_result)
  321. return clean_data
  322. def main():
  323. """示例使用"""
  324. # 解析命令行参数
  325. parser = argparse.ArgumentParser(description='小红书笔记详情工具')
  326. parser.add_argument(
  327. '--results-dir',
  328. type=str,
  329. default='data/detail',
  330. help='结果输出目录 (默认: data/detail)'
  331. )
  332. parser.add_argument(
  333. '--note-id',
  334. type=str,
  335. required=True,
  336. help='笔记ID (必填)'
  337. )
  338. parser.add_argument(
  339. '--force',
  340. action='store_true',
  341. help='强制重新请求API,忽略缓存'
  342. )
  343. parser.add_argument(
  344. '--no-cache',
  345. action='store_true',
  346. help='禁用缓存功能'
  347. )
  348. parser.add_argument(
  349. '--timeout',
  350. type=int,
  351. default=30,
  352. help='请求超时秒数 (默认: 30)'
  353. )
  354. parser.add_argument(
  355. '--max-retries',
  356. type=int,
  357. default=5,
  358. help='最大重试次数 (默认: 5)'
  359. )
  360. parser.add_argument(
  361. '--retry-delay',
  362. type=int,
  363. default=2,
  364. help='重试延迟秒数 (默认: 2)'
  365. )
  366. args = parser.parse_args()
  367. # 创建API客户端实例
  368. use_cache = not args.no_cache
  369. client = XiaohongshuDetail(results_dir=args.results_dir, use_cache=use_cache)
  370. # 执行获取并保存
  371. try:
  372. raw_result, from_cache = client.get_detail(
  373. args.note_id,
  374. timeout=args.timeout,
  375. max_retries=args.max_retries,
  376. retry_delay=args.retry_delay,
  377. force=args.force
  378. )
  379. # 只有新数据才保存
  380. if not from_cache:
  381. raw_filepath, clean_filepath = client.save_result(args.note_id, raw_result)
  382. print(f"Raw data saved to: {raw_filepath}")
  383. print(f"Clean data saved to: {clean_filepath}")
  384. else:
  385. print(f"Used cached data, no new files saved")
  386. except Exception as e:
  387. print(f"Error: {e}", file=__import__('sys').stderr)
  388. if __name__ == "__main__":
  389. main()