Bladeren bron

feat: 实现小红书搜索和详情模块

新增功能:
- 搜索模块:支持关键词搜索、参数过滤、自动翻页
- 详情模块:获取笔记完整正文、视频链接等详细信息
- 自动缓存机制:只有新请求成功才保存,使用缓存时不重复保存
- 自动重试机制:5次重试,指数退避策略(2s → 4s → 8s → 16s → 32s)
- Raw/Clean 双存储:原始数据和清洗后数据分别存储

数据优化:
- 统一 null 处理:不存在的字段用 null 表示(非空字符串或 0)
- 图片自动去重:按顺序去重图片 URL
- Content Type 自动判断:根据 video 字段判断("video" 或 "normal")
- 时间格式化:自动将时间戳转换为可读格式(YYYY-MM-DD HH:MM:SS)

文档:
- 完整的 API 文档和使用示例
- 详细的字段说明和注意事项
- 常见问题解答

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
yangxiaohui 2 weken geleden
bovenliggende
commit
5b5543beda
6 gewijzigde bestanden met toevoegingen van 1132 en 79 verwijderingen
  1. 412 0
      script/detail/README.md
  2. 10 0
      script/detail/__init__.py
  3. 471 0
      script/detail/xiaohongshu_detail.py
  4. 1 1
      script/search/API.md
  5. 162 29
      script/search/README.md
  6. 76 49
      script/search/xiaohongshu_search.py

+ 412 - 0
script/detail/README.md

