#!/usr/bin/env python3 """ 小红书搜索推荐词接口 获取小红书平台搜索框中的推荐词 """ import requests import json import os import argparse import time import ast import hashlib from datetime import datetime from typing import Dict, Any, Optional class XiaohongshuSearchRecommendations: """小红书搜索推荐词API封装类""" BASE_URL = "http://47.84.182.56:8001" TOOL_NAME = "Xiaohongshu_Search_Recommendations" PLATFORM = "xiaohongshu" # 平台名称 def __init__(self, results_dir: str = None, enable_cache: bool = True, cache_ttl: int = 86400): """ 初始化API客户端 Args: results_dir: 结果输出目录,默认为脚本所在目录下的 results 文件夹 enable_cache: 是否启用缓存(从已保存的文件中读取),默认为 True cache_ttl: 缓存有效期(秒),默认为 86400 秒(24小时) """ self.api_url = f"{self.BASE_URL}/tools/call/{self.TOOL_NAME}" # 设置结果输出目录 if results_dir: self.results_base_dir = results_dir else: # 默认使用项目根目录的 data/search_recommendations 文件夹 script_dir = os.path.dirname(os.path.abspath(__file__)) project_root = os.path.dirname(os.path.dirname(script_dir)) self.results_base_dir = os.path.join(project_root, "data", "search_recommendations") # 缓存设置 self.enable_cache = enable_cache self.cache_ttl = cache_ttl self._memory_cache = {} # 内存缓存: {keyword: (data, timestamp)} def _get_from_cache(self, keyword: str) -> Optional[Dict[str, Any]]: """ 从缓存中获取数据(先查内存缓存,再查文件缓存) Args: keyword: 搜索关键词 Returns: 缓存的数据,如果没有有效缓存则返回 None """ if not self.enable_cache: return None current_time = time.time() # 1. 检查内存缓存 if keyword in self._memory_cache: data, timestamp = self._memory_cache[keyword] if current_time - timestamp < self.cache_ttl: # print(f"从内存缓存中获取关键词 '{keyword}' 的数据") return data else: # 内存缓存已过期,删除 del self._memory_cache[keyword] # 2. 检查文件缓存(从已保存的文件中读取最新的) result_dir = os.path.join(self.results_base_dir, self.PLATFORM, keyword) if os.path.exists(result_dir): files = [f for f in os.listdir(result_dir) if f.endswith('.json')] if files: # 按文件名排序(时间戳),获取最新的文件 files.sort(reverse=True) latest_file = os.path.join(result_dir, files[0]) # 检查文件修改时间 file_mtime = os.path.getmtime(latest_file) if current_time - file_mtime < self.cache_ttl: try: with open(latest_file, 'r', encoding='utf-8') as f: data = json.load(f) # 更新内存缓存 self._memory_cache[keyword] = (data, file_mtime) # print(f"从文件缓存中获取关键词 '{keyword}' 的数据: {latest_file}") return data except Exception as e: print(f"读取缓存文件失败: {e}") return None def get_recommendations(self, keyword: str, timeout: int = 300, max_retries: int = 10, retry_delay: int = 2, use_cache: bool = True) -> Dict[str, Any]: """ 获取小红书搜索推荐词 Args: keyword: 搜索关键词,例如:'长沙'、'美妆'等 timeout: 请求超时时间(秒),默认300秒 max_retries: 最大重试次数,默认10次 retry_delay: 重试间隔时间(秒),默认2秒 use_cache: 是否使用缓存,默认为 True Returns: API响应的JSON数据 Raises: requests.exceptions.RequestException: 请求失败时抛出异常 """ # 尝试从缓存获取 if use_cache: cached_data = self._get_from_cache(keyword) if cached_data is not None: return cached_data # 缓存未命中,发起API请求 # print(f"缓存未命中,发起API请求获取关键词 '{keyword}' 的数据") payload = {"keyword": keyword} last_error = None for attempt in range(max_retries + 1): try: response = requests.post( self.api_url, json=payload, timeout=timeout, headers={"Content-Type": "application/json"} ) response.raise_for_status() res = response.json() # 使用 ast.literal_eval 解析 Python 字典字符串(不是标准 JSON) # print(res) import json result = json.loads(res['result']) # result = ast.literal_eval(res['result']) # 成功:code == 0 if result.get('code') == 0: data = result['data']['data'] # 保存到内存缓存 self._memory_cache[keyword] = (data, time.time()) # 自动保存到文件缓存 if self.enable_cache: self.save_result(keyword, data) return data # 失败:code != 0 last_error = f"code={result.get('code')}" except Exception as e: from traceback import print_exc print(f"发生异常: {e}") print_exc() last_error = str(e) # 统一处理重试逻辑 if attempt < max_retries: print(f"请求失败 ({last_error}), 第{attempt + 1}次重试,等待{retry_delay}秒...") time.sleep(retry_delay) else: print(f"达到最大重试次数({max_retries}),最后错误: {last_error}") return [] def clear_memory_cache(self, keyword: Optional[str] = None): """ 清除内存缓存 Args: keyword: 要清除的关键词,如果为 None 则清除所有内存缓存 """ if keyword: if keyword in self._memory_cache: del self._memory_cache[keyword] print(f"已清除关键词 '{keyword}' 的内存缓存") else: self._memory_cache.clear() print("已清除所有内存缓存") def clear_file_cache(self, keyword: Optional[str] = None, keep_latest: bool = True): """ 清除文件缓存 Args: keyword: 要清除的关键词,如果为 None 则清除所有文件缓存 keep_latest: 是否保留最新的文件,默认为 True """ if keyword: result_dir = os.path.join(self.results_base_dir, self.PLATFORM, keyword) if os.path.exists(result_dir): files = [f for f in os.listdir(result_dir) if f.endswith('.json')] if files: files.sort(reverse=True) # 保留最新的文件 files_to_delete = files[1:] if keep_latest else files for f in files_to_delete: filepath = os.path.join(result_dir, f) os.remove(filepath) print(f"已删除缓存文件: {filepath}") else: platform_dir = os.path.join(self.results_base_dir, self.PLATFORM) if os.path.exists(platform_dir): for keyword_dir in os.listdir(platform_dir): keyword_path = os.path.join(platform_dir, keyword_dir) if os.path.isdir(keyword_path): files = [f for f in os.listdir(keyword_path) if f.endswith('.json')] if files: files.sort(reverse=True) files_to_delete = files[1:] if keep_latest else files for f in files_to_delete: filepath = os.path.join(keyword_path, f) os.remove(filepath) print(f"已删除缓存文件: {filepath}") def get_cache_info(self, keyword: Optional[str] = None) -> Dict[str, Any]: """ 获取缓存信息 Args: keyword: 要查询的关键词,如果为 None 则返回所有缓存信息 Returns: 缓存信息字典 """ info = { "memory_cache": {}, "file_cache": {} } current_time = time.time() # 内存缓存信息 if keyword: if keyword in self._memory_cache: data, timestamp = self._memory_cache[keyword] info["memory_cache"][keyword] = { "count": len(data) if isinstance(data, list) else 1, "timestamp": datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d %H:%M:%S"), "age_seconds": int(current_time - timestamp), "is_expired": current_time - timestamp >= self.cache_ttl } else: for kw, (data, timestamp) in self._memory_cache.items(): info["memory_cache"][kw] = { "count": len(data) if isinstance(data, list) else 1, "timestamp": datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d %H:%M:%S"), "age_seconds": int(current_time - timestamp), "is_expired": current_time - timestamp >= self.cache_ttl } # 文件缓存信息 platform_dir = os.path.join(self.results_base_dir, self.PLATFORM) if os.path.exists(platform_dir): keywords = [keyword] if keyword else os.listdir(platform_dir) for kw in keywords: keyword_path = os.path.join(platform_dir, kw) if os.path.isdir(keyword_path): files = [f for f in os.listdir(keyword_path) if f.endswith('.json')] if files: files.sort(reverse=True) latest_file = os.path.join(keyword_path, files[0]) file_mtime = os.path.getmtime(latest_file) info["file_cache"][kw] = { "file_count": len(files), "latest_file": files[0], "timestamp": datetime.fromtimestamp(file_mtime).strftime("%Y-%m-%d %H:%M:%S"), "age_seconds": int(current_time - file_mtime), "is_expired": current_time - file_mtime >= self.cache_ttl } return info def save_result(self, keyword: str, result: Dict[str, Any]) -> str: """ 保存结果到文件 目录结构: results/平台/关键词/时间戳.json Args: keyword: 搜索关键词 result: API返回的结果 Returns: 保存的文件路径 """ # 创建目录结构: results/平台/关键词/ result_dir = os.path.join(self.results_base_dir, self.PLATFORM, keyword) os.makedirs(result_dir, exist_ok=True) # 文件名使用时间戳 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"{timestamp}.json" filepath = os.path.join(result_dir, filename) # 保存结果 with open(filepath, 'w', encoding='utf-8') as f: json.dump(result, f, ensure_ascii=False, indent=2) return filepath def main(): """示例使用""" # 解析命令行参数 parser = argparse.ArgumentParser(description='小红书搜索推荐词接口') parser.add_argument( '--results-dir', type=str, default='data/search_recommendations', help='结果输出目录 (默认: data/search_recommendations)' ) parser.add_argument( '--keyword', type=str, required=True, help='搜索关键词 (必填)' ) args = parser.parse_args() # 创建API客户端实例 client = XiaohongshuSearchRecommendations(results_dir=args.results_dir) # 获取推荐词并保存 try: result = client.get_recommendations(args.keyword) filepath = client.save_result(args.keyword, result) print(f"Output: {filepath}") except Exception as e: print(f"Error: {e}", file=__import__('sys').stderr) if __name__ == "__main__": main()