xiaohongshu_search_recommendations.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344
  1. #!/usr/bin/env python3
  2. """
  3. 小红书搜索推荐词接口
  4. 获取小红书平台搜索框中的推荐词
  5. """
  6. import requests
  7. import json
  8. import os
  9. import argparse
  10. import time
  11. import ast
  12. import hashlib
  13. from datetime import datetime
  14. from typing import Dict, Any, Optional
  15. import sys
  16. from pathlib import Path
  17. # 添加项目根目录到路径并导入配置
  18. sys.path.insert(0, str(Path(__file__).parent.parent.parent))
  19. from lib.config import get_data_dir
  20. class XiaohongshuSearchRecommendations:
  21. """小红书搜索推荐词API封装类"""
  22. BASE_URL = "http://47.84.182.56:8001"
  23. TOOL_NAME = "Xiaohongshu_Search_Recommendations"
  24. PLATFORM = "xiaohongshu" # 平台名称
  25. def __init__(self, results_dir: str = None, enable_cache: bool = True, cache_ttl: int = 86400):
  26. """
  27. 初始化API客户端
  28. Args:
  29. results_dir: 结果输出目录,默认为脚本所在目录下的 results 文件夹
  30. enable_cache: 是否启用缓存(从已保存的文件中读取),默认为 True
  31. cache_ttl: 缓存有效期(秒),默认为 86400 秒(24小时)
  32. """
  33. self.api_url = f"{self.BASE_URL}/tools/call/{self.TOOL_NAME}"
  34. # 设置结果输出目录
  35. if results_dir:
  36. self.results_base_dir = results_dir
  37. else:
  38. # 默认从配置读取
  39. self.results_base_dir = get_data_dir("search_recommendations")
  40. # 缓存设置
  41. self.enable_cache = enable_cache
  42. self.cache_ttl = cache_ttl
  43. self._memory_cache = {} # 内存缓存: {keyword: (data, timestamp)}
  44. def _get_from_cache(self, keyword: str) -> Optional[Dict[str, Any]]:
  45. """
  46. 从缓存中获取数据(先查内存缓存,再查文件缓存)
  47. Args:
  48. keyword: 搜索关键词
  49. Returns:
  50. 缓存的数据,如果没有有效缓存则返回 None
  51. """
  52. if not self.enable_cache:
  53. return None
  54. current_time = time.time()
  55. # 1. 检查内存缓存
  56. if keyword in self._memory_cache:
  57. data, timestamp = self._memory_cache[keyword]
  58. if current_time - timestamp < self.cache_ttl:
  59. # print(f"从内存缓存中获取关键词 '{keyword}' 的数据")
  60. return data
  61. else:
  62. # 内存缓存已过期,删除
  63. del self._memory_cache[keyword]
  64. # 2. 检查文件缓存(从已保存的文件中读取最新的)
  65. result_dir = os.path.join(self.results_base_dir, self.PLATFORM, keyword)
  66. if os.path.exists(result_dir):
  67. files = [f for f in os.listdir(result_dir) if f.endswith('.json')]
  68. if files:
  69. # 按文件名排序(时间戳),获取最新的文件
  70. files.sort(reverse=True)
  71. latest_file = os.path.join(result_dir, files[0])
  72. # 检查文件修改时间
  73. file_mtime = os.path.getmtime(latest_file)
  74. if current_time - file_mtime < self.cache_ttl:
  75. try:
  76. with open(latest_file, 'r', encoding='utf-8') as f:
  77. data = json.load(f)
  78. # 更新内存缓存
  79. self._memory_cache[keyword] = (data, file_mtime)
  80. # print(f"从文件缓存中获取关键词 '{keyword}' 的数据: {latest_file}")
  81. return data
  82. except Exception as e:
  83. print(f"读取缓存文件失败: {e}")
  84. return None
  85. def get_recommendations(self, keyword: str, timeout: int = 300, max_retries: int = 10, retry_delay: int = 2, use_cache: bool = True) -> Dict[str, Any]:
  86. """
  87. 获取小红书搜索推荐词
  88. Args:
  89. keyword: 搜索关键词,例如:'长沙'、'美妆'等
  90. timeout: 请求超时时间(秒),默认300秒
  91. max_retries: 最大重试次数,默认10次
  92. retry_delay: 重试间隔时间(秒),默认2秒
  93. use_cache: 是否使用缓存,默认为 True
  94. Returns:
  95. API响应的JSON数据
  96. Raises:
  97. requests.exceptions.RequestException: 请求失败时抛出异常
  98. """
  99. # 尝试从缓存获取
  100. if use_cache:
  101. cached_data = self._get_from_cache(keyword)
  102. if cached_data is not None:
  103. return cached_data
  104. # 缓存未命中,发起API请求
  105. # print(f"缓存未命中,发起API请求获取关键词 '{keyword}' 的数据")
  106. payload = {"keyword": keyword}
  107. last_error = None
  108. for attempt in range(max_retries + 1):
  109. try:
  110. response = requests.post(
  111. self.api_url,
  112. json=payload,
  113. timeout=timeout,
  114. headers={"Content-Type": "application/json"}
  115. )
  116. response.raise_for_status()
  117. res = response.json()
  118. # 使用 ast.literal_eval 解析 Python 字典字符串(不是标准 JSON)
  119. # print(res)
  120. import json
  121. result = json.loads(res['result'])
  122. # result = ast.literal_eval(res['result'])
  123. # 成功:code == 0
  124. if result.get('code') == 0:
  125. data = result['data']['data']
  126. # 保存到内存缓存
  127. self._memory_cache[keyword] = (data, time.time())
  128. # 自动保存到文件缓存
  129. if self.enable_cache:
  130. self.save_result(keyword, data)
  131. return data
  132. # 失败:code != 0
  133. last_error = f"code={result.get('code')}"
  134. except Exception as e:
  135. from traceback import print_exc
  136. print(f"发生异常: {e}")
  137. print_exc()
  138. last_error = str(e)
  139. # 统一处理重试逻辑
  140. if attempt < max_retries:
  141. print(f"请求失败 ({last_error}), 第{attempt + 1}次重试,等待{retry_delay}秒...")
  142. time.sleep(retry_delay)
  143. else:
  144. print(f"达到最大重试次数({max_retries}),最后错误: {last_error}")
  145. return []
  146. def clear_memory_cache(self, keyword: Optional[str] = None):
  147. """
  148. 清除内存缓存
  149. Args:
  150. keyword: 要清除的关键词,如果为 None 则清除所有内存缓存
  151. """
  152. if keyword:
  153. if keyword in self._memory_cache:
  154. del self._memory_cache[keyword]
  155. print(f"已清除关键词 '{keyword}' 的内存缓存")
  156. else:
  157. self._memory_cache.clear()
  158. print("已清除所有内存缓存")
  159. def clear_file_cache(self, keyword: Optional[str] = None, keep_latest: bool = True):
  160. """
  161. 清除文件缓存
  162. Args:
  163. keyword: 要清除的关键词,如果为 None 则清除所有文件缓存
  164. keep_latest: 是否保留最新的文件,默认为 True
  165. """
  166. if keyword:
  167. result_dir = os.path.join(self.results_base_dir, self.PLATFORM, keyword)
  168. if os.path.exists(result_dir):
  169. files = [f for f in os.listdir(result_dir) if f.endswith('.json')]
  170. if files:
  171. files.sort(reverse=True)
  172. # 保留最新的文件
  173. files_to_delete = files[1:] if keep_latest else files
  174. for f in files_to_delete:
  175. filepath = os.path.join(result_dir, f)
  176. os.remove(filepath)
  177. print(f"已删除缓存文件: {filepath}")
  178. else:
  179. platform_dir = os.path.join(self.results_base_dir, self.PLATFORM)
  180. if os.path.exists(platform_dir):
  181. for keyword_dir in os.listdir(platform_dir):
  182. keyword_path = os.path.join(platform_dir, keyword_dir)
  183. if os.path.isdir(keyword_path):
  184. files = [f for f in os.listdir(keyword_path) if f.endswith('.json')]
  185. if files:
  186. files.sort(reverse=True)
  187. files_to_delete = files[1:] if keep_latest else files
  188. for f in files_to_delete:
  189. filepath = os.path.join(keyword_path, f)
  190. os.remove(filepath)
  191. print(f"已删除缓存文件: {filepath}")
  192. def get_cache_info(self, keyword: Optional[str] = None) -> Dict[str, Any]:
  193. """
  194. 获取缓存信息
  195. Args:
  196. keyword: 要查询的关键词,如果为 None 则返回所有缓存信息
  197. Returns:
  198. 缓存信息字典
  199. """
  200. info = {
  201. "memory_cache": {},
  202. "file_cache": {}
  203. }
  204. current_time = time.time()
  205. # 内存缓存信息
  206. if keyword:
  207. if keyword in self._memory_cache:
  208. data, timestamp = self._memory_cache[keyword]
  209. info["memory_cache"][keyword] = {
  210. "count": len(data) if isinstance(data, list) else 1,
  211. "timestamp": datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d %H:%M:%S"),
  212. "age_seconds": int(current_time - timestamp),
  213. "is_expired": current_time - timestamp >= self.cache_ttl
  214. }
  215. else:
  216. for kw, (data, timestamp) in self._memory_cache.items():
  217. info["memory_cache"][kw] = {
  218. "count": len(data) if isinstance(data, list) else 1,
  219. "timestamp": datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d %H:%M:%S"),
  220. "age_seconds": int(current_time - timestamp),
  221. "is_expired": current_time - timestamp >= self.cache_ttl
  222. }
  223. # 文件缓存信息
  224. platform_dir = os.path.join(self.results_base_dir, self.PLATFORM)
  225. if os.path.exists(platform_dir):
  226. keywords = [keyword] if keyword else os.listdir(platform_dir)
  227. for kw in keywords:
  228. keyword_path = os.path.join(platform_dir, kw)
  229. if os.path.isdir(keyword_path):
  230. files = [f for f in os.listdir(keyword_path) if f.endswith('.json')]
  231. if files:
  232. files.sort(reverse=True)
  233. latest_file = os.path.join(keyword_path, files[0])
  234. file_mtime = os.path.getmtime(latest_file)
  235. info["file_cache"][kw] = {
  236. "file_count": len(files),
  237. "latest_file": files[0],
  238. "timestamp": datetime.fromtimestamp(file_mtime).strftime("%Y-%m-%d %H:%M:%S"),
  239. "age_seconds": int(current_time - file_mtime),
  240. "is_expired": current_time - file_mtime >= self.cache_ttl
  241. }
  242. return info
  243. def save_result(self, keyword: str, result: Dict[str, Any]) -> str:
  244. """
  245. 保存结果到文件
  246. 目录结构: results/平台/关键词/时间戳.json
  247. Args:
  248. keyword: 搜索关键词
  249. result: API返回的结果
  250. Returns:
  251. 保存的文件路径
  252. """
  253. # 创建目录结构: results/平台/关键词/
  254. result_dir = os.path.join(self.results_base_dir, self.PLATFORM, keyword)
  255. os.makedirs(result_dir, exist_ok=True)
  256. # 文件名使用时间戳
  257. timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
  258. filename = f"{timestamp}.json"
  259. filepath = os.path.join(result_dir, filename)
  260. # 保存结果
  261. with open(filepath, 'w', encoding='utf-8') as f:
  262. json.dump(result, f, ensure_ascii=False, indent=2)
  263. return filepath
  264. def main():
  265. """示例使用"""
  266. # 解析命令行参数
  267. parser = argparse.ArgumentParser(description='小红书搜索推荐词接口')
  268. parser.add_argument(
  269. '--results-dir',
  270. type=str,
  271. default=None,
  272. help='结果输出目录 (默认: 从配置读取)'
  273. )
  274. parser.add_argument(
  275. '--keyword',
  276. type=str,
  277. required=True,
  278. help='搜索关键词 (必填)'
  279. )
  280. args = parser.parse_args()
  281. # 创建API客户端实例
  282. client = XiaohongshuSearchRecommendations(results_dir=args.results_dir)
  283. # 获取推荐词并保存
  284. try:
  285. result = client.get_recommendations(args.keyword)
  286. filepath = client.save_result(args.keyword, result)
  287. print(f"Output: {filepath}")
  288. except Exception as e:
  289. print(f"Error: {e}", file=__import__('sys').stderr)
  290. if __name__ == "__main__":
  291. main()