@@ -0,0 +1,412 @@
+# 小红书详情模块
+
+## 快速开始
+
+### Python API(推荐)
+
+```python
+from script.detail import get_xiaohongshu_detail
+
+# 获取笔记详情
+detail = get_xiaohongshu_detail("68d62e4500000000130085fc")
+
+print(f"标题: {detail['title']}")
+print(f"正文: {detail['body_text']}")
+print(f"视频: {detail['video']}")
+print(f"类型: {detail['content_type']}")
+print(f"发布时间: {detail['publish_time']}")
+```
+
+### 命令行工具
+
+```bash
+# 获取详情
+python script/detail/xiaohongshu_detail.py --note-id "68d62e4500000000130085fc"
+
+# 强制刷新
+python script/detail/xiaohongshu_detail.py --note-id "68d62e4500000000130085fc" --force
+```
+
+---
+
+## API 文档
+
+### 函数签名
+
+```python
+detail = get_xiaohongshu_detail(
+    note_id: str,          # 必填:笔记ID
+    force=False            # 可选:强制刷新(忽略缓存)
+)
+```
+
+### 返回值
+
+```python
+{
+  "channel_content_id": "68d62e4500000000130085fc",
+  "link": "https://www.xiaohongshu.com/explore/68d62e4500000000130085fc",
+  "comment_count": null,
+  "images": [
+    "http://res.cybertogether.net/crawler/image/bf6a0e92ed7252ae8414121edf26f2d3.jpeg"
+  ],
+  "like_count": 14,
+  "body_text": "时隔两个月,终于有时间把穿越极圈的航拍视频剪辑出来了...",
+  "title": "穿越北极圈,终生难忘",
+  "collect_count": 6,
+  "channel_account_id": "664954500000000007006ac0",
+  "channel_account_name": "Colin SW",
+  "content_type": "video",  # 根据 video 字段自动判断
+  "video": "http://sns-video-hw.xhscdn.com/stream/1/110/258/...",
+  "publish_timestamp": 1758877418000,
+  "publish_time": "2025-09-26 17:03:38"
+}
+```
+
+### 字段说明
+
+| 字段 | 类型 | 说明 |
+|------|------|------|
+| channel_content_id | string/null | 笔记ID |
+| link | string/null | 笔记链接 |
+| title | string/null | 标题 |
+| body_text | string/null | 完整正文内容 |
+| channel_account_name | string/null | 作者名称 |
+| channel_account_id | string/null | 作者ID |
+| like_count | number/null | 点赞数 |
+| comment_count | number/null | 评论数 |
+| collect_count | number/null | 收藏数 |
+| images | array | 图片URL列表(已去重) |
+| video | string/null | 视频链接 |
+| content_type | string | 内容类型("video" 或 "normal") |
+| publish_timestamp | number/null | 发布时间戳(毫秒) |
+| publish_time | string/null | 发布时间(格式:YYYY-MM-DD HH:MM:SS) |
+
+**注意**:
+- 不存在的字段统一用 `null` 表示,而非空字符串或 0
+- 图片已自动按顺序去重
+- `content_type` 自动判断:有 `video` 字段时为 "video",否则为 "normal"
+
+---
+
+## 使用示例
+
+### 1. 基本使用
+
+```python
+from script.detail import get_xiaohongshu_detail
+
+# 获取笔记详情
+detail = get_xiaohongshu_detail("68d62e4500000000130085fc")
+
+print(f"标题: {detail['title']}")
+print(f"正文: {detail['body_text']}")
+print(f"点赞: {detail['like_count']}")
+```
+
+### 2. 强制刷新
+
+```python
+# 忽略缓存,重新请求 API
+detail = get_xiaohongshu_detail("68d62e4500000000130085fc", force=True)
+```
+
+### 3. 判断内容类型
+
+```python
+detail = get_xiaohongshu_detail("68d62e4500000000130085fc")
+
+if detail['content_type'] == 'video':
+    print(f"视频链接: {detail['video']}")
+else:
+    print(f"图文笔记,图片数量: {len(detail['images'])}")
+```
+
+### 4. 搜索 + 详情(完整流程)
+
+```python
+from script.search import search_xiaohongshu
+from script.detail import get_xiaohongshu_detail
+
+# 1. 搜索笔记
+search_result = search_xiaohongshu("产品测试", publish_time="一周内")
+
+# 2. 获取前 5 条的完整详情
+for note in search_result['notes'][:5]:
+    note_id = note['channel_content_id']
+
+    # 获取详情
+    detail = get_xiaohongshu_detail(note_id)
+
+    print(f"\n标题: {detail['title']}")
+    print(f"摘要: {note['desc'][:50]}...")  # 搜索结果的摘要
+    print(f"完整正文: {detail['body_text'][:100]}...")  # 详情的完整正文
+    print(f"点赞: {detail['like_count']}")
+    print(f"类型: {detail['content_type']}")
+```
+
+### 5. 批量获取详情
+
+```python
+note_ids = [
+    "68d62e4500000000130085fc",
+    "68b69ea9000000001c035a4d",
+    "6808c0e8000000001c00a771"
+]
+
+for note_id in note_ids:
+    try:
+        detail = get_xiaohongshu_detail(note_id)
+        print(f"✓ {detail['title']}")
+    except Exception as e:
+        print(f"✗ {note_id}: {e}")
+```
+
+---
+
+## 命令行使用
+
+### 基本使用
+
+```bash
+python script/detail/xiaohongshu_detail.py --note-id "68d62e4500000000130085fc"
+```
+
+### 强制刷新
+
+```bash
+python script/detail/xiaohongshu_detail.py --note-id "68d62e4500000000130085fc" --force
+```
+
+### 禁用缓存
+
+```bash
+python script/detail/xiaohongshu_detail.py --note-id "68d62e4500000000130085fc" --no-cache
+```
+
+### 完整参数
+
+| 参数 | 默认值 | 说明 |
+|------|--------|------|
+| --note-id | 必填 | 笔记ID |
+| --force | False | 强制刷新(忽略缓存) |
+| --no-cache | False | 禁用缓存功能 |
+| --results-dir | data/detail | 输出目录 |
+| --timeout | 30 | 超时时间(秒) |
+| --max-retries | 5 | 最大重试次数 |
+| --retry-delay | 2 | 重试延迟(秒) |
+
+---
+
+## 核心特性
+
+### 1. 自动缓存(默认开启)
+
+相同的笔记 ID 会自动使用缓存:
+
+```python
+# 第一次:请求 API
+detail1 = get_xiaohongshu_detail("68d62e4500000000130085fc")
+
+# 第二次:使用缓存(瞬间返回)
+detail2 = get_xiaohongshu_detail("68d62e4500000000130085fc")
+
+# 强制刷新
+detail3 = get_xiaohongshu_detail("68d62e4500000000130085fc", force=True)
+```
+
+### 2. 自动重试(失败重试 5 次)
+
+- 超时错误:自动重试
+- 连接错误:自动重试
+- 5xx 服务器错误:自动重试
+- 4xx 客户端错误:不重试
+- API 返回失败(success=false):自动重试
+
+指数退避策略:2秒 → 4秒 → 8秒 → 16秒 → 32秒
+
+### 3. 自动保存(后台完成)
+
+详情结果自动保存到 `data/detail/xiaohongshu_detail/`
+
+目录结构:
+```
+data/detail/xiaohongshu_detail/
+└── {note_id}/
+    ├── raw/                           # 原始数据(完整 API 响应)
+    │   └── {timestamp}.json
+    └── clean/                         # 清洗数据(扁平化结构)
+        └── {timestamp}.json
+```
+
+文件名示例:
+- `20251113_144230.json`
+
+**注意**: 只有新请求的数据才会保存,使用缓存时不会重复保存文件。
+
+### 4. 图片自动去重
+
+图片 URL 会自动按顺序去重:
+
+```python
+# 原始数据可能有重复
+# images: ["url1", "url1", "url2"]
+
+# 返回的数据已去重
+# images: ["url1", "url2"]
+```
+
+### 5. Content Type 自动判断
+
+根据 `video` 字段自动判断内容类型:
+
+```python
+# 有视频
+detail['video'] = "http://..."
+detail['content_type'] = "video"
+
+# 无视频
+detail['video'] = null
+detail['content_type'] = "normal"
+```
+
+### 6. 时间自动转换
+
+自动将时间戳转换为可读格式:
+
+```python
+detail['publish_timestamp'] = 1758877418000  # 毫秒时间戳
+detail['publish_time'] = "2025-09-26 17:03:38"  # 格式化时间
+```
+
+---
+
+## 数据格式
+
+### Clean 数据(推荐使用)
+
+```json
+{
+  "channel_content_id": "68d62e4500000000130085fc",
+  "link": "https://www.xiaohongshu.com/explore/68d62e4500000000130085fc",
+  "comment_count": null,
+  "images": [
+    "http://res.cybertogether.net/crawler/image/bf6a0e92ed7252ae8414121edf26f2d3.jpeg"
+  ],
+  "like_count": 14,
+  "body_text": "完整正文内容...",
+  "title": "穿越北极圈,终生难忘",
+  "collect_count": 6,
+  "channel_account_id": "664954500000000007006ac0",
+  "channel_account_name": "Colin SW",
+  "content_type": "video",
+  "video": "http://sns-video-hw.xhscdn.com/...",
+  "publish_timestamp": 1758877418000,
+  "publish_time": "2025-09-26 17:03:38"
+}
+```
+
+### Raw 数据
+
+完整的 API 响应,包含所有元数据和嵌套结构:
+
+```json
+{
+  "note_id": "68d62e4500000000130085fc",
+  "timestamp": "20251113_144230",
+  "api_response": {
+    "success": true,
+    "result": [...],
+    "tool_name": "get_xhs_detail_by_note_id",
+    "call_type": "api"
+  }
+}
+```
+
+---
+
+## 常见问题
+
+### Q: 缓存如何清理?
+
+A:
+- 方式1:手动删除 `data/detail/xiaohongshu_detail/{note_id}/` 目录
+- 方式2:使用 `force=True` 参数强制刷新
+
+### Q: 如何判断是否使用了缓存?
+
+A: 看控制台输出:
+- 使用缓存:`✓ 使用缓存数据: ...`
+- 请求 API:`正在获取笔记详情: ... (尝试 1/5)`
+
+### Q: video 字段为什么有时是 null?
+
+A:
+- 搜索接口不返回 video 字段
+- 详情接口会返回 video 字段(如果笔记有视频的话)
+- 图文笔记没有视频,video 字段为 null
+
+### Q: comment_count 为什么是 null?
+
+A: API 返回的数据中,某些字段可能不存在或为 null,我们保持原样返回,不会强制转换为 0。
+
+### Q: content_type 如何判断?
+
+A:
+- 自动判断:有 `video` 字段(非 null)时为 "video"
+- 否则为 "normal"(图文笔记)
+
+---
+
+## 技术细节
+
+### 内部默认配置
+
+- **超时时间**:30 秒
+- **最大重试**:5 次
+- **重试延迟**:2 秒(指数增长)
+- **缓存开关**:默认开启
+- **输出目录**:`data/detail`
+
+### 缓存机制
+
+- 基于笔记 ID 生成缓存目录
+- 按文件修改时间排序,返回最新文件
+- 只有新请求成功后才保存缓存
+
+### API 成功验证
+
+只有当 API 返回 `success: true` 时才视为成功并保存缓存,否则会继续重试。
+
+---
+
+## 与搜索模块配合使用
+
+详情模块通常与搜索模块配合使用:
+
+```python
+from script.search import search_xiaohongshu
+from script.detail import get_xiaohongshu_detail
+
+# 1. 搜索笔记(获取摘要)
+search_result = search_xiaohongshu("产品测试")
+
+# 2. 对感兴趣的笔记获取详情(获取完整正文和视频)
+for note in search_result['notes'][:5]:
+    note_id = note['channel_content_id']
+    detail = get_xiaohongshu_detail(note_id)
+
+    # 搜索结果的摘要
+    print(f"摘要: {note['desc']}")
+
+    # 详情的完整正文
+    print(f"完整正文: {detail['body_text']}")
+
+    # 视频(如果有)
+    if detail['video']:
+        print(f"视频: {detail['video']}")
+```
+
+**关键区别**:
+- 搜索接口:返回摘要(`desc`),不返回 `body_text` 和 `video`
+- 详情接口:返回完整正文(`body_text`)和视频链接(`video`)

