#!/usr/bin/env python3 """ 小红书笔记详情获取工具 根据笔记ID获取小红书笔记的详细信息 """ import requests import json import os import argparse import time from datetime import datetime from typing import Dict, Any, List class XiaohongshuDetail: """小红书笔记详情API封装类""" BASE_URL = "http://47.84.182.56:8001" TOOL_NAME = "get_xhs_detail_by_note_id" PLATFORM = "xiaohongshu" def __init__(self, results_dir: str = None): """ 初始化API客户端 Args: results_dir: 结果输出目录,默认为项目根目录下的 data/detail 文件夹 """ self.api_url = f"{self.BASE_URL}/tools/call/{self.TOOL_NAME}" # 设置结果输出目录 if results_dir: self.results_base_dir = results_dir else: # 默认使用项目根目录的 data/detail 文件夹 script_dir = os.path.dirname(os.path.abspath(__file__)) project_root = os.path.dirname(os.path.dirname(script_dir)) self.results_base_dir = os.path.join(project_root, "data", "detail") def get_detail( self, note_id: str, timeout: int = 30, max_retries: int = 3, retry_delay: int = 2 ) -> Dict[str, Any]: """ 获取小红书笔记详情(带重试机制) Args: note_id: 笔记ID timeout: 请求超时时间(秒),默认30秒 max_retries: 最大重试次数,默认3次 retry_delay: 重试间隔时间(秒),默认2秒 Returns: API响应的JSON数据 Raises: requests.exceptions.RequestException: 所有重试都失败时抛出异常 """ payload = { "note_id": note_id } last_exception = None # 重试循环:最多尝试 max_retries 次 for attempt in range(1, max_retries + 1): try: if attempt > 1: print(f" 重试第 {attempt - 1}/{max_retries - 1} 次: {note_id}") response = requests.post( self.api_url, json=payload, timeout=timeout, headers={"Content-Type": "application/json"} ) response.raise_for_status() result = response.json() if attempt > 1: print(f" ✓ 重试成功") return result except requests.exceptions.RequestException as e: last_exception = e if attempt < max_retries: # 还有重试机会,等待后继续 print(f" ✗ 请求失败 (第{attempt}次尝试): {e}") print(f" 等待 {retry_delay} 秒后重试...") time.sleep(retry_delay) else: # 已达最大重试次数,抛出异常 print(f" ✗ 请求失败 (已达最大重试次数 {max_retries}): {e}") # 所有重试都失败,抛出最后一次的异常 raise last_exception def get_details_batch( self, note_ids: List[str], timeout: int = 30, max_retries: int = 3, retry_delay: int = 2, batch_delay: int = 1 ) -> List[Dict[str, Any]]: """ 批量获取多个笔记的详情 Args: note_ids: 笔记ID列表 timeout: 请求超时时间(秒),默认30秒 max_retries: 最大重试次数,默认3次 retry_delay: 重试间隔时间(秒),默认2秒 batch_delay: 批量请求间隔时间(秒),默认1秒 Returns: 包含所有笔记详情的列表 """ results = [] total = len(note_ids) for idx, note_id in enumerate(note_ids, 1): print(f"正在获取笔记详情 ({idx}/{total}): {note_id}") try: result = self.get_detail(note_id, timeout, max_retries, retry_delay) results.append({ "note_id": note_id, "success": True, "data": result }) print(f" ✓ 成功获取") except Exception as e: print(f" ✗ 获取失败: {e}") results.append({ "note_id": note_id, "success": False, "error": str(e) }) # 避免请求过快,添加延迟(最后一个不需要延迟) if idx < total: time.sleep(batch_delay) return results def save_result(self, note_id: str, result: Dict[str, Any]) -> str: """ 保存单个笔记详情到文件 目录结构: results/xiaohongshu_detail/note_id/时间戳.json Args: note_id: 笔记ID result: API返回的结果 Returns: 保存的文件路径 """ # 创建目录结构: results/xiaohongshu_detail/note_id/ result_dir = os.path.join(self.results_base_dir, "xiaohongshu_detail", note_id) os.makedirs(result_dir, exist_ok=True) # 文件名使用时间戳 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"{timestamp}.json" filepath = os.path.join(result_dir, filename) # 保存结果 with open(filepath, 'w', encoding='utf-8') as f: json.dump(result, f, ensure_ascii=False, indent=2) return filepath def save_batch_results(self, results: List[Dict[str, Any]], batch_name: str = None) -> str: """ 保存批量获取的结果到单个文件 目录结构: results/xiaohongshu_detail/batch/时间戳_批次名.json Args: results: 批量获取的结果列表 batch_name: 批次名称(可选) Returns: 保存的文件路径 """ # 创建目录结构 result_dir = os.path.join(self.results_base_dir, "xiaohongshu_detail", "batch") os.makedirs(result_dir, exist_ok=True) # 文件名使用时间戳和批次名 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") if batch_name: filename = f"{timestamp}_{batch_name}.json" else: filename = f"{timestamp}.json" filepath = os.path.join(result_dir, filename) # 保存结果 with open(filepath, 'w', encoding='utf-8') as f: json.dump(results, f, ensure_ascii=False, indent=2) return filepath def main(): """示例使用""" # 解析命令行参数 parser = argparse.ArgumentParser(description='小红书笔记详情获取工具') parser.add_argument( '--results-dir', type=str, default='data/detail', help='结果输出目录 (默认: data/detail)' ) # 创建互斥参数组:单个笔记ID 或 批量笔记ID group = parser.add_mutually_exclusive_group(required=True) group.add_argument( '--note-id', type=str, help='单个笔记ID' ) group.add_argument( '--note-ids', type=str, help='多个笔记ID,用逗号分隔,例如: id1,id2,id3' ) group.add_argument( '--note-ids-file', type=str, help='包含笔记ID的文件路径,每行一个ID' ) parser.add_argument( '--batch-name', type=str, help='批量获取时的批次名称(可选)' ) args = parser.parse_args() # 创建API客户端实例 client = XiaohongshuDetail(results_dir=args.results_dir) try: # 单个笔记ID if args.note_id: result = client.get_detail(args.note_id) filepath = client.save_result(args.note_id, result) print(f"Output: {filepath}") # 多个笔记ID(命令行逗号分隔) elif args.note_ids: note_ids = [nid.strip() for nid in args.note_ids.split(',') if nid.strip()] results = client.get_details_batch(note_ids) filepath = client.save_batch_results(results, args.batch_name) print(f"\n批量获取完成") print(f"成功: {sum(1 for r in results if r['success'])}/{len(results)}") print(f"Output: {filepath}") # 从文件读取笔记ID elif args.note_ids_file: with open(args.note_ids_file, 'r', encoding='utf-8') as f: note_ids = [line.strip() for line in f if line.strip()] results = client.get_details_batch(note_ids) filepath = client.save_batch_results(results, args.batch_name) print(f"\n批量获取完成") print(f"成功: {sum(1 for r in results if r['success'])}/{len(results)}") print(f"Output: {filepath}") except Exception as e: print(f"Error: {e}", file=__import__('sys').stderr) raise if __name__ == "__main__": main()