| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282 |
- #!/usr/bin/env python3
- """
- 小红书笔记详情获取工具
- 根据笔记ID获取小红书笔记的详细信息
- """
- import requests
- import json
- import os
- import argparse
- import time
- from datetime import datetime
- from typing import Dict, Any, List
- class XiaohongshuDetail:
- """小红书笔记详情API封装类"""
- BASE_URL = "http://47.84.182.56:8001"
- TOOL_NAME = "get_xhs_detail_by_note_id"
- PLATFORM = "xiaohongshu"
- def __init__(self, results_dir: str = None):
- """
- 初始化API客户端
- Args:
- results_dir: 结果输出目录,默认为项目根目录下的 data/detail 文件夹
- """
- self.api_url = f"{self.BASE_URL}/tools/call/{self.TOOL_NAME}"
- # 设置结果输出目录
- if results_dir:
- self.results_base_dir = results_dir
- else:
- # 默认使用项目根目录的 data/detail 文件夹
- script_dir = os.path.dirname(os.path.abspath(__file__))
- project_root = os.path.dirname(os.path.dirname(script_dir))
- self.results_base_dir = os.path.join(project_root, "data", "detail")
- def get_detail(
- self,
- note_id: str,
- timeout: int = 30,
- max_retries: int = 3,
- retry_delay: int = 2
- ) -> Dict[str, Any]:
- """
- 获取小红书笔记详情(带重试机制)
- Args:
- note_id: 笔记ID
- timeout: 请求超时时间(秒),默认30秒
- max_retries: 最大重试次数,默认3次
- retry_delay: 重试间隔时间(秒),默认2秒
- Returns:
- API响应的JSON数据
- Raises:
- requests.exceptions.RequestException: 所有重试都失败时抛出异常
- """
- payload = {
- "note_id": note_id
- }
- last_exception = None
- # 重试循环:最多尝试 max_retries 次
- for attempt in range(1, max_retries + 1):
- try:
- if attempt > 1:
- print(f" 重试第 {attempt - 1}/{max_retries - 1} 次: {note_id}")
- response = requests.post(
- self.api_url,
- json=payload,
- timeout=timeout,
- headers={"Content-Type": "application/json"}
- )
- response.raise_for_status()
- result = response.json()
- if attempt > 1:
- print(f" ✓ 重试成功")
- return result
- except requests.exceptions.RequestException as e:
- last_exception = e
- if attempt < max_retries:
- # 还有重试机会,等待后继续
- print(f" ✗ 请求失败 (第{attempt}次尝试): {e}")
- print(f" 等待 {retry_delay} 秒后重试...")
- time.sleep(retry_delay)
- else:
- # 已达最大重试次数,抛出异常
- print(f" ✗ 请求失败 (已达最大重试次数 {max_retries}): {e}")
- # 所有重试都失败,抛出最后一次的异常
- raise last_exception
- def get_details_batch(
- self,
- note_ids: List[str],
- timeout: int = 30,
- max_retries: int = 3,
- retry_delay: int = 2,
- batch_delay: int = 1
- ) -> List[Dict[str, Any]]:
- """
- 批量获取多个笔记的详情
- Args:
- note_ids: 笔记ID列表
- timeout: 请求超时时间(秒),默认30秒
- max_retries: 最大重试次数,默认3次
- retry_delay: 重试间隔时间(秒),默认2秒
- batch_delay: 批量请求间隔时间(秒),默认1秒
- Returns:
- 包含所有笔记详情的列表
- """
- results = []
- total = len(note_ids)
- for idx, note_id in enumerate(note_ids, 1):
- print(f"正在获取笔记详情 ({idx}/{total}): {note_id}")
- try:
- result = self.get_detail(note_id, timeout, max_retries, retry_delay)
- results.append({
- "note_id": note_id,
- "success": True,
- "data": result
- })
- print(f" ✓ 成功获取")
- except Exception as e:
- print(f" ✗ 获取失败: {e}")
- results.append({
- "note_id": note_id,
- "success": False,
- "error": str(e)
- })
- # 避免请求过快,添加延迟(最后一个不需要延迟)
- if idx < total:
- time.sleep(batch_delay)
- return results
- def save_result(self, note_id: str, result: Dict[str, Any]) -> str:
- """
- 保存单个笔记详情到文件
- 目录结构: results/xiaohongshu_detail/note_id/时间戳.json
- Args:
- note_id: 笔记ID
- result: API返回的结果
- Returns:
- 保存的文件路径
- """
- # 创建目录结构: results/xiaohongshu_detail/note_id/
- result_dir = os.path.join(self.results_base_dir, "xiaohongshu_detail", note_id)
- os.makedirs(result_dir, exist_ok=True)
- # 文件名使用时间戳
- timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
- filename = f"{timestamp}.json"
- filepath = os.path.join(result_dir, filename)
- # 保存结果
- with open(filepath, 'w', encoding='utf-8') as f:
- json.dump(result, f, ensure_ascii=False, indent=2)
- return filepath
- def save_batch_results(self, results: List[Dict[str, Any]], batch_name: str = None) -> str:
- """
- 保存批量获取的结果到单个文件
- 目录结构: results/xiaohongshu_detail/batch/时间戳_批次名.json
- Args:
- results: 批量获取的结果列表
- batch_name: 批次名称(可选)
- Returns:
- 保存的文件路径
- """
- # 创建目录结构
- result_dir = os.path.join(self.results_base_dir, "xiaohongshu_detail", "batch")
- os.makedirs(result_dir, exist_ok=True)
- # 文件名使用时间戳和批次名
- timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
- if batch_name:
- filename = f"{timestamp}_{batch_name}.json"
- else:
- filename = f"{timestamp}.json"
- filepath = os.path.join(result_dir, filename)
- # 保存结果
- with open(filepath, 'w', encoding='utf-8') as f:
- json.dump(results, f, ensure_ascii=False, indent=2)
- return filepath
- def main():
- """示例使用"""
- # 解析命令行参数
- parser = argparse.ArgumentParser(description='小红书笔记详情获取工具')
- parser.add_argument(
- '--results-dir',
- type=str,
- default='data/detail',
- help='结果输出目录 (默认: data/detail)'
- )
- # 创建互斥参数组:单个笔记ID 或 批量笔记ID
- group = parser.add_mutually_exclusive_group(required=True)
- group.add_argument(
- '--note-id',
- type=str,
- help='单个笔记ID'
- )
- group.add_argument(
- '--note-ids',
- type=str,
- help='多个笔记ID,用逗号分隔,例如: id1,id2,id3'
- )
- group.add_argument(
- '--note-ids-file',
- type=str,
- help='包含笔记ID的文件路径,每行一个ID'
- )
- parser.add_argument(
- '--batch-name',
- type=str,
- help='批量获取时的批次名称(可选)'
- )
- args = parser.parse_args()
- # 创建API客户端实例
- client = XiaohongshuDetail(results_dir=args.results_dir)
- try:
- # 单个笔记ID
- if args.note_id:
- result = client.get_detail(args.note_id)
- filepath = client.save_result(args.note_id, result)
- print(f"Output: {filepath}")
- # 多个笔记ID(命令行逗号分隔)
- elif args.note_ids:
- note_ids = [nid.strip() for nid in args.note_ids.split(',') if nid.strip()]
- results = client.get_details_batch(note_ids)
- filepath = client.save_batch_results(results, args.batch_name)
- print(f"\n批量获取完成")
- print(f"成功: {sum(1 for r in results if r['success'])}/{len(results)}")
- print(f"Output: {filepath}")
- # 从文件读取笔记ID
- elif args.note_ids_file:
- with open(args.note_ids_file, 'r', encoding='utf-8') as f:
- note_ids = [line.strip() for line in f if line.strip()]
- results = client.get_details_batch(note_ids)
- filepath = client.save_batch_results(results, args.batch_name)
- print(f"\n批量获取完成")
- print(f"成功: {sum(1 for r in results if r['success'])}/{len(results)}")
- print(f"Output: {filepath}")
- except Exception as e:
- print(f"Error: {e}", file=__import__('sys').stderr)
- raise
- if __name__ == "__main__":
- main()
|