+ 10 - 0
script/detail/__init__.py

@@ -0,0 +1,10 @@
+"""
+小红书详情模块
+
+提供小红书笔记详情获取功能,支持缓存
+"""
+
+from .xiaohongshu_detail import get_xiaohongshu_detail
+
+__all__ = ['get_xiaohongshu_detail']
+__version__ = '1.0.0'

+ 471 - 0
script/detail/xiaohongshu_detail.py

@@ -0,0 +1,471 @@
+#!/usr/bin/env python3
+"""
+小红书笔记详情工具
+根据笔记ID获取笔记详情(包含完整正文、视频等)
+"""
+
+import requests
+import json
+import os
+import argparse
+import time
+import hashlib
+import re
+from datetime import datetime
+from typing import Dict, Any, Optional
+from pathlib import Path
+
+
+class XiaohongshuDetail:
+    """小红书笔记详情API封装类"""
+
+    BASE_URL = "http://47.84.182.56:8001"
+    TOOL_NAME = "get_xhs_detail_by_note_id"
+    PLATFORM = "xiaohongshu"
+
+    def __init__(self, results_dir: str = None, use_cache: bool = True):
+        """
+        初始化API客户端
+
+        Args:
+            results_dir: 结果输出目录,默认为项目根目录下的 data/detail 文件夹
+            use_cache: 是否启用缓存,默认为 True
+        """
+        self.api_url = f"{self.BASE_URL}/tools/call/{self.TOOL_NAME}"
+        self.use_cache = use_cache
+
+        # 设置结果输出目录
+        if results_dir:
+            self.results_base_dir = results_dir
+        else:
+            # 默认使用项目根目录的 data/detail 文件夹
+            script_dir = os.path.dirname(os.path.abspath(__file__))
+            project_root = os.path.dirname(os.path.dirname(script_dir))
+            self.results_base_dir = os.path.join(project_root, "data", "detail")
+
+    def _sanitize_note_id(self, note_id: str) -> str:
+        """
+        清理笔记ID,使其可以作为文件夹名称
+
+        Args:
+            note_id: 原始笔记ID
+
+        Returns:
+            清理后的笔记ID
+        """
+        # 替换不能用作文件夹名称的字符
+        sanitized = re.sub(r'[<>:"/\\|?*]', '_', note_id)
+        sanitized = sanitized.strip().strip('.')
+
+        if not sanitized:
+            sanitized = "unnamed"
+
+        if len(sanitized) > 200:
+            sanitized = sanitized[:200]
+
+        return sanitized
+
+    def _get_latest_cache(self, note_id: str) -> Optional[tuple[str, str]]:
+        """
+        获取最新的缓存文件(raw 和 clean)
+
+        Args:
+            note_id: 笔记ID
+
+        Returns:
+            (raw_filepath, clean_filepath) 元组 或 None
+        """
+        safe_note_id = self._sanitize_note_id(note_id)
+        detail_dir = os.path.join(self.results_base_dir, "xiaohongshu_detail", safe_note_id)
+        raw_dir = os.path.join(detail_dir, "raw")
+        clean_dir = os.path.join(detail_dir, "clean")
+
+        if not os.path.exists(raw_dir) or not os.path.exists(clean_dir):
+            return None
+
+        # 获取 raw 目录下的所有 JSON 文件
+        raw_path = Path(raw_dir)
+        raw_files = sorted(raw_path.glob("*.json"), key=lambda x: x.stat().st_mtime, reverse=True)
+
+        if not raw_files:
+            return None
+
+        # 获取最新的 raw 文件,并构造对应的 clean 文件路径
+        latest_raw = raw_files[0]
+        latest_clean = Path(clean_dir) / latest_raw.name
+
+        if latest_clean.exists():
+            return (str(latest_raw), str(latest_clean))
+
+        return None
+
+    def _load_cached_result(self, filepath: str) -> Optional[Dict[str, Any]]:
+        """
+        加载缓存的数据
+
+        Args:
+            filepath: 文件路径
+
+        Returns:
+            数据字典 或 None
+        """
+        try:
+            with open(filepath, 'r', encoding='utf-8') as f:
+                data = json.load(f)
+                # 兼容新旧格式
+                if "api_response" in data:
+                    return data["api_response"]
+                else:
+                    return data
+        except Exception:
+            return None
+
+    def get_detail(
+        self,
+        note_id: str,
+        timeout: int = 30,
+        max_retries: int = 5,
+        retry_delay: int = 2,
+        force: bool = False
+    ) -> tuple[Dict[str, Any], bool]:
+        """
+        获取小红书笔记详情,带自动重试机制和缓存
+
+        Args:
+            note_id: 笔记ID
+            timeout: 请求超时时间(秒),默认30秒
+            max_retries: 最大重试次数,默认5次
+            retry_delay: 重试延迟(秒),默认2秒,每次重试会指数增长
+            force: 强制重新请求API,忽略缓存,默认为 False
+
+        Returns:
+            (原始数据, 是否来自缓存) 的元组
+
+        Raises:
+            requests.exceptions.RequestException: 所有重试失败后抛出异常
+        """
+        # 检查缓存(如果启用且未强制刷新)
+        if self.use_cache and not force:
+            cached_files = self._get_latest_cache(note_id)
+            if cached_files:
+                raw_file, clean_file = cached_files
+                cached_result = self._load_cached_result(raw_file)
+                if cached_result:
+                    print(f"✓ 使用缓存数据: {raw_file}")
+                    return cached_result, True  # 返回缓存标记
+
+        payload = {"note_id": note_id}
+        last_exception = None
+
+        for attempt in range(max_retries):
+            try:
+                if attempt > 0:
+                    wait_time = retry_delay * (2 ** (attempt - 1))
+                    print(f"等待 {wait_time} 秒后进行第 {attempt + 1} 次重试...")
+                    time.sleep(wait_time)
+
+                print(f"正在获取笔记详情: {note_id} (尝试 {attempt + 1}/{max_retries})")
+
+                response = requests.post(
+                    self.api_url,
+                    json=payload,
+                    timeout=timeout,
+                    headers={"Content-Type": "application/json"}
+                )
+                response.raise_for_status()
+                raw_result = response.json()
+
+                # 如果 result 字段是字符串,需要解析成 JSON 对象
+                if 'result' in raw_result and isinstance(raw_result['result'], str):
+                    try:
+                        raw_result['result'] = json.loads(raw_result['result'])
+                    except json.JSONDecodeError:
+                        pass
+
+                # 检查 API 返回是否成功
+                if not raw_result.get('success'):
+                    error_msg = raw_result.get('message', '未知错误')
+                    print(f"✗ API 返回失败: {error_msg}")
+                    last_exception = Exception(f"API 返回失败: {error_msg}")
+                    continue  # 继续重试
+
+                print(f"✓ 获取成功!")
+                return raw_result, False  # 返回新数据标记
+
+            except requests.exceptions.Timeout as e:
+                last_exception = e
+                print(f"✗ 请求超时: {e}")
+
+            except requests.exceptions.ConnectionError as e:
+                last_exception = e
+                print(f"✗ 连接错误: {e}")
+
+            except requests.exceptions.HTTPError as e:
+                last_exception = e
+                status_code = e.response.status_code if e.response else "未知"
+                print(f"✗ HTTP错误 {status_code}: {e}")
+
+                # 如果是客户端错误(4xx),不重试
+                if e.response and 400 <= e.response.status_code < 500:
+                    print(f"客户端错误,停止重试")
+                    raise
+
+            except requests.exceptions.RequestException as e:
+                last_exception = e
+                print(f"✗ 请求失败: {e}")
+
+        # 所有重试都失败
+        print(f"✗ 已达到最大重试次数 ({max_retries}),请求失败")
+        raise last_exception
+
+    def _extract_clean_data(self, raw_result: Dict[str, Any]) -> Dict[str, Any]:
+        """
+        提取并清理数据,生成扁平化的结构(参考现有格式)
+
+        Args:
+            raw_result: 原始 API 响应
+
+        Returns:
+            清理后的笔记详情
+        """
+        if not raw_result.get("success"):
+            return {}
+
+        result = raw_result.get("result", [])
+        if not result or not isinstance(result, list) or len(result) == 0:
+            return {}
+
+        data = result[0].get("data", {})
+
+        # 提取图片 URL 并按顺序去重
+        images = []
+        seen = set()
+        for img in data.get("images", []):
+            url = None
+            if isinstance(img, dict) and "cdn_url" in img:
+                url = img["cdn_url"]
+            elif isinstance(img, str):
+                url = img
+
+            # 按顺序去重
+            if url and url not in seen:
+                images.append(url)
+                seen.add(url)
+
+        # 处理时间戳转换为时间字符串
+        publish_timestamp = data.get("publish_timestamp")
+        publish_time = None
+        if publish_timestamp:
+            try:
+                from datetime import datetime
+                # 毫秒时间戳转换为秒
+                dt = datetime.fromtimestamp(publish_timestamp / 1000)
+                publish_time = dt.strftime("%Y-%m-%d %H:%M:%S")
+            except:
+                publish_time = None
+
+        # 获取 video 字段
+        video = data.get("video") or None
+
+        # 根据 video 字段判断 content_type
+        if video:
+            content_type = "video"
+        else:
+            content_type = "normal"
+
+        # 构建清理后的数据(扁平化结构,参考现有格式)
+        # 不存在的字段统一用 None/null 表示
+        clean_data = {
+            "channel_content_id": data.get("channel_content_id") or None,
+            "link": data.get("content_link") or None,
+            "comment_count": data.get("comment_count"),
+            "images": images if images else [],
+            "like_count": data.get("like_count"),
+            "body_text": data.get("body_text") or None,
+            "title": data.get("title") or None,
+            "collect_count": data.get("collect_count"),
+            "channel_account_id": data.get("channel_account_id") or None,
+            "channel_account_name": data.get("channel_account_name") or None,
+            "content_type": content_type,
+            "video": video,
+            "publish_timestamp": publish_timestamp,
+            "publish_time": publish_time
+        }
+
+        return clean_data
+
+    def save_result(
+        self,
+        note_id: str,
+        raw_result: Dict[str, Any]
+    ) -> tuple[str, str]:
+        """
+        保存原始数据和清理后数据到不同的目录
+
+        目录结构:
+        data/detail/xiaohongshu_detail/
+        └── {note_id}/
+            ├── raw/                      # 原始数据(完整 API 响应)
+            │   └── {timestamp}.json
+            └── clean/                    # 清理后数据(扁平化结构)
+                └── {timestamp}.json
+
+        Args:
+            note_id: 笔记ID
+            raw_result: 原始数据(已解析 result 字段)
+
+        Returns:
+            (原始数据路径, 清理后数据路径) 的元组
+        """
+        # 清理笔记ID用于文件夹名称
+        safe_note_id = self._sanitize_note_id(note_id)
+
+        # 创建目录结构
+        base_dir = os.path.join(self.results_base_dir, "xiaohongshu_detail", safe_note_id)
+        raw_dir = os.path.join(base_dir, "raw")
+        clean_dir = os.path.join(base_dir, "clean")
+
+        os.makedirs(raw_dir, exist_ok=True)
+        os.makedirs(clean_dir, exist_ok=True)
+
+        # 生成文件名(使用时间戳)
+        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
+        filename = f"{timestamp}.json"
+
+        raw_filepath = os.path.join(raw_dir, filename)
+        clean_filepath = os.path.join(clean_dir, filename)
+
+        # 添加元数据到 raw 数据
+        raw_data_with_meta = {
+            "note_id": note_id,
+            "timestamp": timestamp,
+            "api_response": raw_result
+        }
+
+        # 保存原始结果(包含元数据)
+        with open(raw_filepath, 'w', encoding='utf-8') as f:
+            json.dump(raw_data_with_meta, f, ensure_ascii=False, indent=2)
+
+        # 提取并保存清理后的数据(扁平化结构,直接保存)
+        clean_data = self._extract_clean_data(raw_result)
+
+        with open(clean_filepath, 'w', encoding='utf-8') as f:
+            json.dump(clean_data, f, ensure_ascii=False, indent=2)
+
+        return raw_filepath, clean_filepath
+
+
+def get_xiaohongshu_detail(
+    note_id: str,
+    force: bool = False
+) -> Dict[str, Any]:
+    """
+    获取小红书笔记详情
+
+    Args:
+        note_id: 笔记ID
+        force: 强制刷新(忽略缓存)
+
+    Returns:
+        笔记详情数据(clean 格式,扁平化结构)
+
+    Examples:
+        >>> # 基本使用
+        >>> detail = get_xiaohongshu_detail("6915588b00000000040143b5")
+        >>> print(detail['title'])
+        >>> print(detail['body_text'])
+
+        >>> # 强制刷新
+        >>> detail = get_xiaohongshu_detail("6915588b00000000040143b5", force=True)
+    """
+    # 创建客户端(使用默认配置)
+    client = XiaohongshuDetail(use_cache=True)
+
+    # 获取详情(内部处理重试、超时等)
+    raw_result, from_cache = client.get_detail(note_id=note_id, force=force)
+
+    # 只有新请求的数据才需要保存
+    if not from_cache:
+        raw_filepath, clean_filepath = client.save_result(note_id=note_id, raw_result=raw_result)
+        # 读取并返回 clean 数据
+        with open(clean_filepath, 'r', encoding='utf-8') as f:
+            return json.load(f)
+    else:
+        # 如果是缓存数据,直接提取 clean 数据返回
+        clean_data = client._extract_clean_data(raw_result)
+        return clean_data
+
+
+def main():
+    """示例使用"""
+    # 解析命令行参数
+    parser = argparse.ArgumentParser(description='小红书笔记详情工具')
+    parser.add_argument(
+        '--results-dir',
+        type=str,
+        default='data/detail',
+        help='结果输出目录 (默认: data/detail)'
+    )
+    parser.add_argument(
+        '--note-id',
+        type=str,
+        required=True,
+        help='笔记ID (必填)'
+    )
+    parser.add_argument(
+        '--force',
+        action='store_true',
+        help='强制重新请求API,忽略缓存'
+    )
+    parser.add_argument(
+        '--no-cache',
+        action='store_true',
+        help='禁用缓存功能'
+    )
+    parser.add_argument(
+        '--timeout',
+        type=int,
+        default=30,
+        help='请求超时秒数 (默认: 30)'
+    )
+    parser.add_argument(
+        '--max-retries',
+        type=int,
+        default=5,
+        help='最大重试次数 (默认: 5)'
+    )
+    parser.add_argument(
+        '--retry-delay',
+        type=int,
+        default=2,
+        help='重试延迟秒数 (默认: 2)'
+    )
+    args = parser.parse_args()
+
+    # 创建API客户端实例
+    use_cache = not args.no_cache
+    client = XiaohongshuDetail(results_dir=args.results_dir, use_cache=use_cache)
+
+    # 执行获取并保存
+    try:
+        raw_result, from_cache = client.get_detail(
+            args.note_id,
+            timeout=args.timeout,
+            max_retries=args.max_retries,
+            retry_delay=args.retry_delay,
+            force=args.force
+        )
+
+        # 只有新数据才保存
+        if not from_cache:
+            raw_filepath, clean_filepath = client.save_result(args.note_id, raw_result)
+            print(f"Raw data saved to: {raw_filepath}")
+            print(f"Clean data saved to: {clean_filepath}")
+        else:
+            print(f"Used cached data, no new files saved")
+    except Exception as e:
+        print(f"Error: {e}", file=__import__('sys').stderr)
+
+
+if __name__ == "__main__":
+    main()

