| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477 |
- #!/usr/bin/env python3
- """
- 小红书笔记详情工具
- 根据笔记ID获取笔记详情(包含完整正文、视频等)
- """
- import requests
- import json
- import os
- import argparse
- import time
- import hashlib
- import re
- from datetime import datetime
- from typing import Dict, Any, Optional
- import sys
- from pathlib import Path
- # 添加项目根目录到路径并导入配置
- sys.path.insert(0, str(Path(__file__).parent.parent.parent))
- from lib.config import get_data_dir
- from pathlib import Path
- class XiaohongshuDetail:
- """小红书笔记详情API封装类"""
- BASE_URL = "http://47.84.182.56:8001"
- TOOL_NAME = "get_xhs_detail_by_note_id"
- PLATFORM = "xiaohongshu"
- def __init__(self, results_dir: str = None, use_cache: bool = True):
- """
- 初始化API客户端
- Args:
- results_dir: 结果输出目录,默认为项目根目录下的 data/detail 文件夹
- use_cache: 是否启用缓存,默认为 True
- """
- self.api_url = f"{self.BASE_URL}/tools/call/{self.TOOL_NAME}"
- self.use_cache = use_cache
- # 设置结果输出目录
- if results_dir:
- self.results_base_dir = results_dir
- else:
- # 默认从配置读取
- self.results_base_dir = get_data_dir("detail")
- def _sanitize_note_id(self, note_id: str) -> str:
- """
- 清理笔记ID,使其可以作为文件夹名称
- Args:
- note_id: 原始笔记ID
- Returns:
- 清理后的笔记ID
- """
- # 替换不能用作文件夹名称的字符
- sanitized = re.sub(r'[<>:"/\\|?*]', '_', note_id)
- sanitized = sanitized.strip().strip('.')
- if not sanitized:
- sanitized = "unnamed"
- if len(sanitized) > 200:
- sanitized = sanitized[:200]
- return sanitized
- def _get_latest_cache(self, note_id: str) -> Optional[tuple[str, str]]:
- """
- 获取最新的缓存文件(raw 和 clean)
- Args:
- note_id: 笔记ID
- Returns:
- (raw_filepath, clean_filepath) 元组 或 None
- """
- safe_note_id = self._sanitize_note_id(note_id)
- detail_dir = os.path.join(self.results_base_dir, "xiaohongshu_detail", safe_note_id)
- raw_dir = os.path.join(detail_dir, "raw")
- clean_dir = os.path.join(detail_dir, "clean")
- if not os.path.exists(raw_dir) or not os.path.exists(clean_dir):
- return None
- # 获取 raw 目录下的所有 JSON 文件
- raw_path = Path(raw_dir)
- raw_files = sorted(raw_path.glob("*.json"), key=lambda x: x.stat().st_mtime, reverse=True)
- if not raw_files:
- return None
- # 获取最新的 raw 文件,并构造对应的 clean 文件路径
- latest_raw = raw_files[0]
- latest_clean = Path(clean_dir) / latest_raw.name
- if latest_clean.exists():
- return (str(latest_raw), str(latest_clean))
- return None
- def _load_cached_result(self, filepath: str) -> Optional[Dict[str, Any]]:
- """
- 加载缓存的数据
- Args:
- filepath: 文件路径
- Returns:
- 数据字典 或 None
- """
- try:
- with open(filepath, 'r', encoding='utf-8') as f:
- data = json.load(f)
- # 兼容新旧格式
- if "api_response" in data:
- return data["api_response"]
- else:
- return data
- except Exception:
- return None
- def get_detail(
- self,
- note_id: str,
- timeout: int = 30,
- max_retries: int = 5,
- retry_delay: int = 2,
- force: bool = False
- ) -> tuple[Dict[str, Any], bool]:
- """
- 获取小红书笔记详情,带自动重试机制和缓存
- Args:
- note_id: 笔记ID
- timeout: 请求超时时间(秒),默认30秒
- max_retries: 最大重试次数,默认5次
- retry_delay: 重试延迟(秒),默认2秒,每次重试会指数增长
- force: 强制重新请求API,忽略缓存,默认为 False
- Returns:
- (原始数据, 是否来自缓存) 的元组
- Raises:
- requests.exceptions.RequestException: 所有重试失败后抛出异常
- """
- # 检查缓存(如果启用且未强制刷新)
- if self.use_cache and not force:
- cached_files = self._get_latest_cache(note_id)
- if cached_files:
- raw_file, clean_file = cached_files
- cached_result = self._load_cached_result(raw_file)
- if cached_result:
- print(f"✓ 使用缓存数据: {raw_file}")
- return cached_result, True # 返回缓存标记
- payload = {"note_id": note_id}
- last_exception = None
- for attempt in range(max_retries):
- try:
- if attempt > 0:
- wait_time = retry_delay * (2 ** (attempt - 1))
- print(f"等待 {wait_time} 秒后进行第 {attempt + 1} 次重试...")
- time.sleep(wait_time)
- print(f"正在获取笔记详情: {note_id} (尝试 {attempt + 1}/{max_retries})")
- response = requests.post(
- self.api_url,
- json=payload,
- timeout=timeout,
- headers={"Content-Type": "application/json"}
- )
- response.raise_for_status()
- raw_result = response.json()
- # 如果 result 字段是字符串,需要解析成 JSON 对象
- if 'result' in raw_result and isinstance(raw_result['result'], str):
- try:
- raw_result['result'] = json.loads(raw_result['result'])
- except json.JSONDecodeError:
- pass
- # 检查 API 返回是否成功
- if not raw_result.get('success'):
- error_msg = raw_result.get('message', '未知错误')
- print(f"✗ API 返回失败: {error_msg}")
- last_exception = Exception(f"API 返回失败: {error_msg}")
- continue # 继续重试
- print(f"✓ 获取成功!")
- return raw_result, False # 返回新数据标记
- except requests.exceptions.Timeout as e:
- last_exception = e
- print(f"✗ 请求超时: {e}")
- except requests.exceptions.ConnectionError as e:
- last_exception = e
- print(f"✗ 连接错误: {e}")
- except requests.exceptions.HTTPError as e:
- last_exception = e
- status_code = e.response.status_code if e.response else "未知"
- print(f"✗ HTTP错误 {status_code}: {e}")
- # 如果是客户端错误(4xx),不重试
- if e.response and 400 <= e.response.status_code < 500:
- print(f"客户端错误,停止重试")
- raise
- except requests.exceptions.RequestException as e:
- last_exception = e
- print(f"✗ 请求失败: {e}")
- # 所有重试都失败
- print(f"✗ 已达到最大重试次数 ({max_retries}),请求失败")
- raise last_exception
- def _extract_clean_data(self, raw_result: Dict[str, Any]) -> Dict[str, Any]:
- """
- 提取并清理数据,生成扁平化的结构(参考现有格式)
- Args:
- raw_result: 原始 API 响应
- Returns:
- 清理后的笔记详情
- """
- if not raw_result.get("success"):
- return {}
- result = raw_result.get("result", [])
- if not result or not isinstance(result, list) or len(result) == 0:
- return {}
- data = result[0].get("data", {})
- # 提取图片 URL 并按顺序去重
- images = []
- seen = set()
- for img in data.get("images", []):
- url = None
- if isinstance(img, dict) and "cdn_url" in img:
- url = img["cdn_url"]
- elif isinstance(img, str):
- url = img
- # 按顺序去重
- if url and url not in seen:
- images.append(url)
- seen.add(url)
- # 处理时间戳转换为时间字符串
- publish_timestamp = data.get("publish_timestamp")
- publish_time = None
- if publish_timestamp:
- try:
- from datetime import datetime
- # 毫秒时间戳转换为秒
- dt = datetime.fromtimestamp(publish_timestamp / 1000)
- publish_time = dt.strftime("%Y-%m-%d %H:%M:%S")
- except:
- publish_time = None
- # 获取 video 字段
- video = data.get("video") or None
- # 根据 video 字段判断 content_type
- if video:
- content_type = "video"
- else:
- content_type = "normal"
- # 构建清理后的数据(扁平化结构,参考现有格式)
- # 不存在的字段统一用 None/null 表示
- clean_data = {
- "channel_content_id": data.get("channel_content_id") or None,
- "link": data.get("content_link") or None,
- "comment_count": data.get("comment_count"),
- "images": images if images else [],
- "like_count": data.get("like_count"),
- "body_text": data.get("body_text") or None,
- "title": data.get("title") or None,
- "collect_count": data.get("collect_count"),
- "channel_account_id": data.get("channel_account_id") or None,
- "channel_account_name": data.get("channel_account_name") or None,
- "content_type": content_type,
- "video": video,
- "publish_timestamp": publish_timestamp,
- "publish_time": publish_time
- }
- return clean_data
- def save_result(
- self,
- note_id: str,
- raw_result: Dict[str, Any]
- ) -> tuple[str, str]:
- """
- 保存原始数据和清理后数据到不同的目录
- 目录结构:
- data/detail/xiaohongshu_detail/
- └── {note_id}/
- ├── raw/ # 原始数据(完整 API 响应)
- │ └── {timestamp}.json
- └── clean/ # 清理后数据(扁平化结构)
- └── {timestamp}.json
- Args:
- note_id: 笔记ID
- raw_result: 原始数据(已解析 result 字段)
- Returns:
- (原始数据路径, 清理后数据路径) 的元组
- """
- # 清理笔记ID用于文件夹名称
- safe_note_id = self._sanitize_note_id(note_id)
- # 创建目录结构
- base_dir = os.path.join(self.results_base_dir, "xiaohongshu_detail", safe_note_id)
- raw_dir = os.path.join(base_dir, "raw")
- clean_dir = os.path.join(base_dir, "clean")
- os.makedirs(raw_dir, exist_ok=True)
- os.makedirs(clean_dir, exist_ok=True)
- # 生成文件名(使用时间戳)
- timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
- filename = f"{timestamp}.json"
- raw_filepath = os.path.join(raw_dir, filename)
- clean_filepath = os.path.join(clean_dir, filename)
- # 添加元数据到 raw 数据
- raw_data_with_meta = {
- "note_id": note_id,
- "timestamp": timestamp,
- "api_response": raw_result
- }
- # 保存原始结果(包含元数据)
- with open(raw_filepath, 'w', encoding='utf-8') as f:
- json.dump(raw_data_with_meta, f, ensure_ascii=False, indent=2)
- # 提取并保存清理后的数据(扁平化结构,直接保存)
- clean_data = self._extract_clean_data(raw_result)
- with open(clean_filepath, 'w', encoding='utf-8') as f:
- json.dump(clean_data, f, ensure_ascii=False, indent=2)
- return raw_filepath, clean_filepath
- def get_xiaohongshu_detail(
- note_id: str,
- force: bool = False
- ) -> Dict[str, Any]:
- """
- 获取小红书笔记详情
- Args:
- note_id: 笔记ID
- force: 强制刷新(忽略缓存)
- Returns:
- 笔记详情数据(clean 格式,扁平化结构)
- Examples:
- >>> # 基本使用
- >>> detail = get_xiaohongshu_detail("6915588b00000000040143b5")
- >>> print(detail['title'])
- >>> print(detail['body_text'])
- >>> # 强制刷新
- >>> detail = get_xiaohongshu_detail("6915588b00000000040143b5", force=True)
- """
- # 创建客户端(使用默认配置)
- client = XiaohongshuDetail(use_cache=True)
- # 获取详情(内部处理重试、超时等)
- raw_result, from_cache = client.get_detail(note_id=note_id, force=force)
- # 只有新请求的数据才需要保存
- if not from_cache:
- raw_filepath, clean_filepath = client.save_result(note_id=note_id, raw_result=raw_result)
- # 读取并返回 clean 数据
- with open(clean_filepath, 'r', encoding='utf-8') as f:
- return json.load(f)
- else:
- # 如果是缓存数据,直接提取 clean 数据返回
- clean_data = client._extract_clean_data(raw_result)
- return clean_data
- def main():
- """示例使用"""
- # 解析命令行参数
- parser = argparse.ArgumentParser(description='小红书笔记详情工具')
- parser.add_argument(
- '--results-dir',
- type=str,
- default=None,
- help='结果输出目录 (默认: 从配置读取)'
- )
- parser.add_argument(
- '--note-id',
- type=str,
- required=True,
- help='笔记ID (必填)'
- )
- parser.add_argument(
- '--force',
- action='store_true',
- help='强制重新请求API,忽略缓存'
- )
- parser.add_argument(
- '--no-cache',
- action='store_true',
- help='禁用缓存功能'
- )
- parser.add_argument(
- '--timeout',
- type=int,
- default=30,
- help='请求超时秒数 (默认: 30)'
- )
- parser.add_argument(
- '--max-retries',
- type=int,
- default=5,
- help='最大重试次数 (默认: 5)'
- )
- parser.add_argument(
- '--retry-delay',
- type=int,
- default=2,
- help='重试延迟秒数 (默认: 2)'
- )
- args = parser.parse_args()
- # 创建API客户端实例
- use_cache = not args.no_cache
- client = XiaohongshuDetail(results_dir=args.results_dir, use_cache=use_cache)
- # 执行获取并保存
- try:
- raw_result, from_cache = client.get_detail(
- args.note_id,
- timeout=args.timeout,
- max_retries=args.max_retries,
- retry_delay=args.retry_delay,
- force=args.force
- )
- # 只有新数据才保存
- if not from_cache:
- raw_filepath, clean_filepath = client.save_result(args.note_id, raw_result)
- print(f"Raw data saved to: {raw_filepath}")
- print(f"Clean data saved to: {clean_filepath}")
- else:
- print(f"Used cached data, no new files saved")
- except Exception as e:
- print(f"Error: {e}", file=__import__('sys').stderr)
- if __name__ == "__main__":
- main()
|