#!/usr/bin/env python3 """ 小红书笔记详情工具 根据笔记ID获取笔记详情(包含完整正文、视频等) """ import requests import json import os import argparse import time import hashlib import re from datetime import datetime from typing import Dict, Any, Optional from pathlib import Path class XiaohongshuDetail: """小红书笔记详情API封装类""" BASE_URL = "http://47.84.182.56:8001" TOOL_NAME = "get_xhs_detail_by_note_id" PLATFORM = "xiaohongshu" def __init__(self, results_dir: str = None, use_cache: bool = True): """ 初始化API客户端 Args: results_dir: 结果输出目录,默认为项目根目录下的 data/detail 文件夹 use_cache: 是否启用缓存,默认为 True """ self.api_url = f"{self.BASE_URL}/tools/call/{self.TOOL_NAME}" self.use_cache = use_cache # 设置结果输出目录 if results_dir: self.results_base_dir = results_dir else: # 默认使用项目根目录的 data/detail 文件夹 script_dir = os.path.dirname(os.path.abspath(__file__)) project_root = os.path.dirname(os.path.dirname(script_dir)) self.results_base_dir = os.path.join(project_root, "data", "detail") def _sanitize_note_id(self, note_id: str) -> str: """ 清理笔记ID,使其可以作为文件夹名称 Args: note_id: 原始笔记ID Returns: 清理后的笔记ID """ # 替换不能用作文件夹名称的字符 sanitized = re.sub(r'[<>:"/\\|?*]', '_', note_id) sanitized = sanitized.strip().strip('.') if not sanitized: sanitized = "unnamed" if len(sanitized) > 200: sanitized = sanitized[:200] return sanitized def _get_latest_cache(self, note_id: str) -> Optional[tuple[str, str]]: """ 获取最新的缓存文件(raw 和 clean) Args: note_id: 笔记ID Returns: (raw_filepath, clean_filepath) 元组 或 None """ safe_note_id = self._sanitize_note_id(note_id) detail_dir = os.path.join(self.results_base_dir, "xiaohongshu_detail", safe_note_id) raw_dir = os.path.join(detail_dir, "raw") clean_dir = os.path.join(detail_dir, "clean") if not os.path.exists(raw_dir) or not os.path.exists(clean_dir): return None # 获取 raw 目录下的所有 JSON 文件 raw_path = Path(raw_dir) raw_files = sorted(raw_path.glob("*.json"), key=lambda x: x.stat().st_mtime, reverse=True) if not raw_files: return None # 获取最新的 raw 文件,并构造对应的 clean 文件路径 latest_raw = raw_files[0] latest_clean = Path(clean_dir) / latest_raw.name if latest_clean.exists(): return (str(latest_raw), str(latest_clean)) return None def _load_cached_result(self, filepath: str) -> Optional[Dict[str, Any]]: """ 加载缓存的数据 Args: filepath: 文件路径 Returns: 数据字典 或 None """ try: with open(filepath, 'r', encoding='utf-8') as f: data = json.load(f) # 兼容新旧格式 if "api_response" in data: return data["api_response"] else: return data except Exception: return None def get_detail( self, note_id: str, timeout: int = 30, max_retries: int = 5, retry_delay: int = 2, force: bool = False ) -> tuple[Dict[str, Any], bool]: """ 获取小红书笔记详情,带自动重试机制和缓存 Args: note_id: 笔记ID timeout: 请求超时时间(秒),默认30秒 max_retries: 最大重试次数,默认5次 retry_delay: 重试延迟(秒),默认2秒,每次重试会指数增长 force: 强制重新请求API,忽略缓存,默认为 False Returns: (原始数据, 是否来自缓存) 的元组 Raises: requests.exceptions.RequestException: 所有重试失败后抛出异常 """ # 检查缓存(如果启用且未强制刷新) if self.use_cache and not force: cached_files = self._get_latest_cache(note_id) if cached_files: raw_file, clean_file = cached_files cached_result = self._load_cached_result(raw_file) if cached_result: print(f"✓ 使用缓存数据: {raw_file}") return cached_result, True # 返回缓存标记 payload = {"note_id": note_id} last_exception = None for attempt in range(max_retries): try: if attempt > 0: wait_time = retry_delay * (2 ** (attempt - 1)) print(f"等待 {wait_time} 秒后进行第 {attempt + 1} 次重试...") time.sleep(wait_time) print(f"正在获取笔记详情: {note_id} (尝试 {attempt + 1}/{max_retries})") response = requests.post( self.api_url, json=payload, timeout=timeout, headers={"Content-Type": "application/json"} ) response.raise_for_status() raw_result = response.json() # 如果 result 字段是字符串,需要解析成 JSON 对象 if 'result' in raw_result and isinstance(raw_result['result'], str): try: raw_result['result'] = json.loads(raw_result['result']) except json.JSONDecodeError: pass # 检查 API 返回是否成功 if not raw_result.get('success'): error_msg = raw_result.get('message', '未知错误') print(f"✗ API 返回失败: {error_msg}") last_exception = Exception(f"API 返回失败: {error_msg}") continue # 继续重试 print(f"✓ 获取成功!") return raw_result, False # 返回新数据标记 except requests.exceptions.Timeout as e: last_exception = e print(f"✗ 请求超时: {e}") except requests.exceptions.ConnectionError as e: last_exception = e print(f"✗ 连接错误: {e}") except requests.exceptions.HTTPError as e: last_exception = e status_code = e.response.status_code if e.response else "未知" print(f"✗ HTTP错误 {status_code}: {e}") # 如果是客户端错误(4xx),不重试 if e.response and 400 <= e.response.status_code < 500: print(f"客户端错误,停止重试") raise except requests.exceptions.RequestException as e: last_exception = e print(f"✗ 请求失败: {e}") # 所有重试都失败 print(f"✗ 已达到最大重试次数 ({max_retries}),请求失败") raise last_exception def _extract_clean_data(self, raw_result: Dict[str, Any]) -> Dict[str, Any]: """ 提取并清理数据,生成扁平化的结构(参考现有格式) Args: raw_result: 原始 API 响应 Returns: 清理后的笔记详情 """ if not raw_result.get("success"): return {} result = raw_result.get("result", []) if not result or not isinstance(result, list) or len(result) == 0: return {} data = result[0].get("data", {}) # 提取图片 URL 并按顺序去重 images = [] seen = set() for img in data.get("images", []): url = None if isinstance(img, dict) and "cdn_url" in img: url = img["cdn_url"] elif isinstance(img, str): url = img # 按顺序去重 if url and url not in seen: images.append(url) seen.add(url) # 处理时间戳转换为时间字符串 publish_timestamp = data.get("publish_timestamp") publish_time = None if publish_timestamp: try: from datetime import datetime # 毫秒时间戳转换为秒 dt = datetime.fromtimestamp(publish_timestamp / 1000) publish_time = dt.strftime("%Y-%m-%d %H:%M:%S") except: publish_time = None # 获取 video 字段 video = data.get("video") or None # 根据 video 字段判断 content_type if video: content_type = "video" else: content_type = "normal" # 构建清理后的数据(扁平化结构,参考现有格式) # 不存在的字段统一用 None/null 表示 clean_data = { "channel_content_id": data.get("channel_content_id") or None, "link": data.get("content_link") or None, "comment_count": data.get("comment_count"), "images": images if images else [], "like_count": data.get("like_count"), "body_text": data.get("body_text") or None, "title": data.get("title") or None, "collect_count": data.get("collect_count"), "channel_account_id": data.get("channel_account_id") or None, "channel_account_name": data.get("channel_account_name") or None, "content_type": content_type, "video": video, "publish_timestamp": publish_timestamp, "publish_time": publish_time } return clean_data def save_result( self, note_id: str, raw_result: Dict[str, Any] ) -> tuple[str, str]: """ 保存原始数据和清理后数据到不同的目录 目录结构: data/detail/xiaohongshu_detail/ └── {note_id}/ ├── raw/ # 原始数据(完整 API 响应) │ └── {timestamp}.json └── clean/ # 清理后数据(扁平化结构) └── {timestamp}.json Args: note_id: 笔记ID raw_result: 原始数据(已解析 result 字段) Returns: (原始数据路径, 清理后数据路径) 的元组 """ # 清理笔记ID用于文件夹名称 safe_note_id = self._sanitize_note_id(note_id) # 创建目录结构 base_dir = os.path.join(self.results_base_dir, "xiaohongshu_detail", safe_note_id) raw_dir = os.path.join(base_dir, "raw") clean_dir = os.path.join(base_dir, "clean") os.makedirs(raw_dir, exist_ok=True) os.makedirs(clean_dir, exist_ok=True) # 生成文件名(使用时间戳) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"{timestamp}.json" raw_filepath = os.path.join(raw_dir, filename) clean_filepath = os.path.join(clean_dir, filename) # 添加元数据到 raw 数据 raw_data_with_meta = { "note_id": note_id, "timestamp": timestamp, "api_response": raw_result } # 保存原始结果(包含元数据) with open(raw_filepath, 'w', encoding='utf-8') as f: json.dump(raw_data_with_meta, f, ensure_ascii=False, indent=2) # 提取并保存清理后的数据(扁平化结构,直接保存) clean_data = self._extract_clean_data(raw_result) with open(clean_filepath, 'w', encoding='utf-8') as f: json.dump(clean_data, f, ensure_ascii=False, indent=2) return raw_filepath, clean_filepath def get_xiaohongshu_detail( note_id: str, force: bool = False ) -> Dict[str, Any]: """ 获取小红书笔记详情 Args: note_id: 笔记ID force: 强制刷新(忽略缓存) Returns: 笔记详情数据(clean 格式,扁平化结构) Examples: >>> # 基本使用 >>> detail = get_xiaohongshu_detail("6915588b00000000040143b5") >>> print(detail['title']) >>> print(detail['body_text']) >>> # 强制刷新 >>> detail = get_xiaohongshu_detail("6915588b00000000040143b5", force=True) """ # 创建客户端(使用默认配置) client = XiaohongshuDetail(use_cache=True) # 获取详情(内部处理重试、超时等) raw_result, from_cache = client.get_detail(note_id=note_id, force=force) # 只有新请求的数据才需要保存 if not from_cache: raw_filepath, clean_filepath = client.save_result(note_id=note_id, raw_result=raw_result) # 读取并返回 clean 数据 with open(clean_filepath, 'r', encoding='utf-8') as f: return json.load(f) else: # 如果是缓存数据,直接提取 clean 数据返回 clean_data = client._extract_clean_data(raw_result) return clean_data def main(): """示例使用""" # 解析命令行参数 parser = argparse.ArgumentParser(description='小红书笔记详情工具') parser.add_argument( '--results-dir', type=str, default='data/detail', help='结果输出目录 (默认: data/detail)' ) parser.add_argument( '--note-id', type=str, required=True, help='笔记ID (必填)' ) parser.add_argument( '--force', action='store_true', help='强制重新请求API,忽略缓存' ) parser.add_argument( '--no-cache', action='store_true', help='禁用缓存功能' ) parser.add_argument( '--timeout', type=int, default=30, help='请求超时秒数 (默认: 30)' ) parser.add_argument( '--max-retries', type=int, default=5, help='最大重试次数 (默认: 5)' ) parser.add_argument( '--retry-delay', type=int, default=2, help='重试延迟秒数 (默认: 2)' ) args = parser.parse_args() # 创建API客户端实例 use_cache = not args.no_cache client = XiaohongshuDetail(results_dir=args.results_dir, use_cache=use_cache) # 执行获取并保存 try: raw_result, from_cache = client.get_detail( args.note_id, timeout=args.timeout, max_retries=args.max_retries, retry_delay=args.retry_delay, force=args.force ) # 只有新数据才保存 if not from_cache: raw_filepath, clean_filepath = client.save_result(args.note_id, raw_result) print(f"Raw data saved to: {raw_filepath}") print(f"Clean data saved to: {clean_filepath}") else: print(f"Used cached data, no new files saved") except Exception as e: print(f"Error: {e}", file=__import__('sys').stderr) if __name__ == "__main__": main()