| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701 |
- #!/usr/bin/env python3
- """
- 小红书笔记搜索工具
- 根据关键词搜索小红书笔记,支持多种筛选条件
- """
- import requests
- import json
- import os
- import argparse
- import time
- import hashlib
- import re
- from datetime import datetime
- from typing import Dict, Any, Optional, Tuple
- from copy import deepcopy
- from pathlib import Path
- class XiaohongshuSearch:
- """小红书笔记搜索API封装类"""
- BASE_URL = "http://47.84.182.56:8001"
- TOOL_NAME = "xhs_note_search"
- PLATFORM = "xiaohongshu"
- def __init__(self, results_dir: str = None, use_cache: bool = True):
- """
- 初始化API客户端
- Args:
- results_dir: 结果输出目录,默认为项目根目录下的 data/search 文件夹
- use_cache: 是否启用缓存,默认为 True
- """
- self.api_url = f"{self.BASE_URL}/tools/call/{self.TOOL_NAME}"
- self.use_cache = use_cache
- # 设置结果输出目录
- if results_dir:
- self.results_base_dir = results_dir
- else:
- # 默认使用项目根目录的 data/search 文件夹
- script_dir = os.path.dirname(os.path.abspath(__file__))
- project_root = os.path.dirname(os.path.dirname(script_dir))
- self.results_base_dir = os.path.join(project_root, "data", "search")
- def _sanitize_keyword(self, keyword: str) -> str:
- """
- 清理关键词,使其可以作为文件夹名称
- Args:
- keyword: 原始关键词
- Returns:
- 清理后的关键词
- """
- # 替换不能用作文件夹名称的字符
- # Windows: < > : " / \ | ? *
- # Unix: /
- # 替换为下划线
- sanitized = re.sub(r'[<>:"/\\|?*]', '_', keyword)
- # 移除首尾空格
- sanitized = sanitized.strip()
- # 移除首尾的点号(Windows不允许)
- sanitized = sanitized.strip('.')
- # 如果清理后为空,使用默认名称
- if not sanitized:
- sanitized = "unnamed"
- # 限制长度(文件系统通常限制255字符)
- if len(sanitized) > 200:
- sanitized = sanitized[:200]
- return sanitized
- def _get_cache_key(
- self,
- keyword: str,
- content_type: str,
- sort_type: str,
- publish_time: str,
- cursor: str
- ) -> str:
- """
- 生成缓存键(基于搜索参数的哈希)
- Args:
- 搜索参数
- Returns:
- 缓存键(MD5哈希值)
- """
- # 将所有参数组合成字符串
- params_str = f"{keyword}|{content_type}|{sort_type}|{publish_time}|{cursor}"
- # 生成 MD5 哈希
- return hashlib.md5(params_str.encode('utf-8')).hexdigest()
- def _get_latest_cache(
- self,
- keyword: str,
- cache_key: str,
- content_type: str,
- sort_type: str,
- publish_time: str
- ) -> Optional[Tuple[str, str]]:
- """
- 获取最新的缓存文件(匹配搜索参数)
- Args:
- keyword: 搜索关键词
- cache_key: 缓存键(未使用,保留接口兼容)
- content_type: 内容类型
- sort_type: 排序方式
- publish_time: 发布时间
- Returns:
- (raw_filepath, clean_filepath) 或 None(如果没有缓存)
- """
- # 清理关键词用于文件夹名称
- safe_keyword = self._sanitize_keyword(keyword)
- base_dir = os.path.join(self.results_base_dir, "xiaohongshu_search", safe_keyword)
- raw_dir = os.path.join(base_dir, "raw")
- clean_dir = os.path.join(base_dir, "clean")
- # 检查目录是否存在
- if not os.path.exists(raw_dir) or not os.path.exists(clean_dir):
- return None
- # 获取所有文件并筛选匹配参数的文件
- try:
- # 生成参数后缀用于匹配文件名
- param_suffix = self._get_filename_suffix(content_type, sort_type, publish_time)
- raw_files = list(Path(raw_dir).glob("*.json"))
- clean_files = list(Path(clean_dir).glob("*.json"))
- if not raw_files or not clean_files:
- return None
- # 筛选匹配参数的文件
- matching_raw_files = [
- f for f in raw_files
- if param_suffix in f.name
- ]
- matching_clean_files = [
- f for f in clean_files
- if param_suffix in f.name
- ]
- if not matching_raw_files or not matching_clean_files:
- return None
- # 按修改时间排序,最新的在前
- matching_raw_files.sort(key=lambda x: x.stat().st_mtime, reverse=True)
- matching_clean_files.sort(key=lambda x: x.stat().st_mtime, reverse=True)
- # 返回最新的匹配文件路径
- return (str(matching_raw_files[0]), str(matching_clean_files[0]))
- except Exception:
- return None
- def _load_cached_result(self, raw_filepath: str) -> Optional[Dict[str, Any]]:
- """
- 加载缓存的原始数据
- Args:
- raw_filepath: 原始数据文件路径
- Returns:
- 原始数据字典 或 None
- """
- try:
- with open(raw_filepath, 'r', encoding='utf-8') as f:
- data = json.load(f)
- # 兼容旧格式和新格式
- if "api_response" in data:
- # 新格式:包含 search_params 和 api_response
- return data["api_response"]
- else:
- # 旧格式:直接是 API 响应
- return data
- except Exception:
- return None
- def search(
- self,
- keyword: str,
- content_type: str = "不限",
- sort_type: str = "综合",
- publish_time: str = "不限",
- cursor: str = "",
- timeout: int = 30,
- max_retries: int = 5,
- retry_delay: int = 2,
- force: bool = False
- ) -> tuple[Dict[str, Any], bool]:
- """
- 搜索小红书笔记,带自动重试机制和缓存
- Args:
- keyword: 搜索关键词
- content_type: 内容类型,可选值:不限、视频、图文,默认为'不限'
- sort_type: 排序方式,可选值:综合、最新、最多点赞、最多评论,默认为'综合'
- publish_time: 发布时间筛选,可选值:不限、一天内、一周内、半年内,默认为'不限'
- cursor: 翻页游标,第一页默认为空,下一页的游标在上一页的返回值中获取
- timeout: 请求超时时间(秒),默认30秒
- max_retries: 最大重试次数,默认5次
- retry_delay: 重试延迟(秒),默认2秒,每次重试会指数增长
- force: 强制重新请求API,忽略缓存,默认为 False
- Returns:
- (原始数据, 是否来自缓存) 的元组
- Raises:
- requests.exceptions.RequestException: 所有重试失败后抛出异常
- """
- # 检查缓存(如果启用且未强制刷新)
- if self.use_cache and not force:
- cache_key = self._get_cache_key(keyword, content_type, sort_type, publish_time, cursor)
- cached_files = self._get_latest_cache(keyword, cache_key, content_type, sort_type, publish_time)
- if cached_files:
- raw_filepath, clean_filepath = cached_files
- cached_result = self._load_cached_result(raw_filepath)
- if cached_result:
- print(f"✓ 使用缓存数据: {raw_filepath}")
- return cached_result, True # 返回缓存标记
- payload = {
- "keyword": keyword,
- "content_type": content_type,
- "sort_type": sort_type,
- "publish_time": publish_time,
- "cursor": cursor
- }
- last_exception = None
- for attempt in range(max_retries):
- try:
- if attempt > 0:
- # 指数退避策略:每次重试延迟时间翻倍
- wait_time = retry_delay * (2 ** (attempt - 1))
- print(f"等待 {wait_time} 秒后进行第 {attempt + 1} 次重试...")
- time.sleep(wait_time)
- print(f"正在搜索关键词: {keyword} (尝试 {attempt + 1}/{max_retries})")
- response = requests.post(
- self.api_url,
- json=payload,
- timeout=timeout,
- headers={"Content-Type": "application/json"}
- )
- response.raise_for_status()
- raw_result = response.json()
- # 如果 result 字段是字符串,需要解析成 JSON 对象
- if 'result' in raw_result and isinstance(raw_result['result'], str):
- try:
- raw_result['result'] = json.loads(raw_result['result'])
- except json.JSONDecodeError:
- pass # 如果解析失败,保持原样
- # raw_result 就是 raw 数据(已解析 result,保留完整结构)
- print(f"✓ 搜索成功!")
- return raw_result, False # 返回新数据标记
- except requests.exceptions.Timeout as e:
- last_exception = e
- print(f"✗ 请求超时: {e}")
- except requests.exceptions.ConnectionError as e:
- last_exception = e
- print(f"✗ 连接错误: {e}")
- except requests.exceptions.HTTPError as e:
- last_exception = e
- status_code = e.response.status_code if e.response else "未知"
- print(f"✗ HTTP错误 {status_code}: {e}")
- # 如果是客户端错误(4xx),不重试
- if e.response and 400 <= e.response.status_code < 500:
- print(f"客户端错误,停止重试")
- raise
- except requests.exceptions.RequestException as e:
- last_exception = e
- print(f"✗ 请求失败: {e}")
- # 所有重试都失败
- print(f"✗ 已达到最大重试次数 ({max_retries}),请求失败")
- raise last_exception
- def _extract_clean_data(self, result: Dict[str, Any]) -> Dict[str, Any]:
- """
- 提取并清理数据,生成扁平化的结构
- Args:
- result: 已处理的结果字典
- Returns:
- 包含笔记列表和分页信息的字典
- """
- result_data = result.get("result", {})
- if not isinstance(result_data, dict):
- return {"has_more": False, "next_cursor": "", "notes": []}
- data = result_data.get("data", {})
- notes = data.get("data", [])
- clean_notes = []
- for note in notes:
- note_card = note.get("note_card", {})
- user = note_card.get("user", {})
- interact_info = note_card.get("interact_info", {})
- # 处理 image_list:从字典格式提取 URL
- image_list_raw = note_card.get("image_list", [])
- images = []
- for img in image_list_raw:
- if isinstance(img, dict) and "image_url" in img:
- images.append(img["image_url"])
- elif isinstance(img, str):
- images.append(img)
- # 不存在的字段统一用 None/null 表示
- note_id = note.get("id")
- clean_note = {
- "channel_content_id": note_id or None,
- "link": f"https://www.xiaohongshu.com/explore/{note_id}" if note_id else None,
- "comment_count": interact_info.get("comment_count"),
- "images": images if images else [],
- "like_count": interact_info.get("liked_count"),
- "desc": note_card.get("desc") or None, # 摘要(搜索接口返回)
- "body_text": None, # 完整正文需要调用详情接口获取
- "title": note_card.get("display_title") or None,
- "collect_count": interact_info.get("collected_count"),
- "channel_account_id": user.get("user_id") or None,
- "channel_account_name": user.get("nick_name") or None,
- "content_type": note_card.get("type") or None,
- "video": None, # 搜索结果中没有视频字段
- "shared_count": interact_info.get("shared_count")
- }
- clean_notes.append(clean_note)
- # Return clean data with pagination info
- return {
- "has_more": data.get("has_more", False),
- "next_cursor": data.get("next_cursor", ""),
- "notes": clean_notes
- }
- def _get_filename_suffix(
- self,
- content_type: str,
- sort_type: str,
- publish_time: str
- ) -> str:
- """
- 根据搜索参数生成文件名后缀
- Args:
- content_type: 内容类型
- sort_type: 排序方式
- publish_time: 发布时间
- Returns:
- 文件名后缀字符串
- """
- # 直接使用原始参数值,不做映射,全部显示
- parts = [content_type, sort_type, publish_time]
- return "_" + "_".join(parts)
- def save_result(
- self,
- keyword: str,
- raw_result: Dict[str, Any],
- page: int = 1,
- content_type: str = "不限",
- sort_type: str = "综合",
- publish_time: str = "不限",
- cursor: str = ""
- ) -> tuple[str, str]:
- """
- 保存原始数据和清理后数据到不同的目录
- 目录结构:
- data/search/xiaohongshu_search/
- ├── {keyword}/
- │ ├── raw/ # 原始数据(完整 API 响应,含分页信息)
- │ │ └── {timestamp}_page{页码}_{参数}.json
- │ └── clean/ # 清理后数据(扁平化笔记数组)
- │ └── {timestamp}_page{页码}_{参数}.json
- Args:
- keyword: 搜索关键词
- raw_result: 原始数据(已解析 result 字段)
- page: 页码
- content_type: 内容类型
- sort_type: 排序方式
- publish_time: 发布时间
- cursor: 翻页游标
- Returns:
- (原始数据路径, 清理后数据路径) 的元组
- """
- # 清理关键词用于文件夹名称
- safe_keyword = self._sanitize_keyword(keyword)
- # 创建目录结构
- base_dir = os.path.join(self.results_base_dir, "xiaohongshu_search", safe_keyword)
- raw_dir = os.path.join(base_dir, "raw")
- clean_dir = os.path.join(base_dir, "clean")
- os.makedirs(raw_dir, exist_ok=True)
- os.makedirs(clean_dir, exist_ok=True)
- # 生成文件名(包含参数信息)
- timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
- param_suffix = self._get_filename_suffix(content_type, sort_type, publish_time)
- filename = f"{timestamp}_page{page}{param_suffix}.json"
- raw_filepath = os.path.join(raw_dir, filename)
- clean_filepath = os.path.join(clean_dir, filename)
- # 添加搜索参数到 raw 数据
- raw_data_with_meta = {
- "search_params": {
- "keyword": keyword,
- "content_type": content_type,
- "sort_type": sort_type,
- "publish_time": publish_time,
- "cursor": cursor,
- "page": page,
- "timestamp": timestamp
- },
- "api_response": raw_result
- }
- # 保存原始结果(包含元数据)
- with open(raw_filepath, 'w', encoding='utf-8') as f:
- json.dump(raw_data_with_meta, f, ensure_ascii=False, indent=2)
- # 提取并保存清理后的数据
- clean_data = self._extract_clean_data(raw_result)
- # 添加搜索参数到 clean 数据
- clean_data_with_meta = {
- "search_params": {
- "keyword": keyword,
- "content_type": content_type,
- "sort_type": sort_type,
- "publish_time": publish_time,
- "cursor": cursor,
- "page": page,
- "timestamp": timestamp
- },
- "has_more": clean_data["has_more"],
- "next_cursor": clean_data["next_cursor"],
- "notes": clean_data["notes"]
- }
- with open(clean_filepath, 'w', encoding='utf-8') as f:
- json.dump(clean_data_with_meta, f, ensure_ascii=False, indent=2)
- return raw_filepath, clean_filepath
- def main():
- """示例使用"""
- # 解析命令行参数
- parser = argparse.ArgumentParser(description='小红书笔记搜索工具')
- parser.add_argument(
- '--results-dir',
- type=str,
- default='data/search',
- help='结果输出目录 (默认: data/search)'
- )
- parser.add_argument(
- '--keyword',
- type=str,
- required=True,
- help='搜索关键词 (必填)'
- )
- parser.add_argument(
- '--content-type',
- type=str,
- default='不限',
- choices=['不限', '视频', '图文'],
- help='内容类型 (默认: 不限)'
- )
- parser.add_argument(
- '--sort-type',
- type=str,
- default='综合',
- choices=['综合', '最新', '最多点赞', '最多评论'],
- help='排序方式 (默认: 综合)'
- )
- parser.add_argument(
- '--publish-time',
- type=str,
- default='不限',
- choices=['不限', '一天内', '一周内', '半年内'],
- help='发布时间筛选 (默认: 不限)'
- )
- parser.add_argument(
- '--cursor',
- type=str,
- default='',
- help='翻页游标 (默认为空,即第一页)'
- )
- parser.add_argument(
- '--page',
- type=int,
- default=1,
- help='页码标识,用于保存文件名 (默认: 1)'
- )
- parser.add_argument(
- '--max-retries',
- type=int,
- default=5,
- help='最大重试次数 (默认: 5)'
- )
- parser.add_argument(
- '--retry-delay',
- type=int,
- default=2,
- help='重试延迟秒数 (默认: 2)'
- )
- parser.add_argument(
- '--timeout',
- type=int,
- default=30,
- help='请求超时秒数 (默认: 30)'
- )
- parser.add_argument(
- '--force',
- action='store_true',
- help='强制重新请求API,忽略缓存'
- )
- parser.add_argument(
- '--no-cache',
- action='store_true',
- help='禁用缓存功能'
- )
- args = parser.parse_args()
- # 创建API客户端实例
- use_cache = not args.no_cache
- client = XiaohongshuSearch(results_dir=args.results_dir, use_cache=use_cache)
- # 执行搜索并保存
- try:
- raw_result, from_cache = client.search(
- args.keyword,
- args.content_type,
- args.sort_type,
- args.publish_time,
- args.cursor,
- timeout=args.timeout,
- max_retries=args.max_retries,
- retry_delay=args.retry_delay,
- force=args.force
- )
- # 只有新数据才保存
- if not from_cache:
- raw_filepath, clean_filepath = client.save_result(
- args.keyword,
- raw_result,
- args.page,
- args.content_type,
- args.sort_type,
- args.publish_time,
- args.cursor
- )
- print(f"Raw data saved to: {raw_filepath}")
- print(f"Clean data saved to: {clean_filepath}")
- else:
- print(f"Used cached data, no new files saved")
- except Exception as e:
- print(f"Error: {e}", file=__import__('sys').stderr)
- def search_xiaohongshu(
- keyword: str,
- content_type: str = "不限",
- sort_type: str = "综合",
- publish_time: str = "不限",
- page: int = 1,
- force: bool = False
- ) -> Dict[str, Any]:
- """
- 小红书笔记搜索
- Args:
- keyword: 搜索关键词
- content_type: 内容类型,可选:不限、视频、图文
- sort_type: 排序方式,可选:综合、最新、最多点赞、最多评论
- publish_time: 发布时间,可选:不限、一天内、一周内、半年内
- page: 页码(自动翻页)
- force: 强制刷新(忽略缓存)
- Returns:
- {
- "search_params": {...},
- "has_more": bool,
- "next_cursor": str,
- "notes": [...]
- }
- Examples:
- >>> # 基本使用
- >>> data = search_xiaohongshu("产品测试")
- >>> for note in data['notes']:
- ... print(f"{note['title']} - {note['like_count']} 赞")
- >>> # 带参数
- >>> data = search_xiaohongshu(
- ... keyword="产品测试",
- ... content_type="视频",
- ... sort_type="最新"
- ... )
- >>> # 翻页(自动处理 cursor)
- >>> page1 = search_xiaohongshu("产品测试", page=1)
- >>> page2 = search_xiaohongshu("产品测试", page=2)
- >>> page3 = search_xiaohongshu("产品测试", page=3)
- """
- # 创建客户端(使用默认配置)
- client = XiaohongshuSearch(use_cache=True)
- # 自动处理翻页游标
- cursor = ""
- if page > 1:
- # 读取上一页的 cursor
- prev_page_result = search_xiaohongshu(
- keyword=keyword,
- content_type=content_type,
- sort_type=sort_type,
- publish_time=publish_time,
- page=page - 1,
- force=False # 上一页使用缓存
- )
- cursor = prev_page_result.get('next_cursor', '')
- # 搜索(内部处理重试、超时等)
- raw_result, from_cache = client.search(
- keyword=keyword,
- content_type=content_type,
- sort_type=sort_type,
- publish_time=publish_time,
- cursor=cursor,
- force=force
- )
- # 只有新请求的数据才需要保存
- if not from_cache:
- _, clean_filepath = client.save_result(
- keyword=keyword,
- raw_result=raw_result,
- page=page,
- content_type=content_type,
- sort_type=sort_type,
- publish_time=publish_time,
- cursor=cursor
- )
- # 读取并返回数据
- with open(clean_filepath, 'r', encoding='utf-8') as f:
- return json.load(f)
- else:
- # 如果是缓存数据,直接提取 clean 数据返回
- clean_data = client._extract_clean_data(raw_result)
- # 添加搜索参数到 clean 数据
- timestamp = raw_result.get("search_params", {}).get("timestamp", "")
- clean_data_with_meta = {
- "search_params": {
- "keyword": keyword,
- "content_type": content_type,
- "sort_type": sort_type,
- "publish_time": publish_time,
- "cursor": cursor,
- "page": page,
- "timestamp": timestamp
- },
- "has_more": clean_data["has_more"],
- "next_cursor": clean_data["next_cursor"],
- "notes": clean_data["notes"]
- }
- return clean_data_with_meta
- if __name__ == "__main__":
- main()
|