xiaohongshu_search.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322
  1. #!/usr/bin/env python3
  2. """
  3. 小红书笔记搜索工具
  4. 根据关键词搜索小红书笔记,支持多种筛选条件
  5. """
  6. import requests
  7. import json
  8. import os
  9. import argparse
  10. import time
  11. import logging
  12. from datetime import datetime
  13. from typing import Dict, Any
  14. logger = logging.getLogger(__name__)
  15. class XiaohongshuSearch:
  16. """小红书笔记搜索API封装类"""
  17. BASE_URL = "http://47.84.182.56:8001"
  18. TOOL_NAME = "xhs_note_search"
  19. PLATFORM = "xiaohongshu"
  20. def __init__(self, results_dir: str = None, cache_dir: str = "search_cache"):
  21. """
  22. 初始化API客户端
  23. Args:
  24. results_dir: 结果输出目录,默认为项目根目录下的 data/search 文件夹
  25. cache_dir: 缓存目录,默认为 search_cache
  26. """
  27. self.api_url = f"{self.BASE_URL}/tools/call/{self.TOOL_NAME}"
  28. # 设置结果输出目录
  29. if results_dir:
  30. self.results_base_dir = results_dir
  31. else:
  32. # 默认使用项目根目录的 data/search 文件夹
  33. script_dir = os.path.dirname(os.path.abspath(__file__))
  34. project_root = os.path.dirname(os.path.dirname(script_dir))
  35. self.results_base_dir = os.path.join(project_root, "data", "search")
  36. # 设置缓存目录
  37. self.cache_dir = cache_dir
  38. if cache_dir:
  39. os.makedirs(cache_dir, exist_ok=True)
  40. def _get_cache_key(
  41. self,
  42. keyword: str,
  43. content_type: str,
  44. sort_type: str,
  45. publish_time: str
  46. ) -> str:
  47. """
  48. 生成缓存key
  49. Args:
  50. keyword: 搜索关键词
  51. content_type: 内容类型
  52. sort_type: 排序方式
  53. publish_time: 发布时间
  54. Returns:
  55. 缓存key字符串
  56. """
  57. return f"{keyword}_{content_type}_{sort_type}_{publish_time}"
  58. def _get_cache_path(self, cache_key: str) -> str:
  59. """
  60. 获取缓存文件路径
  61. Args:
  62. cache_key: 缓存key
  63. Returns:
  64. 缓存文件完整路径
  65. """
  66. # 清理文件名中的非法字符
  67. safe_key = cache_key.replace('/', '_').replace('\\', '_').replace(' ', '_')
  68. return os.path.join(self.cache_dir, f"{safe_key}.json")
  69. def search(
  70. self,
  71. keyword: str,
  72. content_type: str = "不限",
  73. sort_type: str = "综合",
  74. publish_time: str = "不限",
  75. cursor: str = "",
  76. timeout: int = 30,
  77. max_retries: int = 5,
  78. retry_delay: int = 2,
  79. use_cache: bool = True
  80. ) -> Dict[str, Any]:
  81. """
  82. 搜索小红书笔记(带重试机制和缓存)
  83. Args:
  84. keyword: 搜索关键词
  85. content_type: 内容类型,可选值:不限、视频、图文,默认为'不限'
  86. sort_type: 排序方式,可选值:综合、最新、最多点赞、最多评论,默认为'综合'
  87. publish_time: 发布时间筛选,可选值:不限、一天内、一周内、半年内,默认为'不限'
  88. cursor: 翻页游标,第一页默认为空,下一页的游标在上一页的返回值中获取
  89. timeout: 请求超时时间(秒),默认30秒
  90. max_retries: 最大重试次数,默认3次
  91. retry_delay: 重试间隔时间(秒),默认2秒
  92. use_cache: 是否使用缓存,默认True
  93. Returns:
  94. API响应的JSON数据
  95. Raises:
  96. requests.exceptions.RequestException: 所有重试都失败时抛出异常
  97. """
  98. # 检查缓存
  99. if use_cache and self.cache_dir:
  100. cache_key = self._get_cache_key(keyword, content_type, sort_type, publish_time)
  101. cache_path = self._get_cache_path(cache_key)
  102. if os.path.exists(cache_path):
  103. try:
  104. with open(cache_path, 'r', encoding='utf-8') as f:
  105. cached_result = json.load(f)
  106. logger.info(f" ✓ 使用缓存: {keyword}")
  107. return cached_result
  108. except Exception as e:
  109. logger.warning(f" 读取缓存失败: {e},将重新搜索")
  110. # 缓存未命中或未启用,执行实际搜索
  111. payload = {
  112. "keyword": keyword,
  113. "content_type": '不限', # 使用映射后的参数
  114. "sort_type": sort_type,
  115. "publish_time": publish_time,
  116. "cursor": cursor
  117. }
  118. last_exception = None
  119. # 重试循环:最多尝试 max_retries 次
  120. for attempt in range(1, max_retries + 1):
  121. try:
  122. if attempt > 1:
  123. print(f" 重试第 {attempt - 1}/{max_retries - 1} 次: {keyword}")
  124. response = requests.post(
  125. self.api_url,
  126. json=payload,
  127. timeout=timeout,
  128. headers={"Content-Type": "application/json"}
  129. )
  130. response.raise_for_status()
  131. api_response = response.json()
  132. # 解析API返回的result字段(是JSON字符串)
  133. if not api_response.get("success"):
  134. raise Exception(f"API返回失败: {api_response}")
  135. result_str = api_response.get("result", "{}")
  136. result = json.loads(result_str)
  137. # 预处理返回数据:提取 image_list 中的 URL 字符串
  138. self._preprocess_response(result)
  139. if attempt > 1:
  140. print(f" ✓ 重试成功")
  141. # 保存到缓存
  142. if use_cache and self.cache_dir:
  143. try:
  144. cache_key = self._get_cache_key(keyword, content_type, sort_type, publish_time)
  145. cache_path = self._get_cache_path(cache_key)
  146. with open(cache_path, 'w', encoding='utf-8') as f:
  147. json.dump(result, f, ensure_ascii=False, indent=2)
  148. logger.info(f" ✓ 已缓存: {keyword}")
  149. except Exception as e:
  150. logger.warning(f" 保存缓存失败: {e}")
  151. return result
  152. except requests.exceptions.RequestException as e:
  153. last_exception = e
  154. if attempt < max_retries:
  155. # 还有重试机会,等待后继续
  156. print(f" ✗ 请求失败 (第{attempt}次尝试): {e}")
  157. print(f" 等待 {retry_delay} 秒后重试...")
  158. time.sleep(retry_delay)
  159. else:
  160. # 已达最大重试次数,抛出异常
  161. print(f" ✗ 请求失败 (已达最大重试次数 {max_retries}): {e}")
  162. # 所有重试都失败,抛出最后一次的异常
  163. raise last_exception
  164. def _preprocess_response(self, result: Dict[str, Any]) -> None:
  165. """
  166. 预处理搜索结果,将 image_list 中的字典格式转换为 URL 字符串列表
  167. Args:
  168. result: API返回的原始结果字典(会直接修改)
  169. """
  170. # 获取帖子列表
  171. notes = result.get("data", {}).get("data", [])
  172. for note in notes:
  173. note_card = note.get("note_card", {})
  174. image_list_raw = note_card.get("image_list", [])
  175. # 提取 URL 字符串
  176. image_list = []
  177. for img in image_list_raw:
  178. if isinstance(img, dict) and "image_url" in img:
  179. image_list.append(img["image_url"])
  180. elif isinstance(img, str):
  181. # 如果已经是字符串,直接使用
  182. image_list.append(img)
  183. # 更新为预处理后的列表
  184. note_card["image_list"] = image_list
  185. def save_result(self, keyword: str, result: Dict[str, Any], page: int = 1) -> str:
  186. """
  187. 保存结果到文件
  188. 目录结构: results/xiaohongshu_search/关键词/时间戳_page{页码}.json
  189. Args:
  190. keyword: 搜索关键词
  191. result: API返回的结果
  192. page: 页码
  193. Returns:
  194. 保存的文件路径
  195. """
  196. # 创建目录结构: results/xiaohongshu_search/关键词/
  197. result_dir = os.path.join(self.results_base_dir, "xiaohongshu_search", keyword)
  198. os.makedirs(result_dir, exist_ok=True)
  199. # 文件名使用时间戳和页码
  200. timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
  201. filename = f"{timestamp}_page{page}.json"
  202. filepath = os.path.join(result_dir, filename)
  203. # 保存结果
  204. with open(filepath, 'w', encoding='utf-8') as f:
  205. json.dump(result, f, ensure_ascii=False, indent=2)
  206. return filepath
  207. def main():
  208. """示例使用"""
  209. # 解析命令行参数
  210. parser = argparse.ArgumentParser(description='小红书笔记搜索工具')
  211. parser.add_argument(
  212. '--results-dir',
  213. type=str,
  214. default='data/search',
  215. help='结果输出目录 (默认: data/search)'
  216. )
  217. parser.add_argument(
  218. '--keyword',
  219. type=str,
  220. required=True,
  221. help='搜索关键词 (必填)'
  222. )
  223. parser.add_argument(
  224. '--content-type',
  225. type=str,
  226. default='不限',
  227. choices=['不限', '视频', '图文'],
  228. help='内容类型 (默认: 不限)'
  229. )
  230. parser.add_argument(
  231. '--sort-type',
  232. type=str,
  233. default='综合',
  234. choices=['综合', '最新', '最多点赞', '最多评论'],
  235. help='排序方式 (默认: 综合)'
  236. )
  237. parser.add_argument(
  238. '--publish-time',
  239. type=str,
  240. default='不限',
  241. choices=['不限', '一天内', '一周内', '半年内'],
  242. help='发布时间筛选 (默认: 不限)'
  243. )
  244. parser.add_argument(
  245. '--cursor',
  246. type=str,
  247. default='',
  248. help='翻页游标 (默认为空,即第一页)'
  249. )
  250. parser.add_argument(
  251. '--page',
  252. type=int,
  253. default=1,
  254. help='页码标识,用于保存文件名 (默认: 1)'
  255. )
  256. args = parser.parse_args()
  257. # 创建API客户端实例
  258. client = XiaohongshuSearch(results_dir=args.results_dir)
  259. # 执行搜索并保存
  260. try:
  261. result = client.search(
  262. args.keyword,
  263. args.content_type,
  264. args.sort_type,
  265. args.publish_time,
  266. args.cursor
  267. )
  268. filepath = client.save_result(args.keyword, result, args.page)
  269. print(f"Output: {filepath}")
  270. except Exception as e:
  271. print(f"Error: {e}", file=__import__('sys').stderr)
  272. if __name__ == "__main__":
  273. main()