xiaohongshu_detail.py 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282
  1. #!/usr/bin/env python3
  2. """
  3. 小红书笔记详情获取工具
  4. 根据笔记ID获取小红书笔记的详细信息
  5. """
  6. import requests
  7. import json
  8. import os
  9. import argparse
  10. import time
  11. from datetime import datetime
  12. from typing import Dict, Any, List
  13. class XiaohongshuDetail:
  14. """小红书笔记详情API封装类"""
  15. BASE_URL = "http://47.84.182.56:8001"
  16. TOOL_NAME = "get_xhs_detail_by_note_id"
  17. PLATFORM = "xiaohongshu"
  18. def __init__(self, results_dir: str = None):
  19. """
  20. 初始化API客户端
  21. Args:
  22. results_dir: 结果输出目录,默认为项目根目录下的 data/detail 文件夹
  23. """
  24. self.api_url = f"{self.BASE_URL}/tools/call/{self.TOOL_NAME}"
  25. # 设置结果输出目录
  26. if results_dir:
  27. self.results_base_dir = results_dir
  28. else:
  29. # 默认使用项目根目录的 data/detail 文件夹
  30. script_dir = os.path.dirname(os.path.abspath(__file__))
  31. project_root = os.path.dirname(os.path.dirname(script_dir))
  32. self.results_base_dir = os.path.join(project_root, "data", "detail")
  33. def get_detail(
  34. self,
  35. note_id: str,
  36. timeout: int = 30,
  37. max_retries: int = 3,
  38. retry_delay: int = 2
  39. ) -> Dict[str, Any]:
  40. """
  41. 获取小红书笔记详情(带重试机制)
  42. Args:
  43. note_id: 笔记ID
  44. timeout: 请求超时时间(秒),默认30秒
  45. max_retries: 最大重试次数,默认3次
  46. retry_delay: 重试间隔时间(秒),默认2秒
  47. Returns:
  48. API响应的JSON数据
  49. Raises:
  50. requests.exceptions.RequestException: 所有重试都失败时抛出异常
  51. """
  52. payload = {
  53. "note_id": note_id
  54. }
  55. last_exception = None
  56. # 重试循环:最多尝试 max_retries 次
  57. for attempt in range(1, max_retries + 1):
  58. try:
  59. if attempt > 1:
  60. print(f" 重试第 {attempt - 1}/{max_retries - 1} 次: {note_id}")
  61. response = requests.post(
  62. self.api_url,
  63. json=payload,
  64. timeout=timeout,
  65. headers={"Content-Type": "application/json"}
  66. )
  67. response.raise_for_status()
  68. result = response.json()
  69. if attempt > 1:
  70. print(f" ✓ 重试成功")
  71. return result
  72. except requests.exceptions.RequestException as e:
  73. last_exception = e
  74. if attempt < max_retries:
  75. # 还有重试机会,等待后继续
  76. print(f" ✗ 请求失败 (第{attempt}次尝试): {e}")
  77. print(f" 等待 {retry_delay} 秒后重试...")
  78. time.sleep(retry_delay)
  79. else:
  80. # 已达最大重试次数,抛出异常
  81. print(f" ✗ 请求失败 (已达最大重试次数 {max_retries}): {e}")
  82. # 所有重试都失败,抛出最后一次的异常
  83. raise last_exception
  84. def get_details_batch(
  85. self,
  86. note_ids: List[str],
  87. timeout: int = 30,
  88. max_retries: int = 3,
  89. retry_delay: int = 2,
  90. batch_delay: int = 1
  91. ) -> List[Dict[str, Any]]:
  92. """
  93. 批量获取多个笔记的详情
  94. Args:
  95. note_ids: 笔记ID列表
  96. timeout: 请求超时时间(秒),默认30秒
  97. max_retries: 最大重试次数,默认3次
  98. retry_delay: 重试间隔时间(秒),默认2秒
  99. batch_delay: 批量请求间隔时间(秒),默认1秒
  100. Returns:
  101. 包含所有笔记详情的列表
  102. """
  103. results = []
  104. total = len(note_ids)
  105. for idx, note_id in enumerate(note_ids, 1):
  106. print(f"正在获取笔记详情 ({idx}/{total}): {note_id}")
  107. try:
  108. result = self.get_detail(note_id, timeout, max_retries, retry_delay)
  109. results.append({
  110. "note_id": note_id,
  111. "success": True,
  112. "data": result
  113. })
  114. print(f" ✓ 成功获取")
  115. except Exception as e:
  116. print(f" ✗ 获取失败: {e}")
  117. results.append({
  118. "note_id": note_id,
  119. "success": False,
  120. "error": str(e)
  121. })
  122. # 避免请求过快,添加延迟(最后一个不需要延迟)
  123. if idx < total:
  124. time.sleep(batch_delay)
  125. return results
  126. def save_result(self, note_id: str, result: Dict[str, Any]) -> str:
  127. """
  128. 保存单个笔记详情到文件
  129. 目录结构: results/xiaohongshu_detail/note_id/时间戳.json
  130. Args:
  131. note_id: 笔记ID
  132. result: API返回的结果
  133. Returns:
  134. 保存的文件路径
  135. """
  136. # 创建目录结构: results/xiaohongshu_detail/note_id/
  137. result_dir = os.path.join(self.results_base_dir, "xiaohongshu_detail", note_id)
  138. os.makedirs(result_dir, exist_ok=True)
  139. # 文件名使用时间戳
  140. timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
  141. filename = f"{timestamp}.json"
  142. filepath = os.path.join(result_dir, filename)
  143. # 保存结果
  144. with open(filepath, 'w', encoding='utf-8') as f:
  145. json.dump(result, f, ensure_ascii=False, indent=2)
  146. return filepath
  147. def save_batch_results(self, results: List[Dict[str, Any]], batch_name: str = None) -> str:
  148. """
  149. 保存批量获取的结果到单个文件
  150. 目录结构: results/xiaohongshu_detail/batch/时间戳_批次名.json
  151. Args:
  152. results: 批量获取的结果列表
  153. batch_name: 批次名称(可选)
  154. Returns:
  155. 保存的文件路径
  156. """
  157. # 创建目录结构
  158. result_dir = os.path.join(self.results_base_dir, "xiaohongshu_detail", "batch")
  159. os.makedirs(result_dir, exist_ok=True)
  160. # 文件名使用时间戳和批次名
  161. timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
  162. if batch_name:
  163. filename = f"{timestamp}_{batch_name}.json"
  164. else:
  165. filename = f"{timestamp}.json"
  166. filepath = os.path.join(result_dir, filename)
  167. # 保存结果
  168. with open(filepath, 'w', encoding='utf-8') as f:
  169. json.dump(results, f, ensure_ascii=False, indent=2)
  170. return filepath
  171. def main():
  172. """示例使用"""
  173. # 解析命令行参数
  174. parser = argparse.ArgumentParser(description='小红书笔记详情获取工具')
  175. parser.add_argument(
  176. '--results-dir',
  177. type=str,
  178. default='data/detail',
  179. help='结果输出目录 (默认: data/detail)'
  180. )
  181. # 创建互斥参数组:单个笔记ID 或 批量笔记ID
  182. group = parser.add_mutually_exclusive_group(required=True)
  183. group.add_argument(
  184. '--note-id',
  185. type=str,
  186. help='单个笔记ID'
  187. )
  188. group.add_argument(
  189. '--note-ids',
  190. type=str,
  191. help='多个笔记ID,用逗号分隔,例如: id1,id2,id3'
  192. )
  193. group.add_argument(
  194. '--note-ids-file',
  195. type=str,
  196. help='包含笔记ID的文件路径,每行一个ID'
  197. )
  198. parser.add_argument(
  199. '--batch-name',
  200. type=str,
  201. help='批量获取时的批次名称(可选)'
  202. )
  203. args = parser.parse_args()
  204. # 创建API客户端实例
  205. client = XiaohongshuDetail(results_dir=args.results_dir)
  206. try:
  207. # 单个笔记ID
  208. if args.note_id:
  209. result = client.get_detail(args.note_id)
  210. filepath = client.save_result(args.note_id, result)
  211. print(f"Output: {filepath}")
  212. # 多个笔记ID(命令行逗号分隔)
  213. elif args.note_ids:
  214. note_ids = [nid.strip() for nid in args.note_ids.split(',') if nid.strip()]
  215. results = client.get_details_batch(note_ids)
  216. filepath = client.save_batch_results(results, args.batch_name)
  217. print(f"\n批量获取完成")
  218. print(f"成功: {sum(1 for r in results if r['success'])}/{len(results)}")
  219. print(f"Output: {filepath}")
  220. # 从文件读取笔记ID
  221. elif args.note_ids_file:
  222. with open(args.note_ids_file, 'r', encoding='utf-8') as f:
  223. note_ids = [line.strip() for line in f if line.strip()]
  224. results = client.get_details_batch(note_ids)
  225. filepath = client.save_batch_results(results, args.batch_name)
  226. print(f"\n批量获取完成")
  227. print(f"成功: {sum(1 for r in results if r['success'])}/{len(results)}")
  228. print(f"Output: {filepath}")
  229. except Exception as e:
  230. print(f"Error: {e}", file=__import__('sys').stderr)
  231. raise
  232. if __name__ == "__main__":
  233. main()