#!/usr/bin/env python3 """ 小红书笔记搜索工具 根据关键词搜索小红书笔记,支持多种筛选条件 """ import requests import json import os import argparse import time import logging from datetime import datetime from typing import Dict, Any logger = logging.getLogger(__name__) class XiaohongshuSearch: """小红书笔记搜索API封装类""" BASE_URL = "http://47.84.182.56:8001" TOOL_NAME = "xhs_note_search" PLATFORM = "xiaohongshu" def __init__(self, results_dir: str = None, cache_dir: str = "search_cache"): """ 初始化API客户端 Args: results_dir: 结果输出目录,默认为项目根目录下的 data/search 文件夹 cache_dir: 缓存目录,默认为 search_cache """ self.api_url = f"{self.BASE_URL}/tools/call/{self.TOOL_NAME}" # 设置结果输出目录 if results_dir: self.results_base_dir = results_dir else: # 默认使用项目根目录的 data/search 文件夹 script_dir = os.path.dirname(os.path.abspath(__file__)) project_root = os.path.dirname(os.path.dirname(script_dir)) self.results_base_dir = os.path.join(project_root, "data", "search") # 设置缓存目录 self.cache_dir = cache_dir if cache_dir: os.makedirs(cache_dir, exist_ok=True) def _get_cache_key( self, keyword: str, content_type: str, sort_type: str, publish_time: str ) -> str: """ 生成缓存key Args: keyword: 搜索关键词 content_type: 内容类型 sort_type: 排序方式 publish_time: 发布时间 Returns: 缓存key字符串 """ return f"{keyword}_{content_type}_{sort_type}_{publish_time}" def _get_cache_path(self, cache_key: str) -> str: """ 获取缓存文件路径 Args: cache_key: 缓存key Returns: 缓存文件完整路径 """ # 清理文件名中的非法字符 safe_key = cache_key.replace('/', '_').replace('\\', '_').replace(' ', '_') return os.path.join(self.cache_dir, f"{safe_key}.json") def search( self, keyword: str, content_type: str = "不限", sort_type: str = "综合", publish_time: str = "不限", cursor: str = "", timeout: int = 30, max_retries: int = 5, retry_delay: int = 2, use_cache: bool = True ) -> Dict[str, Any]: """ 搜索小红书笔记(带重试机制和缓存) Args: keyword: 搜索关键词 content_type: 内容类型,可选值:不限、视频、图文,默认为'不限' sort_type: 排序方式,可选值:综合、最新、最多点赞、最多评论,默认为'综合' publish_time: 发布时间筛选,可选值:不限、一天内、一周内、半年内,默认为'不限' cursor: 翻页游标,第一页默认为空,下一页的游标在上一页的返回值中获取 timeout: 请求超时时间(秒),默认30秒 max_retries: 最大重试次数,默认3次 retry_delay: 重试间隔时间(秒),默认2秒 use_cache: 是否使用缓存,默认True Returns: API响应的JSON数据 Raises: requests.exceptions.RequestException: 所有重试都失败时抛出异常 """ # 检查缓存 if use_cache and self.cache_dir: cache_key = self._get_cache_key(keyword, content_type, sort_type, publish_time) cache_path = self._get_cache_path(cache_key) if os.path.exists(cache_path): try: with open(cache_path, 'r', encoding='utf-8') as f: cached_result = json.load(f) logger.info(f" ✓ 使用缓存: {keyword}") return cached_result except Exception as e: logger.warning(f" 读取缓存失败: {e},将重新搜索") # 缓存未命中或未启用,执行实际搜索 payload = { "keyword": keyword, "content_type": '不限', # 使用映射后的参数 "sort_type": sort_type, "publish_time": publish_time, "cursor": cursor } last_exception = None # 重试循环:最多尝试 max_retries 次 for attempt in range(1, max_retries + 1): try: if attempt > 1: print(f" 重试第 {attempt - 1}/{max_retries - 1} 次: {keyword}") response = requests.post( self.api_url, json=payload, timeout=timeout, headers={"Content-Type": "application/json"} ) response.raise_for_status() api_response = response.json() # 解析API返回的result字段(是JSON字符串) if not api_response.get("success"): raise Exception(f"API返回失败: {api_response}") result_str = api_response.get("result", "{}") result = json.loads(result_str) # 预处理返回数据:提取 image_list 中的 URL 字符串 self._preprocess_response(result) if attempt > 1: print(f" ✓ 重试成功") # 保存到缓存 if use_cache and self.cache_dir: try: cache_key = self._get_cache_key(keyword, content_type, sort_type, publish_time) cache_path = self._get_cache_path(cache_key) with open(cache_path, 'w', encoding='utf-8') as f: json.dump(result, f, ensure_ascii=False, indent=2) logger.info(f" ✓ 已缓存: {keyword}") except Exception as e: logger.warning(f" 保存缓存失败: {e}") return result except requests.exceptions.RequestException as e: last_exception = e if attempt < max_retries: # 还有重试机会,等待后继续 print(f" ✗ 请求失败 (第{attempt}次尝试): {e}") print(f" 等待 {retry_delay} 秒后重试...") time.sleep(retry_delay) else: # 已达最大重试次数,抛出异常 print(f" ✗ 请求失败 (已达最大重试次数 {max_retries}): {e}") # 所有重试都失败,抛出最后一次的异常 raise last_exception def _preprocess_response(self, result: Dict[str, Any]) -> None: """ 预处理搜索结果,将 image_list 中的字典格式转换为 URL 字符串列表 Args: result: API返回的原始结果字典(会直接修改) """ # 获取帖子列表 notes = result.get("data", {}).get("data", []) for note in notes: note_card = note.get("note_card", {}) image_list_raw = note_card.get("image_list", []) # 提取 URL 字符串 image_list = [] for img in image_list_raw: if isinstance(img, dict) and "image_url" in img: image_list.append(img["image_url"]) elif isinstance(img, str): # 如果已经是字符串,直接使用 image_list.append(img) # 更新为预处理后的列表 note_card["image_list"] = image_list def save_result(self, keyword: str, result: Dict[str, Any], page: int = 1) -> str: """ 保存结果到文件 目录结构: results/xiaohongshu_search/关键词/时间戳_page{页码}.json Args: keyword: 搜索关键词 result: API返回的结果 page: 页码 Returns: 保存的文件路径 """ # 创建目录结构: results/xiaohongshu_search/关键词/ result_dir = os.path.join(self.results_base_dir, "xiaohongshu_search", keyword) os.makedirs(result_dir, exist_ok=True) # 文件名使用时间戳和页码 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"{timestamp}_page{page}.json" filepath = os.path.join(result_dir, filename) # 保存结果 with open(filepath, 'w', encoding='utf-8') as f: json.dump(result, f, ensure_ascii=False, indent=2) return filepath def main(): """示例使用""" # 解析命令行参数 parser = argparse.ArgumentParser(description='小红书笔记搜索工具') parser.add_argument( '--results-dir', type=str, default='data/search', help='结果输出目录 (默认: data/search)' ) parser.add_argument( '--keyword', type=str, required=True, help='搜索关键词 (必填)' ) parser.add_argument( '--content-type', type=str, default='不限', choices=['不限', '视频', '图文'], help='内容类型 (默认: 不限)' ) parser.add_argument( '--sort-type', type=str, default='综合', choices=['综合', '最新', '最多点赞', '最多评论'], help='排序方式 (默认: 综合)' ) parser.add_argument( '--publish-time', type=str, default='不限', choices=['不限', '一天内', '一周内', '半年内'], help='发布时间筛选 (默认: 不限)' ) parser.add_argument( '--cursor', type=str, default='', help='翻页游标 (默认为空,即第一页)' ) parser.add_argument( '--page', type=int, default=1, help='页码标识,用于保存文件名 (默认: 1)' ) args = parser.parse_args() # 创建API客户端实例 client = XiaohongshuSearch(results_dir=args.results_dir) # 执行搜索并保存 try: result = client.search( args.keyword, args.content_type, args.sort_type, args.publish_time, args.cursor ) filepath = client.save_result(args.keyword, result, args.page) print(f"Output: {filepath}") except Exception as e: print(f"Error: {e}", file=__import__('sys').stderr) if __name__ == "__main__": main()