xiaohongshu_detail.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477
  1. #!/usr/bin/env python3
  2. """
  3. 小红书笔记详情工具
  4. 根据笔记ID获取笔记详情(包含完整正文、视频等)
  5. """
  6. import requests
  7. import json
  8. import os
  9. import argparse
  10. import time
  11. import hashlib
  12. import re
  13. from datetime import datetime
  14. from typing import Dict, Any, Optional
  15. import sys
  16. from pathlib import Path
  17. # 添加项目根目录到路径并导入配置
  18. sys.path.insert(0, str(Path(__file__).parent.parent.parent))
  19. from lib.config import get_data_dir
  20. from pathlib import Path
  21. class XiaohongshuDetail:
  22. """小红书笔记详情API封装类"""
  23. BASE_URL = "http://47.84.182.56:8001"
  24. TOOL_NAME = "get_xhs_detail_by_note_id"
  25. PLATFORM = "xiaohongshu"
  26. def __init__(self, results_dir: str = None, use_cache: bool = True):
  27. """
  28. 初始化API客户端
  29. Args:
  30. results_dir: 结果输出目录,默认为项目根目录下的 data/detail 文件夹
  31. use_cache: 是否启用缓存,默认为 True
  32. """
  33. self.api_url = f"{self.BASE_URL}/tools/call/{self.TOOL_NAME}"
  34. self.use_cache = use_cache
  35. # 设置结果输出目录
  36. if results_dir:
  37. self.results_base_dir = results_dir
  38. else:
  39. # 默认从配置读取
  40. self.results_base_dir = get_data_dir("detail")
  41. def _sanitize_note_id(self, note_id: str) -> str:
  42. """
  43. 清理笔记ID,使其可以作为文件夹名称
  44. Args:
  45. note_id: 原始笔记ID
  46. Returns:
  47. 清理后的笔记ID
  48. """
  49. # 替换不能用作文件夹名称的字符
  50. sanitized = re.sub(r'[<>:"/\\|?*]', '_', note_id)
  51. sanitized = sanitized.strip().strip('.')
  52. if not sanitized:
  53. sanitized = "unnamed"
  54. if len(sanitized) > 200:
  55. sanitized = sanitized[:200]
  56. return sanitized
  57. def _get_latest_cache(self, note_id: str) -> Optional[tuple[str, str]]:
  58. """
  59. 获取最新的缓存文件(raw 和 clean)
  60. Args:
  61. note_id: 笔记ID
  62. Returns:
  63. (raw_filepath, clean_filepath) 元组 或 None
  64. """
  65. safe_note_id = self._sanitize_note_id(note_id)
  66. detail_dir = os.path.join(self.results_base_dir, "xiaohongshu_detail", safe_note_id)
  67. raw_dir = os.path.join(detail_dir, "raw")
  68. clean_dir = os.path.join(detail_dir, "clean")
  69. if not os.path.exists(raw_dir) or not os.path.exists(clean_dir):
  70. return None
  71. # 获取 raw 目录下的所有 JSON 文件
  72. raw_path = Path(raw_dir)
  73. raw_files = sorted(raw_path.glob("*.json"), key=lambda x: x.stat().st_mtime, reverse=True)
  74. if not raw_files:
  75. return None
  76. # 获取最新的 raw 文件,并构造对应的 clean 文件路径
  77. latest_raw = raw_files[0]
  78. latest_clean = Path(clean_dir) / latest_raw.name
  79. if latest_clean.exists():
  80. return (str(latest_raw), str(latest_clean))
  81. return None
  82. def _load_cached_result(self, filepath: str) -> Optional[Dict[str, Any]]:
  83. """
  84. 加载缓存的数据
  85. Args:
  86. filepath: 文件路径
  87. Returns:
  88. 数据字典 或 None
  89. """
  90. try:
  91. with open(filepath, 'r', encoding='utf-8') as f:
  92. data = json.load(f)
  93. # 兼容新旧格式
  94. if "api_response" in data:
  95. return data["api_response"]
  96. else:
  97. return data
  98. except Exception:
  99. return None
  100. def get_detail(
  101. self,
  102. note_id: str,
  103. timeout: int = 30,
  104. max_retries: int = 5,
  105. retry_delay: int = 2,
  106. force: bool = False
  107. ) -> tuple[Dict[str, Any], bool]:
  108. """
  109. 获取小红书笔记详情,带自动重试机制和缓存
  110. Args:
  111. note_id: 笔记ID
  112. timeout: 请求超时时间(秒),默认30秒
  113. max_retries: 最大重试次数,默认5次
  114. retry_delay: 重试延迟(秒),默认2秒,每次重试会指数增长
  115. force: 强制重新请求API,忽略缓存,默认为 False
  116. Returns:
  117. (原始数据, 是否来自缓存) 的元组
  118. Raises:
  119. requests.exceptions.RequestException: 所有重试失败后抛出异常
  120. """
  121. # 检查缓存(如果启用且未强制刷新)
  122. if self.use_cache and not force:
  123. cached_files = self._get_latest_cache(note_id)
  124. if cached_files:
  125. raw_file, clean_file = cached_files
  126. cached_result = self._load_cached_result(raw_file)
  127. if cached_result:
  128. print(f"✓ 使用缓存数据: {raw_file}")
  129. return cached_result, True # 返回缓存标记
  130. payload = {"note_id": note_id}
  131. last_exception = None
  132. for attempt in range(max_retries):
  133. try:
  134. if attempt > 0:
  135. wait_time = retry_delay * (2 ** (attempt - 1))
  136. print(f"等待 {wait_time} 秒后进行第 {attempt + 1} 次重试...")
  137. time.sleep(wait_time)
  138. print(f"正在获取笔记详情: {note_id} (尝试 {attempt + 1}/{max_retries})")
  139. response = requests.post(
  140. self.api_url,
  141. json=payload,
  142. timeout=timeout,
  143. headers={"Content-Type": "application/json"}
  144. )
  145. response.raise_for_status()
  146. raw_result = response.json()
  147. # 如果 result 字段是字符串,需要解析成 JSON 对象
  148. if 'result' in raw_result and isinstance(raw_result['result'], str):
  149. try:
  150. raw_result['result'] = json.loads(raw_result['result'])
  151. except json.JSONDecodeError:
  152. pass
  153. # 检查 API 返回是否成功
  154. if not raw_result.get('success'):
  155. error_msg = raw_result.get('message', '未知错误')
  156. print(f"✗ API 返回失败: {error_msg}")
  157. last_exception = Exception(f"API 返回失败: {error_msg}")
  158. continue # 继续重试
  159. print(f"✓ 获取成功!")
  160. return raw_result, False # 返回新数据标记
  161. except requests.exceptions.Timeout as e:
  162. last_exception = e
  163. print(f"✗ 请求超时: {e}")
  164. except requests.exceptions.ConnectionError as e:
  165. last_exception = e
  166. print(f"✗ 连接错误: {e}")
  167. except requests.exceptions.HTTPError as e:
  168. last_exception = e
  169. status_code = e.response.status_code if e.response else "未知"
  170. print(f"✗ HTTP错误 {status_code}: {e}")
  171. # 如果是客户端错误(4xx),不重试
  172. if e.response and 400 <= e.response.status_code < 500:
  173. print(f"客户端错误,停止重试")
  174. raise
  175. except requests.exceptions.RequestException as e:
  176. last_exception = e
  177. print(f"✗ 请求失败: {e}")
  178. # 所有重试都失败
  179. print(f"✗ 已达到最大重试次数 ({max_retries}),请求失败")
  180. raise last_exception
  181. def _extract_clean_data(self, raw_result: Dict[str, Any]) -> Dict[str, Any]:
  182. """
  183. 提取并清理数据,生成扁平化的结构(参考现有格式)
  184. Args:
  185. raw_result: 原始 API 响应
  186. Returns:
  187. 清理后的笔记详情
  188. """
  189. if not raw_result.get("success"):
  190. return {}
  191. result = raw_result.get("result", [])
  192. if not result or not isinstance(result, list) or len(result) == 0:
  193. return {}
  194. data = result[0].get("data", {})
  195. # 提取图片 URL 并按顺序去重
  196. images = []
  197. seen = set()
  198. for img in data.get("images", []):
  199. url = None
  200. if isinstance(img, dict) and "cdn_url" in img:
  201. url = img["cdn_url"]
  202. elif isinstance(img, str):
  203. url = img
  204. # 按顺序去重
  205. if url and url not in seen:
  206. images.append(url)
  207. seen.add(url)
  208. # 处理时间戳转换为时间字符串
  209. publish_timestamp = data.get("publish_timestamp")
  210. publish_time = None
  211. if publish_timestamp:
  212. try:
  213. from datetime import datetime
  214. # 毫秒时间戳转换为秒
  215. dt = datetime.fromtimestamp(publish_timestamp / 1000)
  216. publish_time = dt.strftime("%Y-%m-%d %H:%M:%S")
  217. except:
  218. publish_time = None
  219. # 获取 video 字段
  220. video = data.get("video") or None
  221. # 根据 video 字段判断 content_type
  222. if video:
  223. content_type = "video"
  224. else:
  225. content_type = "normal"
  226. # 构建清理后的数据(扁平化结构,参考现有格式)
  227. # 不存在的字段统一用 None/null 表示
  228. clean_data = {
  229. "channel_content_id": data.get("channel_content_id") or None,
  230. "link": data.get("content_link") or None,
  231. "comment_count": data.get("comment_count"),
  232. "images": images if images else [],
  233. "like_count": data.get("like_count"),
  234. "body_text": data.get("body_text") or None,
  235. "title": data.get("title") or None,
  236. "collect_count": data.get("collect_count"),
  237. "channel_account_id": data.get("channel_account_id") or None,
  238. "channel_account_name": data.get("channel_account_name") or None,
  239. "content_type": content_type,
  240. "video": video,
  241. "publish_timestamp": publish_timestamp,
  242. "publish_time": publish_time
  243. }
  244. return clean_data
  245. def save_result(
  246. self,
  247. note_id: str,
  248. raw_result: Dict[str, Any]
  249. ) -> tuple[str, str]:
  250. """
  251. 保存原始数据和清理后数据到不同的目录
  252. 目录结构:
  253. data/detail/xiaohongshu_detail/
  254. └── {note_id}/
  255. ├── raw/ # 原始数据(完整 API 响应)
  256. │ └── {timestamp}.json
  257. └── clean/ # 清理后数据(扁平化结构)
  258. └── {timestamp}.json
  259. Args:
  260. note_id: 笔记ID
  261. raw_result: 原始数据(已解析 result 字段)
  262. Returns:
  263. (原始数据路径, 清理后数据路径) 的元组
  264. """
  265. # 清理笔记ID用于文件夹名称
  266. safe_note_id = self._sanitize_note_id(note_id)
  267. # 创建目录结构
  268. base_dir = os.path.join(self.results_base_dir, "xiaohongshu_detail", safe_note_id)
  269. raw_dir = os.path.join(base_dir, "raw")
  270. clean_dir = os.path.join(base_dir, "clean")
  271. os.makedirs(raw_dir, exist_ok=True)
  272. os.makedirs(clean_dir, exist_ok=True)
  273. # 生成文件名(使用时间戳)
  274. timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
  275. filename = f"{timestamp}.json"
  276. raw_filepath = os.path.join(raw_dir, filename)
  277. clean_filepath = os.path.join(clean_dir, filename)
  278. # 添加元数据到 raw 数据
  279. raw_data_with_meta = {
  280. "note_id": note_id,
  281. "timestamp": timestamp,
  282. "api_response": raw_result
  283. }
  284. # 保存原始结果(包含元数据)
  285. with open(raw_filepath, 'w', encoding='utf-8') as f:
  286. json.dump(raw_data_with_meta, f, ensure_ascii=False, indent=2)
  287. # 提取并保存清理后的数据(扁平化结构,直接保存)
  288. clean_data = self._extract_clean_data(raw_result)
  289. with open(clean_filepath, 'w', encoding='utf-8') as f:
  290. json.dump(clean_data, f, ensure_ascii=False, indent=2)
  291. return raw_filepath, clean_filepath
  292. def get_xiaohongshu_detail(
  293. note_id: str,
  294. force: bool = False
  295. ) -> Dict[str, Any]:
  296. """
  297. 获取小红书笔记详情
  298. Args:
  299. note_id: 笔记ID
  300. force: 强制刷新(忽略缓存)
  301. Returns:
  302. 笔记详情数据(clean 格式,扁平化结构)
  303. Examples:
  304. >>> # 基本使用
  305. >>> detail = get_xiaohongshu_detail("6915588b00000000040143b5")
  306. >>> print(detail['title'])
  307. >>> print(detail['body_text'])
  308. >>> # 强制刷新
  309. >>> detail = get_xiaohongshu_detail("6915588b00000000040143b5", force=True)
  310. """
  311. # 创建客户端(使用默认配置)
  312. client = XiaohongshuDetail(use_cache=True)
  313. # 获取详情(内部处理重试、超时等)
  314. raw_result, from_cache = client.get_detail(note_id=note_id, force=force)
  315. # 只有新请求的数据才需要保存
  316. if not from_cache:
  317. raw_filepath, clean_filepath = client.save_result(note_id=note_id, raw_result=raw_result)
  318. # 读取并返回 clean 数据
  319. with open(clean_filepath, 'r', encoding='utf-8') as f:
  320. return json.load(f)
  321. else:
  322. # 如果是缓存数据,直接提取 clean 数据返回
  323. clean_data = client._extract_clean_data(raw_result)
  324. return clean_data
  325. def main():
  326. """示例使用"""
  327. # 解析命令行参数
  328. parser = argparse.ArgumentParser(description='小红书笔记详情工具')
  329. parser.add_argument(
  330. '--results-dir',
  331. type=str,
  332. default=None,
  333. help='结果输出目录 (默认: 从配置读取)'
  334. )
  335. parser.add_argument(
  336. '--note-id',
  337. type=str,
  338. required=True,
  339. help='笔记ID (必填)'
  340. )
  341. parser.add_argument(
  342. '--force',
  343. action='store_true',
  344. help='强制重新请求API,忽略缓存'
  345. )
  346. parser.add_argument(
  347. '--no-cache',
  348. action='store_true',
  349. help='禁用缓存功能'
  350. )
  351. parser.add_argument(
  352. '--timeout',
  353. type=int,
  354. default=30,
  355. help='请求超时秒数 (默认: 30)'
  356. )
  357. parser.add_argument(
  358. '--max-retries',
  359. type=int,
  360. default=5,
  361. help='最大重试次数 (默认: 5)'
  362. )
  363. parser.add_argument(
  364. '--retry-delay',
  365. type=int,
  366. default=2,
  367. help='重试延迟秒数 (默认: 2)'
  368. )
  369. args = parser.parse_args()
  370. # 创建API客户端实例
  371. use_cache = not args.no_cache
  372. client = XiaohongshuDetail(results_dir=args.results_dir, use_cache=use_cache)
  373. # 执行获取并保存
  374. try:
  375. raw_result, from_cache = client.get_detail(
  376. args.note_id,
  377. timeout=args.timeout,
  378. max_retries=args.max_retries,
  379. retry_delay=args.retry_delay,
  380. force=args.force
  381. )
  382. # 只有新数据才保存
  383. if not from_cache:
  384. raw_filepath, clean_filepath = client.save_result(args.note_id, raw_result)
  385. print(f"Raw data saved to: {raw_filepath}")
  386. print(f"Clean data saved to: {clean_filepath}")
  387. else:
  388. print(f"Used cached data, no new files saved")
  389. except Exception as e:
  390. print(f"Error: {e}", file=__import__('sys').stderr)
  391. if __name__ == "__main__":
  392. main()