+ 1 - 1
script/search/API.md

@@ -81,7 +81,7 @@ data = search_xiaohongshu("产品测试", force=True)
 
 ## 内部特性
 
-- ✅ 自动重试(最多3次)
+- ✅ 自动重试(最多5次)
 - ✅ 自动缓存(默认开启)
 - ✅ 自动保存(后台完成)
 - ✅ 超时保护(30秒)

+ 162 - 29
script/search/README.md

@@ -6,26 +6,36 @@
 
 ```python
 from script.search import search_xiaohongshu
+from script.detail import get_xiaohongshu_detail
 
-# 基本搜索
+# 搜索笔记
 data = search_xiaohongshu("产品测试")
 
-# 使用数据
+# 获取详情
 for note in data['notes']:
-    print(f"{note['title']} - {note['like_count']} 赞")
+    note_id = note['channel_content_id']
+    detail = get_xiaohongshu_detail(note_id)
+    print(f"{detail['title']}")
+    print(f"{detail['body_text']}")  # 完整正文
 ```
 
 ### 命令行工具
 
 ```bash
+# 搜索
 python script/search/xiaohongshu_search.py --keyword "产品测试"
+
+# 详情
+python script/detail/xiaohongshu_detail.py --note-id "6915588b00000000040143b5"
 ```
 
 ---
 
 ## API 文档
 
