소스 검색

add search

yangxiaohui 2 주 전
부모
커밋
55cc15758a

+ 3 - 0
.gitignore

@@ -0,0 +1,3 @@
+data
+*.ipynb
+__pycache__

BIN
script/.DS_Store


+ 139 - 0
script/README.md

@@ -0,0 +1,139 @@
+# 搜索脚本文档
+
+本目录包含各平台的搜索、推荐词和标签词获取脚本。
+
+## 目录
+
+- [获取工具列表](#获取工具列表-)
+- [搜索脚本](#搜索脚本)
+  - [通用搜索(Google、Baidu、Bing)](#1-通用搜索googlebaidubing-)
+  - [抖音内容搜索](#2-抖音内容搜索-)
+  - [小红书笔记搜索](#3-小红书笔记搜索-)
+  - [AI搜索](#4-ai搜索-)
+- [推荐词脚本](#推荐词脚本-)
+- [标签词脚本](#标签词脚本)
+  - [抖音标签词](#抖音标签词-)
+  - [小红书标签词](#小红书标签词-)
+
+---
+
+## 获取工具列表 ✅
+
+```bash
+python script/get_tools_list.py
+```
+
+**输出:**`data/tools_list/tools_list_{时间戳}.json`
+
+---
+
+## 搜索脚本
+
+### 1. 通用搜索(Google、Baidu、Bing) ✅
+
+```bash
+# 使用默认平台Google搜索
+python script/search/custom_search.py --keyword "python"
+
+# 指定其他搜索平台
+python script/search/custom_search.py --keyword "python" --platform "baidu"
+```
+
+**参数:**
+- `--keyword`: 搜索关键词(必填)
+- `--platform`: 搜索平台,可选值:google/baidu/bing(可选,默认google)
+- `--results-dir`: 结果保存目录(可选,默认 data/search)
+
+**输出:**`data/search/custom_search/{平台}/{关键词}/{时间戳}.json`
+
+### 2. 抖音内容搜索 ✅
+
+```bash
+python script/search/douyin_search.py --keyword "美食"
+```
+
+**参数:**
+- `--keyword`: 搜索关键词(必填)
+- `--results-dir`: 结果保存目录(可选,默认 data/search)
+
+**输出:**`data/search/douyin_search/{关键词}/{时间戳}.json`
+
+### 3. 小红书笔记搜索 ✅
+
+```bash
+python script/search/xiaohongshu_search.py --keyword "旅游"
+```
+
+**参数:**
+- `--keyword`: 搜索关键词(必填)
+- `--content-type`: 内容类型,可选值:不限/视频/图文(可选,默认"不限")
+- `--sort-type`: 排序方式,可选值:综合/最新/最多点赞/最多评论(可选,默认"综合")
+- `--publish-time`: 发布时间,可选值:不限/一天内/一周内/半年内(可选,默认"不限")
+- `--cursor`: 翻页游标(可选,默认为空)
+- `--page`: 页码标识(可选,默认1)
+- `--results-dir`: 结果保存目录(可选,默认 data/search)
+
+**输出:**`data/search/xiaohongshu_search/{关键词}/{时间戳}_page{页码}.json`
+
+### 4. AI搜索 ✅
+
+```bash
+python script/search/ai_search.py --query "什么是Python"
+```
+
+**参数:**
+- `--query`: 查询内容(必填)
+- `--results-dir`: 结果保存目录(可选,默认 data/search)
+
+**输出:**`data/search/ai_search/{查询内容前20字符}/{时间戳}.json`
+
+---
+
+## 推荐词脚本 ✅
+
+```bash
+# 抖音推荐词
+python script/search_recommendations/douyin_search_recommendations.py --keyword "美食"
+
+# B站推荐词
+python script/search_recommendations/bilibili_search_recommendations.py --keyword "游戏"
+
+# 小红书推荐词
+python script/search_recommendations/xiaohongshu_search_recommendations.py --keyword "长沙"
+```
+
+**参数:**
+- `--keyword`: 搜索关键词(必填)
+- `--results-dir`: 结果保存目录(可选,默认 data/search_recommendations)
+
+**输出:**`data/search_recommendations/{平台}/{关键词}/{时间戳}.json`
+
+---
+
+## 标签词脚本
+
+### 抖音标签词 ✅
+
+```bash
+python script/search_tagwords/douyin_search_tagword.py --keyword "旅游"
+```
+
+**参数:**
+- `--keyword`: 搜索关键词(必填)
+- `--results-dir`: 结果保存目录(可选,默认 data/search_tagwords)
+
+**输出:**`data/search_tagwords/douyin/{关键词}/tagword_{时间戳}.json`
+
+### 小红书标签词 ❌
+
+```bash
+python script/search_tagwords/xiaohongshu_search_hashtag.py --keyword "护肤"
+```
+
+**参数:**
+- `--keyword`: 搜索关键词(必填)
+- `--results-dir`: 结果保存目录(可选,默认 data/search_tagwords)
+
+**输出:**`data/search_tagwords/xiaohongshu/{关键词}/hashtag_{时间戳}.json`
+
+> ❌ **不可用:** 该接口当前返回500错误,服务端暂时不可用

+ 3 - 0
script/__init__.py

@@ -0,0 +1,3 @@
+"""
+脚本工具包
+"""

+ 115 - 0
script/get_tools_list.py

@@ -0,0 +1,115 @@
+#!/usr/bin/env python3
+"""
+获取可用工具列表接口
+从API获取所有可用的工具列表
+"""
+
+import requests
+import json
+import os
+import argparse
+from datetime import datetime
+from typing import Dict, Any
+
+
+class ToolsListFetcher:
+    """工具列表获取API封装类"""
+
+    BASE_URL = "http://47.84.182.56:8001"
+    API_ENDPOINT = "/tools"
+
+    def __init__(self, results_dir: str = None):
+        """
+        初始化API客户端
+
+        Args:
+            results_dir: 结果输出目录,默认为项目根目录下的 data/tools_list 文件夹
+        """
+        self.api_url = f"{self.BASE_URL}{self.API_ENDPOINT}"
+
+        # 设置结果输出目录
+        if results_dir:
+            self.results_base_dir = results_dir
+        else:
+            # 默认使用项目根目录的 data/tools_list 文件夹
+            script_dir = os.path.dirname(os.path.abspath(__file__))
+            project_root = os.path.dirname(script_dir)
+            self.results_base_dir = os.path.join(project_root, "data", "tools_list")
+
+    def get_tools_list(self, timeout: int = 30) -> Dict[str, Any]:
+        """
+        获取工具列表
+
+        Args:
+            timeout: 请求超时时间(秒),默认30秒
+
+        Returns:
+            API响应的JSON数据
+
+        Raises:
+            requests.exceptions.RequestException: 请求失败时抛出异常
+        """
+        try:
+            response = requests.get(
+                self.api_url,
+                timeout=timeout,
+                headers={"Content-Type": "application/json"}
+            )
+            response.raise_for_status()
+            return response.json()
+        except requests.exceptions.RequestException as e:
+            print(f"请求失败: {e}")
+            raise
+
+    def save_result(self, result: Dict[str, Any]) -> str:
+        """
+        保存结果到文件
+        目录结构: results/tools_list_时间戳.json
+
+        Args:
+            result: API返回的结果
+
+        Returns:
+            保存的文件路径
+        """
+        # 创建目录结构
+        os.makedirs(self.results_base_dir, exist_ok=True)
+
+        # 文件名使用时间戳
+        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
+        filename = f"tools_list_{timestamp}.json"
+        filepath = os.path.join(self.results_base_dir, filename)
+
+        # 保存结果
+        with open(filepath, 'w', encoding='utf-8') as f:
+            json.dump(result, f, ensure_ascii=False, indent=2)
+
+        return filepath
+
+
+def main():
+    """示例使用"""
+    # 解析命令行参数
+    parser = argparse.ArgumentParser(description='获取可用工具列表接口')
+    parser.add_argument(
+        '--results-dir',
+        type=str,
+        default='data/tools_list',
+        help='结果输出目录 (默认: data/tools_list)'
+    )
+    args = parser.parse_args()
+
+    # 创建API客户端实例
+    client = ToolsListFetcher(results_dir=args.results_dir)
+
+    # 获取工具列表并保存
+    try:
+        result = client.get_tools_list()
+        filepath = client.save_result(result)
+        print(f"Output: {filepath}")
+    except Exception as e:
+        print(f"Error: {e}", file=__import__('sys').stderr)
+
+
+if __name__ == "__main__":
+    main()

+ 87 - 0
script/search/API.md

@@ -0,0 +1,87 @@
+# 小红书搜索 API 文档
+
+## 函数
+
+```python
+from script.search import search_xiaohongshu
+
+data = search_xiaohongshu(keyword, **options)
+```
+
+## 参数
+
+| 参数 | 类型 | 默认值 | 说明 |
+|------|------|--------|------|
+| **keyword** | str | 必填 | 搜索关键词 |
+| content_type | str | "不限" | 内容类型:不限、视频、图文 |
+| sort_type | str | "综合" | 排序:综合、最新、最多点赞、最多评论 |
+| publish_time | str | "不限" | 时间:不限、一天内、一周内、半年内 |
+| page | int | 1 | 页码(自动翻页) |
+| force | bool | False | 强制刷新 |
+
+## 返回值
+
+```python
+{
+  "search_params": {...},     # 搜索参数
+  "has_more": True,           # 是否有更多
+  "next_cursor": "...",       # 下一页游标
+  "notes": [...]              # 笔记列表
+}
+```
+
+### 笔记字段
+
+| 字段 | 说明 |
+|------|------|
+| channel_content_id | 笔记ID |
+| title | 标题 |
+| desc | 摘要 |
+| channel_account_name | 作者 |
+| like_count | 点赞数 |
+| comment_count | 评论数 |
+| collect_count | 收藏数 |
+| images | 图片列表 |
+| link | 链接 |
+
+## 使用示例
+
+### 基本搜索
+```python
+from script.search import search_xiaohongshu
+
+data = search_xiaohongshu("产品测试")
+
+for note in data['notes']:
+    print(f"{note['title']} - {note['like_count']} 赞")
+```
+
+### 带参数搜索
+```python
+data = search_xiaohongshu(
+    keyword="产品测试",
+    content_type="视频",
+    sort_type="最新",
+    publish_time="一周内"
+)
+```
+
+### 翻页(自动处理)
+```python
+# 直接指定页码即可
+page1 = search_xiaohongshu("产品测试", page=1)
+page2 = search_xiaohongshu("产品测试", page=2)
+page3 = search_xiaohongshu("产品测试", page=3)
+```
+
+### 强制刷新
+```python
+data = search_xiaohongshu("产品测试", force=True)
+```
+
+## 内部特性
+
+- ✅ 自动重试(最多3次)
+- ✅ 自动缓存(默认开启)
+- ✅ 自动保存(后台完成)
+- ✅ 超时保护(30秒)

+ 400 - 0
script/search/README.md

@@ -0,0 +1,400 @@
+# 小红书搜索模块
+
+## 快速开始
+
+### Python API(推荐)
+
+```python
+from script.search import search_xiaohongshu
+
+# 基本搜索
+data = search_xiaohongshu("产品测试")
+
+# 使用数据
+for note in data['notes']:
+    print(f"{note['title']} - {note['like_count']} 赞")
+```
+
+### 命令行工具
+
+```bash
+python script/search/xiaohongshu_search.py --keyword "产品测试"
+```
+
+---
+
+## API 文档
+
+### 函数签名
+
+```python
+data = search_xiaohongshu(
+    keyword: str,           # 必填:搜索关键词
+    content_type="不限",    # 可选:不限、视频、图文
+    sort_type="综合",       # 可选:综合、最新、最多点赞、最多评论
+    publish_time="不限",    # 可选:不限、一天内、一周内、半年内
+    page=1,                # 可选:页码(自动翻页)
+    force=False            # 可选:强制刷新
+)
+```
+
+### 返回值
+
+```python
+{
+  "search_params": {      # 搜索参数
+    "keyword": "产品测试",
+    "content_type": "视频",
+    "sort_type": "最新",
+    "publish_time": "一周内",
+    "cursor": "",
+    "page": 1,
+    "timestamp": "20251113_133258"
+  },
+  "has_more": True,       # 是否有更多
+  "next_cursor": "...",   # 下一页游标(内部使用)
+  "notes": [...]          # 笔记列表
+}
+```
+
+### 笔记字段
+
+| 字段 | 说明 |
+|------|------|
+| channel_content_id | 笔记ID |
+| link | 笔记链接 |
+| title | 标题 |
+| desc | 摘要(搜索接口返回) |
+| body_text | 完整正文(需详情接口) |
+| channel_account_name | 作者名称 |
+| channel_account_id | 作者ID |
+| like_count | 点赞数 |
+| comment_count | 评论数 |
+| collect_count | 收藏数 |
+| shared_count | 分享数 |
+| images | 图片URL列表 |
+| video | 视频链接(需详情接口) |
+| content_type | 内容类型 |
+
+---
+
+## 使用示例
+
+### 1. 基本搜索
+
+```python
+from script.search import search_xiaohongshu
+
+data = search_xiaohongshu("产品测试")
+
+print(f"找到 {len(data['notes'])} 条笔记")
+for note in data['notes']:
+    print(f"- {note['title']} ({note['like_count']} 赞)")
+```
+
+### 2. 带参数搜索
+
+```python
+data = search_xiaohongshu(
+    keyword="产品测试",
+    content_type="视频",
+    sort_type="最新",
+    publish_time="一周内"
+)
+```
+
+### 3. 翻页(自动处理)
+
+```python
+# 直接指定页码,自动处理 cursor
+page1 = search_xiaohongshu("产品测试", page=1)
+page2 = search_xiaohongshu("产品测试", page=2)
+page3 = search_xiaohongshu("产品测试", page=3)
+```
+
+### 4. 强制刷新
+
+```python
+# 忽略缓存,重新请求 API
+data = search_xiaohongshu("产品测试", force=True)
+```
+
+### 5. 批量搜索
+
+```python
+keywords = ["产品测试", "软件测试", "性能测试"]
+
+for keyword in keywords:
+    data = search_xiaohongshu(keyword)
+    print(f"{keyword}: {len(data['notes'])} 条笔记")
+```
+
+### 6. 数据分析
+
+```python
+from script.search import search_xiaohongshu
+
+def analyze_topic(keyword):
+    """分析话题热度"""
+    data = search_xiaohongshu(
+        keyword=keyword,
+        sort_type="最新",
+        publish_time="一周内"
+    )
+
+    notes = data['notes']
+    total_likes = sum(n['like_count'] for n in notes)
+    avg_likes = total_likes / len(notes) if notes else 0
+
+    print(f"关键词: {keyword}")
+    print(f"笔记数: {len(notes)}")
+    print(f"总点赞: {total_likes}")
+    print(f"平均点赞: {avg_likes:.1f}")
+
+analyze_topic("产品测试")
+```
+
+---
+
+## 命令行使用
+
+### 基本搜索
+
+```bash
+python script/search/xiaohongshu_search.py --keyword "产品测试"
+```
+
+### 带参数搜索
+
+```bash
+python script/search/xiaohongshu_search.py \
+  --keyword "产品测试" \
+  --content-type "视频" \
+  --sort-type "最新" \
+  --publish-time "一周内"
+```
+
+### 强制刷新
+
+```bash
+python script/search/xiaohongshu_search.py --keyword "产品测试" --force
+```
+
+### 禁用缓存
+
+```bash
+python script/search/xiaohongshu_search.py --keyword "产品测试" --no-cache
+```
+
+### 完整参数
+
+| 参数 | 默认值 | 说明 |
+|------|--------|------|
+| --keyword | 必填 | 搜索关键词 |
+| --content-type | "不限" | 内容类型:不限、视频、图文 |
+| --sort-type | "综合" | 排序:综合、最新、最多点赞、最多评论 |
+| --publish-time | "不限" | 时间:不限、一天内、一周内、半年内 |
+| --page | 1 | 页码 |
+| --cursor | "" | 翻页游标 |
+| --force | False | 强制刷新 |
+| --no-cache | False | 禁用缓存 |
+| --results-dir | data/search | 输出目录 |
+| --timeout | 30 | 超时时间(秒) |
+| --max-retries | 3 | 最大重试次数 |
+| --retry-delay | 2 | 重试延迟(秒) |
+
+---
+
+## 核心特性
+
+### 1. 自动缓存(默认开启)
+
+相同的搜索参数会自动使用缓存:
+
+```python
+# 第一次:请求 API
+data1 = search_xiaohongshu("产品测试")
+
+# 第二次:使用缓存
+data2 = search_xiaohongshu("产品测试")  # 瞬间返回
+
+# 强制刷新
+data3 = search_xiaohongshu("产品测试", force=True)
+```
+
+### 2. 自动重试(失败重试 3 次)
+
+- 超时错误:自动重试
+- 连接错误:自动重试
+- 5xx 服务器错误:自动重试
+- 4xx 客户端错误:不重试
+
+指数退避策略:2秒 → 4秒 → 8秒
+
+### 3. 自动保存(后台完成)
+
+搜索结果自动保存到 `data/search/xiaohongshu_search/`
+
+目录结构:
+```
+data/search/xiaohongshu_search/
+└── {关键词}/
+    ├── raw/                           # 原始数据
+    │   └── {时间戳}_page{页码}_{参数}.json
+    └── clean/                         # 清洗数据
+        └── {时间戳}_page{页码}_{参数}.json
+```
+
+文件名示例:
+- 默认参数:`20251113_133315_page1_不限_综合_不限.json`
+- 自定义参数:`20251113_133258_page1_视频_最新_一周内.json`
+
+### 4. 自动翻页(内部处理 cursor)
+
+```python
+# 无需手动管理 cursor
+page1 = search_xiaohongshu("产品测试", page=1)
+page2 = search_xiaohongshu("产品测试", page=2)  # 自动获取 page1 的 cursor
+page3 = search_xiaohongshu("产品测试", page=3)  # 自动获取 page2 的 cursor
+```
+
+### 5. 关键词自动清理
+
+特殊字符会自动处理,避免文件名冲突:
+
+```python
+# 自动清理特殊字符
+search_xiaohongshu("测试/产品:问题?")
+# → 文件夹名:测试_产品_问题_
+```
+
+---
+
+## 数据格式
+
+### Clean 数据(推荐使用)
+
+```json
+{
+  "search_params": {
+    "keyword": "产品测试",
+    "content_type": "视频",
+    "sort_type": "最新",
+    "publish_time": "一周内",
+    "cursor": "",
+    "page": 1,
+    "timestamp": "20251113_133258"
+  },
+  "has_more": true,
+  "next_cursor": "2@2fl1kgnh0gdx2oarsbpxc@...",
+  "notes": [
+    {
+      "channel_content_id": "6915588b00000000040143b5",
+      "link": "https://www.xiaohongshu.com/explore/6915588b00000000040143b5",
+      "title": "笔记标题",
+      "desc": "笔记摘要...",
+      "body_text": "",
+      "channel_account_name": "作者名称",
+      "channel_account_id": "5b1e2c0811be10762dee6859",
+      "like_count": 2,
+      "comment_count": 0,
+      "collect_count": 1,
+      "shared_count": 0,
+      "images": ["https://..."],
+      "video": "",
+      "content_type": "video"
+    }
+  ]
+}
+```
+
+### Raw 数据
+
+完整的 API 响应,包含所有元数据和嵌套结构。
+
+---
+
+## 注意事项
+
+### 关于 desc 和 body_text
+
+- **desc**:搜索接口返回的摘要(已截断)
+- **body_text**:完整正文(空,需调用详情接口 `get_xhs_detail_by_note_id` 获取)
+
+### 关于 video
+
+- 搜索接口不返回视频链接
+- 需要调用详情接口获取
+
+### 频率限制
+
+- 建议每次搜索间隔 1-2 秒
+- 避免短时间内大量请求
+
+---
+
+## 常见问题
+
+### Q: 如何获取完整正文?
+
+A: 搜索接口只返回摘要,完整正文需要调用详情接口:
+
+```python
+# 1. 先搜索获取笔记列表
+data = search_xiaohongshu("产品测试")
+
+# 2. 对感兴趣的笔记调用详情接口
+note_id = data['notes'][0]['channel_content_id']
+# 调用 get_xhs_detail_by_note_id(note_id) 获取完整正文
+```
+
+### Q: 缓存如何清理?
+
+A:
+- 方式1:手动删除 `data/search/xiaohongshu_search/{关键词}/` 目录
+- 方式2:使用 `force=True` 参数强制刷新
+
+### Q: 如何判断是否使用了缓存?
+
+A: 看控制台输出:
+- 使用缓存:`✓ 使用缓存数据: ...`
+- 请求 API:`正在搜索关键词: ... (尝试 1/3)`
+
+### Q: 翻页时 cursor 在哪里?
+
+A: cursor 已自动处理,无需手动管理:
+
+```python
+# ✅ 推荐:直接指定页码
+page2 = search_xiaohongshu("产品测试", page=2)
+
+# ❌ 不需要:手动传 cursor
+# page2 = search_xiaohongshu("产品测试", cursor="...")
+```
+
+---
+
+## 技术细节
+
+### 内部默认配置
+
+- **超时时间**:30 秒
+- **最大重试**:3 次
+- **重试延迟**:2 秒(指数增长)
+- **缓存开关**:默认开启
+- **输出目录**:`data/search`
+
+### 缓存机制
+
+- 基于搜索参数生成缓存键(keyword + content_type + sort_type + publish_time + cursor)
+- 相同参数返回最新的缓存文件
+- 按文件修改时间排序
+
+### 自动翻页原理
+
+```python
+# page=2 时自动执行:
+# 1. 读取 page=1 的缓存
+# 2. 提取 next_cursor
+# 3. 使用 cursor 请求 page=2
+```

+ 10 - 0
script/search/__init__.py

@@ -0,0 +1,10 @@
+"""
+小红书搜索模块
+
+提供小红书笔记搜索功能,支持缓存、参数过滤等
+"""
+
+from .xiaohongshu_search import search_xiaohongshu
+
+__all__ = ['search_xiaohongshu']
+__version__ = '1.0.0'

+ 136 - 0
script/search/ai_search.py

@@ -0,0 +1,136 @@
+#!/usr/bin/env python3
+"""
+AI搜索工具
+调用AI搜索回答用户问题,返回多模态参考源、总结答案和追问问题
+"""
+
+import requests
+import json
+import os
+import argparse
+from datetime import datetime
+from typing import Dict, Any
+
+
+class AISearch:
+    """AI搜索API封装类"""
+
+    BASE_URL = "http://47.84.182.56:8001"
+    TOOL_NAME = "ai_search"
+
+    def __init__(self, results_dir: str = None):
+        """
+        初始化API客户端
+
+        Args:
+            results_dir: 结果输出目录,默认为项目根目录下的 data/search 文件夹
+        """
+        self.api_url = f"{self.BASE_URL}/tools/call/{self.TOOL_NAME}"
+
+        # 设置结果输出目录
+        if results_dir:
+            self.results_base_dir = results_dir
+        else:
+            # 默认使用项目根目录的 data/search 文件夹
+            script_dir = os.path.dirname(os.path.abspath(__file__))
+            project_root = os.path.dirname(os.path.dirname(script_dir))
+            self.results_base_dir = os.path.join(project_root, "data", "search")
+
+    def search(self, query: str, timeout: int = 60) -> Dict[str, Any]:
+        """
+        执行AI搜索
+
+        Args:
+            query: 搜索查询内容
+            timeout: 请求超时时间(秒),默认60秒(AI搜索可能需要更长时间)
+
+        Returns:
+            API响应的JSON数据
+            返回数据中:
+            - type=source: 参考源(网页、图片等)
+            - type=answer: 总结答案
+            - type=follow_up: 追问问题
+
+        Raises:
+            requests.exceptions.RequestException: 请求失败时抛出异常
+        """
+        payload = {
+            "query": query
+        }
+
+        try:
+            response = requests.post(
+                self.api_url,
+                json=payload,
+                timeout=timeout,
+                headers={"Content-Type": "application/json"}
+            )
+            response.raise_for_status()
+            return response.json()
+        except requests.exceptions.RequestException as e:
+            print(f"请求失败: {e}")
+            raise
+
+    def save_result(self, query: str, result: Dict[str, Any]) -> str:
+        """
+        保存结果到文件
+        目录结构: results/ai_search/query关键词/时间戳.json
+
+        Args:
+            query: 搜索查询内容
+            result: API返回的结果
+
+        Returns:
+            保存的文件路径
+        """
+        # 从query中提取简短的关键词作为文件夹名(取前20个字符)
+        query_short = query[:20].replace('/', '_').replace('\\', '_')
+
+        # 创建目录结构: results/ai_search/query/
+        result_dir = os.path.join(self.results_base_dir, "ai_search", query_short)
+        os.makedirs(result_dir, exist_ok=True)
+
+        # 文件名使用时间戳
+        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
+        filename = f"{timestamp}.json"
+        filepath = os.path.join(result_dir, filename)
+
+        # 保存结果
+        with open(filepath, 'w', encoding='utf-8') as f:
+            json.dump(result, f, ensure_ascii=False, indent=2)
+
+        return filepath
+
+
+def main():
+    """示例使用"""
+    # 解析命令行参数
+    parser = argparse.ArgumentParser(description='AI搜索工具')
+    parser.add_argument(
+        '--results-dir',
+        type=str,
+        default='data/search',
+        help='结果输出目录 (默认: data/search)'
+    )
+    parser.add_argument(
+        '--query',
+        type=str,
+        required=True,
+        help='搜索查询内容 (必填)'
+    )
+    args = parser.parse_args()
+
+    # 创建API客户端实例
+    client = AISearch(results_dir=args.results_dir)
+
+    # 执行搜索并保存
+    try:
+        result = client.search(args.query)
+        filepath = client.save_result(args.query, result)
+        print(f"Output: {filepath}")
+    except Exception as e:
+        print(f"Error: {e}", file=__import__('sys').stderr)
+
+
+if __name__ == "__main__":
+    main()

+ 139 - 0
script/search/custom_search.py

@@ -0,0 +1,139 @@
+#!/usr/bin/env python3
+"""
+通用搜索工具
+支持Google、Baidu、Bing等搜索引擎
+"""
+
+import requests
+import json
+import os
+import argparse
+from datetime import datetime
+from typing import Dict, Any
+
+
+class CustomSearch:
+    """通用搜索API封装类"""
+
+    BASE_URL = "http://47.84.182.56:8001"
+    TOOL_NAME = "custom_search"
+
+    def __init__(self, results_dir: str = None):
+        """
+        初始化API客户端
+
+        Args:
+            results_dir: 结果输出目录,默认为项目根目录下的 data/search 文件夹
+        """
+        self.api_url = f"{self.BASE_URL}/tools/call/{self.TOOL_NAME}"
+
+        # 设置结果输出目录
+        if results_dir:
+            self.results_base_dir = results_dir
+        else:
+            # 默认使用项目根目录的 data/search 文件夹
+            script_dir = os.path.dirname(os.path.abspath(__file__))
+            project_root = os.path.dirname(os.path.dirname(script_dir))
+            self.results_base_dir = os.path.join(project_root, "data", "search")
+
+    def search(self, keyword: str, platform: str = "google", timeout: int = 30) -> Dict[str, Any]:
+        """
+        执行搜索
+
+        Args:
+            keyword: 搜索关键词
+            platform: 搜索平台,可选值:google, baidu, bing,默认为google
+            timeout: 请求超时时间(秒),默认30秒
+
+        Returns:
+            API响应的JSON数据
+
+        Raises:
+            requests.exceptions.RequestException: 请求失败时抛出异常
+        """
+        payload = {
+            "keyword": keyword,
+            "platform": platform
+        }
+
+        try:
+            response = requests.post(
+                self.api_url,
+                json=payload,
+                timeout=timeout,
+                headers={"Content-Type": "application/json"}
+            )
+            response.raise_for_status()
+            return response.json()
+        except requests.exceptions.RequestException as e:
+            print(f"请求失败: {e}")
+            raise
+
+    def save_result(self, keyword: str, platform: str, result: Dict[str, Any]) -> str:
+        """
+        保存结果到文件
+        目录结构: results/custom_search/平台/关键词/时间戳.json
+
+        Args:
+            keyword: 搜索关键词
+            platform: 搜索平台
+            result: API返回的结果
+
+        Returns:
+            保存的文件路径
+        """
+        # 创建目录结构: results/custom_search/平台/关键词/
+        result_dir = os.path.join(self.results_base_dir, "custom_search", platform, keyword)
+        os.makedirs(result_dir, exist_ok=True)
+
+        # 文件名使用时间戳
+        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
+        filename = f"{timestamp}.json"
+        filepath = os.path.join(result_dir, filename)
+
+        # 保存结果
+        with open(filepath, 'w', encoding='utf-8') as f:
+            json.dump(result, f, ensure_ascii=False, indent=2)
+
+        return filepath
+
+
+def main():
+    """示例使用"""
+    # 解析命令行参数
+    parser = argparse.ArgumentParser(description='通用搜索工具')
+    parser.add_argument(
+        '--results-dir',
+        type=str,
+        default='data/search',
+        help='结果输出目录 (默认: data/search)'
+    )
+    parser.add_argument(
+        '--keyword',
+        type=str,
+        required=True,
+        help='搜索关键词 (必填)'
+    )
+    parser.add_argument(
+        '--platform',
+        type=str,
+        default='google',
+        choices=['google', 'baidu', 'bing'],
+        help='搜索平台 (默认: google)'
+    )
+    args = parser.parse_args()
+
+    # 创建API客户端实例
+    client = CustomSearch(results_dir=args.results_dir)
+
+    # 执行搜索并保存
+    try:
+        result = client.search(args.keyword, args.platform)
+        filepath = client.save_result(args.keyword, args.platform, result)
+        print(f"Output: {filepath}")
+    except Exception as e:
+        print(f"Error: {e}", file=__import__('sys').stderr)
+
+
+if __name__ == "__main__":
+    main()

+ 130 - 0
script/search/douyin_search.py

@@ -0,0 +1,130 @@
+#!/usr/bin/env python3
+"""
+抖音内容搜索工具
+根据关键词搜索抖音内容
+"""
+
+import requests
+import json
+import os
+import argparse
+from datetime import datetime
+from typing import Dict, Any
+
+
+class DouyinSearch:
+    """抖音搜索API封装类"""
+
+    BASE_URL = "http://47.84.182.56:8001"
+    TOOL_NAME = "douyin_search_by_keyword"
+    PLATFORM = "douyin"
+
+    def __init__(self, results_dir: str = None):
+        """
+        初始化API客户端
+
+        Args:
+            results_dir: 结果输出目录,默认为项目根目录下的 data/search 文件夹
+        """
+        self.api_url = f"{self.BASE_URL}/tools/call/{self.TOOL_NAME}"
+
+        # 设置结果输出目录
+        if results_dir:
+            self.results_base_dir = results_dir
+        else:
+            # 默认使用项目根目录的 data/search 文件夹
+            script_dir = os.path.dirname(os.path.abspath(__file__))
+            project_root = os.path.dirname(os.path.dirname(script_dir))
+            self.results_base_dir = os.path.join(project_root, "data", "search")
+
+    def search(self, keyword: str, timeout: int = 30) -> Dict[str, Any]:
+        """
+        搜索抖音内容
+
+        Args:
+            keyword: 搜索关键词
+            timeout: 请求超时时间(秒),默认30秒
+
+        Returns:
+            API响应的JSON数据
+
+        Raises:
+            requests.exceptions.RequestException: 请求失败时抛出异常
+        """
+        payload = {
+            "keyword": keyword
+        }
+
+        try:
+            response = requests.post(
+                self.api_url,
+                json=payload,
+                timeout=timeout,
+                headers={"Content-Type": "application/json"}
+            )
+            response.raise_for_status()
+            return response.json()
+        except requests.exceptions.RequestException as e:
+            print(f"请求失败: {e}")
+            raise
+
+    def save_result(self, keyword: str, result: Dict[str, Any]) -> str:
+        """
+        保存结果到文件
+        目录结构: results/douyin_search/关键词/时间戳.json
+
+        Args:
+            keyword: 搜索关键词
+            result: API返回的结果
+
+        Returns:
+            保存的文件路径
+        """
+        # 创建目录结构: results/douyin_search/关键词/
+        result_dir = os.path.join(self.results_base_dir, "douyin_search", keyword)
+        os.makedirs(result_dir, exist_ok=True)
+
+        # 文件名使用时间戳
+        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
+        filename = f"{timestamp}.json"
+        filepath = os.path.join(result_dir, filename)
+
+        # 保存结果
+        with open(filepath, 'w', encoding='utf-8') as f:
+            json.dump(result, f, ensure_ascii=False, indent=2)
+
+        return filepath
+
+
+def main():
+    """示例使用"""
+    # 解析命令行参数
+    parser = argparse.ArgumentParser(description='抖音内容搜索工具')
+    parser.add_argument(
+        '--results-dir',
+        type=str,
+        default='data/search',
+        help='结果输出目录 (默认: data/search)'
+    )
+    parser.add_argument(
+        '--keyword',
+        type=str,
+        required=True,
+        help='搜索关键词 (必填)'
+    )
+    args = parser.parse_args()
+
+    # 创建API客户端实例
+    client = DouyinSearch(results_dir=args.results_dir)
+
+    # 执行搜索并保存
+    try:
+        result = client.search(args.keyword)
+        filepath = client.save_result(args.keyword, result)
+        print(f"Output: {filepath}")
+    except Exception as e:
+        print(f"Error: {e}", file=__import__('sys').stderr)
+
+
+if __name__ == "__main__":
+    main()

+ 674 - 0
script/search/xiaohongshu_search.py

@@ -0,0 +1,674 @@
+#!/usr/bin/env python3
+"""
+小红书笔记搜索工具
+根据关键词搜索小红书笔记,支持多种筛选条件
+"""
+
+import requests
+import json
+import os
+import argparse
+import time
+import hashlib
+import re
+from datetime import datetime
+from typing import Dict, Any, Optional, Tuple
+from copy import deepcopy
+from pathlib import Path
+
+
+class XiaohongshuSearch:
+    """小红书笔记搜索API封装类"""
+
+    BASE_URL = "http://47.84.182.56:8001"
+    TOOL_NAME = "xhs_note_search"
+    PLATFORM = "xiaohongshu"
+
+    def __init__(self, results_dir: str = None, use_cache: bool = True):
+        """
+        初始化API客户端
+
+        Args:
+            results_dir: 结果输出目录,默认为项目根目录下的 data/search 文件夹
+            use_cache: 是否启用缓存,默认为 True
+        """
+        self.api_url = f"{self.BASE_URL}/tools/call/{self.TOOL_NAME}"
+        self.use_cache = use_cache
+
+        # 设置结果输出目录
+        if results_dir:
+            self.results_base_dir = results_dir
+        else:
+            # 默认使用项目根目录的 data/search 文件夹
+            script_dir = os.path.dirname(os.path.abspath(__file__))
+            project_root = os.path.dirname(os.path.dirname(script_dir))
+            self.results_base_dir = os.path.join(project_root, "data", "search")
+
+    def _sanitize_keyword(self, keyword: str) -> str:
+        """
+        清理关键词,使其可以作为文件夹名称
+
+        Args:
+            keyword: 原始关键词
+
+        Returns:
+            清理后的关键词
+        """
+        # 替换不能用作文件夹名称的字符
+        # Windows: < > : " / \ | ? *
+        # Unix: /
+        # 替换为下划线
+        sanitized = re.sub(r'[<>:"/\\|?*]', '_', keyword)
+
+        # 移除首尾空格
+        sanitized = sanitized.strip()
+
+        # 移除首尾的点号(Windows不允许)
+        sanitized = sanitized.strip('.')
+
+        # 如果清理后为空,使用默认名称
+        if not sanitized:
+            sanitized = "unnamed"
+
+        # 限制长度(文件系统通常限制255字符)
+        if len(sanitized) > 200:
+            sanitized = sanitized[:200]
+
+        return sanitized
+
+    def _get_cache_key(
+        self,
+        keyword: str,
+        content_type: str,
+        sort_type: str,
+        publish_time: str,
+        cursor: str
+    ) -> str:
+        """
+        生成缓存键(基于搜索参数的哈希)
+
+        Args:
+            搜索参数
+
+        Returns:
+            缓存键(MD5哈希值)
+        """
+        # 将所有参数组合成字符串
+        params_str = f"{keyword}|{content_type}|{sort_type}|{publish_time}|{cursor}"
+        # 生成 MD5 哈希
+        return hashlib.md5(params_str.encode('utf-8')).hexdigest()
+
+    def _get_latest_cache(
+        self,
+        keyword: str,
+        cache_key: str,
+        content_type: str,
+        sort_type: str,
+        publish_time: str
+    ) -> Optional[Tuple[str, str]]:
+        """
+        获取最新的缓存文件(匹配搜索参数)
+
+        Args:
+            keyword: 搜索关键词
+            cache_key: 缓存键(未使用,保留接口兼容)
+            content_type: 内容类型
+            sort_type: 排序方式
+            publish_time: 发布时间
+
+        Returns:
+            (raw_filepath, clean_filepath) 或 None(如果没有缓存)
+        """
+        # 清理关键词用于文件夹名称
+        safe_keyword = self._sanitize_keyword(keyword)
+        base_dir = os.path.join(self.results_base_dir, "xiaohongshu_search", safe_keyword)
+        raw_dir = os.path.join(base_dir, "raw")
+        clean_dir = os.path.join(base_dir, "clean")
+
+        # 检查目录是否存在
+        if not os.path.exists(raw_dir) or not os.path.exists(clean_dir):
+            return None
+
+        # 获取所有文件并筛选匹配参数的文件
+        try:
+            # 生成参数后缀用于匹配文件名
+            param_suffix = self._get_filename_suffix(content_type, sort_type, publish_time)
+
+            raw_files = list(Path(raw_dir).glob("*.json"))
+            clean_files = list(Path(clean_dir).glob("*.json"))
+
+            if not raw_files or not clean_files:
+                return None
+
+            # 筛选匹配参数的文件
+            matching_raw_files = [
+                f for f in raw_files
+                if param_suffix in f.name
+            ]
+            matching_clean_files = [
+                f for f in clean_files
+                if param_suffix in f.name
+            ]
+
+            if not matching_raw_files or not matching_clean_files:
+                return None
+
+            # 按修改时间排序,最新的在前
+            matching_raw_files.sort(key=lambda x: x.stat().st_mtime, reverse=True)
+            matching_clean_files.sort(key=lambda x: x.stat().st_mtime, reverse=True)
+
+            # 返回最新的匹配文件路径
+            return (str(matching_raw_files[0]), str(matching_clean_files[0]))
+
+        except Exception:
+            return None
+
+    def _load_cached_result(self, raw_filepath: str) -> Optional[Dict[str, Any]]:
+        """
+        加载缓存的原始数据
+
+        Args:
+            raw_filepath: 原始数据文件路径
+
+        Returns:
+            原始数据字典 或 None
+        """
+        try:
+            with open(raw_filepath, 'r', encoding='utf-8') as f:
+                data = json.load(f)
+                # 兼容旧格式和新格式
+                if "api_response" in data:
+                    # 新格式:包含 search_params 和 api_response
+                    return data["api_response"]
+                else:
+                    # 旧格式:直接是 API 响应
+                    return data
+        except Exception:
+            return None
+
+    def search(
+        self,
+        keyword: str,
+        content_type: str = "不限",
+        sort_type: str = "综合",
+        publish_time: str = "不限",
+        cursor: str = "",
+        timeout: int = 30,
+        max_retries: int = 3,
+        retry_delay: int = 2,
+        force: bool = False
+    ) -> Dict[str, Any]:
+        """
+        搜索小红书笔记,带自动重试机制和缓存
+
+        Args:
+            keyword: 搜索关键词
+            content_type: 内容类型,可选值:不限、视频、图文,默认为'不限'
+            sort_type: 排序方式,可选值:综合、最新、最多点赞、最多评论,默认为'综合'
+            publish_time: 发布时间筛选,可选值:不限、一天内、一周内、半年内,默认为'不限'
+            cursor: 翻页游标,第一页默认为空,下一页的游标在上一页的返回值中获取
+            timeout: 请求超时时间(秒),默认30秒
+            max_retries: 最大重试次数,默认3次
+            retry_delay: 重试延迟(秒),默认2秒,每次重试会指数增长
+            force: 强制重新请求API,忽略缓存,默认为 False
+
+        Returns:
+            原始数据(已解析 result 字段)
+
+        Raises:
+            requests.exceptions.RequestException: 所有重试失败后抛出异常
+        """
+        # 检查缓存(如果启用且未强制刷新)
+        if self.use_cache and not force:
+            cache_key = self._get_cache_key(keyword, content_type, sort_type, publish_time, cursor)
+            cached_files = self._get_latest_cache(keyword, cache_key, content_type, sort_type, publish_time)
+
+            if cached_files:
+                raw_filepath, clean_filepath = cached_files
+                cached_result = self._load_cached_result(raw_filepath)
+
+                if cached_result:
+                    print(f"✓ 使用缓存数据: {raw_filepath}")
+                    return cached_result
+
+        payload = {
+            "keyword": keyword,
+            "content_type": content_type,
+            "sort_type": sort_type,
+            "publish_time": publish_time,
+            "cursor": cursor
+        }
+
+        last_exception = None
+
+        for attempt in range(max_retries):
+            try:
+                if attempt > 0:
+                    # 指数退避策略:每次重试延迟时间翻倍
+                    wait_time = retry_delay * (2 ** (attempt - 1))
+                    print(f"等待 {wait_time} 秒后进行第 {attempt + 1} 次重试...")
+                    time.sleep(wait_time)
+
+                print(f"正在搜索关键词: {keyword} (尝试 {attempt + 1}/{max_retries})")
+
+                response = requests.post(
+                    self.api_url,
+                    json=payload,
+                    timeout=timeout,
+                    headers={"Content-Type": "application/json"}
+                )
+                response.raise_for_status()
+                raw_result = response.json()
+
+                # 如果 result 字段是字符串,需要解析成 JSON 对象
+                if 'result' in raw_result and isinstance(raw_result['result'], str):
+                    try:
+                        raw_result['result'] = json.loads(raw_result['result'])
+                    except json.JSONDecodeError:
+                        pass  # 如果解析失败,保持原样
+
+                # raw_result 就是 raw 数据(已解析 result,保留完整结构)
+                print(f"✓ 搜索成功!")
+                return raw_result
+
+            except requests.exceptions.Timeout as e:
+                last_exception = e
+                print(f"✗ 请求超时: {e}")
+
+            except requests.exceptions.ConnectionError as e:
+                last_exception = e
+                print(f"✗ 连接错误: {e}")
+
+            except requests.exceptions.HTTPError as e:
+                last_exception = e
+                status_code = e.response.status_code if e.response else "未知"
+                print(f"✗ HTTP错误 {status_code}: {e}")
+
+                # 如果是客户端错误(4xx),不重试
+                if e.response and 400 <= e.response.status_code < 500:
+                    print(f"客户端错误,停止重试")
+                    raise
+
+            except requests.exceptions.RequestException as e:
+                last_exception = e
+                print(f"✗ 请求失败: {e}")
+
+        # 所有重试都失败
+        print(f"✗ 已达到最大重试次数 ({max_retries}),请求失败")
+        raise last_exception
+
+    def _extract_clean_data(self, result: Dict[str, Any]) -> Dict[str, Any]:
+        """
+        提取并清理数据,生成扁平化的结构
+
+        Args:
+            result: 已处理的结果字典
+
+        Returns:
+            包含笔记列表和分页信息的字典
+        """
+        result_data = result.get("result", {})
+        if not isinstance(result_data, dict):
+            return {"has_more": False, "next_cursor": "", "notes": []}
+
+        data = result_data.get("data", {})
+        notes = data.get("data", [])
+        clean_notes = []
+
+        for note in notes:
+            note_card = note.get("note_card", {})
+            user = note_card.get("user", {})
+            interact_info = note_card.get("interact_info", {})
+
+            # 处理 image_list:从字典格式提取 URL
+            image_list_raw = note_card.get("image_list", [])
+            images = []
+            for img in image_list_raw:
+                if isinstance(img, dict) and "image_url" in img:
+                    images.append(img["image_url"])
+                elif isinstance(img, str):
+                    images.append(img)
+
+            clean_note = {
+                "channel_content_id": note.get("id", ""),
+                "link": f"https://www.xiaohongshu.com/explore/{note.get('id', '')}",
+                "comment_count": interact_info.get("comment_count", 0),
+                "images": images,
+                "like_count": interact_info.get("liked_count", 0),
+                "desc": note_card.get("desc", ""),  # 摘要(搜索接口返回)
+                "body_text": "",  # 完整正文需要调用详情接口获取
+                "title": note_card.get("display_title", ""),
+                "collect_count": interact_info.get("collected_count", 0),
+                "channel_account_id": user.get("user_id", ""),
+                "channel_account_name": user.get("nick_name", ""),
+                "content_type": note_card.get("type", "note"),
+                "video": "",  # 搜索结果中没有视频字段
+                "shared_count": interact_info.get("shared_count", 0)
+            }
+
+            clean_notes.append(clean_note)
+
+        # Return clean data with pagination info
+        return {
+            "has_more": data.get("has_more", False),
+            "next_cursor": data.get("next_cursor", ""),
+            "notes": clean_notes
+        }
+
+    def _get_filename_suffix(
+        self,
+        content_type: str,
+        sort_type: str,
+        publish_time: str
+    ) -> str:
+        """
+        根据搜索参数生成文件名后缀
+
+        Args:
+            content_type: 内容类型
+            sort_type: 排序方式
+            publish_time: 发布时间
+
+        Returns:
+            文件名后缀字符串
+        """
+        # 直接使用原始参数值,不做映射,全部显示
+        parts = [content_type, sort_type, publish_time]
+        return "_" + "_".join(parts)
+
+    def save_result(
+        self,
+        keyword: str,
+        raw_result: Dict[str, Any],
+        page: int = 1,
+        content_type: str = "不限",
+        sort_type: str = "综合",
+        publish_time: str = "不限",
+        cursor: str = ""
+    ) -> tuple[str, str]:
+        """
+        保存原始数据和清理后数据到不同的目录
+
+        目录结构:
+        data/search/xiaohongshu_search/
+        ├── {keyword}/
+        │   ├── raw/                      # 原始数据(完整 API 响应,含分页信息)
+        │   │   └── {timestamp}_page{页码}_{参数}.json
+        │   └── clean/                    # 清理后数据(扁平化笔记数组)
+        │       └── {timestamp}_page{页码}_{参数}.json
+
+        Args:
+            keyword: 搜索关键词
+            raw_result: 原始数据(已解析 result 字段)
+            page: 页码
+            content_type: 内容类型
+            sort_type: 排序方式
+            publish_time: 发布时间
+            cursor: 翻页游标
+
+        Returns:
+            (原始数据路径, 清理后数据路径) 的元组
+        """
+        # 清理关键词用于文件夹名称
+        safe_keyword = self._sanitize_keyword(keyword)
+
+        # 创建目录结构
+        base_dir = os.path.join(self.results_base_dir, "xiaohongshu_search", safe_keyword)
+        raw_dir = os.path.join(base_dir, "raw")
+        clean_dir = os.path.join(base_dir, "clean")
+
+        os.makedirs(raw_dir, exist_ok=True)
+        os.makedirs(clean_dir, exist_ok=True)
+
+        # 生成文件名(包含参数信息)
+        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
+        param_suffix = self._get_filename_suffix(content_type, sort_type, publish_time)
+        filename = f"{timestamp}_page{page}{param_suffix}.json"
+
+        raw_filepath = os.path.join(raw_dir, filename)
+        clean_filepath = os.path.join(clean_dir, filename)
+
+        # 添加搜索参数到 raw 数据
+        raw_data_with_meta = {
+            "search_params": {
+                "keyword": keyword,
+                "content_type": content_type,
+                "sort_type": sort_type,
+                "publish_time": publish_time,
+                "cursor": cursor,
+                "page": page,
+                "timestamp": timestamp
+            },
+            "api_response": raw_result
+        }
+
+        # 保存原始结果(包含元数据)
+        with open(raw_filepath, 'w', encoding='utf-8') as f:
+            json.dump(raw_data_with_meta, f, ensure_ascii=False, indent=2)
+
+        # 提取并保存清理后的数据
+        clean_data = self._extract_clean_data(raw_result)
+
+        # 添加搜索参数到 clean 数据
+        clean_data_with_meta = {
+            "search_params": {
+                "keyword": keyword,
+                "content_type": content_type,
+                "sort_type": sort_type,
+                "publish_time": publish_time,
+                "cursor": cursor,
+                "page": page,
+                "timestamp": timestamp
+            },
+            "has_more": clean_data["has_more"],
+            "next_cursor": clean_data["next_cursor"],
+            "notes": clean_data["notes"]
+        }
+
+        with open(clean_filepath, 'w', encoding='utf-8') as f:
+            json.dump(clean_data_with_meta, f, ensure_ascii=False, indent=2)
+
+        return raw_filepath, clean_filepath
+
+
+def main():
+    """示例使用"""
+    # 解析命令行参数
+    parser = argparse.ArgumentParser(description='小红书笔记搜索工具')
+    parser.add_argument(
+        '--results-dir',
+        type=str,
+        default='data/search',
+        help='结果输出目录 (默认: data/search)'
+    )
+    parser.add_argument(
+        '--keyword',
+        type=str,
+        required=True,
+        help='搜索关键词 (必填)'
+    )
+    parser.add_argument(
+        '--content-type',
+        type=str,
+        default='不限',
+        choices=['不限', '视频', '图文'],
+        help='内容类型 (默认: 不限)'
+    )
+    parser.add_argument(
+        '--sort-type',
+        type=str,
+        default='综合',
+        choices=['综合', '最新', '最多点赞', '最多评论'],
+        help='排序方式 (默认: 综合)'
+    )
+    parser.add_argument(
+        '--publish-time',
+        type=str,
+        default='不限',
+        choices=['不限', '一天内', '一周内', '半年内'],
+        help='发布时间筛选 (默认: 不限)'
+    )
+    parser.add_argument(
+        '--cursor',
+        type=str,
+        default='',
+        help='翻页游标 (默认为空,即第一页)'
+    )
+    parser.add_argument(
+        '--page',
+        type=int,
+        default=1,
+        help='页码标识,用于保存文件名 (默认: 1)'
+    )
+    parser.add_argument(
+        '--max-retries',
+        type=int,
+        default=3,
+        help='最大重试次数 (默认: 3)'
+    )
+    parser.add_argument(
+        '--retry-delay',
+        type=int,
+        default=2,
+        help='重试延迟秒数 (默认: 2)'
+    )
+    parser.add_argument(
+        '--timeout',
+        type=int,
+        default=30,
+        help='请求超时秒数 (默认: 30)'
+    )
+    parser.add_argument(
+        '--force',
+        action='store_true',
+        help='强制重新请求API,忽略缓存'
+    )
+    parser.add_argument(
+        '--no-cache',
+        action='store_true',
+        help='禁用缓存功能'
+    )
+    args = parser.parse_args()
+
+    # 创建API客户端实例
+    use_cache = not args.no_cache
+    client = XiaohongshuSearch(results_dir=args.results_dir, use_cache=use_cache)
+
+    # 执行搜索并保存
+    try:
+        raw_result = client.search(
+            args.keyword,
+            args.content_type,
+            args.sort_type,
+            args.publish_time,
+            args.cursor,
+            timeout=args.timeout,
+            max_retries=args.max_retries,
+            retry_delay=args.retry_delay,
+            force=args.force
+        )
+        raw_filepath, clean_filepath = client.save_result(
+            args.keyword,
+            raw_result,
+            args.page,
+            args.content_type,
+            args.sort_type,
+            args.publish_time,
+            args.cursor
+        )
+        print(f"Raw data saved to: {raw_filepath}")
+        print(f"Clean data saved to: {clean_filepath}")
+    except Exception as e:
+        print(f"Error: {e}", file=__import__('sys').stderr)
+
+
+def search_xiaohongshu(
+    keyword: str,
+    content_type: str = "不限",
+    sort_type: str = "综合",
+    publish_time: str = "不限",
+    page: int = 1,
+    force: bool = False
+) -> Dict[str, Any]:
+    """
+    小红书笔记搜索
+
+    Args:
+        keyword: 搜索关键词
+        content_type: 内容类型,可选:不限、视频、图文
+        sort_type: 排序方式,可选:综合、最新、最多点赞、最多评论
+        publish_time: 发布时间,可选:不限、一天内、一周内、半年内
+        page: 页码(自动翻页)
+        force: 强制刷新(忽略缓存)
+
+    Returns:
+        {
+            "search_params": {...},
+            "has_more": bool,
+            "next_cursor": str,
+            "notes": [...]
+        }
+
+    Examples:
+        >>> # 基本使用
+        >>> data = search_xiaohongshu("产品测试")
+        >>> for note in data['notes']:
+        ...     print(f"{note['title']} - {note['like_count']} 赞")
+
+        >>> # 带参数
+        >>> data = search_xiaohongshu(
+        ...     keyword="产品测试",
+        ...     content_type="视频",
+        ...     sort_type="最新"
+        ... )
+
+        >>> # 翻页(自动处理 cursor)
+        >>> page1 = search_xiaohongshu("产品测试", page=1)
+        >>> page2 = search_xiaohongshu("产品测试", page=2)
+        >>> page3 = search_xiaohongshu("产品测试", page=3)
+    """
+    # 创建客户端(使用默认配置)
+    client = XiaohongshuSearch(use_cache=True)
+
+    # 自动处理翻页游标
+    cursor = ""
+    if page > 1:
+        # 读取上一页的 cursor
+        prev_page_result = search_xiaohongshu(
+            keyword=keyword,
+            content_type=content_type,
+            sort_type=sort_type,
+            publish_time=publish_time,
+            page=page - 1,
+            force=False  # 上一页使用缓存
+        )
+        cursor = prev_page_result.get('next_cursor', '')
+
+    # 搜索(内部处理重试、超时等)
+    raw_result = client.search(
+        keyword=keyword,
+        content_type=content_type,
+        sort_type=sort_type,
+        publish_time=publish_time,
+        cursor=cursor,
+        force=force
+    )
+
+    # 自动保存
+    _, clean_filepath = client.save_result(
+        keyword=keyword,
+        raw_result=raw_result,
+        page=page,
+        content_type=content_type,
+        sort_type=sort_type,
+        publish_time=publish_time,
+        cursor=cursor
+    )
+
+    # 读取并返回数据
+    with open(clean_filepath, 'r', encoding='utf-8') as f:
+        return json.load(f)
+
+
+if __name__ == "__main__":
+    main()

+ 130 - 0
script/search_recommendations/bilibili_search_recommendations.py

@@ -0,0 +1,130 @@
+#!/usr/bin/env python3
+"""
+B站搜索推荐词接口
+获取B站平台搜索框中的推荐词
+"""
+
+import requests
+import json
+import os
+import argparse
+from datetime import datetime
+from typing import Dict, Any
+
+
+class BilibiliSearchRecommendations:
+    """B站搜索推荐词API封装类"""
+
+    BASE_URL = "http://47.84.182.56:8001"
+    TOOL_NAME = "Bilibili_Search_Recommendations"
+    PLATFORM = "bilibili"  # 平台名称
+
+    def __init__(self, results_dir: str = None):
+        """
+        初始化API客户端
+
+        Args:
+            results_dir: 结果输出目录,默认为项目根目录下的 data/search_recommendations 文件夹
+        """
+        self.api_url = f"{self.BASE_URL}/tools/call/{self.TOOL_NAME}"
+
+        # 设置结果输出目录
+        if results_dir:
+            self.results_base_dir = results_dir
+        else:
+            # 默认使用项目根目录的 data/search_recommendations 文件夹
+            script_dir = os.path.dirname(os.path.abspath(__file__))
+            project_root = os.path.dirname(os.path.dirname(script_dir))
+            self.results_base_dir = os.path.join(project_root, "data", "search_recommendations")
+
+    def get_recommendations(self, keyword: str, timeout: int = 30) -> Dict[str, Any]:
+        """
+        获取B站搜索推荐词
+
+        Args:
+            keyword: 搜索关键词
+            timeout: 请求超时时间(秒),默认30秒
+
+        Returns:
+            API响应的JSON数据
+
+        Raises:
+            requests.exceptions.RequestException: 请求失败时抛出异常
+        """
+        payload = {
+            "keyword": keyword
+        }
+
+        try:
+            response = requests.post(
+                self.api_url,
+                json=payload,
+                timeout=timeout,
+                headers={"Content-Type": "application/json"}
+            )
+            response.raise_for_status()
+            return response.json()
+        except requests.exceptions.RequestException as e:
+            print(f"请求失败: {e}")
+            raise
+
+    def save_result(self, keyword: str, result: Dict[str, Any]) -> str:
+        """
+        保存结果到文件
+        目录结构: results/平台/关键词/时间戳.json
+
+        Args:
+            keyword: 搜索关键词
+            result: API返回的结果
+
+        Returns:
+            保存的文件路径
+        """
+        # 创建目录结构: results/平台/关键词/
+        result_dir = os.path.join(self.results_base_dir, self.PLATFORM, keyword)
+        os.makedirs(result_dir, exist_ok=True)
+
+        # 文件名使用时间戳
+        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
+        filename = f"{timestamp}.json"
+        filepath = os.path.join(result_dir, filename)
+
+        # 保存结果
+        with open(filepath, 'w', encoding='utf-8') as f:
+            json.dump(result, f, ensure_ascii=False, indent=2)
+
+        return filepath
+
+
+def main():
+    """示例使用"""
+    # 解析命令行参数
+    parser = argparse.ArgumentParser(description='B站搜索推荐词接口')
+    parser.add_argument(
+        '--results-dir',
+        type=str,
+        default='data/search_recommendations',
+        help='结果输出目录 (默认: data/search_recommendations)'
+    )
+    parser.add_argument(
+        '--keyword',
+        type=str,
+        required=True,
+        help='搜索关键词 (必填)'
+    )
+    args = parser.parse_args()
+
+    # 创建API客户端实例
+    client = BilibiliSearchRecommendations(results_dir=args.results_dir)
+
+    # 获取推荐词并保存
+    try:
+        result = client.get_recommendations(args.keyword)
+        filepath = client.save_result(args.keyword, result)
+        print(f"Output: {filepath}")
+    except Exception as e:
+        print(f"Error: {e}", file=__import__('sys').stderr)
+
+
+if __name__ == "__main__":
+    main()

+ 130 - 0
script/search_recommendations/douyin_search_recommendations.py

@@ -0,0 +1,130 @@
+#!/usr/bin/env python3
+"""
+抖音搜索推荐词接口
+获取抖音平台搜索框中的推荐词
+"""
+
+import requests
+import json
+import os
+import argparse
+from datetime import datetime
+from typing import Dict, Any
+
+
+class DouyinSearchRecommendations:
+    """抖音搜索推荐词API封装类"""
+
+    BASE_URL = "http://47.84.182.56:8001"
+    TOOL_NAME = "Douyin_SearchRecommendations"
+    PLATFORM = "douyin"  # 平台名称
+
+    def __init__(self, results_dir: str = None):
+        """
+        初始化API客户端
+
+        Args:
+            results_dir: 结果输出目录,默认为项目根目录下的 data/search_recommendations 文件夹
+        """
+        self.api_url = f"{self.BASE_URL}/tools/call/{self.TOOL_NAME}"
+
+        # 设置结果输出目录
+        if results_dir:
+            self.results_base_dir = results_dir
+        else:
+            # 默认使用项目根目录的 data/search_recommendations 文件夹
+            script_dir = os.path.dirname(os.path.abspath(__file__))
+            project_root = os.path.dirname(os.path.dirname(script_dir))
+            self.results_base_dir = os.path.join(project_root, "data", "search_recommendations")
+
+    def get_recommendations(self, keyword: str, timeout: int = 30) -> Dict[str, Any]:
+        """
+        获取抖音搜索推荐词
+
+        Args:
+            keyword: 搜索关键词
+            timeout: 请求超时时间(秒),默认30秒
+
+        Returns:
+            API响应的JSON数据
+
+        Raises:
+            requests.exceptions.RequestException: 请求失败时抛出异常
+        """
+        payload = {
+            "keyword": keyword
+        }
+
+        try:
+            response = requests.post(
+                self.api_url,
+                json=payload,
+                timeout=timeout,
+                headers={"Content-Type": "application/json"}
+            )
+            response.raise_for_status()
+            return response.json()
+        except requests.exceptions.RequestException as e:
+            print(f"请求失败: {e}")
+            raise
+
+    def save_result(self, keyword: str, result: Dict[str, Any]) -> str:
+        """
+        保存结果到文件
+        目录结构: results/平台/关键词/时间戳.json
+
+        Args:
+            keyword: 搜索关键词
+            result: API返回的结果
+
+        Returns:
+            保存的文件路径
+        """
+        # 创建目录结构: results/平台/关键词/
+        result_dir = os.path.join(self.results_base_dir, self.PLATFORM, keyword)
+        os.makedirs(result_dir, exist_ok=True)
+
+        # 文件名使用时间戳
+        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
+        filename = f"{timestamp}.json"
+        filepath = os.path.join(result_dir, filename)
+
+        # 保存结果
+        with open(filepath, 'w', encoding='utf-8') as f:
+            json.dump(result, f, ensure_ascii=False, indent=2)
+
+        return filepath
+
+
+def main():
+    """示例使用"""
+    # 解析命令行参数
+    parser = argparse.ArgumentParser(description='抖音搜索推荐词接口')
+    parser.add_argument(
+        '--results-dir',
+        type=str,
+        default='data/search_recommendations',
+        help='结果输出目录 (默认: data/search_recommendations)'
+    )
+    parser.add_argument(
+        '--keyword',
+        type=str,
+        required=True,
+        help='搜索关键词 (必填)'
+    )
+    args = parser.parse_args()
+
+    # 创建API客户端实例
+    client = DouyinSearchRecommendations(results_dir=args.results_dir)
+
+    # 获取推荐词并保存
+    try:
+        result = client.get_recommendations(args.keyword)
+        filepath = client.save_result(args.keyword, result)
+        print(f"Output: {filepath}")
+    except Exception as e:
+        print(f"Error: {e}", file=__import__('sys').stderr)
+
+
+if __name__ == "__main__":
+    main()

+ 338 - 0
script/search_recommendations/xiaohongshu_search_recommendations.py

@@ -0,0 +1,338 @@
+#!/usr/bin/env python3
+"""
+小红书搜索推荐词接口
+获取小红书平台搜索框中的推荐词
+"""
+
+import requests
+import json
+import os
+import argparse
+import time
+import ast
+import hashlib
+from datetime import datetime
+from typing import Dict, Any, Optional
+
+
+class XiaohongshuSearchRecommendations:
+    """小红书搜索推荐词API封装类"""
+
+    BASE_URL = "http://47.84.182.56:8001"
+    TOOL_NAME = "Xiaohongshu_Search_Recommendations"
+    PLATFORM = "xiaohongshu"  # 平台名称
+
+    def __init__(self, results_dir: str = None, enable_cache: bool = True, cache_ttl: int = 86400):
+        """
+        初始化API客户端
+
+        Args:
+            results_dir: 结果输出目录,默认为脚本所在目录下的 results 文件夹
+            enable_cache: 是否启用缓存(从已保存的文件中读取),默认为 True
+            cache_ttl: 缓存有效期(秒),默认为 86400 秒(24小时)
+        """
+        self.api_url = f"{self.BASE_URL}/tools/call/{self.TOOL_NAME}"
+
+        # 设置结果输出目录
+        if results_dir:
+            self.results_base_dir = results_dir
+        else:
+            # 默认使用项目根目录的 data/search_recommendations 文件夹
+            script_dir = os.path.dirname(os.path.abspath(__file__))
+            project_root = os.path.dirname(os.path.dirname(script_dir))
+            self.results_base_dir = os.path.join(project_root, "data", "search_recommendations")
+
+        # 缓存设置
+        self.enable_cache = enable_cache
+        self.cache_ttl = cache_ttl
+        self._memory_cache = {}  # 内存缓存: {keyword: (data, timestamp)}
+
+    def _get_from_cache(self, keyword: str) -> Optional[Dict[str, Any]]:
+        """
+        从缓存中获取数据(先查内存缓存,再查文件缓存)
+
+        Args:
+            keyword: 搜索关键词
+
+        Returns:
+            缓存的数据,如果没有有效缓存则返回 None
+        """
+        if not self.enable_cache:
+            return None
+
+        current_time = time.time()
+
+        # 1. 检查内存缓存
+        if keyword in self._memory_cache:
+            data, timestamp = self._memory_cache[keyword]
+            if current_time - timestamp < self.cache_ttl:
+                # print(f"从内存缓存中获取关键词 '{keyword}' 的数据")
+                return data
+            else:
+                # 内存缓存已过期,删除
+                del self._memory_cache[keyword]
+
+        # 2. 检查文件缓存(从已保存的文件中读取最新的)
+        result_dir = os.path.join(self.results_base_dir, self.PLATFORM, keyword)
+        if os.path.exists(result_dir):
+            files = [f for f in os.listdir(result_dir) if f.endswith('.json')]
+            if files:
+                # 按文件名排序(时间戳),获取最新的文件
+                files.sort(reverse=True)
+                latest_file = os.path.join(result_dir, files[0])
+
+                # 检查文件修改时间
+                file_mtime = os.path.getmtime(latest_file)
+                if current_time - file_mtime < self.cache_ttl:
+                    try:
+                        with open(latest_file, 'r', encoding='utf-8') as f:
+                            data = json.load(f)
+                        # 更新内存缓存
+                        self._memory_cache[keyword] = (data, file_mtime)
+                        # print(f"从文件缓存中获取关键词 '{keyword}' 的数据: {latest_file}")
+                        return data
+                    except Exception as e:
+                        print(f"读取缓存文件失败: {e}")
+
+        return None
+
+    def get_recommendations(self, keyword: str, timeout: int = 300, max_retries: int = 10, retry_delay: int = 2, use_cache: bool = True) -> Dict[str, Any]:
+        """
+        获取小红书搜索推荐词
+
+        Args:
+            keyword: 搜索关键词,例如:'长沙'、'美妆'等
+            timeout: 请求超时时间(秒),默认300秒
+            max_retries: 最大重试次数,默认10次
+            retry_delay: 重试间隔时间(秒),默认2秒
+            use_cache: 是否使用缓存,默认为 True
+
+        Returns:
+            API响应的JSON数据
+
+        Raises:
+            requests.exceptions.RequestException: 请求失败时抛出异常
+        """
+        # 尝试从缓存获取
+        if use_cache:
+            cached_data = self._get_from_cache(keyword)
+            if cached_data is not None:
+                return cached_data
+
+        # 缓存未命中,发起API请求
+        # print(f"缓存未命中,发起API请求获取关键词 '{keyword}' 的数据")
+        payload = {"keyword": keyword}
+        last_error = None
+
+        for attempt in range(max_retries + 1):
+            try:
+                response = requests.post(
+                    self.api_url,
+                    json=payload,
+                    timeout=timeout,
+                    headers={"Content-Type": "application/json"}
+                )
+                response.raise_for_status()
+                res = response.json()
+                # 使用 ast.literal_eval 解析 Python 字典字符串(不是标准 JSON)
+                # print(res)
+                import json
+                result = json.loads(res['result'])
+                # result = ast.literal_eval(res['result'])
+
+                # 成功:code == 0
+                if result.get('code') == 0:
+                    data = result['data']['data']
+                    # 保存到内存缓存
+                    self._memory_cache[keyword] = (data, time.time())
+                    # 自动保存到文件缓存
+                    if self.enable_cache:
+                        self.save_result(keyword, data)
+                    return data
+
+                # 失败:code != 0
+                last_error = f"code={result.get('code')}"
+
+            except Exception as e:
+                from traceback import print_exc
+                print(f"发生异常: {e}")
+                print_exc()
+                last_error = str(e)
+
+            # 统一处理重试逻辑
+            if attempt < max_retries:
+                print(f"请求失败 ({last_error}), 第{attempt + 1}次重试,等待{retry_delay}秒...")
+                time.sleep(retry_delay)
+            else:
+                print(f"达到最大重试次数({max_retries}),最后错误: {last_error}")
+
+        return []
+
+    def clear_memory_cache(self, keyword: Optional[str] = None):
+        """
+        清除内存缓存
+
+        Args:
+            keyword: 要清除的关键词,如果为 None 则清除所有内存缓存
+        """
+        if keyword:
+            if keyword in self._memory_cache:
+                del self._memory_cache[keyword]
+                print(f"已清除关键词 '{keyword}' 的内存缓存")
+        else:
+            self._memory_cache.clear()
+            print("已清除所有内存缓存")
+
+    def clear_file_cache(self, keyword: Optional[str] = None, keep_latest: bool = True):
+        """
+        清除文件缓存
+
+        Args:
+            keyword: 要清除的关键词,如果为 None 则清除所有文件缓存
+            keep_latest: 是否保留最新的文件,默认为 True
+        """
+        if keyword:
+            result_dir = os.path.join(self.results_base_dir, self.PLATFORM, keyword)
+            if os.path.exists(result_dir):
+                files = [f for f in os.listdir(result_dir) if f.endswith('.json')]
+                if files:
+                    files.sort(reverse=True)
+                    # 保留最新的文件
+                    files_to_delete = files[1:] if keep_latest else files
+                    for f in files_to_delete:
+                        filepath = os.path.join(result_dir, f)
+                        os.remove(filepath)
+                        print(f"已删除缓存文件: {filepath}")
+        else:
+            platform_dir = os.path.join(self.results_base_dir, self.PLATFORM)
+            if os.path.exists(platform_dir):
+                for keyword_dir in os.listdir(platform_dir):
+                    keyword_path = os.path.join(platform_dir, keyword_dir)
+                    if os.path.isdir(keyword_path):
+                        files = [f for f in os.listdir(keyword_path) if f.endswith('.json')]
+                        if files:
+                            files.sort(reverse=True)
+                            files_to_delete = files[1:] if keep_latest else files
+                            for f in files_to_delete:
+                                filepath = os.path.join(keyword_path, f)
+                                os.remove(filepath)
+                                print(f"已删除缓存文件: {filepath}")
+
+    def get_cache_info(self, keyword: Optional[str] = None) -> Dict[str, Any]:
+        """
+        获取缓存信息
+
+        Args:
+            keyword: 要查询的关键词,如果为 None 则返回所有缓存信息
+
+        Returns:
+            缓存信息字典
+        """
+        info = {
+            "memory_cache": {},
+            "file_cache": {}
+        }
+
+        current_time = time.time()
+
+        # 内存缓存信息
+        if keyword:
+            if keyword in self._memory_cache:
+                data, timestamp = self._memory_cache[keyword]
+                info["memory_cache"][keyword] = {
+                    "count": len(data) if isinstance(data, list) else 1,
+                    "timestamp": datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d %H:%M:%S"),
+                    "age_seconds": int(current_time - timestamp),
+                    "is_expired": current_time - timestamp >= self.cache_ttl
+                }
+        else:
+            for kw, (data, timestamp) in self._memory_cache.items():
+                info["memory_cache"][kw] = {
+                    "count": len(data) if isinstance(data, list) else 1,
+                    "timestamp": datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d %H:%M:%S"),
+                    "age_seconds": int(current_time - timestamp),
+                    "is_expired": current_time - timestamp >= self.cache_ttl
+                }
+
+        # 文件缓存信息
+        platform_dir = os.path.join(self.results_base_dir, self.PLATFORM)
+        if os.path.exists(platform_dir):
+            keywords = [keyword] if keyword else os.listdir(platform_dir)
+            for kw in keywords:
+                keyword_path = os.path.join(platform_dir, kw)
+                if os.path.isdir(keyword_path):
+                    files = [f for f in os.listdir(keyword_path) if f.endswith('.json')]
+                    if files:
+                        files.sort(reverse=True)
+                        latest_file = os.path.join(keyword_path, files[0])
+                        file_mtime = os.path.getmtime(latest_file)
+                        info["file_cache"][kw] = {
+                            "file_count": len(files),
+                            "latest_file": files[0],
+                            "timestamp": datetime.fromtimestamp(file_mtime).strftime("%Y-%m-%d %H:%M:%S"),
+                            "age_seconds": int(current_time - file_mtime),
+                            "is_expired": current_time - file_mtime >= self.cache_ttl
+                        }
+
+        return info
+
+    def save_result(self, keyword: str, result: Dict[str, Any]) -> str:
+        """
+        保存结果到文件
+        目录结构: results/平台/关键词/时间戳.json
+
+        Args:
+            keyword: 搜索关键词
+            result: API返回的结果
+
+        Returns:
+            保存的文件路径
+        """
+        # 创建目录结构: results/平台/关键词/
+        result_dir = os.path.join(self.results_base_dir, self.PLATFORM, keyword)
+        os.makedirs(result_dir, exist_ok=True)
+
+        # 文件名使用时间戳
+        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
+        filename = f"{timestamp}.json"
+        filepath = os.path.join(result_dir, filename)
+
+        # 保存结果
+        with open(filepath, 'w', encoding='utf-8') as f:
+            json.dump(result, f, ensure_ascii=False, indent=2)
+
+        return filepath
+
+
+def main():
+    """示例使用"""
+    # 解析命令行参数
+    parser = argparse.ArgumentParser(description='小红书搜索推荐词接口')
+    parser.add_argument(
+        '--results-dir',
+        type=str,
+        default='data/search_recommendations',
+        help='结果输出目录 (默认: data/search_recommendations)'
+    )
+    parser.add_argument(
+        '--keyword',
+        type=str,
+        required=True,
+        help='搜索关键词 (必填)'
+    )
+    args = parser.parse_args()
+
+    # 创建API客户端实例
+    client = XiaohongshuSearchRecommendations(results_dir=args.results_dir)
+
+    # 获取推荐词并保存
+    try:
+        result = client.get_recommendations(args.keyword)
+        filepath = client.save_result(args.keyword, result)
+        print(f"Output: {filepath}")
+    except Exception as e:
+        print(f"Error: {e}", file=__import__('sys').stderr)
+
+
+if __name__ == "__main__":
+    main()

+ 130 - 0
script/search_tagwords/douyin_search_tagword.py

@@ -0,0 +1,130 @@
+#!/usr/bin/env python3
+"""
+抖音搜索标签词接口
+获取抖音平台搜索结果中的标签词
+"""
+
+import requests
+import json
+import os
+import argparse
+from datetime import datetime
+from typing import Dict, Any
+
+
+class DouyinSearchTagWord:
+    """抖音搜索标签词API封装类"""
+
+    BASE_URL = "http://47.84.182.56:8001"
+    TOOL_NAME = "Douyin_Search_TagWord"
+    PLATFORM = "douyin"  # 平台名称
+
+    def __init__(self, results_dir: str = None):
+        """
+        初始化API客户端
+
+        Args:
+            results_dir: 结果输出目录,默认为脚本所在目录下的 results 文件夹
+        """
+        self.api_url = f"{self.BASE_URL}/tools/call/{self.TOOL_NAME}"
+
+        # 设置结果输出目录
+        if results_dir:
+            self.results_base_dir = results_dir
+        else:
+            # 默认使用项目根目录的 data/search_tagwords 文件夹
+            script_dir = os.path.dirname(os.path.abspath(__file__))
+            project_root = os.path.dirname(os.path.dirname(script_dir))
+            self.results_base_dir = os.path.join(project_root, "data", "search_tagwords")
+
+    def get_tagwords(self, keyword: str, timeout: int = 30) -> Dict[str, Any]:
+        """
+        获取抖音搜索标签词
+
+        Args:
+            keyword: 搜索关键词
+            timeout: 请求超时时间(秒),默认30秒
+
+        Returns:
+            API响应的JSON数据
+
+        Raises:
+            requests.exceptions.RequestException: 请求失败时抛出异常
+        """
+        payload = {
+            "keyword": keyword
+        }
+
+        try:
+            response = requests.post(
+                self.api_url,
+                json=payload,
+                timeout=timeout,
+                headers={"Content-Type": "application/json"}
+            )
+            response.raise_for_status()
+            return response.json()
+        except requests.exceptions.RequestException as e:
+            print(f"请求失败: {e}")
+            raise
+
+    def save_result(self, keyword: str, result: Dict[str, Any]) -> str:
+        """
+        保存结果到文件
+        目录结构: results/平台/关键词/tagword_时间戳.json
+
+        Args:
+            keyword: 搜索关键词
+            result: API返回的结果
+
+        Returns:
+            保存的文件路径
+        """
+        # 创建目录结构: results/平台/关键词/
+        result_dir = os.path.join(self.results_base_dir, self.PLATFORM, keyword)
+        os.makedirs(result_dir, exist_ok=True)
+
+        # 文件名使用时间戳,添加tagword标识
+        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
+        filename = f"tagword_{timestamp}.json"
+        filepath = os.path.join(result_dir, filename)
+
+        # 保存结果
+        with open(filepath, 'w', encoding='utf-8') as f:
+            json.dump(result, f, ensure_ascii=False, indent=2)
+
+        return filepath
+
+
+def main():
+    """示例使用"""
+    # 解析命令行参数
+    parser = argparse.ArgumentParser(description='抖音搜索标签词接口')
+    parser.add_argument(
+        '--results-dir',
+        type=str,
+        default='data/search_tagwords',
+        help='结果输出目录 (默认: data/search_tagwords)'
+    )
+    parser.add_argument(
+        '--keyword',
+        type=str,
+        required=True,
+        help='搜索关键词 (必填)'
+    )
+    args = parser.parse_args()
+
+    # 创建API客户端实例
+    client = DouyinSearchTagWord(results_dir=args.results_dir)
+
+    # 获取标签词并保存
+    try:
+        result = client.get_tagwords(args.keyword)
+        filepath = client.save_result(args.keyword, result)
+        print(f"Output: {filepath}")
+    except Exception as e:
+        print(f"Error: {e}", file=__import__('sys').stderr)
+
+
+if __name__ == "__main__":
+    main()

+ 141 - 0
script/search_tagwords/xiaohongshu_search_hashtag.py

@@ -0,0 +1,141 @@
+#!/usr/bin/env python3
+"""
+小红书搜索标签词接口
+获取小红书平台搜索结果中的标签词
+注意:此接口的call_type为browser_auto_operate,可能需要更长的响应时间
+"""
+
+import requests
+import json
+import os
+import argparse
+from datetime import datetime
+from typing import Dict, Any
+
+
+class XiaohongshuSearchHashtag:
+    """小红书搜索标签词API封装类"""
+
+    BASE_URL = "http://47.84.182.56:8001"
+    TOOL_NAME = "xiaohongshu_search_hashtag"
+    PLATFORM = "xiaohongshu"  # 平台名称
+
+    def __init__(self, results_dir: str = None):
+        """
+        初始化API客户端
+
+        Args:
+            results_dir: 结果输出目录,默认为脚本所在目录下的 results 文件夹
+        """
+        self.api_url = f"{self.BASE_URL}/tools/call/{self.TOOL_NAME}"
+
+        # 设置结果输出目录
+        if results_dir:
+            self.results_base_dir = results_dir
+        else:
+            # 默认使用项目根目录的 data/search_tagwords 文件夹
+            script_dir = os.path.dirname(os.path.abspath(__file__))
+            project_root = os.path.dirname(os.path.dirname(script_dir))
+            self.results_base_dir = os.path.join(project_root, "data", "search_tagwords")
+
+    def get_hashtags(self, prompt: str, timeout: int = 60) -> Dict[str, Any]:
+        """
+        获取小红书搜索标签词
+
+        注意:此接口使用browser_auto_operate方式调用,需要提供完整的prompt描述
+
+        Args:
+            prompt: 提示词prompt,包含完整的输入信息
+                    例如:'搜索关键词"护肤",获取相关标签词'
+            timeout: 请求超时时间(秒),默认60秒(浏览器自动化可能需要更长时间)
+
+        Returns:
+            API响应的JSON数据
+
+        Raises:
+            requests.exceptions.RequestException: 请求失败时抛出异常
+        """
+        payload = {
+            "prompt": prompt
+        }
+
+        try:
+            response = requests.post(
+                self.api_url,
+                json=payload,
+                timeout=timeout,
+                headers={"Content-Type": "application/json"}
+            )
+            response.raise_for_status()
+            return response.json()
+        except requests.exceptions.RequestException as e:
+            print(f"请求失败: {e}")
+            raise
+
+    def save_result(self, prompt: str, result: Dict[str, Any]) -> str:
+        """
+        保存结果到文件
+        目录结构: results/平台/关键词/hashtag_时间戳.json
+
+        Args:
+            prompt: 提示词(用于提取关键词)
+            result: API返回的结果
+
+        Returns:
+            保存的文件路径
+        """
+        # 从prompt中提取关键词
+        keyword = prompt.replace('"', '').replace("'", '').replace('搜索关键词', '').replace(',获取搜索结果中的标签词', '').strip()
+        if not keyword:
+            keyword = "unknown"
+
+        # 创建目录结构: results/平台/关键词/
+        result_dir = os.path.join(self.results_base_dir, self.PLATFORM, keyword)
+        os.makedirs(result_dir, exist_ok=True)
+
+        # 文件名使用时间戳,添加hashtag标识
+        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
+        filename = f"hashtag_{timestamp}.json"
+        filepath = os.path.join(result_dir, filename)
+
+        # 保存结果
+        with open(filepath, 'w', encoding='utf-8') as f:
+            json.dump(result, f, ensure_ascii=False, indent=2)
+
+        return filepath
+
+
+def main():
+    """示例使用"""
+    # 解析命令行参数
+    parser = argparse.ArgumentParser(description='小红书搜索标签词接口')
+    parser.add_argument(
+        '--results-dir',
+        type=str,
+        default='data/search_tagwords',
+        help='结果输出目录 (默认: data/search_tagwords)'
+    )
+    parser.add_argument(
+        '--keyword',
+        type=str,
+        required=True,
+        help='搜索关键词 (必填)'
+    )
+    args = parser.parse_args()
+
+    # 创建API客户端实例
+    client = XiaohongshuSearchHashtag(results_dir=args.results_dir)
+
+    # 获取标签词并保存
+    try:
+        # 注意:此接口需要提供完整的prompt描述
+        prompt = f'搜索关键词"{args.keyword}",获取搜索结果中的标签词'
+        result = client.get_hashtags(prompt)
+        filepath = client.save_result(prompt, result)
+        print(f"Output: {filepath}")
+    except Exception as e:
+        print(f"Error: {e}", file=__import__('sys').stderr)
+
+
+if __name__ == "__main__":
+    main()