소스 검색

refactor: content search tools

Talegorithm 1 개월 전
부모
커밋
0003ac96f1

+ 6 - 7
agent/core/runner.py

@@ -161,14 +161,13 @@ BUILTIN_TOOLS = [
     "evaluate",
     "get_current_context",
 
-    # 搜索工具
-    "search_posts",
-    "select_post",
-    "get_search_suggestions",
-    "x_search",
-    "youtube_search",
-    "youtube_detail",
+    # 内容工具族
+    "content_platforms",
+    "content_search",
+    "content_detail",
+    "content_suggest",
     "import_content",
+    "extract_video_clip",
 
     # 知识管理工具
     "ask_knowledge",

+ 8 - 2
agent/docs/tools.md

@@ -819,6 +819,12 @@ print(f"Success rate: {stats['success_rate']:.1%}")
 | `toolhub_health` | 检查 ToolHub 远程工具库服务状态 | 自研 |
 | `toolhub_search` | 搜索/发现 ToolHub 远程工具 | 自研 |
 | `toolhub_call` | 调用 ToolHub 远程工具(图片参数支持本地文件路径) | 自研 |
+| `content_platforms` | 列出/查询内容平台及其搜索参数(支持模糊匹配) | 自研 |
+| `content_search` | 跨平台内容搜索(11 个平台统一入口) | 自研 |
+| `content_detail` | 查看内容详情(从搜索缓存按索引取) | 自研 |
+| `content_suggest` | 搜索关键词补全建议 | 自研 |
+| `extract_video_clip` | 截取已下载 YouTube 视频的片段 | 自研 |
+| `import_content` | 批量导入文章到 CMS | 自研 |
 | `ask_knowledge` | 向知识库查询信息(通过 KnowHub Librarian) | 自研 |
 | `upload_knowledge` | 上传调研结果到知识库 | 自研 |
 
@@ -846,9 +852,9 @@ print(f"Success rate: {stats['success_rate']:.1%}")
 | 10-12 张 | 4 列 | 320px |
 | 13-16 张 | 4 列 | 300px |
 
-**关于标签/标题:** `read_images` 的拼图**不显示文件名**,只显示索引序号——因为本地文件名(如 `IMG_1234.jpg`)对 LLM 理解内容没有帮助,而索引到原始路径的对照表通过返回文本提供,LLM 可以用"第 3 张"这种引用方式精确指代。对比之下 `search_posts` / `youtube_search` 的拼图**会**显示 label(帖子/视频标题),因为这些是内容型元数据,有实际信息量。这一差异反映在 `build_image_grid(labels=...)` 参数上:传 `None` 只画序号,传列表则在每格下方画标题。
+**关于标签/标题:** `read_images` 的拼图**不显示文件名**,只显示索引序号——因为本地文件名(如 `IMG_1234.jpg`)对 LLM 理解内容没有帮助,而索引到原始路径的对照表通过返回文本提供,LLM 可以用"第 3 张"这种引用方式精确指代。对比之下 `content_search` 的拼图**会**显示 label(帖子/视频标题),因为这些是内容型元数据,有实际信息量。这一差异反映在 `build_image_grid(labels=...)` 参数上:传 `None` 只画序号,传列表则在每格下方画标题。
 
-网格和降采样的实现在 `agent/tools/utils/image.py`,`search_posts` 和 `youtube_search` 等工具也复用同一套拼图逻辑。
+网格和降采样的实现在 `agent/tools/utils/image.py`,`content_search` 等内容工具也复用同一套拼图逻辑。
 
 ### Agent 工具
 

+ 0 - 115
agent/skill/skills/core.md

@@ -1,115 +0,0 @@
----
-name: core
-type: core
-description: 核心系统能力,自动加载到 System Prompt
----
-
-## 计划与执行
-
-使用 `goal` 工具管理执行计划。目标树是你的工作记忆——系统会定期将当前计划注入给你,帮助你追踪进度和关键结论。
-
-### 核心原则
-
-- **先明确目标再行动**:开始执行前,用 `goal` 明确当前要做什么
-- **灵活运用,不受约束**:
-  - 可以先做全局规划再行动:`goal(add="调研方案, 实现方案, 测试验证")`
-  - 可以走一步看一步,每次只规划下一个目标
-  - 行动中可以动态放弃并调整:`goal(abandon="方案不可行")`
-  - 规划本身可以作为一个目标(如 "调研并确定技术方案")
-- **简单任务只需一个目标**:`goal(add="将CSV转换为JSON")` 即可,不需要强制拆分
-
-### 使用方式
-
-创建目标:
-
-```
-goal(add="调研并确定方案, 执行方案, 评估结果")
-```
-
-聚焦并开始执行(使用计划视图中的 ID,如 "1", "2.1"):
-
-```
-goal(focus="1")
-```
-
-完成目标,记录**关键结论**(不是过程描述):
-
-```
-goal(done="最佳方案是openpose,精度高且支持多人检测")
-```
-
-完成并切换到下一个:
-
-```
-goal(done="openpose方案确认可行", focus="2")
-```
-
-添加子目标或同级目标:
-
-```
-goal(add="设计接口, 实现代码", under="2")
-goal(add="编写文档", after="2")
-```
-
-放弃不可行的目标:
-
-```
-goal(abandon="方案A需要Redis,环境没有")
-```
-
-### 使用规范
-
-1. **聚焦到具体目标**:始终将焦点放在你正在执行的最具体的子目标上,而不是父目标。创建子目标后立即 `focus` 到第一个要执行的子目标。完成后用 `done` + `focus` 切换到下一个。
-2. **同时只有一个目标处于执行中**:完成当前目标后再切换
-3. **summary 记录结论**:记录关键发现,而非 "已完成调研" 这样无信息量的描述
-4. **计划可调整**:根据执行情况随时追加、跳过或放弃目标
-5. **使用 ID 定位**:focus、after、under 参数使用目标的 ID(如 "1", "2.1")
-
-### 知识复用
-
-在**启动新任务**、**拆分复杂目标**或**遇到执行障碍**时,应主动调用 `knowledge_search` 获取相关的历史经验或避坑指南。
-**使用示例:**
-`knowledge_search(query="如何处理浏览器点击不生效的问题", types=["strategy", "tool"])`
-
-## 信息调研
-
-你可以通过联网搜索工具`search_posts`大概浏览来自Github、小红书、微信公众号、知乎等渠道的信息,并再使用`select_post`工具查看具体信息。
-对于需要深度交互的网页内容,使用浏览器工具进行操作。
-
-调研过程可能需要多次搜索,比如基于搜索结果中获得的启发或信息启动新的搜索,直到得到令人满意的答案。你可以使用`goal`工具管理搜索的过程,或者使用文档记录搜索的中间或最终结果。
-
-## 浏览器工具使用指南
-
-所有浏览器工具都以 `browser_` 为前缀。浏览器会话会持久化,无需每次重新启动。
-
-### 基本工作流程
-
-1. **页面导航**: 使用 `browser_navigate_to_url` 或 `browser_search_web` 到达目标页面
-2. **等待加载**: 页面跳转后调用 `browser_wait(seconds=2)` 等待内容加载
-3. **获取元素索引**: 调用 `browser_get_visual_selector_map` 获取可交互元素的索引映射和当前界面的截图
-4. **执行交互**: 使用 `browser_click_element`、`browser_input_text` 等工具操作页面
-5. **提取内容**: 使用 `browser_extract_content`, `browser_read_long_content`, `browser_get_page_html` 获取数据
-
-### 关键原则
-
-- **禁止模拟结果**:不要输出你认为的搜索结果,而是要调用工具获取真实结果
-- **必须先获取索引**: 所有 `index` 参数都需要先通过 `browser_get_selector_map` 获取
-- **高级工具**:优先使用`browser_extract_content`, `browser_read_long_content`等工具获取数据,而不是使用`browser_get_selector_map`获取索引后手动解析
-- **操作后等待**: 任何可能触发页面变化的操作(点击、输入、滚动)后都要调用 `browser_wait`
-- **登录处理**:
-  - **正常登录**:当遇到需要登录的网页时,使用`browser_load_cookies`来登录
-  - **首次登录**:当没有该网站的cookie时,需要请求人类协助登录:
-    1. 调用 `browser_get_live_url` 获取云浏览器实时画面链接
-    2. 导航到目标网站的登录页面
-    3. 通过 `feishu_send_message_to_contact` 将 live URL 发送给相关人员,请求其在浏览器中完成登录
-    4. 使用 `feishu_get_contact_replies(contact_name="...", wait_time_seconds=300)` 等待对方回复确认登录完成
-    5. 收到回复后使用 `browser_export_cookies` 将登录态保存下来
-- **复杂操作用JS**: 当标准工具无法满足时,使用 `browser_evaluate` 执行JavaScript代码
-
-### 工具分类
-
-**导航**: browser_get_live_url, browser_navigate_to_url, browser_search_web, browser_go_back, browser_wait
-**交互**: browser_click_element, browser_input_text, browser_send_keys, browser_upload_file
-**视图**: browser_scroll_page, browser_find_text, browser_screenshot
-**提取**: browser_extract_content, browser_read_long_content, browser_get_page_html, browser_get_selector_map, browser_get_visual_selector_map
-**高级**: browser_evaluate, browser_load_cookies, browser_export_cookies, browser_wait_for_user_action, browser_download_direct_url

+ 5 - 5
agent/skill/skills/research.md

@@ -5,7 +5,7 @@ description: 知识调研 - 根据目标和任务自动执行搜索,返回结
 
 ## 信息调研
 
-你可以通过联网搜索工具`search_posts`大概浏览来自Github、小红书、微信公众号、知乎等渠道的信息,并再使用`select_post`工具查看具体信息
+你可以通过 `content_search` 工具搜索来自 GitHub、小红书、微信公众号、知乎、YouTube、X 等平台的信息,并使用 `content_detail` 查看具体内容。如不确定平台参数,先调 `content_platforms` 查看
 
 ## 调研过程可能需要多次搜索,比如基于搜索结果中获得的启发或信息启动新的搜索,直到得到令人满意的答案。你可以使用 `goal` 工具管理搜索的过程,或者使用文档记录搜索的中间或最终结果。(可以着重参考browser的工具来辅助搜索)
 
@@ -26,10 +26,10 @@ goal(add="搜索工具, 搜索案例, 搜索方法论")
 
 **Step 2: 多维度搜索**
 
-- 搜索工具:`search_posts(query="PDF table extraction tool")`
-- 搜索案例:`search_posts(query="PDF table extraction usercase site:github.com")`
-- 搜索定义:`search_posts(query="PDF table extraction definition")`
-- 搜索方法:`search_posts(query="PDF table extraction best practice")`
+- 搜索工具:`content_search(platform="github", keyword="PDF table extraction tool")`
+- 搜索案例:`content_search(platform="xhs", keyword="PDF表格提取")`
+- 搜索定义:`content_search(platform="zhihu", keyword="PDF table extraction")`
+- 搜索方法:`content_search(platform="gzh", keyword="PDF table extraction best practice")`
 
 **Step 3: 结构化记录**
 每发现一条有价值的信息,立即保存为结构化知识:

+ 12 - 11
agent/tools/builtin/__init__.py

@@ -16,14 +16,16 @@ from agent.tools.builtin.file.grep import grep_content
 from agent.tools.builtin.bash import bash_command
 from agent.tools.builtin.skill import skill, list_skills
 from agent.tools.builtin.subagent import agent, evaluate
-from agent.tools.builtin.search import search_posts, get_search_suggestions
-# sandbox 工具已废弃(2026-04):原本是给"运行工具"场景准备的,但工具已被抽到 toolhub 单独处理
+# sandbox 工具已废弃(2026-04);search.py / crawler.py 已重构为 content/ 工具族(2026-04)
 from agent.tools.builtin.knowledge import(knowledge_search,knowledge_save,knowledge_list,knowledge_update,knowledge_batch_update,knowledge_slim)
 from agent.tools.builtin.librarian import ask_knowledge, upload_knowledge
 from agent.tools.builtin.context import get_current_context
 from agent.tools.builtin.toolhub import toolhub_health, toolhub_search, toolhub_call
 from agent.tools.builtin.resource import resource_list_tools, resource_get_tool
-from agent.tools.builtin.crawler import youtube_search, youtube_detail, x_search, import_content, extract_video_clip
+from agent.tools.builtin.content import (
+    content_platforms, content_search, content_detail, content_suggest,
+    extract_video_clip, import_content,
+)
 from agent.trace.goal_tool import goal
 # 导入浏览器工具以触发注册
 import agent.tools.builtin.browser  # noqa: F401
@@ -55,9 +57,11 @@ __all__ = [
     "list_skills",
     "agent",
     "evaluate",
-    "search_posts",
-    "select_post",
-    "get_search_suggestions",
+    # 内容工具族(重构自 search.py + crawler.py)
+    "content_platforms",
+    "content_search",
+    "content_detail",
+    "content_suggest",
     # 上下文工具
     "get_current_context",
     # ToolHub 远程工具库
@@ -68,12 +72,9 @@ __all__ = [
     # 资源查询
     "resource_list_tools",
     "resource_get_tool",
-    # 爬虫工具
-    "youtube_search",
-    "youtube_detail",
-    "x_search",
-    "import_content",
+    # 媒体 / 导入
     "extract_video_clip",
+    "import_content",
     # Goal 管理
     "goal",
 ]

+ 29 - 0
agent/tools/builtin/content/__init__.py

@@ -0,0 +1,29 @@
+"""
+内容工具族 —— 统一的跨平台内容搜索/详情/建议词 + 媒体处理 + 内容导入
+
+@tool 入口:
+  content_platforms  - 查看平台及参数
+  content_search     - 跨平台搜索
+  content_detail     - 查看详情
+  content_suggest    - 搜索建议词
+  extract_video_clip - YouTube 视频片段截取
+  import_content     - 内容批量导入 CMS
+"""
+
+from agent.tools.builtin.content.tools import (
+    content_platforms,
+    content_search,
+    content_detail,
+    content_suggest,
+)
+from agent.tools.builtin.content.media import extract_video_clip
+from agent.tools.builtin.content.ingestion import import_content
+
+__all__ = [
+    "content_platforms",
+    "content_search",
+    "content_detail",
+    "content_suggest",
+    "extract_video_clip",
+    "import_content",
+]

+ 86 - 0
agent/tools/builtin/content/cache.py

@@ -0,0 +1,86 @@
+"""
+内容搜索缓存(磁盘持久化)
+
+搜索结果按 trace_id 隔离,同一 Agent session 内的 CLI 多次调用也能复用。
+文件格式:/tmp/content_cache_{trace_id}.json
+"""
+
+import json
+import os
+import time
+from pathlib import Path
+from typing import Any, Dict, List, Optional
+
+_CACHE_DIR = Path("/tmp")
+_CACHE_TTL = 3600  # 1 小时过期
+
+
+def _cache_path(trace_id: str) -> Path:
+    safe_id = trace_id.replace("/", "_").replace("..", "_")
+    return _CACHE_DIR / f"content_cache_{safe_id}.json"
+
+
+def _load_raw(trace_id: str) -> dict:
+    p = _cache_path(trace_id)
+    if not p.exists():
+        return {}
+    try:
+        data = json.loads(p.read_text("utf-8"))
+        # 检查过期
+        if time.time() - data.get("_ts", 0) > _CACHE_TTL:
+            p.unlink(missing_ok=True)
+            return {}
+        return data
+    except Exception:
+        return {}
+
+
+def _save_raw(trace_id: str, data: dict) -> None:
+    data["_ts"] = time.time()
+    try:
+        _cache_path(trace_id).write_text(
+            json.dumps(data, ensure_ascii=False), encoding="utf-8"
+        )
+    except Exception:
+        pass
+
+
+def save_search_results(
+    trace_id: str,
+    platform: str,
+    keyword: str,
+    posts: List[Dict[str, Any]],
+) -> None:
+    """保存搜索结果到磁盘缓存"""
+    data = _load_raw(trace_id)
+    # 每个 platform 只保留最近一次搜索
+    data[f"search:{platform}"] = {
+        "keyword": keyword,
+        "posts": posts,
+    }
+    _save_raw(trace_id, data)
+
+
+def get_cached_post(
+    trace_id: str,
+    platform: str,
+    index: int,
+) -> Optional[Dict[str, Any]]:
+    """按索引从缓存取一条完整记录(1-based)"""
+    data = _load_raw(trace_id)
+    entry = data.get(f"search:{platform}")
+    if not entry:
+        return None
+    posts = entry.get("posts", [])
+    if 1 <= index <= len(posts):
+        return posts[index - 1]
+    return None
+
+
+def get_cached_search_info(trace_id: str, platform: str) -> Optional[Dict[str, Any]]:
+    """获取缓存的搜索信息(keyword + 总条数),用于错误提示"""
+    data = _load_raw(trace_id)
+    entry = data.get(f"search:{platform}")
+    if not entry:
+        return None
+    return {"keyword": entry.get("keyword"), "total": len(entry.get("posts", []))}

+ 46 - 0
agent/tools/builtin/content/ingestion.py

@@ -0,0 +1,46 @@
+"""
+内容导入工具
+
+将文章链接批量导入 AIGC CMS 系统。
+"""
+
+import json
+from typing import Any, Dict, List
+
+import httpx
+
+from agent.tools import tool, ToolResult
+
+AIGC_BASE_URL = "http://aigc-channel.aiddit.com/aigc/channel"
+DEFAULT_TIMEOUT = 60.0
+
+
+@tool()
+async def import_content(plan_name: str, content_data: List[Dict[str, Any]]) -> ToolResult:
+    """
+    导入长文内容到 CMS(微信公众号、小红书、抖音等通用链接)。
+
+    Args:
+        plan_name: 计划名称
+        content_data: 内容数据列表,每项包含 channel、content_link、title 等字段
+    """
+    try:
+        async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as client:
+            response = await client.post(
+                f"{AIGC_BASE_URL}/weixin/auto_insert",
+                json={"plan_name": plan_name, "data": content_data},
+            )
+            response.raise_for_status()
+            data = response.json()
+
+        if data.get("code") == 0:
+            result_data = data.get("data", {})
+            return ToolResult(
+                title=f"内容导入: {plan_name}",
+                output=json.dumps(result_data, ensure_ascii=False, indent=2),
+                long_term_memory=f"Imported {len(content_data)} items to plan '{plan_name}'",
+            )
+        return ToolResult(title="导入失败", output="", error=f"导入失败: {data.get('msg', '未知错误')}")
+
+    except Exception as e:
+        return ToolResult(title="内容导入异常", output="", error=str(e))

+ 114 - 0
agent/tools/builtin/content/media.py

@@ -0,0 +1,114 @@
+"""
+媒体处理工具
+
+- extract_video_clip: 从已下载的 YouTube 视频中截取片段
+- download_youtube_video / parse_srt_to_outline: 供 YouTube 详情调用的辅助函数
+"""
+
+import asyncio
+import json
+import subprocess
+import tempfile
+from pathlib import Path
+from typing import Dict, List, Optional
+
+from agent.tools import tool, ToolResult
+
+VIDEO_DOWNLOAD_DIR = Path(tempfile.gettempdir()) / "youtube_videos"
+VIDEO_DOWNLOAD_DIR.mkdir(exist_ok=True)
+
+
+# ── 辅助函数(供 platforms/youtube.py 调用) ──
+
+def download_youtube_video(video_id: str) -> Optional[str]:
+    """使用 yt-dlp 下载 YouTube 视频,返回文件路径"""
+    try:
+        output_path = VIDEO_DOWNLOAD_DIR / f"{video_id}.mp4"
+        if output_path.exists():
+            return str(output_path)
+
+        cmd = [
+            "yt-dlp",
+            "-f", "best[ext=mp4]",
+            "-o", str(output_path),
+            f"https://www.youtube.com/watch?v={video_id}",
+        ]
+        result = subprocess.run(cmd, capture_output=True, text=True, timeout=300)
+        if result.returncode == 0 and output_path.exists():
+            return str(output_path)
+        return None
+    except Exception:
+        return None
+
+
+def parse_srt_to_outline(srt_content: str) -> List[Dict[str, str]]:
+    """解析 SRT 字幕,生成带时间戳的大纲"""
+    if not srt_content:
+        return []
+    outline = []
+    blocks = srt_content.strip().split("\n\n")
+    for block in blocks:
+        lines = block.strip().split("\n")
+        if len(lines) >= 3:
+            timestamp_line = lines[1]
+            if "-->" in timestamp_line:
+                start_time = timestamp_line.split("-->")[0].strip()
+                text = " ".join(lines[2:])
+                outline.append({"timestamp": start_time, "text": text})
+    return outline
+
+
+# ── @tool ──
+
+@tool()
+async def extract_video_clip(
+    video_id: str,
+    start_time: str,
+    end_time: str,
+    output_name: Optional[str] = None,
+) -> ToolResult:
+    """
+    从已下载的 YouTube 视频中截取指定时间段的片段。
+
+    必须先通过 content_detail(platform="youtube", index=..., extras={"download_video": true})
+    下载视频后才能使用。
+
+    Args:
+        video_id: YouTube 视频 ID
+        start_time: 开始时间,格式 HH:MM:SS 或 MM:SS
+        end_time: 结束时间,格式 HH:MM:SS 或 MM:SS
+        output_name: 输出文件名(可选,自动生成)
+    """
+    source_video = VIDEO_DOWNLOAD_DIR / f"{video_id}.mp4"
+    if not source_video.exists():
+        return ToolResult(
+            title="视频截取失败",
+            output="",
+            error="源视频不存在,请先使用 content_detail(platform='youtube', ..., extras={'download_video': true}) 下载",
+        )
+
+    if not output_name:
+        output_name = f"{video_id}_clip_{start_time.replace(':', '-')}_{end_time.replace(':', '-')}.mp4"
+    output_path = VIDEO_DOWNLOAD_DIR / output_name
+
+    cmd = ["ffmpeg", "-i", str(source_video), "-ss", start_time, "-to", end_time, "-c", "copy", "-y", str(output_path)]
+
+    try:
+        result = await asyncio.to_thread(subprocess.run, cmd, capture_output=True, text=True, timeout=60)
+    except subprocess.TimeoutExpired:
+        return ToolResult(title="视频截取超时", output="", error="ffmpeg 超时(60秒)")
+
+    if result.returncode == 0 and output_path.exists():
+        file_size = output_path.stat().st_size / (1024 * 1024)
+        return ToolResult(
+            title=f"视频片段: {start_time} - {end_time}",
+            output=json.dumps({
+                "video_id": video_id,
+                "clip_path": str(output_path),
+                "start_time": start_time,
+                "end_time": end_time,
+                "file_size_mb": round(file_size, 2),
+            }, ensure_ascii=False, indent=2),
+            long_term_memory=f"Extracted clip from {video_id}: {start_time}-{end_time}",
+        )
+    return ToolResult(title="视频截取失败", output="", error=f"ffmpeg 执行失败: {result.stderr}")

+ 1 - 0
agent/tools/builtin/content/platforms/__init__.py

@@ -0,0 +1 @@
+"""内容平台实现模块"""

+ 237 - 0
agent/tools/builtin/content/platforms/aigc_channel.py

@@ -0,0 +1,237 @@
+"""
+AIGC-Channel 平台实现(9 个中文平台)
+
+后端:aigc-channel.aiddit.com
+平台:xhs / gzh / sph / github / toutiao / douyin / bili / zhihu / weibo
+"""
+
+import json
+from typing import Any, Dict, List, Optional
+
+import httpx
+
+from agent.tools.models import ToolResult
+from agent.tools.utils.image import build_image_grid, encode_base64, load_images
+from agent.tools.builtin.content.registry import (
+    PlatformDef, ParamSpec, register_platform,
+)
+
+BASE_URL = "http://aigc-channel.aiddit.com/aigc/channel"
+DEFAULT_TIMEOUT = 60.0
+
+
+# ── 平台注册 ──
+
+_XHS_SEARCH_PARAMS = {
+    "sort_type": ParamSpec(
+        values=["综合排序", "最新发布", "最多点赞"],
+        default="综合排序",
+    ),
+    "publish_time": ParamSpec(
+        values=["不限", "近1天", "近7天", "近30天"],
+        default="不限",
+    ),
+    "content_type": ParamSpec(
+        values=["不限", "图文", "视频", "文章"],
+        default="不限",
+    ),
+    "filter_note_range": ParamSpec(
+        values=["不限", "1分钟以内", "1-5分钟", "5分钟以上"],
+        default="不限",
+        note="仅视频内容生效",
+    ),
+}
+
+_COMMON_CONTENT_TYPE = {
+    "content_type": ParamSpec(
+        values=["视频", "图文"],
+        default="",
+        note="留空不限",
+    ),
+}
+
+# 9 个中文平台定义
+_AIGC_PLATFORMS = [
+    PlatformDef(id="xhs",     name="小红书",   aliases=["RED", "xiaohongshu"], search_params=_XHS_SEARCH_PARAMS, supports_suggest=True),
+    PlatformDef(id="gzh",     name="公众号",   aliases=["微信公众号", "wechat"], search_params=_COMMON_CONTENT_TYPE),
+    PlatformDef(id="sph",     name="视频号",   aliases=["微信视频号"], search_params=_COMMON_CONTENT_TYPE),
+    PlatformDef(id="github",  name="GitHub",   aliases=["gh"], search_params=_COMMON_CONTENT_TYPE),
+    PlatformDef(id="toutiao", name="头条",     aliases=["今日头条", "toutiao"], search_params=_COMMON_CONTENT_TYPE, supports_suggest=True),
+    PlatformDef(id="douyin",  name="抖音",     aliases=["TikTok"], search_params=_COMMON_CONTENT_TYPE, supports_suggest=True),
+    PlatformDef(id="bili",    name="B站",      aliases=["哔哩哔哩", "bilibili"], search_params=_COMMON_CONTENT_TYPE, supports_suggest=True),
+    PlatformDef(id="zhihu",   name="知乎",     aliases=[], search_params=_COMMON_CONTENT_TYPE, supports_suggest=True),
+    PlatformDef(id="weibo",   name="微博",     aliases=["sina"], search_params=_COMMON_CONTENT_TYPE),
+]
+
+# suggest API 额外支持 wx(微信搜一搜),但它不是搜索平台
+_SUGGEST_ONLY_CHANNELS = {"wx": "微信"}
+
+
+# ── 搜索实现 ──
+
+async def search(
+    platform_id: str,
+    keyword: str,
+    max_count: int = 20,
+    cursor: str = "",
+    extras: Optional[Dict[str, Any]] = None,
+) -> ToolResult:
+    """AIGC-Channel 统一搜索"""
+    extras = extras or {}
+
+    if platform_id == "xhs":
+        payload = {
+            "type": platform_id,
+            "keyword": keyword,
+            "cursor": cursor,
+            "content_type": extras.get("content_type", "不限"),
+            "sort_type": extras.get("sort_type", "综合排序"),
+            "publish_time": extras.get("publish_time", "不限"),
+            "filter_note_range": extras.get("filter_note_range", "不限"),
+        }
+    else:
+        payload = {
+            "type": platform_id,
+            "keyword": keyword,
+            "cursor": cursor or "0",
+            "max_count": max_count,
+            "content_type": extras.get("content_type", ""),
+        }
+
+    try:
+        async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as client:
+            response = await client.post(
+                f"{BASE_URL}/data",
+                json=payload,
+                headers={"Content-Type": "application/json"},
+            )
+            response.raise_for_status()
+            data = response.json()
+    except httpx.HTTPStatusError as e:
+        return ToolResult(title="搜索失败", output="", error=f"HTTP {e.response.status_code}: {e.response.text}")
+    except Exception as e:
+        return ToolResult(title="搜索失败", output="", error=str(e))
+
+    posts = data.get("data", [])
+
+    # 构建概览摘要
+    summary_list = []
+    for idx, post in enumerate(posts, 1):
+        body = post.get("body_text", "") or ""
+        title = post.get("title") or body[:20] or ""
+        summary_list.append({
+            "index": idx,
+            "title": title,
+            "body_text": body[:100] + ("..." if len(body) > 100 else ""),
+            "like_count": post.get("like_count"),
+            "comment_count": post.get("comment_count"),
+            "channel": post.get("channel"),
+            "link": post.get("link"),
+            "content_type": post.get("content_type"),
+        })
+
+    # 封面拼图
+    images = []
+    try:
+        collage_b64 = await _build_collage(posts)
+        if collage_b64:
+            images.append({"type": "base64", "media_type": "image/png", "data": collage_b64})
+    except Exception:
+        pass
+
+    return ToolResult(
+        title=f"搜索: {keyword} ({platform_id})",
+        output=json.dumps({"data": summary_list}, ensure_ascii=False, indent=2),
+        long_term_memory=f"Searched '{keyword}' on {platform_id}, {len(posts)} results. Use content_detail to view full details.",
+        images=images,
+        metadata={"posts": posts},  # 完整数据传给上层缓存
+    )
+
+
+# ── 详情实现(从缓存获取,不需要额外 HTTP) ──
+
+async def detail(post: Dict[str, Any], extras: Optional[Dict[str, Any]] = None) -> ToolResult:
+    """返回单条帖子的完整内容"""
+    title = post.get("title") or post.get("body_text", "")[:30] or "无标题"
+
+    images = []
+    for img_url in post.get("images", []):
+        if img_url:
+            images.append({"type": "url", "url": img_url})
+
+    return ToolResult(
+        title=f"详情: {title}",
+        output=json.dumps(post, ensure_ascii=False, indent=2),
+        long_term_memory=f"Viewed detail: {title}",
+        images=images,
+    )
+
+
+# ── 建议词实现 ──
+
+async def suggest(channel: str, keyword: str) -> ToolResult:
+    """获取搜索建议词"""
+    try:
+        async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as client:
+            response = await client.post(
+                f"{BASE_URL}/suggest",
+                json={"type": channel, "keyword": keyword},
+                headers={"Content-Type": "application/json"},
+            )
+            response.raise_for_status()
+            data = response.json()
+    except Exception as e:
+        return ToolResult(title="建议词获取失败", output="", error=str(e))
+
+    suggestion_count = sum(len(item.get("list", [])) for item in data.get("data", []))
+    return ToolResult(
+        title=f"建议词: {keyword} ({channel})",
+        output=json.dumps(data, ensure_ascii=False, indent=2),
+        long_term_memory=f"Got {suggestion_count} suggestions for '{keyword}' on {channel}",
+    )
+
+
+# ── 拼图辅助 ──
+
+async def _build_collage(posts: List[Dict[str, Any]]) -> Optional[str]:
+    """封面图网格拼图"""
+    urls, titles = [], []
+    for post in posts:
+        imgs = post.get("images", [])
+        if imgs and imgs[0]:
+            urls.append(imgs[0])
+            titles.append(post.get("title", "") or "")
+
+    if not urls:
+        return None
+
+    loaded = await load_images(urls)
+    valid_images, valid_labels = [], []
+    for (_, img), title in zip(loaded, titles):
+        if img is not None:
+            valid_images.append(img)
+            valid_labels.append(title)
+
+    if not valid_images:
+        return None
+
+    grid = build_image_grid(images=valid_images, labels=valid_labels)
+    b64, _ = encode_base64(grid, format="PNG")
+    return b64
+
+
+# ── 注册所有 AIGC 平台 ──
+
+def _register_all():
+    for p in _AIGC_PLATFORMS:
+        p.search_impl = search
+        p.detail_impl = detail
+        if p.supports_suggest:
+            p.suggest_impl = suggest
+            p.suggest_channels = [p.id]
+        register_platform(p)
+
+    # wx 只有 suggest,没有搜索
+    # suggest 调用时 channel 传 "wx",但不注册为独立平台
+
+_register_all()

+ 129 - 0
agent/tools/builtin/content/platforms/x.py

@@ -0,0 +1,129 @@
+"""
+X (Twitter) 平台实现
+
+后端:crawler.aiddit.com/crawler/x
+"""
+
+import json
+from typing import Any, Dict, List, Optional
+
+import httpx
+
+from agent.tools.models import ToolResult
+from agent.tools.utils.image import build_image_grid, encode_base64, load_images
+from agent.tools.builtin.content.registry import PlatformDef, register_platform
+
+CRAWLER_URL = "http://crawler.aiddit.com/crawler/x/keyword"
+DEFAULT_TIMEOUT = 60.0
+
+
+async def search(
+    platform_id: str,
+    keyword: str,
+    max_count: int = 20,
+    cursor: str = "",
+    extras: Optional[Dict[str, Any]] = None,
+) -> ToolResult:
+    try:
+        async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as client:
+            response = await client.post(CRAWLER_URL, json={"keyword": keyword})
+            response.raise_for_status()
+            data = response.json()
+
+        if data.get("code") != 0:
+            return ToolResult(title="X 搜索失败", output="", error=data.get("msg", "未知错误"))
+
+        result_data = data.get("data", {})
+        tweets = result_data.get("data", []) if isinstance(result_data, dict) else []
+
+        summary_list = []
+        for idx, tweet in enumerate(tweets[:max_count], 1):
+            text = tweet.get("body_text", "")
+            summary_list.append({
+                "index": idx,
+                "author": tweet.get("channel_account_name", ""),
+                "body_text": text[:100] + ("..." if len(text) > 100 else ""),
+                "like_count": tweet.get("like_count"),
+                "comment_count": tweet.get("comment_count"),
+            })
+
+        # 拼图
+        images = []
+        collage_b64 = await _build_tweet_collage(tweets[:max_count])
+        if collage_b64:
+            images.append({"type": "base64", "media_type": "image/png", "data": collage_b64})
+
+        return ToolResult(
+            title=f"X: {keyword}",
+            output=json.dumps({"data": summary_list}, ensure_ascii=False, indent=2),
+            long_term_memory=f"Searched X for '{keyword}', {len(tweets)} results.",
+            images=images,
+            metadata={"posts": tweets[:max_count]},
+        )
+
+    except Exception as e:
+        return ToolResult(title="X 搜索异常", output="", error=str(e))
+
+
+async def detail(post: Dict[str, Any], extras: Optional[Dict[str, Any]] = None) -> ToolResult:
+    """X 的详情直接从缓存的搜索结果取完整数据"""
+    author = post.get("channel_account_name", "")
+    text = post.get("body_text", "")[:30]
+
+    all_images = []
+    for img_item in post.get("image_url_list", []):
+        url = img_item.get("image_url") if isinstance(img_item, dict) else img_item
+        if url:
+            all_images.append({"type": "url", "url": url})
+
+    return ToolResult(
+        title=f"X 详情: @{author}",
+        output=json.dumps(post, ensure_ascii=False, indent=2),
+        long_term_memory=f"Viewed X post by @{author}: {text}",
+        images=all_images,
+    )
+
+
+async def _build_tweet_collage(tweets: List[Dict[str, Any]]) -> Optional[str]:
+    urls, titles = [], []
+    for tweet in tweets:
+        thumb = None
+        for img_item in tweet.get("image_url_list", []):
+            url = img_item.get("image_url") if isinstance(img_item, dict) else img_item
+            if url:
+                thumb = url
+                break
+        if not thumb:
+            thumb = tweet.get("cover_url")
+        if thumb:
+            urls.append(thumb)
+            titles.append(f"@{tweet.get('channel_account_name', '')}")
+
+    if not urls:
+        return None
+
+    loaded = await load_images(urls)
+    valid_images, valid_labels = [], []
+    for (_, img), title in zip(loaded, titles):
+        if img is not None:
+            valid_images.append(img)
+            valid_labels.append(title)
+
+    if not valid_images:
+        return None
+
+    grid = build_image_grid(images=valid_images, labels=valid_labels)
+    b64, _ = encode_base64(grid, format="PNG")
+    return b64
+
+
+# ── 注册 ──
+
+_X = PlatformDef(
+    id="x",
+    name="X (Twitter)",
+    aliases=["twitter", "推特"],
+)
+_X.search_impl = search
+_X.detail_impl = detail
+register_platform(_X)

+ 203 - 0
agent/tools/builtin/content/platforms/youtube.py

@@ -0,0 +1,203 @@
+"""
+YouTube 平台实现
+
+后端:crawler.aiddit.com/crawler/youtube
+"""
+
+import json
+from typing import Any, Dict, List, Optional
+
+import httpx
+
+from agent.tools.models import ToolResult
+from agent.tools.utils.image import build_image_grid, encode_base64, load_images
+from agent.tools.builtin.content.registry import (
+    PlatformDef, ParamSpec, register_platform,
+)
+
+CRAWLER_BASE_URL = "http://crawler.aiddit.com/crawler"
+DEFAULT_TIMEOUT = 60.0
+
+
+# ── 搜索 ──
+
+async def search(
+    platform_id: str,
+    keyword: str,
+    max_count: int = 20,
+    cursor: str = "",
+    extras: Optional[Dict[str, Any]] = None,
+) -> ToolResult:
+    try:
+        async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as client:
+            response = await client.post(
+                f"{CRAWLER_BASE_URL}/youtube/keyword",
+                json={"keyword": keyword},
+            )
+            response.raise_for_status()
+            data = response.json()
+
+        if data.get("code") != 0:
+            return ToolResult(title="YouTube 搜索失败", output="", error=data.get("msg", "未知错误"))
+
+        result_data = data.get("data", {})
+        videos = result_data.get("data", []) if isinstance(result_data, dict) else []
+
+        # 概览
+        summary_list = []
+        for idx, video in enumerate(videos[:max_count], 1):
+            summary_list.append({
+                "index": idx,
+                "title": video.get("title", ""),
+                "author": video.get("author", ""),
+                "video_id": video.get("video_id", ""),
+            })
+
+        # 拼图
+        images = []
+        collage_b64 = await _build_video_collage(videos[:max_count])
+        if collage_b64:
+            images.append({"type": "base64", "media_type": "image/png", "data": collage_b64})
+
+        return ToolResult(
+            title=f"YouTube: {keyword}",
+            output=json.dumps({"data": summary_list}, ensure_ascii=False, indent=2),
+            long_term_memory=f"Searched YouTube for '{keyword}', {len(videos)} results.",
+            images=images,
+            metadata={"posts": videos[:max_count]},
+        )
+
+    except Exception as e:
+        return ToolResult(title="YouTube 搜索异常", output="", error=str(e))
+
+
+# ── 详情 ──
+
+async def detail(post: Dict[str, Any], extras: Optional[Dict[str, Any]] = None) -> ToolResult:
+    """
+    YouTube 详情:需要额外 HTTP 调用获取字幕/下载等。
+    post 来自搜索缓存,extras 支持 include_captions / download_video。
+    """
+    extras = extras or {}
+    content_id = post.get("video_id", "")
+    include_captions = extras.get("include_captions", True)
+    download_video = extras.get("download_video", False)
+
+    try:
+        async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as client:
+            resp = await client.post(
+                f"{CRAWLER_BASE_URL}/youtube/detail",
+                json={"content_id": content_id},
+            )
+            resp.raise_for_status()
+            detail_data = resp.json()
+
+        if detail_data.get("code") != 0:
+            return ToolResult(title="详情获取失败", output="", error=detail_data.get("msg", "未知错误"))
+
+        result_data = detail_data.get("data", {})
+        video_info = result_data.get("data", {}) if isinstance(result_data, dict) else {}
+
+        # 字幕
+        captions_text = None
+        if include_captions or download_video:
+            try:
+                async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as client:
+                    cap_resp = await client.post(
+                        f"{CRAWLER_BASE_URL}/youtube/captions",
+                        json={"content_id": content_id},
+                    )
+                    cap_resp.raise_for_status()
+                    cap_data = cap_resp.json()
+                    if cap_data.get("code") == 0:
+                        inner = cap_data.get("data", {})
+                        if isinstance(inner, dict):
+                            inner2 = inner.get("data", {})
+                            if isinstance(inner2, dict):
+                                captions_text = inner2.get("content")
+            except Exception:
+                pass
+
+        # 下载
+        video_path = None
+        video_outline = None
+        if download_video:
+            import asyncio
+            from agent.tools.builtin.content.media import download_youtube_video, parse_srt_to_outline
+            video_path = await asyncio.to_thread(download_youtube_video, content_id)
+            if captions_text:
+                video_outline = parse_srt_to_outline(captions_text)
+
+        output_data = {
+            "video_id": content_id,
+            "title": video_info.get("title", ""),
+            "channel": video_info.get("channel_account_name", ""),
+            "description": video_info.get("body_text", ""),
+            "like_count": video_info.get("like_count"),
+            "comment_count": video_info.get("comment_count"),
+            "content_link": video_info.get("content_link", ""),
+            "captions": captions_text,
+        }
+        if download_video:
+            output_data["video_path"] = video_path
+            output_data["video_outline"] = video_outline
+
+        return ToolResult(
+            title=f"YouTube 详情: {video_info.get('title', content_id)}",
+            output=json.dumps(output_data, ensure_ascii=False, indent=2),
+            long_term_memory=f"YouTube detail for {content_id}" + (" with captions" if captions_text else ""),
+        )
+
+    except Exception as e:
+        return ToolResult(title="YouTube 详情异常", output="", error=str(e))
+
+
+# ── 拼图 ──
+
+async def _build_video_collage(videos: List[Dict[str, Any]]) -> Optional[str]:
+    urls, titles = [], []
+    for video in videos:
+        thumb = None
+        if "thumbnails" in video and isinstance(video["thumbnails"], list) and video["thumbnails"]:
+            thumb = video["thumbnails"][0].get("url")
+        elif "thumbnail" in video:
+            thumb = video.get("thumbnail")
+        elif "cover_url" in video:
+            thumb = video.get("cover_url")
+
+        if thumb:
+            urls.append(thumb)
+            titles.append(video.get("title", ""))
+
+    if not urls:
+        return None
+
+    loaded = await load_images(urls)
+    valid_images, valid_labels = [], []
+    for (_, img), title in zip(loaded, titles):
+        if img is not None:
+            valid_images.append(img)
+            valid_labels.append(title)
+
+    if not valid_images:
+        return None
+
+    grid = build_image_grid(images=valid_images, labels=valid_labels)
+    b64, _ = encode_base64(grid, format="PNG")
+    return b64
+
+
+# ── 注册 ──
+
+_YOUTUBE = PlatformDef(
+    id="youtube",
+    name="YouTube",
+    aliases=["yt", "油管"],
+    detail_extras={
+        "include_captions": ParamSpec(note="是否获取字幕,默认 True"),
+        "download_video": ParamSpec(note="是否下载视频到本地,默认 False"),
+    },
+)
+_YOUTUBE.search_impl = search
+_YOUTUBE.detail_impl = detail
+register_platform(_YOUTUBE)

+ 125 - 0
agent/tools/builtin/content/registry.py

@@ -0,0 +1,125 @@
+"""
+内容平台注册表
+
+定义所有支持的内容平台及其搜索参数 schema。
+供 content_platforms / content_search / content_detail 路由使用。
+"""
+
+from dataclasses import dataclass, field
+from typing import Any, Callable, Coroutine, Dict, List, Optional
+
+from agent.tools.models import ToolResult
+
+
+# ── 类型定义 ──
+
+@dataclass
+class ParamSpec:
+    """平台专属参数的描述"""
+    values: Optional[List[str]] = None   # 枚举值(None 表示自由文本)
+    default: Optional[str] = None
+    note: str = ""                       # 额外说明
+
+    def to_dict(self) -> dict:
+        d: dict = {}
+        if self.values is not None:
+            d["values"] = self.values
+            d["default"] = self.default
+        if self.note:
+            d["note"] = self.note
+        return d
+
+
+# 平台实现函数的签名
+SearchFunc = Callable[..., Coroutine[Any, Any, ToolResult]]
+DetailFunc = Callable[..., Coroutine[Any, Any, ToolResult]]
+SuggestFunc = Callable[..., Coroutine[Any, Any, ToolResult]]
+
+
+@dataclass
+class PlatformDef:
+    """一个内容平台的完整定义"""
+    id: str                                         # 唯一标识,如 "xhs"
+    name: str                                       # 显示名,如 "小红书"
+    aliases: List[str] = field(default_factory=list) # 模糊匹配别名,如 ["小红书", "RED"]
+    search_params: Dict[str, ParamSpec] = field(default_factory=dict)
+    detail_extras: Dict[str, ParamSpec] = field(default_factory=dict)
+    supports_suggest: bool = False
+    suggest_channels: Optional[List[str]] = None     # suggest API 的 channel 值(可能与 id 不同)
+
+    # 平台实现函数(运行时由 platforms/ 模块设置)
+    search_impl: Optional[SearchFunc] = None
+    detail_impl: Optional[DetailFunc] = None
+    suggest_impl: Optional[SuggestFunc] = None
+
+    def summary(self) -> dict:
+        """概要信息(不含参数细节)"""
+        d = {"id": self.id, "name": self.name}
+        if self.search_params:
+            d["has_search_params"] = True
+        if self.detail_extras:
+            d["has_detail_extras"] = True
+        if self.supports_suggest:
+            d["supports_suggest"] = True
+        return d
+
+    def detail(self) -> dict:
+        """完整参数说明"""
+        d = self.summary()
+        if self.search_params:
+            d["search_params"] = {k: v.to_dict() for k, v in self.search_params.items()}
+        if self.detail_extras:
+            d["detail_extras"] = {k: v.to_dict() for k, v in self.detail_extras.items()}
+        return d
+
+
+# ── 平台注册表 ──
+
+_PLATFORMS: Dict[str, PlatformDef] = {}
+
+
+def register_platform(p: PlatformDef) -> None:
+    _PLATFORMS[p.id] = p
+
+
+def get_platform(platform_id: str) -> Optional[PlatformDef]:
+    return _PLATFORMS.get(platform_id)
+
+
+def all_platforms() -> List[PlatformDef]:
+    return list(_PLATFORMS.values())
+
+
+def match_platforms(query: str) -> List[PlatformDef]:
+    """
+    模糊匹配平台:精确 ID > 别名包含 > token 交集。
+    空 query 返回全部。
+    """
+    if not query:
+        return all_platforms()
+
+    q = query.strip().lower()
+
+    # 1) 精确 ID 匹配
+    if q in _PLATFORMS:
+        return [_PLATFORMS[q]]
+
+    # 2) 别名 / 名称包含匹配
+    alias_hits = [
+        p for p in _PLATFORMS.values()
+        if q in p.name.lower() or any(q in a.lower() for a in p.aliases)
+    ]
+    if alias_hits:
+        return alias_hits
+
+    # 3) token 交集(把 query 拆成字符/词,看命中率)
+    q_tokens = set(q.replace("_", " ").replace("-", " ").split())
+    scored = []
+    for p in _PLATFORMS.values():
+        pool = {p.id, p.name.lower()} | {a.lower() for a in p.aliases}
+        pool_text = " ".join(pool)
+        hits = sum(1 for t in q_tokens if t in pool_text)
+        if hits > 0:
+            scored.append((hits, p))
+    scored.sort(key=lambda x: -x[0])
+    return [p for _, p in scored]

+ 266 - 0
agent/tools/builtin/content/tools.py

@@ -0,0 +1,266 @@
+"""
+内容工具族 —— 统一入口
+
+4 个 @tool 注册给 LLM:
+  - content_platforms: 列出/查询平台及其参数
+  - content_search:    跨平台搜索
+  - content_detail:    查看详情
+  - content_suggest:   搜索建议词
+
+所有平台的具体实现在 platforms/ 子目录,按模块自注册到 registry。
+"""
+
+import json
+import os
+import uuid
+from typing import Any, Dict, Optional
+
+from agent.tools import tool, ToolResult, ToolContext
+from agent.tools.builtin.content.registry import (
+    all_platforms, get_platform, match_platforms,
+)
+from agent.tools.builtin.content import cache as _cache
+
+# 导入平台模块以触发自注册(副作用导入)
+import agent.tools.builtin.content.platforms.aigc_channel  # noqa: F401
+import agent.tools.builtin.content.platforms.youtube       # noqa: F401
+import agent.tools.builtin.content.platforms.x             # noqa: F401
+
+
+def _get_trace_id(context: Optional[ToolContext]) -> str:
+    """从 context 取 trace_id,回退到环境变量或自动生成"""
+    if context and hasattr(context, "trace_id") and context.trace_id:
+        return context.trace_id
+    return os.getenv("TRACE_ID") or f"anon-{uuid.uuid4().hex[:8]}"
+
+
+# ── content_platforms ──
+
+@tool(hidden_params=["context"])
+async def content_platforms(
+    platform: str = "",
+    context: Optional[ToolContext] = None,
+) -> ToolResult:
+    """
+    列出支持的内容平台及其搜索参数。
+
+    不传 platform 时返回所有平台的概要列表(仅名称和 ID)。
+    传入 platform 时模糊匹配并返回匹配平台的详细参数说明(支持 ID、中文名、别名)。
+
+    建议在不熟悉平台参数时先调用此工具查看,再构造 content_search / content_detail 的参数。
+
+    Args:
+        platform: 可选,平台名称或关键词。支持模糊匹配(如 "xhs"、"小红书"、"youtube")。
+                  留空返回全部平台概要。
+        context: 工具上下文(自动注入)
+    """
+    hits = match_platforms(platform)
+
+    if not hits:
+        all_ids = [p.id for p in all_platforms()]
+        return ToolResult(
+            title="未找到匹配平台",
+            output=f"没有匹配 '{platform}' 的平台。可用平台: {', '.join(all_ids)}",
+        )
+
+    if platform:
+        # 有 query:返回匹配平台的详细参数
+        result = [p.detail() for p in hits]
+    else:
+        # 无 query:返回概要列表
+        result = [p.summary() for p in hits]
+
+    return ToolResult(
+        title=f"内容平台" + (f" ({platform})" if platform else ""),
+        output=json.dumps(result, ensure_ascii=False, indent=2),
+    )
+
+
+# ── content_search ──
+
+@tool(hidden_params=["context"])
+async def content_search(
+    platform: str,
+    keyword: str,
+    max_count: int = 20,
+    cursor: str = "",
+    extras: Optional[Dict[str, Any]] = None,
+    context: Optional[ToolContext] = None,
+) -> ToolResult:
+    """
+    跨平台内容搜索,返回带索引编号的封面拼图 + 概览列表。
+
+    返回的是摘要信息(标题 + 正文截断 + 互动数据),不含完整正文和所有图片。
+    如需查看某条内容的完整信息,请使用 content_detail。
+
+    Args:
+        platform: 平台标识,如 'xhs'、'youtube'、'x'。完整列表见 content_platforms。
+        keyword: 搜索关键词。
+        max_count: 返回条数上限,默认 20。
+        cursor: 分页游标,首次搜索留空,翻页时传入上次返回值。
+        extras: 平台专用参数(dict)。不同平台支持不同参数,
+                如 xhs 支持 sort_type / publish_time / content_type / filter_note_range。
+                不清楚可先调 content_platforms(platform) 查看。
+        context: 工具上下文(自动注入)
+    """
+    pdef = get_platform(platform)
+    if not pdef:
+        # 尝试模糊匹配
+        hits = match_platforms(platform)
+        if hits:
+            suggestions = ", ".join(f"{p.id}({p.name})" for p in hits[:3])
+            return ToolResult(title="平台不存在", output=f"未找到平台 '{platform}'。你是否想要: {suggestions}")
+        all_ids = [p.id for p in all_platforms()]
+        return ToolResult(title="平台不存在", output=f"未找到平台 '{platform}'。可用: {', '.join(all_ids)}")
+
+    if not pdef.search_impl:
+        return ToolResult(title="不支持搜索", output=f"平台 {pdef.name} 暂不支持搜索")
+
+    result = await pdef.search_impl(
+        platform_id=pdef.id,
+        keyword=keyword,
+        max_count=max_count,
+        cursor=cursor,
+        extras=extras,
+    )
+
+    # 持久化搜索结果到磁盘缓存
+    if not result.error:
+        posts = result.metadata.pop("posts", [])
+        trace_id = _get_trace_id(context)
+        _cache.save_search_results(trace_id, pdef.id, keyword, posts)
+
+    return result
+
+
+# ── content_detail ──
+
+@tool(hidden_params=["context"])
+async def content_detail(
+    platform: str,
+    index: int,
+    extras: Optional[Dict[str, Any]] = None,
+    context: Optional[ToolContext] = None,
+) -> ToolResult:
+    """
+    查看内容详情。从最近一次 content_search 的结果中按索引取完整记录。
+
+    Args:
+        platform: 平台标识(必须和之前 content_search 用的一致)。
+        index: 内容序号(1-based),来自 content_search 返回的 index 字段。
+        extras: 平台专用详情参数。YouTube 支持 include_captions / download_video。
+                其他平台通常不需要。
+        context: 工具上下文(自动注入)
+    """
+    pdef = get_platform(platform)
+    if not pdef:
+        return ToolResult(title="平台不存在", output=f"未找到平台 '{platform}'")
+
+    trace_id = _get_trace_id(context)
+    post = _cache.get_cached_post(trace_id, pdef.id, index)
+
+    if not post:
+        info = _cache.get_cached_search_info(trace_id, pdef.id)
+        if info:
+            return ToolResult(
+                title="索引无效",
+                output=f"平台 {pdef.name} 上次搜索 '{info['keyword']}' 共 {info['total']} 条,"
+                       f"有效索引 1-{info['total']},你传入了 {index}。",
+                error="Invalid index",
+            )
+        return ToolResult(
+            title="缓存未命中",
+            output=f"没有 {pdef.name} 的搜索缓存。请先调用 content_search(platform='{pdef.id}', keyword=...) 搜索。",
+            error="No cache",
+        )
+
+    if pdef.detail_impl:
+        return await pdef.detail_impl(post, extras)
+
+    # fallback:直接返回缓存的完整数据
+    return ToolResult(
+        title=f"详情 #{index}",
+        output=json.dumps(post, ensure_ascii=False, indent=2),
+    )
+
+
+# ── content_suggest ──
+
+@tool(hidden_params=["context"])
+async def content_suggest(
+    platform: str,
+    keyword: str,
+    context: Optional[ToolContext] = None,
+) -> ToolResult:
+    """
+    获取搜索关键词补全建议。
+
+    仅部分平台支持(xhs、toutiao、douyin、bili、zhihu)。
+    用于辅助用户发现更精准的搜索词。
+
+    Args:
+        platform: 平台标识。
+        keyword: 搜索关键词(输入中的部分词即可)。
+        context: 工具上下文(自动注入)
+    """
+    pdef = get_platform(platform)
+    if not pdef:
+        return ToolResult(title="平台不存在", output=f"未找到平台 '{platform}'")
+
+    if not pdef.suggest_impl:
+        supported = [p.id for p in all_platforms() if p.supports_suggest]
+        return ToolResult(
+            title="不支持建议词",
+            output=f"平台 {pdef.name} 不支持建议词。支持的平台: {', '.join(supported)}",
+        )
+
+    channel = (pdef.suggest_channels or [pdef.id])[0]
+    return await pdef.suggest_impl(channel, keyword)
+
+
+# ── CLI 入口 ──
+
+def _parse_args(argv: list) -> dict:
+    """解析 --key=value 格式的 CLI 参数"""
+    kwargs = {}
+    for arg in argv:
+        if arg.startswith("--") and "=" in arg:
+            key, val = arg[2:].split("=", 1)
+            # 尝试 JSON 解析(dict / int / bool)
+            try:
+                val = json.loads(val)
+            except (json.JSONDecodeError, ValueError):
+                pass
+            kwargs[key] = val
+    return kwargs
+
+
+if __name__ == "__main__":
+    import sys
+    import asyncio
+
+    COMMANDS = {
+        "platforms": content_platforms,
+        "search": content_search,
+        "detail": content_detail,
+        "suggest": content_suggest,
+    }
+
+    if len(sys.argv) < 2 or sys.argv[1] not in COMMANDS:
+        print(f"Usage: python {sys.argv[0]} <{'|'.join(COMMANDS)}> [--key=value ...]")
+        sys.exit(1)
+
+    cmd = sys.argv[1]
+    kwargs = _parse_args(sys.argv[2:])
+
+    # trace_id:CLI 参数 > 环境变量 > 自动生成
+    trace_id = kwargs.pop("trace_id", None) or os.getenv("TRACE_ID") or f"cli-{uuid.uuid4().hex[:8]}"
+    os.environ["TRACE_ID"] = trace_id
+
+    result = asyncio.run(COMMANDS[cmd](**kwargs))
+
+    # 输出 JSON(与 toolhub CLI 格式一致)
+    out = {"trace_id": trace_id, "output": result.output, "error": result.error}
+    if result.metadata:
+        out["metadata"] = result.metadata
+    print(json.dumps(out, ensure_ascii=False, indent=2))

+ 0 - 497
agent/tools/builtin/crawler.py

@@ -1,497 +0,0 @@
-"""
-爬虫服务工具模块
-
-提供 YouTube、X (Twitter) 和微信/通用链接的搜索和详情查询功能。
-"""
-
-import json
-import os
-import subprocess
-import tempfile
-from pathlib import Path
-from typing import Optional, List, Dict, Any
-
-import httpx
-
-from agent.tools import tool, ToolResult
-from agent.tools.utils.image import build_image_grid, encode_base64, load_images
-
-
-# API 配置
-CRAWLER_BASE_URL = "http://crawler.aiddit.com/crawler"
-AIGC_BASE_URL = "http://aigc-channel.aiddit.com/aigc/channel"
-DEFAULT_TIMEOUT = 60.0
-
-# 视频处理相关配置
-VIDEO_DOWNLOAD_DIR = Path(tempfile.gettempdir()) / "youtube_videos"
-VIDEO_DOWNLOAD_DIR.mkdir(exist_ok=True)
-
-
-async def _build_video_collage(videos: List[Dict[str, Any]]) -> Optional[str]:
-    """
-    将视频缩略图+序号+标题拼接成网格图,返回 base64 编码的 PNG。
-    复用 agent.tools.utils.image 中的共享拼图逻辑。
-    """
-    if not videos:
-        return None
-
-    urls: List[str] = []
-    titles: List[str] = []
-    for video in videos:
-        thumbnail = None
-        if "thumbnails" in video and isinstance(video["thumbnails"], list) and video["thumbnails"]:
-            thumbnail = video["thumbnails"][0].get("url")
-        elif "thumbnail" in video:
-            thumbnail = video.get("thumbnail")
-        elif "cover_url" in video:
-            thumbnail = video.get("cover_url")
-
-        title = video.get("title", "") or video.get("text", "")
-        if thumbnail:
-            urls.append(thumbnail)
-            titles.append(title)
-
-    if not urls:
-        return None
-
-    loaded = await load_images(urls)
-
-    valid_images = []
-    valid_labels = []
-    for (_, img), title in zip(loaded, titles):
-        if img is not None:
-            valid_images.append(img)
-            valid_labels.append(title)
-
-    if not valid_images:
-        return None
-
-    grid = build_image_grid(images=valid_images, labels=valid_labels)
-    b64, _ = encode_base64(grid, format="PNG")
-    return b64
-
-
-def _parse_srt_to_outline(srt_content: str) -> List[Dict[str, str]]:
-    """解析 SRT 字幕,生成带时间戳的大纲"""
-    if not srt_content:
-        return []
-
-    outline = []
-    blocks = srt_content.strip().split('\n\n')
-    for block in blocks:
-        lines = block.strip().split('\n')
-        if len(lines) >= 3:
-            timestamp_line = lines[1]
-            if '-->' in timestamp_line:
-                start_time = timestamp_line.split('-->')[0].strip()
-                text = ' '.join(lines[2:])
-                outline.append({'timestamp': start_time, 'text': text})
-    return outline
-
-
-def _download_youtube_video(video_id: str) -> Optional[str]:
-    """使用 yt-dlp 下载 YouTube 视频,返回文件路径"""
-    try:
-        output_path = VIDEO_DOWNLOAD_DIR / f"{video_id}.mp4"
-        if output_path.exists():
-            return str(output_path)
-
-        cmd = [
-            'yt-dlp',
-            '-f', 'best[ext=mp4]',
-            '-o', str(output_path),
-            f'https://www.youtube.com/watch?v={video_id}'
-        ]
-        result = subprocess.run(cmd, capture_output=True, text=True, timeout=300)
-
-        if result.returncode == 0 and output_path.exists():
-            return str(output_path)
-        return None
-    except Exception:
-        return None
-
-
-# ── YouTube 工具 ──
-
-@tool()
-async def youtube_search(keyword: str) -> ToolResult:
-    """
-    搜索 YouTube 视频
-
-    Args:
-        keyword: 搜索关键词
-
-    Returns:
-        搜索结果列表,包含视频标题、ID、频道等信息
-    """
-    try:
-        async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as client:
-            response = await client.post(
-                f"{CRAWLER_BASE_URL}/youtube/keyword",
-                json={"keyword": keyword}
-            )
-            response.raise_for_status()
-            data = response.json()
-
-            if data.get("code") == 0:
-                result_data = data.get("data", {})
-                videos = result_data.get("data", []) if isinstance(result_data, dict) else []
-
-                images = []
-                collage_b64 = await _build_video_collage(videos)
-                if collage_b64:
-                    images.append({
-                        "type": "base64",
-                        "media_type": "image/png",
-                        "data": collage_b64
-                    })
-
-                summary_list = []
-                for idx, video in enumerate(videos[:20], 1):
-                    title = video.get("title", "")
-                    author = video.get("author", "")
-                    video_id = video.get("video_id", "")
-                    summary_list.append(f"{idx}. {title} - {author} (ID: {video_id})")
-
-                output_data = {
-                    "keyword": keyword,
-                    "total": len(videos),
-                    "summary": summary_list,
-                    "data": videos
-                }
-
-                return ToolResult(
-                    title=f"YouTube 搜索: {keyword}",
-                    output=json.dumps(output_data, ensure_ascii=False, indent=2),
-                    long_term_memory=f"Searched YouTube for '{keyword}', found {len(videos)} videos",
-                    images=images
-                )
-            else:
-                return ToolResult(
-                    title="YouTube 搜索失败",
-                    output="",
-                    error=f"搜索失败: {data.get('msg', '未知错误')}"
-                )
-
-    except Exception as e:
-        return ToolResult(
-            title="YouTube 搜索异常",
-            output="",
-            error=str(e)
-        )
-
-
-@tool()
-async def youtube_detail(
-    content_id: str,
-    include_captions: bool = True,
-    download_video: bool = False
-) -> ToolResult:
-    """
-    获取 YouTube 视频详情(可选包含字幕、下载视频并生成大纲)
-
-    Args:
-        content_id: 视频 ID
-        include_captions: 是否包含字幕,默认 True
-        download_video: 是否下载视频并生成带时间戳的大纲,默认 False。
-            下载后可使用 extract_video_clip 截取视频片段观看。
-
-    Returns:
-        视频详细信息,包含字幕、视频大纲和本地文件路径
-    """
-    try:
-        async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as client:
-            detail_response = await client.post(
-                f"{CRAWLER_BASE_URL}/youtube/detail",
-                json={"content_id": content_id}
-            )
-            detail_response.raise_for_status()
-            detail_data = detail_response.json()
-
-            if detail_data.get("code") != 0:
-                return ToolResult(
-                    title="获取详情失败",
-                    output="",
-                    error=f"获取详情失败: {detail_data.get('msg', '未知错误')}"
-                )
-
-            result_data = detail_data.get("data", {})
-            video_info = result_data.get("data", {}) if isinstance(result_data, dict) else {}
-
-            # 获取字幕
-            captions_text = None
-            if include_captions or download_video:
-                try:
-                    captions_response = await client.post(
-                        f"{CRAWLER_BASE_URL}/youtube/captions",
-                        json={"content_id": content_id}
-                    )
-                    captions_response.raise_for_status()
-                    captions_data = captions_response.json()
-
-                    if captions_data.get("code") == 0:
-                        captions_result = captions_data.get("data", {})
-                        if isinstance(captions_result, dict):
-                            inner_data = captions_result.get("data", {})
-                            if isinstance(inner_data, dict):
-                                captions_text = inner_data.get("content")
-                except Exception:
-                    pass
-
-            # 下载视频并生成大纲
-            video_path = None
-            video_outline = None
-            if download_video:
-                video_path = await asyncio.to_thread(_download_youtube_video, content_id)
-                if captions_text:
-                    video_outline = _parse_srt_to_outline(captions_text)
-
-            # 合并数据
-            output_data = {
-                "video_id": content_id,
-                "title": video_info.get("title", ""),
-                "channel": video_info.get("channel_account_name", ""),
-                "description": video_info.get("body_text", ""),
-                "like_count": video_info.get("like_count"),
-                "comment_count": video_info.get("comment_count"),
-                "publish_timestamp": video_info.get("publish_timestamp"),
-                "content_link": video_info.get("content_link", ""),
-                "captions": captions_text,
-                "full_data": video_info
-            }
-
-            if download_video:
-                output_data["video_path"] = video_path
-                output_data["video_outline"] = video_outline
-                if not video_path:
-                    output_data["download_error"] = "视频下载失败,请检查 yt-dlp 是否可用"
-
-            memory = f"Retrieved YouTube video details for {content_id}"
-            if captions_text:
-                memory += " with captions"
-            if video_path:
-                memory += f", downloaded to {video_path}"
-
-            return ToolResult(
-                title=f"YouTube 视频详情: {content_id}",
-                output=json.dumps(output_data, ensure_ascii=False, indent=2),
-                long_term_memory=memory
-            )
-
-    except Exception as e:
-        return ToolResult(
-            title="YouTube 详情查询异常",
-            output="",
-            error=str(e)
-        )
-
-
-# ── X (Twitter) 工具 ──
-
-@tool()
-async def x_search(keyword: str) -> ToolResult:
-    """
-    搜索 X (Twitter) 内容(数据已结构化,无需访问详情页)
-
-    Args:
-        keyword: 搜索关键词
-
-    Returns:
-        搜索结果列表,包含推文内容、作者、互动数据等
-    """
-    try:
-        async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as client:
-            response = await client.post(
-                "http://crawler.aiddit.com/crawler/x/keyword",
-                json={"keyword": keyword}
-            )
-            response.raise_for_status()
-            data = response.json()
-
-            if data.get("code") == 0:
-                result_data = data.get("data", {})
-                tweets = result_data.get("data", []) if isinstance(result_data, dict) else []
-
-                # 构建拼接图
-                images = []
-                tweets_with_images = []
-                for tweet in tweets:
-                    image_list = tweet.get("image_url_list", [])
-                    if image_list:
-                        tweet["thumbnails"] = [{"url": image_list[0].get("image_url")}]
-                        tweets_with_images.append(tweet)
-
-                collage_b64 = await _build_video_collage(tweets_with_images if tweets_with_images else tweets)
-                if collage_b64:
-                    images.append({
-                        "type": "base64",
-                        "media_type": "image/png",
-                        "data": collage_b64
-                    })
-
-                summary_list = []
-                for idx, tweet in enumerate(tweets[:20], 1):
-                    text = tweet.get("body_text", "")[:100]
-                    author = tweet.get("channel_account_name", "")
-                    summary_list.append(f"{idx}. @{author}: {text}")
-
-                output_data = {
-                    "keyword": keyword,
-                    "total": len(tweets),
-                    "summary": summary_list,
-                    "data": tweets
-                }
-
-                return ToolResult(
-                    title=f"X 搜索: {keyword}",
-                    output=json.dumps(output_data, ensure_ascii=False, indent=2),
-                    long_term_memory=f"Searched X (Twitter) for '{keyword}', found {len(tweets)} tweets",
-                    images=images
-                )
-            else:
-                return ToolResult(
-                    title="X 搜索失败",
-                    output="",
-                    error=f"搜索失败: {data.get('msg', '未知错误')}"
-                )
-
-    except Exception as e:
-        return ToolResult(
-            title="X 搜索异常",
-            output="",
-            error=str(e)
-        )
-
-
-# ── 内容导入工具 ──
-
-@tool()
-async def import_content(plan_name: str, content_data: List[Dict[str, Any]]) -> ToolResult:
-    """
-    导入长文内容(微信公众号、小红书、抖音等通用链接)
-
-    Args:
-        plan_name: 计划名称
-        content_data: 内容数据列表,每项包含 channel、content_link、title 等字段
-
-    Returns:
-        导入结果,包含 plan_id
-    """
-    try:
-        async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as client:
-            response = await client.post(
-                f"{AIGC_BASE_URL}/weixin/auto_insert",
-                json={"plan_name": plan_name, "data": content_data}
-            )
-            response.raise_for_status()
-            data = response.json()
-
-            if data.get("code") == 0:
-                result_data = data.get("data", {})
-                return ToolResult(
-                    title=f"内容导入: {plan_name}",
-                    output=json.dumps(result_data, ensure_ascii=False, indent=2),
-                    long_term_memory=f"Imported {len(content_data)} items to plan '{plan_name}'"
-                )
-            else:
-                return ToolResult(
-                    title="导入失败",
-                    output="",
-                    error=f"导入失败: {data.get('msg', '未知错误')}"
-                )
-
-    except Exception as e:
-        return ToolResult(
-            title="内容导入异常",
-            output="",
-            error=str(e)
-        )
-
-
-# ── 视频截取工具 ──
-
-@tool()
-async def extract_video_clip(
-    video_id: str,
-    start_time: str,
-    end_time: str,
-    output_name: Optional[str] = None
-) -> ToolResult:
-    """
-    从已下载的 YouTube 视频中截取指定时间段的片段
-
-    Args:
-        video_id: YouTube 视频 ID(必须先通过 youtube_detail(download_video=True) 下载)
-        start_time: 开始时间,格式: HH:MM:SS 或 MM:SS
-        end_time: 结束时间,格式: HH:MM:SS 或 MM:SS
-        output_name: 输出文件名(可选)
-
-    Returns:
-        截取的视频片段路径
-
-    Example:
-        extract_video_clip("dQw4w9WgXcQ", "00:00:10", "00:00:30")
-    """
-    try:
-        source_video = VIDEO_DOWNLOAD_DIR / f"{video_id}.mp4"
-        if not source_video.exists():
-            return ToolResult(
-                title="视频截取失败",
-                output="",
-                error="源视频不存在,请先使用 youtube_detail(download_video=True) 下载视频"
-            )
-
-        if not output_name:
-            output_name = f"{video_id}_clip_{start_time.replace(':', '-')}_{end_time.replace(':', '-')}.mp4"
-
-        output_path = VIDEO_DOWNLOAD_DIR / output_name
-
-        cmd = [
-            'ffmpeg',
-            '-i', str(source_video),
-            '-ss', start_time,
-            '-to', end_time,
-            '-c', 'copy',
-            '-y',
-            str(output_path)
-        ]
-
-        result = await asyncio.to_thread(
-            subprocess.run, cmd, capture_output=True, text=True, timeout=60
-        )
-
-        if result.returncode == 0 and output_path.exists():
-            file_size = output_path.stat().st_size / (1024 * 1024)
-
-            output_data = {
-                "video_id": video_id,
-                "clip_path": str(output_path),
-                "start_time": start_time,
-                "end_time": end_time,
-                "file_size_mb": round(file_size, 2)
-            }
-
-            return ToolResult(
-                title=f"视频片段截取成功: {start_time} - {end_time}",
-                output=json.dumps(output_data, ensure_ascii=False, indent=2),
-                long_term_memory=f"Extracted video clip from {video_id}: {start_time} to {end_time}"
-            )
-        else:
-            return ToolResult(
-                title="视频截取失败",
-                output="",
-                error=f"ffmpeg 执行失败: {result.stderr}"
-            )
-
-    except subprocess.TimeoutExpired:
-        return ToolResult(
-            title="视频截取超时",
-            output="",
-            error="视频截取超时(60秒)"
-        )
-    except Exception as e:
-        return ToolResult(
-            title="视频截取异常",
-            output="",
-            error=str(e)
-        )

+ 0 - 420
agent/tools/builtin/search.py

@@ -1,420 +0,0 @@
-"""
-搜索工具模块
-
-提供帖子搜索、帖子详情查看和建议词搜索功能,支持多个渠道平台。
-
-主要功能:
-1. search_posts - 帖子搜索(浏览模式:封面图+标题+内容截断)
-2. select_post - 帖子详情(从搜索结果中选取单个帖子的完整内容)
-3. get_search_suggestions - 获取平台的搜索补全建议词
-"""
-
-import json
-from enum import Enum
-from typing import Any, Dict, List, Optional
-
-import httpx
-
-from agent.tools import tool, ToolResult
-from agent.tools.utils.image import build_image_grid, encode_base64, load_images
-
-
-# API 基础配置
-BASE_URL = "http://aigc-channel.aiddit.com/aigc/channel"
-DEFAULT_TIMEOUT = 60.0
-
-# 搜索结果缓存,以序号为 key
-_search_cache: Dict[int, Dict[str, Any]] = {}
-
-
-async def _build_collage(posts: List[Dict[str, Any]]) -> Optional[str]:
-    """
-    将帖子封面图+序号+标题拼接成网格图,返回 base64 编码的 PNG。
-    复用 agent.tools.utils.image 中的共享拼图逻辑。
-    """
-    if not posts:
-        return None
-
-    # 收集有封面图的帖子
-    urls: List[str] = []
-    titles: List[str] = []
-    for post in posts:
-        imgs = post.get("images", [])
-        cover_url = imgs[0] if imgs else None
-        if cover_url:
-            urls.append(cover_url)
-            titles.append(post.get("title", "") or "")
-
-    if not urls:
-        return None
-
-    # 并发加载图片
-    loaded = await load_images(urls)
-
-    # 过滤加载失败的(保持 url 和 title 对齐)
-    valid_images = []
-    valid_labels = []
-    for (_, img), title in zip(loaded, titles):
-        if img is not None:
-            valid_images.append(img)
-            valid_labels.append(title)
-
-    if not valid_images:
-        return None
-
-    grid = build_image_grid(images=valid_images, labels=valid_labels)
-    b64, _ = encode_base64(grid, format="PNG")
-    return b64
-
-
-class PostSearchChannel(str, Enum):
-    """
-    帖子搜索支持的渠道类型
-    """
-    XHS = "xhs"           # 小红书
-    GZH = "gzh"           # 公众号
-    SPH = "sph"           # 视频号
-    GITHUB = "github"     # GitHub
-    TOUTIAO = "toutiao"   # 头条
-    DOUYIN = "douyin"     # 抖音
-    BILI = "bili"         # B站
-    ZHIHU = "zhihu"       # 知乎
-    WEIBO = "weibo"       # 微博
-
-
-class SuggestSearchChannel(str, Enum):
-    """
-    建议词搜索支持的渠道类型
-    """
-    XHS = "xhs"           # 小红书
-    WX = "wx"             # 微信
-    GITHUB = "github"     # GitHub
-    TOUTIAO = "toutiao"   # 头条
-    DOUYIN = "douyin"     # 抖音
-    BILI = "bili"         # B站
-    ZHIHU = "zhihu"       # 知乎
-
-
-@tool(
-    display={
-        "zh": {
-            "name": "帖子搜索",
-            "params": {
-                "keyword": "搜索关键词",
-                "channel": "搜索渠道(xhs=小红书, gzh=公众号, sph=视频号, github, toutiao=头条, douyin=抖音, bili=B站, zhihu=知乎, weibo=微博)",
-                "cursor": "分页游标",
-                "max_count": "返回条数",
-                "content_type": "内容类型-视频/图文",
-                "sort_type": "排序方式(xhs专用)",
-                "publish_time": "发布时间筛选(xhs专用)",
-                "filter_note_range": "笔记时长筛选(xhs专用)"
-            }
-        },
-        "en": {
-            "name": "Search Posts",
-            "params": {
-                "keyword": "Search keyword",
-                "channel": "Search channel (xhs=XiaoHongShu, gzh=WeChat Official Account, sph=WeChat Channels, github, toutiao, douyin, bili, zhihu, weibo)",
-                "cursor": "Pagination cursor",
-                "max_count": "Max results",
-                "content_type": "content type-视频/图文",
-                "sort_type": "Sort type (xhs only)",
-                "publish_time": "Publish time filter (xhs only)",
-                "filter_note_range": "Note duration filter (xhs only)"
-            }
-        }
-    }
-)
-async def search_posts(
-    keyword: str,
-    channel: str = "xhs",
-    cursor: str = "",
-    max_count: int = 20,
-    content_type: str = "",
-    sort_type: str = "综合排序",
-    publish_time: str = "不限",
-    filter_note_range: str = "不限"
-) -> ToolResult:
-    """
-    帖子搜索(浏览模式)
-
-    根据关键词在指定渠道平台搜索帖子,返回封面图+标题+内容摘要,用于快速浏览。
-    如需查看某个帖子的完整内容,请使用 select_post 工具。
-
-    Args:
-        keyword: 搜索关键词
-        channel: 搜索渠道,支持的渠道有:
-            - xhs: 小红书
-            - gzh: 公众号
-            - sph: 视频号
-            - github: GitHub
-            - toutiao: 头条
-            - douyin: 抖音
-            - bili: B站
-            - zhihu: 知乎
-            - weibo: 微博
-        cursor: 分页游标,首次请求为空字符串,后续使用上次返回的 cursor
-        max_count: 返回的最大条数,默认为 20
-        content_type: 内容类型筛选,默认不限;
-            xhs 可选值:'不限' | '图文' | '视频' | '文章';
-            其他渠道可选值:'视频' | '图文'
-        sort_type: 排序方式(仅 xhs 有效),可选值:'综合排序' | '最新发布' | '最多点赞',默认'综合排序'
-        publish_time: 发布时间筛选(仅 xhs 有效),可选值:'不限' | '近30天' | '近7天' | '近1天',默认'不限'
-        filter_note_range: 笔记时长筛选,视频内容有效(仅 xhs 有效),可选值:'不限' | '1分钟以内' | '1-5分钟' | '5分钟以上',默认'不限'
-
-    Returns:
-        ToolResult 包含搜索结果摘要列表(封面图+标题+内容截断),
-        可通过 channel_content_id 调用 select_post 查看完整内容。
-    """
-    global _search_cache
-    try:
-        channel_value = channel.value if isinstance(channel, PostSearchChannel) else channel
-
-        url = f"{BASE_URL}/data"
-        if channel_value == "xhs":
-            payload = {
-                "type": channel_value,
-                "keyword": keyword,
-                "cursor": cursor,
-                "content_type": content_type if content_type else "不限",
-                "sort_type": sort_type,
-                "publish_time": publish_time,
-                "filter_note_range": filter_note_range,
-            }
-        else:
-            payload = {
-                "type": channel_value,
-                "keyword": keyword,
-                "cursor": cursor if cursor else "0",
-                "max_count": max_count,
-                "content_type": content_type,
-            }
-
-        async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as client:
-            response = await client.post(
-                url,
-                json=payload,
-                headers={"Content-Type": "application/json"},
-            )
-            response.raise_for_status()
-            data = response.json()
-
-        posts = data.get("data", [])
-
-        # 缓存完整结果(以序号为 key)
-        _search_cache.clear()
-        for idx, post in enumerate(posts):
-            _search_cache[idx + 1] = post
-
-        # 构建摘要列表(带序号)
-        summary_list = []
-        for idx, post in enumerate(posts):
-            body = post.get("body_text", "") or ""
-            title = post.get("title") or body[:20] or ""
-            summary_list.append({
-                "index": idx + 1,
-                "channel_content_id": post.get("channel_content_id"),
-                "title": title,
-                "body_text": body[:100] + ("..." if len(body) > 100 else ""),
-                "like_count": post.get("like_count"),
-                "collect_count": post.get("collect_count"),
-                "comment_count": post.get("comment_count"),
-                "channel": post.get("channel"),
-                "link": post.get("link"),
-                "content_type": post.get("content_type"),
-                "publish_timestamp": post.get("publish_timestamp"),
-            })
-
-        # 拼接封面图网格
-        images = []
-        try:
-            collage_b64 = await _build_collage(posts)
-            if collage_b64:
-                images.append({
-                    "type": "base64",
-                    "media_type": "image/png",
-                    "data": collage_b64
-                })
-        except Exception as collage_error:
-            # 图片拼接失败不影响主流程,记录错误但继续返回结果
-            import logging
-            logging.warning(f"Failed to build collage for {channel_value}: {collage_error}")
-
-        output_data = {
-            "code": data.get("code"),
-            "message": data.get("message"),
-            "data": summary_list
-        }
-
-        return ToolResult(
-            title=f"搜索结果: {keyword} ({channel_value})",
-            output=json.dumps(output_data, ensure_ascii=False, indent=2),
-            long_term_memory=f"Searched '{keyword}' on {channel_value}, found {len(posts)} posts. Use select_post(index) to view full details of a specific post.",
-            images=images
-        )
-    except httpx.HTTPStatusError as e:
-        return ToolResult(
-            title="搜索失败",
-            output="",
-            error=f"HTTP error {e.response.status_code}: {e.response.text}"
-        )
-    except Exception as e:
-        return ToolResult(
-            title="搜索失败",
-            output="",
-            error=str(e)
-        )
-
-
-@tool(
-    display={
-        "zh": {
-            "name": "帖子详情",
-            "params": {
-                "index": "帖子序号"
-            }
-        },
-        "en": {
-            "name": "Select Post",
-            "params": {
-                "index": "Post index"
-            }
-        }
-    }
-)
-async def select_post(
-    index: int,
-) -> ToolResult:
-    """
-    查看帖子详情
-
-    从最近一次 search_posts 的搜索结果中,根据序号选取指定帖子并返回完整内容(全部正文、全部图片、视频等)。
-    需要先调用 search_posts 进行搜索。
-
-    Args:
-        index: 帖子序号,来自 search_posts 返回结果中的 index 字段(从 1 开始)
-
-    Returns:
-        ToolResult 包含该帖子的完整信息和所有图片。
-    """
-    post = _search_cache.get(index)
-    if not post:
-        return ToolResult(
-            title="未找到帖子",
-            output="",
-            error=f"未找到序号 {index} 的帖子,请先调用 search_posts 搜索。"
-        )
-
-    # 返回所有图片
-    images = []
-    for img_url in post.get("images", []):
-        if img_url:
-            images.append({
-                "type": "url",
-                "url": img_url
-            })
-
-    return ToolResult(
-        title=f"帖子详情 #{index}: {post.get('title', '')}",
-        output=json.dumps(post, ensure_ascii=False, indent=2),
-        long_term_memory=f"Viewed post detail #{index}: {post.get('title', '')}",
-        images=images
-    )
-
-
-@tool(
-    display={
-        "zh": {
-            "name": "获取搜索关键词补全建议",
-            "params": {
-                "keyword": "搜索关键词",
-                "channel": "搜索渠道"
-            }
-        },
-        "en": {
-            "name": "Get Search Suggestions",
-            "params": {
-                "keyword": "Search keyword",
-                "channel": "Search channel"
-            }
-        }
-    }
-)
-async def get_search_suggestions(
-    keyword: str,
-    channel: str = "xhs",
-) -> ToolResult:
-    """
-    获取搜索关键词补全建议
-
-    根据关键词在指定渠道平台获取搜索建议词。
-
-    Args:
-        keyword: 搜索关键词
-        channel: 搜索渠道,支持的渠道有:
-            - xhs: 小红书
-            - wx: 微信
-            - github: GitHub
-            - toutiao: 头条
-            - douyin: 抖音
-            - bili: B站
-            - zhihu: 知乎
-
-    Returns:
-        ToolResult 包含建议词数据:
-        {
-            "code": 0,                    # 状态码,0 表示成功
-            "message": "success",         # 状态消息
-            "data": [                     # 建议词数据
-                {
-                    "type": "xhs",        # 渠道类型
-                    "list": [             # 建议词列表
-                        {
-                            "name": "彩虹染发"  # 建议词
-                        }
-                    ]
-                }
-            ]
-        }
-    """
-    try:
-        # 处理 channel 参数,支持枚举和字符串
-        channel_value = channel.value if isinstance(channel, SuggestSearchChannel) else channel
-
-        url = f"{BASE_URL}/suggest"
-        payload = {
-            "type": channel_value,
-            "keyword": keyword,
-        }
-
-        async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as client:
-            response = await client.post(
-                url,
-                json=payload,
-                headers={"Content-Type": "application/json"},
-            )
-            response.raise_for_status()
-            data = response.json()
-
-        # 计算建议词数量
-        suggestion_count = 0
-        for item in data.get("data", []):
-            suggestion_count += len(item.get("list", []))
-
-        return ToolResult(
-            title=f"建议词: {keyword} ({channel_value})",
-            output=json.dumps(data, ensure_ascii=False, indent=2),
-            long_term_memory=f"Got {suggestion_count} suggestions for '{keyword}' on {channel_value}"
-        )
-    except httpx.HTTPStatusError as e:
-        return ToolResult(
-            title="获取建议词失败",
-            output="",
-            error=f"HTTP error {e.response.status_code}: {e.response.text}"
-        )
-    except Exception as e:
-        return ToolResult(
-            title="获取建议词失败",
-            output="",
-            error=str(e)
-        )

+ 2 - 2
agent/tools/utils/image.py

@@ -1,8 +1,8 @@
 """
 图片处理共享工具
 
-提供批量读图、降采样、网格拼图等通用逻辑。供 read_images、search_posts、
-youtube_search 工具共享,避免代码重复。
+提供批量读图、降采样、网格拼图等通用逻辑。供 read_images、content 工具族
+等共享,避免代码重复。
 
 核心函数:
 - load_image: 从本地路径或 URL 加载为 PIL Image

+ 2 - 2
examples/mini_restore/workflow_loop.py

@@ -20,7 +20,7 @@ except ValueError as e:
     print(f"初始化 Gemini 失败: {e},请检查 .env。")
     sys.exit(1)
 
-from agent.tools.builtin.search import search_posts
+from agent.tools.builtin.content import content_search
 
 # -----------------
 # Utility Functions
@@ -76,7 +76,7 @@ async def call_banana_tool(prompt: str, aspect_ratio: str = None, reference_imag
 async def search_tool(keyword: str) -> str:
     print(f"\n[Tool] 🔍 启动小红书调研, 关键词: {keyword}")
     try:
-        result = await search_posts(keyword=keyword, channel="xhs", max_count=3)
+        result = await content_search(platform="xhs", keyword=keyword, max_count=3)
         return result.output
     except Exception as e:
         return f"查询失败: {e}"