-### 函数签名
+### 1. 搜索接口
+
+#### 函数签名
 
 ```python
 data = search_xiaohongshu(
@@ -59,22 +69,95 @@ data = search_xiaohongshu(
 
 ### 笔记字段
 
-| 字段 | 说明 |
-|------|------|
-| channel_content_id | 笔记ID |
-| link | 笔记链接 |
-| title | 标题 |
-| desc | 摘要(搜索接口返回) |
-| body_text | 完整正文(需详情接口) |
-| channel_account_name | 作者名称 |
-| channel_account_id | 作者ID |
-| like_count | 点赞数 |
-| comment_count | 评论数 |
-| collect_count | 收藏数 |
-| shared_count | 分享数 |
-| images | 图片URL列表 |
-| video | 视频链接(需详情接口) |
-| content_type | 内容类型 |
+| 字段 | 类型 | 说明 |
+|------|------|------|
+| channel_content_id | string/null | 笔记ID |
+| link | string/null | 笔记链接 |
+| title | string/null | 标题 |
+| desc | string/null | 摘要(搜索接口返回) |
+| body_text | null | 完整正文(搜索接口不返回,需调用详情接口) |
+| channel_account_name | string/null | 作者名称 |
+| channel_account_id | string/null | 作者ID |
+| like_count | number/null | 点赞数 |
+| comment_count | number/null | 评论数 |
+| collect_count | number/null | 收藏数 |
+| shared_count | number/null | 分享数 |
+| images | array | 图片URL列表(空数组表示无图片) |
+| video | null | 视频链接(搜索接口不返回,需调用详情接口) |
+| content_type | string/null | 内容类型(video/note) |
+
+**注意**: 不存在的字段统一用 `null` 表示,而非空字符串或 0。
+
+### 2. 详情接口
+
+#### 函数签名
+
+```python
+detail = get_xiaohongshu_detail(
+    note_id: str,          # 必填:笔记ID
+    force=False            # 可选:强制刷新
+)
+```
+
+#### 返回值
+
+```python
+{
+  "channel_content_id": "68d62e4500000000130085fc",
+  "link": "https://www.xiaohongshu.com/explore/...",
+  "comment_count": null,
+  "images": ["http://res.cybertogether.net/..."],
+  "like_count": 14,
+  "body_text": "完整正文内容...",
+  "title": "笔记标题",
+  "collect_count": 6,
+  "channel_account_id": "664954500000000007006ac0",
+  "channel_account_name": "作者名称",
+  "content_type": "video",  # 根据 video 字段自动判断:有视频="video",否则="normal"
+  "video": "http://sns-video-hw.xhscdn.com/...",
+  "publish_timestamp": 1758877418000,
+  "publish_time": "2025-09-26 17:03:38"
+}
+```
+
+#### 字段说明
+
+| 字段 | 类型 | 说明 |
+|------|------|------|
+| channel_content_id | string/null | 笔记ID |
+| link | string/null | 笔记链接 |
+| title | string/null | 标题 |
+| body_text | string/null | 完整正文内容 |
+| channel_account_name | string/null | 作者名称 |
+| channel_account_id | string/null | 作者ID |
+| like_count | number/null | 点赞数 |
+| comment_count | number/null | 评论数 |
+| collect_count | number/null | 收藏数 |
+| images | array | 图片URL列表(已去重) |
+| video | string/null | 视频链接 |
+| content_type | string | 内容类型("video" 或 "normal",根据 video 字段自动判断) |
+| publish_timestamp | number/null | 发布时间戳(毫秒) |
+| publish_time | string/null | 发布时间(格式:YYYY-MM-DD HH:MM:SS) |
+
+**注意**:
+- 不存在的字段统一用 `null` 表示
+- 图片已自动按顺序去重
+- `content_type` 自动判断:有 `video` 为 "video",否则为 "normal"
+
+#### 使用示例
+
+```python
+from script.detail import get_xiaohongshu_detail
+
+# 获取笔记详情
+detail = get_xiaohongshu_detail("68d62e4500000000130085fc")
+
+print(f"标题: {detail['title']}")
+print(f"正文: {detail['body_text']}")
+print(f"视频: {detail['video']}")
+print(f"类型: {detail['content_type']}")
+print(f"发布时间: {detail['publish_time']}")
+```
 
 ---
 
@@ -154,17 +237,41 @@ def analyze_topic(keyword):
 analyze_topic("产品测试")
 ```
 
+### 7. 搜索 + 详情(完整正文)
+
+```python
+from script.search import search_xiaohongshu
+from script.detail import get_xiaohongshu_detail
+
+# 搜索笔记
+data = search_xiaohongshu("产品测试", publish_time="一周内")
+
+# 获取前3条的完整详情
+for note in data['notes'][:3]:
+    note_id = note['channel_content_id']
+
+    # 获取详情
+    detail = get_xiaohongshu_detail(note_id)
+
+    print(f"\n标题: {note['title']}")
+    print(f"摘要: {note['desc'][:50]}...")
+    print(f"完整正文: {detail['result']['body_text'][:100]}...")
+    print(f"点赞: {note['like_count']}")
+```
+
 ---
 
 ## 命令行使用
 
-### 基本搜索
+### 搜索接口
+
+#### 基本搜索
 
 ```bash
 python script/search/xiaohongshu_search.py --keyword "产品测试"
 ```
 
-### 带参数搜索
+#### 带参数搜索
 
 ```bash
 python script/search/xiaohongshu_search.py \
@@ -174,19 +281,19 @@ python script/search/xiaohongshu_search.py \
   --publish-time "一周内"
 ```
 
-### 强制刷新
+#### 强制刷新
 
 ```bash
 python script/search/xiaohongshu_search.py --keyword "产品测试" --force
 ```
 
-### 禁用缓存
+#### 禁用缓存
 
 ```bash
 python script/search/xiaohongshu_search.py --keyword "产品测试" --no-cache
 ```
 
-### 完整参数
+#### 完整参数
 
 | 参数 | 默认值 | 说明 |
 |------|--------|------|
@@ -200,7 +307,33 @@ python script/search/xiaohongshu_search.py --keyword "产品测试" --no-cache
 | --no-cache | False | 禁用缓存 |
 | --results-dir | data/search | 输出目录 |
 | --timeout | 30 | 超时时间(秒) |
-| --max-retries | 3 | 最大重试次数 |
+| --max-retries | 5 | 最大重试次数 |
+| --retry-delay | 2 | 重试延迟(秒) |
+
+### 详情接口
+
+#### 基本使用
+
+```bash
+python script/detail/xiaohongshu_detail.py --note-id "6915588b00000000040143b5"
+```
+
+#### 强制刷新
+
+```bash
+python script/detail/xiaohongshu_detail.py --note-id "6915588b00000000040143b5" --force
+```
+
+#### 完整参数
+
+| 参数 | 默认值 | 说明 |
+|------|--------|------|
+| --note-id | 必填 | 笔记ID |
+| --force | False | 强制刷新 |
+| --no-cache | False | 禁用缓存 |
+| --results-dir | data/detail | 输出目录 |
+| --timeout | 30 | 超时时间(秒) |
+| --max-retries | 5 | 最大重试次数 |
 | --retry-delay | 2 | 重试延迟(秒) |
 
 ---
@@ -222,14 +355,14 @@ data2 = search_xiaohongshu("产品测试")  # 瞬间返回
 data3 = search_xiaohongshu("产品测试", force=True)
 ```
 
-### 2. 自动重试(失败重试 3 次)
+### 2. 自动重试(失败重试 5 次)
 
 - 超时错误:自动重试
 - 连接错误:自动重试
 - 5xx 服务器错误:自动重试
 - 4xx 客户端错误:不重试
 
-指数退避策略:2秒 → 4秒 → 8秒
+指数退避策略:2秒 → 4秒 → 8秒 → 16秒 → 32秒
 
 ### 3. 自动保存(后台完成)
 
@@ -379,7 +512,7 @@ page2 = search_xiaohongshu("产品测试", page=2)
 ### 内部默认配置
 
 - **超时时间**:30 秒
-- **最大重试**:3
+- **最大重试**:5
 - **重试延迟**:2 秒(指数增长)
 - **缓存开关**:默认开启
 - **输出目录**:`data/search`

+ 76 - 49
script/search/xiaohongshu_search.py

@@ -194,10 +194,10 @@ class XiaohongshuSearch:
         publish_time: str = "不限",
         cursor: str = "",
         timeout: int = 30,
-        max_retries: int = 3,
+        max_retries: int = 5,
         retry_delay: int = 2,
         force: bool = False
-    ) -> Dict[str, Any]:
+    ) -> tuple[Dict[str, Any], bool]:
         """
         搜索小红书笔记,带自动重试机制和缓存
 
@@ -208,12 +208,12 @@ class XiaohongshuSearch:
             publish_time: 发布时间筛选,可选值:不限、一天内、一周内、半年内,默认为'不限'
             cursor: 翻页游标,第一页默认为空,下一页的游标在上一页的返回值中获取
             timeout: 请求超时时间(秒),默认30秒
-            max_retries: 最大重试次数,默认3
+            max_retries: 最大重试次数,默认5
             retry_delay: 重试延迟(秒),默认2秒,每次重试会指数增长
             force: 强制重新请求API,忽略缓存,默认为 False
 
         Returns:
-            原始数据(已解析 result 字段)
+            (原始数据, 是否来自缓存) 的元组
 
         Raises:
             requests.exceptions.RequestException: 所有重试失败后抛出异常
@@ -229,7 +229,7 @@ class XiaohongshuSearch:
 
                 if cached_result:
                     print(f"✓ 使用缓存数据: {raw_filepath}")
-                    return cached_result
+                    return cached_result, True  # 返回缓存标记
 
         payload = {
             "keyword": keyword,
@@ -269,7 +269,7 @@ class XiaohongshuSearch:
 
                 # raw_result 就是 raw 数据(已解析 result,保留完整结构)
                 print(f"✓ 搜索成功!")
-                return raw_result
+                return raw_result, False  # 返回新数据标记
 
             except requests.exceptions.Timeout as e:
                 last_exception = e
@@ -329,21 +329,23 @@ class XiaohongshuSearch:
                 elif isinstance(img, str):
                     images.append(img)
 
+            # 不存在的字段统一用 None/null 表示
+            note_id = note.get("id")
             clean_note = {
-                "channel_content_id": note.get("id", ""),
-                "link": f"https://www.xiaohongshu.com/explore/{note.get('id', '')}",
-                "comment_count": interact_info.get("comment_count", 0),
-                "images": images,
-                "like_count": interact_info.get("liked_count", 0),
-                "desc": note_card.get("desc", ""),  # 摘要(搜索接口返回)
-                "body_text": "",  # 完整正文需要调用详情接口获取
-                "title": note_card.get("display_title", ""),
-                "collect_count": interact_info.get("collected_count", 0),
-                "channel_account_id": user.get("user_id", ""),
-                "channel_account_name": user.get("nick_name", ""),
-                "content_type": note_card.get("type", "note"),
-                "video": "",  # 搜索结果中没有视频字段
-                "shared_count": interact_info.get("shared_count", 0)
+                "channel_content_id": note_id or None,
+                "link": f"https://www.xiaohongshu.com/explore/{note_id}" if note_id else None,
+                "comment_count": interact_info.get("comment_count"),
+                "images": images if images else [],
+                "like_count": interact_info.get("liked_count"),
+                "desc": note_card.get("desc") or None,  # 摘要(搜索接口返回)
+                "body_text": None,  # 完整正文需要调用详情接口获取
+                "title": note_card.get("display_title") or None,
+                "collect_count": interact_info.get("collected_count"),
+                "channel_account_id": user.get("user_id") or None,
+                "channel_account_name": user.get("nick_name") or None,
+                "content_type": note_card.get("type") or None,
+                "video": None,  # 搜索结果中没有视频字段
+                "shared_count": interact_info.get("shared_count")
             }
 
             clean_notes.append(clean_note)
@@ -523,8 +525,8 @@ def main():
     parser.add_argument(
         '--max-retries',
         type=int,
-        default=3,
-        help='最大重试次数 (默认: 3)'
+        default=5,
+        help='最大重试次数 (默认: 5)'
     )
     parser.add_argument(
         '--retry-delay',
@@ -556,7 +558,7 @@ def main():
 
     # 执行搜索并保存
     try:
-        raw_result = client.search(
+        raw_result, from_cache = client.search(
             args.keyword,
             args.content_type,
             args.sort_type,
@@ -567,17 +569,22 @@ def main():
             retry_delay=args.retry_delay,
             force=args.force
         )
-        raw_filepath, clean_filepath = client.save_result(
-            args.keyword,
-            raw_result,
-            args.page,
-            args.content_type,
-            args.sort_type,
-            args.publish_time,
-            args.cursor
-        )
-        print(f"Raw data saved to: {raw_filepath}")
-        print(f"Clean data saved to: {clean_filepath}")
+
+        # 只有新数据才保存
+        if not from_cache:
+            raw_filepath, clean_filepath = client.save_result(
+                args.keyword,
+                raw_result,
+                args.page,
+                args.content_type,
+                args.sort_type,
+                args.publish_time,
+                args.cursor
+            )
+            print(f"Raw data saved to: {raw_filepath}")
+            print(f"Clean data saved to: {clean_filepath}")
+        else:
+            print(f"Used cached data, no new files saved")
     except Exception as e:
         print(f"Error: {e}", file=__import__('sys').stderr)
 
@@ -645,7 +652,7 @@ def search_xiaohongshu(
         cursor = prev_page_result.get('next_cursor', '')
 
     # 搜索(内部处理重试、超时等)
-    raw_result = client.search(
+    raw_result, from_cache = client.search(
         keyword=keyword,
         content_type=content_type,
         sort_type=sort_type,
@@ -654,20 +661,40 @@ def search_xiaohongshu(
         force=force
     )
 
-    # 自动保存
-    _, clean_filepath = client.save_result(
-        keyword=keyword,
-        raw_result=raw_result,
-        page=page,
-        content_type=content_type,
-        sort_type=sort_type,
-        publish_time=publish_time,
-        cursor=cursor
-    )
-
-    # 读取并返回数据
-    with open(clean_filepath, 'r', encoding='utf-8') as f:
-        return json.load(f)
+    # 只有新请求的数据才需要保存
+    if not from_cache:
+        _, clean_filepath = client.save_result(
+            keyword=keyword,
+            raw_result=raw_result,
+            page=page,
+            content_type=content_type,
+            sort_type=sort_type,
+            publish_time=publish_time,
+            cursor=cursor
+        )
+        # 读取并返回数据
+        with open(clean_filepath, 'r', encoding='utf-8') as f:
+            return json.load(f)
+    else:
+        # 如果是缓存数据,直接提取 clean 数据返回
+        clean_data = client._extract_clean_data(raw_result)
+        # 添加搜索参数到 clean 数据
+        timestamp = raw_result.get("search_params", {}).get("timestamp", "")
+        clean_data_with_meta = {
+            "search_params": {
+                "keyword": keyword,
+                "content_type": content_type,
+                "sort_type": sort_type,
+                "publish_time": publish_time,
+                "cursor": cursor,
+                "page": page,
+                "timestamp": timestamp
+            },
+            "has_more": clean_data["has_more"],
+            "next_cursor": clean_data["next_cursor"],
+            "notes": clean_data["notes"]
+        }
+        return clean_data_with_meta
 
 
 if __name__ == "__main__":