Ver Fonte

视频需求初始化

xueyiming há 11 horas atrás
pai
commit
866ae16124

+ 0 - 1
agent/core/runner.py

@@ -1129,7 +1129,6 @@ class AgentRunner:
 
                 if not has_context_call:
                     # 手动添加 get_current_context 工具调用
-                    import uuid
                     context_call_id = f"call_context_{uuid.uuid4().hex[:8]}"
                     tool_calls.append({
                         "id": context_call_id,

+ 14 - 0
examples/content_needs_generation/PRD/output.md

@@ -0,0 +1,14 @@
+输出可选的热门内容和原因,输出结构如下,输出结果到文件中
+[
+    {
+        "title": "可选热门标题1",
+        "source": "选题的来源1",
+        "reason": "原因1(理由)"
+    },
+    {
+        "title": "可选热门标题2",
+        "source": "选题的来源2",
+        "reason": "原因2(理由)"
+    },
+    ...
+]

+ 54 - 0
examples/content_needs_generation/PRD/process.md

@@ -0,0 +1,54 @@
+## 角色定位
+
+你是一名为老年人服务的内容专家,专注于为50岁及以上老年群体发现和推荐他们最感兴趣的热门话题。
+
+## 核心任务
+
+通过使用现有工具和技能,系统性地寻找、筛选和分析老年人最可能喜欢的热门话题,并输出推荐结果。
+
+## 工作流程
+
+### 1. 理解老年人群体特点
+- 关注健康养生、家庭亲情、生活智慧、历史文化等老年人普遍关心的领域
+- 考虑老年人的阅读习惯、信息接收方式和兴趣偏好
+- 优先选择正面、实用、易懂的内容主题
+
+### 2. 使用工具获取热门话题
+- 使用 `hot_rank_search` 工具获取当前热门话题数据
+- 根据"最热"或"最新"排序方式获取热榜内容
+- 可以多次调用工具获取更全面的数据
+- **注意**:热门榜单搜索是必需步骤,但不能单独作为依据,必须与日期搜索结果结合后再做最终推荐判断
+
+### 3. 基于日期搜索话题
+- **日期范围**:获取当前日期,并计算前后7天的日期范围(前7天至后7天,共15天)
+- **搜索策略**:
+  - 使用当前日期搜索当天的热门话题和内容
+  - 使用前7天的日期逐一搜索历史热门话题,了解近期趋势和持续性话题
+  - 使用后7天的日期搜索即将到来的话题(如节日、纪念日、重要事件等)
+  - 通过多日期搜索,获取更全面的话题覆盖范围,避免遗漏重要内容
+- **执行方式**:
+  - 系统性地遍历前后7天的日期范围
+  - 对重要的日期调用 `browser` 工具进行搜索
+  - 记录和整理不同日期搜索到的热门话题
+
+### 4. 筛选和评估话题
+- 分析每个热门话题与老年人兴趣的匹配度
+- 评估话题的正面性、实用性和可理解性
+- 优先选择符合老年人价值观和需求的话题
+- 在做出最终推荐前,必须将“热门榜单搜索结果”和“日期范围搜索结果”进行对比和融合,避免只依赖其中任意一种来源
+
+### 5. 输出推荐结果
+- 按照指定的JSON格式输出推荐的热门话题
+- 为每个话题提供清晰的推荐理由
+- 理由应说明该话题为什么适合老年人群体
+
+## 执行要求
+
+- 在调用工具前,先说明调用原因和参数生成逻辑
+- **组合搜索要求**:
+  - 必须同时完成基于热榜的搜索(如使用 `hot_rank_search`)和基于日期范围的搜索(如结合 `browser` 或其他工具)
+  - 不允许只完成其中一种搜索就直接输出推荐结果
+  - 在思考和输出中,需要明确说明:两类搜索分别得到的关键信息,以及它们是如何被综合之后才形成最终推荐结论的
+- 展示思考过程,包括如何筛选和评估话题
+- 确保输出的每个话题都有充分的推荐理由
+- 基于实际数据进行分析,不自行联想或添加不存在的信息

+ 114 - 0
examples/content_needs_generation/PRD/system.md

@@ -0,0 +1,114 @@
+## 核心执行原则
+
+### 1. 执行步骤必须输出结果
+
+- **每个执行步骤都必须产生明确的输出结果**,不能只执行不输出
+- 输出结果可以是:
+  - 数据文件(JSON、Markdown等)
+  - 分析报告
+  - 中间结论
+  - 结构化信息
+- 使用 `goal` 工具管理执行计划时,每个目标的完成(`goal(done=...)`)必须包含具体的输出结果描述
+- 禁止"执行了但没输出"的情况
+
+### 2. 必须输出思考过程
+
+- **在执行任何任务时,必须明确展示你的思考过程**
+- 思考过程应包含:
+  - 当前任务的目标和上下文理解
+  - 可选的执行方案及其优缺点分析
+  - 选择某个方案的理由
+  - 执行过程中的关键判断点
+  - 遇到的困难和解决方案
+- 思考过程可以通过以下方式展示:
+  - 在文本回复中明确说明
+  - 使用结构化格式(如 Markdown 列表、分段说明)
+  - 在关键决策点进行说明
+- **禁止"直接执行不解释"的行为**
+
+### 3. 禁止自行联想
+
+- **严格基于提供的数据和事实进行工作**,不能添加数据中不存在的信息
+- 如果数据不完整,应该:
+  - 明确说明缺失的信息
+  - 使用 skills 查找定义(见第5条)
+  - 向用户说明需要补充的信息
+- 禁止基于"常识"或"推测"添加数据中没有的内容
+- 如果需要对数据进行推理,必须:
+  - 明确说明推理依据(来自哪个数据源)
+  - 说明推理逻辑
+  - 标注哪些是原始数据,哪些是推理结果
+
+### 4. 保证数据完整性
+
+- **不能自行压缩、简化或省略数据**
+- 处理数据时:
+  - 必须保留所有原始信息
+  - 可以选择需要使用的数据,但必须明确说明选择标准
+  - 如果数据量大,可以分步骤处理,但每步都要输出完整结果
+  - 最终输出必须包含所有必要信息
+- 数据完整性要求:
+  - JSON 文件:保留所有字段和结构
+  - 分类树:保留完整的层次结构
+  - 元素列表:保留所有元素及其说明
+  - 分类结果:保留分类树、未分类元素、覆盖率等完整信息
+- 如果必须处理大量数据,应该:
+  - 分阶段处理并输出每阶段结果
+  - 使用文件保存中间结果
+  - 在最终输出中整合所有阶段的结果
+
+### 5. 及时使用 skills 查找定义
+
+- **遇到定义问题或概念不清时,必须立即使用 skills 查找定义**
+- 可用的 skills:
+  - `define`: 关键名词定义
+- 使用 skills 的时机:
+  - 遇到不熟悉的概念或术语
+  - 需要了解某个流程的具体要求
+  - 需要确认某个操作的标准格式
+  - 需要查找相关工具的使用方法
+- 使用 skills 后,必须:
+  - 明确说明从 skill 中获取的信息
+  - 基于 skill 的定义执行后续操作
+  - 如果 skill 中的定义与当前任务有冲突,需要说明并寻求解决方案
+
+### 6. 拆分原子步骤后再生成核心定义
+
+- **当需要完成一个核心定义时,如果该定义可以被拆分,必须先完成所有拆分的原子步骤,最后再生成核心定义内容**
+- 拆分原则:
+  - 将复杂定义拆分为多个独立的、可执行的原子步骤
+  - 每个原子步骤应该:
+    - 有明确的输入和输出
+    - 可以独立验证
+    - 不依赖其他步骤的中间结果(除非明确需要)
+- 执行流程:
+  1. **分析定义的可拆分性**:判断核心定义是否可以拆分为多个原子步骤
+  2. **列出所有原子步骤**:明确每个步骤的目标、输入、输出
+  3. **按顺序执行原子步骤**:每个步骤都要输出结果(见原则1)
+  4. **整合原子步骤结果**:将所有原子步骤的输出整合
+  5. **生成核心定义**:基于所有原子步骤的结果,生成最终的核心定义内容
+- 示例:生成人设画像时
+  - 原子步骤1:提取实质维度信息 → 输出实质维度分析结果
+  - 原子步骤2:提取形式维度信息 → 输出形式维度分析结果
+  - 原子步骤3:提取意图维度信息 → 输出意图维度分析结果
+  - 整合步骤:整合三个维度的分析结果
+  - 核心定义:生成完整的人设画像
+
+### 7. 调用工具前必须先说明调用原因
+
+- **在调用任何工具前,必须先输出为什么要调用该工具**
+- 说明内容至少包括:
+  - 当前步骤的目标是什么
+  - 该工具能解决什么问题
+  - 为什么不选择其他工具或直接输出
+- 禁止“直接调用工具不解释原因”的行为
+
+### 8. 调用工具前必须说明参数生成逻辑
+
+- **在调用任何工具前,必须详细说明本次调用参数是如何生成的**
+- 参数说明至少包括:
+  - 每个参数的来源(来自用户输入、上下文数据、上一步输出或规则约束)
+  - 参数取值的选择依据
+  - 如存在可选值,需说明为何选择当前值而非其他值
+  - 参数与当前目标之间的对应关系
+- 禁止“只给参数不解释参数来源和推导过程”的行为

+ 50 - 0
examples/content_needs_generation/config.py

@@ -0,0 +1,50 @@
+"""
+项目配置
+
+定义项目的运行配置。
+"""
+
+from agent.core.runner import KnowledgeConfig, RunConfig
+
+
+# ===== Agent 运行配置 =====
+
+RUN_CONFIG = RunConfig(
+    # 模型配置
+    model="claude-sonnet-4.5",
+    temperature=0.3,
+    max_iterations=1000,
+
+    # 任务名称
+    name="Content Needs Generation Agent",
+
+    # 知识管理配置
+    knowledge=KnowledgeConfig(
+        # 压缩时提取(消息量超阈值触发压缩时,用完整 history 反思)
+        enable_extraction=True,
+        reflect_prompt="",  # 自定义反思 prompt;空则使用默认,见 agent/core/prompts/knowledge.py:REFLECT_PROMPT
+
+        # agent运行完成后提取(不代表任务完成,agent 可能中途退出等待人工评估)
+        enable_completion_extraction=True,
+        completion_reflect_prompt="",  # 自定义复盘 prompt;空则使用默认,见 agent/core/prompts/knowledge.py:COMPLETION_REFLECT_PROMPT
+
+        # 知识注入(agent切换当前工作的goal时,自动注入相关知识)
+        enable_injection=True,
+
+        # 默认字段(保存/搜索时自动注入)
+        owner="",  # 所有者(空则尝试从 git config user.email 获取,再空则用 agent:{agent_id})
+        default_tags={"project": "content_needs", "domain": "content_needs"},  # 默认 tags(会与工具调用参数合并)
+        default_scopes=["org:cybertogether"],  # 默认 scopes
+        default_search_types=["strategy", "tool"],  # 默认搜索类型过滤
+        default_search_owner=""  # 默认搜索 owner 过滤(空则不过滤)
+    )
+)
+
+
+# ===== 基础设施配置 =====
+
+SKILLS_DIR = "./skills"
+TRACE_STORE_PATH = ".trace"
+DEBUG = True
+LOG_LEVEL = "INFO"
+LOG_FILE = None  # 设置为文件路径可以同时输出到文件

+ 19 - 0
examples/content_needs_generation/content_needs_generation.prompt

@@ -0,0 +1,19 @@
+---
+model: anthropic/claude-sonnet-4.5
+temperature: 0.5
+---
+
+$system$
+
+系统要求:
+作为一个专业的内容创作的智能体,你需要在在执行任务的时候,遵循以下约定
+{system}
+
+下面是你本次执行过程要解决的核心问题
+{process}
+
+
+$user$
+
+输出信息
+{output}

+ 304 - 0
examples/content_needs_generation/html.py

@@ -0,0 +1,304 @@
+"""
+将 messages 转为可视化 HTML 结构展示
+
+功能:
+- 每条 message 有清晰的类型标识(系统 / 用户 / 助手 / 工具)
+- 工具类型标注工具名称和工具输出
+- 内容过长时支持展开/收起
+"""
+
+import json
+from pathlib import Path
+from typing import Any, List, Union
+
+# 展开阈值:超过此字符数则默认折叠
+COLLAPSE_THRESHOLD = 300
+
+
+def _ensure_messages(messages: List[Any]) -> List[dict]:
+    """将 Message 对象或 dict 统一转为 dict 列表"""
+    result = []
+    for m in messages:
+        if hasattr(m, "to_dict"):
+            result.append(m.to_dict())
+        elif isinstance(m, dict):
+            result.append(m)
+        else:
+            result.append({"role": "unknown", "content": str(m)})
+    return result
+
+
+def _get_message_type_info(msg: dict) -> tuple[str, str, str]:
+    """
+    根据消息内容返回 (类型标签, 简短说明, 样式类)
+    """
+    role = msg.get("role", "unknown")
+    content = msg.get("content")
+    desc = msg.get("description", "")
+
+    if role == "system":
+        return "系统", "系统指令", "msg-system"
+    if role == "user":
+        return "用户", "用户输入", "msg-user"
+    if role == "assistant":
+        if isinstance(content, dict):
+            text = content.get("text", "")
+            tool_calls = content.get("tool_calls")
+            if tool_calls:
+                names = [
+                    tc.get("function", {}).get("name", "?")
+                    for tc in (tool_calls if isinstance(tool_calls, list) else [])
+                ]
+                label = f"工具调用: {', '.join(names)}" if names else "工具调用"
+                return "助手", label, "msg-assistant-tool"
+            if text:
+                return "助手", "文本回复", "msg-assistant"
+        return "助手", desc or "助手消息", "msg-assistant"
+    if role == "tool":
+        tool_name = "unknown"
+        if isinstance(content, dict):
+            tool_name = content.get("tool_name", content.get("name", "unknown"))
+        return "工具", tool_name, "msg-tool"
+
+    return "未知", str(role), "msg-unknown"
+
+
+def _extract_display_content(msg: dict) -> str:
+    """提取用于展示的文本内容"""
+    role = msg.get("role", "unknown")
+    content = msg.get("content")
+
+    if role == "system" or role == "user":
+        return str(content) if content else ""
+
+    if role == "assistant" and isinstance(content, dict):
+        return content.get("text", "") or ""
+
+    if role == "tool" and isinstance(content, dict):
+        result = content.get("result", content)
+        if isinstance(result, list):
+            return json.dumps(result, ensure_ascii=False, indent=2)
+        return str(result) if result else ""
+
+    return str(content) if content else ""
+
+
+def _extract_tool_info(msg: dict) -> tuple[str, str]:
+    """提取 tool 消息的工具名和输出"""
+    content = msg.get("content")
+    if not isinstance(content, dict):
+        return "unknown", str(content or "")
+    tool_name = content.get("tool_name", content.get("name", msg.get("description", "unknown")))
+    result = content.get("result", content.get("output", content))
+    if isinstance(result, dict) or isinstance(result, list):
+        output = json.dumps(result, ensure_ascii=False, indent=2)
+    else:
+        output = str(result) if result is not None else ""
+    return tool_name, output
+
+
+def _render_collapsible(content: str, block_id: str = "") -> str:
+    """生成可展开/收起的 HTML 片段"""
+    content = content.strip()
+    if not content:
+        return '<pre class="content-body"></pre>'
+
+    escaped = content.replace("&", "&amp;").replace("<", "&lt;").replace(">", "&gt;")
+    should_collapse = len(content) > COLLAPSE_THRESHOLD
+    safe_id = "".join(c if c.isalnum() or c in "-_" else "-" for c in block_id) or "x"
+
+    if should_collapse:
+        preview = escaped[:COLLAPSE_THRESHOLD] + "…"
+        full = escaped
+        return f'''<div class="collapsible-wrap">
+            <pre class="content-body content-preview" id="preview-{safe_id}">{preview}</pre>
+            <pre class="content-body content-full" id="full-{safe_id}" style="display:none">{full}</pre>
+            <button class="btn-toggle" onclick="toggleExpand('{safe_id}')">展开全部</button>
+        </div>'''
+    return f'<pre class="content-body">{escaped}</pre>'
+
+
+def _render_message(msg: dict, index: int) -> str:
+    """渲染单条消息为 HTML"""
+    type_label, short_desc, css_class = _get_message_type_info(msg)
+    seq = msg.get("sequence", index)
+    role = msg.get("role", "unknown")
+    bid = f"m{index}"
+
+    # 头部:类型 + 简短说明
+    header = f'<div class="msg-header"><span class="msg-type {css_class}">{type_label}</span> <span class="msg-desc">{short_desc}</span></div>'
+
+    body_parts = []
+
+    if role == "assistant":
+        content = msg.get("content")
+        if isinstance(content, dict):
+            tool_calls = content.get("tool_calls")
+            text = content.get("text", "")
+            if tool_calls:
+                for tc in tool_calls:
+                    fn = tc.get("function", {})
+                    name = fn.get("name", "?")
+                    args_str = fn.get("arguments", "{}")
+                    try:
+                        args_json = json.loads(args_str)
+                        args_preview = json.dumps(args_json, ensure_ascii=False)[:200]
+                        if len(json.dumps(args_json)) > 200:
+                            args_preview += "…"
+                    except Exception:
+                        args_preview = args_str[:200] + ("…" if len(args_str) > 200 else "")
+                    body_parts.append(
+                        f'<div class="tool-call-item"><span class="tool-name">🛠 {name}</span><pre class="tool-args">{args_preview}</pre></div>'
+                    )
+            if text:
+                body_parts.append(_render_collapsible(text, f"{bid}-text"))
+
+    elif role == "tool":
+        tool_name, output = _extract_tool_info(msg)
+        body_parts.append(f'<div class="tool-output-header"><span class="tool-name">🛠 {tool_name}</span></div>')
+        body_parts.append(_render_collapsible(output, f"{bid}-tool"))
+
+    else:
+        content = _extract_display_content(msg)
+        body_parts.append(_render_collapsible(content, bid))
+
+    body = "\n".join(body_parts)
+    return f'<div class="msg-item" data-role="{role}" data-seq="{seq}">{header}<div class="msg-body">{body}</div></div>'
+
+
+def _build_html(messages: List[dict], title: str = "Messages") -> str:
+    """构建完整 HTML 文档"""
+    items_html = "\n".join(_render_message(m, i) for i, m in enumerate(messages))
+    return f"""<!DOCTYPE html>
+<html lang="zh-CN">
+<head>
+<meta charset="UTF-8">
+<meta name="viewport" content="width=device-width, initial-scale=1">
+<title>{title}</title>
+<style>
+* {{ box-sizing: border-box; }}
+body {{ font-family: ui-sans-serif, system-ui, -apple-system, sans-serif; margin: 0; padding: 20px; background: #f5f5f5; line-height: 1.5; }}
+h1 {{ font-size: 1.25rem; margin-bottom: 16px; color: #333; }}
+.msg-list {{ display: flex; flex-direction: column; gap: 12px; }}
+.msg-item {{ background: #fff; border-radius: 8px; padding: 12px 16px; box-shadow: 0 1px 3px rgba(0,0,0,.08); border-left: 4px solid #94a3b8; }}
+.msg-item[data-role="system"] {{ border-left-color: #64748b; }}
+.msg-item[data-role="user"] {{ border-left-color: #3b82f6; }}
+.msg-item[data-role="assistant"] {{ border-left-color: #22c55e; }}
+.msg-item[data-role="tool"] {{ border-left-color: #f59e0b; }}
+.msg-header {{ margin-bottom: 10px; display: flex; align-items: center; gap: 8px; flex-wrap: wrap; }}
+.msg-type {{ font-size: 0.75rem; font-weight: 600; padding: 2px 8px; border-radius: 4px; }}
+.msg-system {{ background: #e2e8f0; color: #475569; }}
+.msg-user {{ background: #dbeafe; color: #1d4ed8; }}
+.msg-assistant {{ background: #dcfce7; color: #15803d; }}
+.msg-assistant-tool {{ background: #fef3c7; color: #b45309; }}
+.msg-tool {{ background: #fed7aa; color: #c2410c; }}
+.msg-desc {{ font-size: 0.875rem; color: #64748b; }}
+.msg-body {{ font-size: 0.875rem; }}
+.content-body {{ margin: 0; white-space: pre-wrap; word-break: break-word; font-size: 0.8125rem; color: #334155; max-height: 400px; overflow-y: auto; }}
+.tool-call-item {{ margin-bottom: 8px; }}
+.tool-name {{ font-weight: 600; color: #0f172a; }}
+.tool-args {{ margin: 4px 0 0 0; padding: 8px; background: #f8fafc; border-radius: 4px; font-size: 0.75rem; overflow-x: auto; }}
+.tool-output-header {{ margin-bottom: 8px; }}
+.btn-toggle {{ margin-top: 8px; padding: 4px 12px; font-size: 0.75rem; cursor: pointer; background: #e2e8f0; border: 1px solid #cbd5e1; border-radius: 4px; color: #475569; }}
+.btn-toggle:hover {{ background: #cbd5e1; }}
+.collapsible-wrap {{ position: relative; }}
+</style>
+</head>
+<body>
+<h1>{title}</h1>
+<div class="msg-list">{items_html}</div>
+<script>
+function toggleExpand(idSuffix) {{
+  var preview = document.getElementById('preview-' + idSuffix);
+  var full = document.getElementById('full-' + idSuffix);
+  var btn = preview.parentElement.querySelector('.btn-toggle');
+  if (!preview || !full) return;
+  if (full.style.display === 'none') {{
+    preview.style.display = 'none';
+    full.style.display = 'block';
+    if (btn) btn.textContent = '收起';
+  }} else {{
+    preview.style.display = 'block';
+    full.style.display = 'none';
+    if (btn) btn.textContent = '展开全部';
+  }}
+}}
+</script>
+</body>
+</html>"""
+
+
+def messages_to_html(
+    messages: List[Any],
+    output_path: Union[str, Path],
+    title: str = "Messages 可视化",
+) -> Path:
+    """
+    将 messages 转为 HTML 并写入文件
+
+    Args:
+        messages: Message 对象或 dict 列表
+        output_path: 输出 HTML 文件路径
+        title: 页面标题
+
+    Returns:
+        输出文件的 Path
+    """
+    data = _ensure_messages(messages)
+    html = _build_html(data, title)
+    out = Path(output_path)
+    out.parent.mkdir(parents=True, exist_ok=True)
+    out.write_text(html, encoding="utf-8")
+    return out
+
+
+async def trace_to_html(
+    trace_id: str,
+    output_path: Union[str, Path],
+    base_path: str = ".trace",
+    title: str | None = None,
+) -> Path:
+    """
+    从 Trace 加载 messages 并生成 HTML
+
+    Args:
+        trace_id: Trace ID
+        output_path: 输出 HTML 文件路径
+        base_path: Trace 存储根目录
+        title: 页面标题,默认使用 trace_id
+
+    Returns:
+        输出文件的 Path
+    """
+    from agent.trace import FileSystemTraceStore
+
+    store = FileSystemTraceStore(base_path=base_path)
+    messages = await store.get_trace_messages(trace_id)
+    if not messages:
+        raise FileNotFoundError(f"Trace {trace_id} 下没有找到 messages")
+    page_title = title or f"Trace {trace_id[:8]}... Messages"
+    return messages_to_html(messages, output_path, title=page_title)
+
+
+if __name__ == "__main__":
+    import asyncio
+    import sys
+    from pathlib import Path
+
+    # 添加项目根目录,使 agent 模块可被导入
+    _project_root = Path(__file__).resolve().parent.parent.parent
+    if str(_project_root) not in sys.path:
+        sys.path.insert(0, str(_project_root))
+
+    async def _main():
+        import argparse
+        parser = argparse.ArgumentParser(description="将 trace messages 转为 HTML")
+        parser.add_argument("--trace", required=True, help="Trace ID")
+        parser.add_argument("-o", "--output", default="messages.html", help="输出文件路径")
+        parser.add_argument("--base-path", default=".trace", help="Trace 存储根目录")
+        args = parser.parse_args()
+
+        out = await trace_to_html(args.trace, args.output, base_path=args.base_path)
+        print(f"已生成: {out.absolute()}")
+
+    asyncio.run(_main())

+ 12 - 0
examples/content_needs_generation/presets.json

@@ -0,0 +1,12 @@
+{
+  "default": {
+    "max_iterations": 300,
+    "temperature": 0.5,
+    "skills": [
+      "planning",
+      "hot_rank_search",
+      "browser"
+    ],
+    "description": "默认 Agent,拥有全部工具权限"
+  }
+}

+ 179 - 0
examples/content_needs_generation/process_messages.py

@@ -0,0 +1,179 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+处理消息文件,生成结构化的JSON数据
+"""
+
+import json
+import os
+from pathlib import Path
+from typing import Dict, List, Any, Optional
+from collections import defaultdict
+
+
+def load_all_messages(messages_dir: str) -> List[Dict[str, Any]]:
+    """加载所有JSON消息文件"""
+    messages = []
+    messages_path = Path(messages_dir)
+
+    # 只处理JSON文件
+    for json_file in sorted(messages_path.glob("*.json")):
+        try:
+            with open(json_file, 'r', encoding='utf-8') as f:
+                data = json.load(f)
+                messages.append(data)
+        except Exception as e:
+            print(f"警告: 无法读取文件 {json_file}: {e}")
+
+    # 按sequence排序
+    messages.sort(key=lambda x: x.get('sequence', 0))
+    return messages
+
+
+def extract_tool_calls(content: Any) -> List[Dict[str, Any]]:
+    """从content中提取tool_calls"""
+    if isinstance(content, dict):
+        return content.get('tool_calls', [])
+    return []
+
+
+def find_tool_result(tool_call_id: str, messages: List[Dict[str, Any]]) -> Optional[Dict[str, Any]]:
+    """根据tool_call_id查找对应的tool结果消息"""
+    for msg in messages:
+        if msg.get('role') == 'tool' and msg.get('tool_call_id') == tool_call_id:
+            return msg
+    return None
+
+
+def format_message(msg: Dict[str, Any], messages: List[Dict[str, Any]]) -> Dict[str, Any]:
+    """格式化单个消息为结构化数据"""
+    result = {
+        "sequence": msg.get('sequence'),
+        "role": msg.get('role'),
+        "message_id": msg.get('message_id'),
+        "parent_sequence": msg.get('parent_sequence'),
+        "status": msg.get('status'),
+        "goal_id": msg.get('goal_id'),
+        "created_at": msg.get('created_at'),
+    }
+
+    # 处理content
+    content = msg.get('content')
+    if isinstance(content, str):
+        result["content"] = content
+        result["text"] = content
+    elif isinstance(content, dict):
+        result["text"] = content.get('text', '')
+        result["content"] = content
+
+    # 处理description
+    if msg.get('description'):
+        result["description"] = msg.get('description')
+
+    # 处理tokens信息
+    if msg.get('tokens') is not None:
+        result["tokens"] = msg.get('tokens')
+    if msg.get('prompt_tokens') is not None:
+        result["prompt_tokens"] = msg.get('prompt_tokens')
+    if msg.get('completion_tokens') is not None:
+        result["completion_tokens"] = msg.get('completion_tokens')
+    if msg.get('cost') is not None:
+        result["cost"] = msg.get('cost')
+
+    # 如果是assistant消息且有tool_calls,添加children
+    if msg.get('role') == 'assistant':
+        tool_calls = extract_tool_calls(content)
+        if tool_calls:
+            result["children"] = []
+            for tool_call in tool_calls:
+                tool_call_id = tool_call.get('id')
+                tool_name = tool_call.get('function', {}).get('name', 'unknown')
+                tool_args = tool_call.get('function', {}).get('arguments', '{}')
+
+                # 尝试解析arguments
+                try:
+                    tool_args_parsed = json.loads(tool_args)
+                except:
+                    tool_args_parsed = tool_args
+
+                tool_node = {
+                    "type": "tool_call",
+                    "tool_call_id": tool_call_id,
+                    "tool_name": tool_name,
+                    "arguments": tool_args_parsed,
+                    "raw_arguments": tool_args,
+                }
+
+                # 查找对应的tool结果
+                tool_result = find_tool_result(tool_call_id, messages)
+                if tool_result:
+                    tool_node["result"] = {
+                        "sequence": tool_result.get('sequence'),
+                        "tool_name": tool_result.get('content', {}).get('tool_name') if isinstance(
+                            tool_result.get('content'), dict) else None,
+                        "result": tool_result.get('content', {}).get('result') if isinstance(tool_result.get('content'),
+                                                                                             dict) else tool_result.get(
+                            'content'),
+                        "status": tool_result.get('status'),
+                        "created_at": tool_result.get('created_at'),
+                    }
+
+                result["children"].append(tool_node)
+
+    # 如果是tool消息,添加工具相关信息
+    if msg.get('role') == 'tool':
+        result["tool_call_id"] = msg.get('tool_call_id')
+        if isinstance(content, dict):
+            result["tool_name"] = content.get('tool_name')
+            result["tool_result"] = content.get('result')
+
+    return result
+
+
+def process_messages(messages_dir: str, output_path: str):
+    """处理所有消息并生成结构化数据"""
+    messages_dir_path = Path(messages_dir).resolve()
+    output_file_path = Path(output_path).resolve()
+
+    if not messages_dir_path.exists():
+        raise ValueError(f"输入目录不存在: {messages_dir_path}")
+
+    if not messages_dir_path.is_dir():
+        raise ValueError(f"输入路径不是目录: {messages_dir_path}")
+
+    print(f"正在读取消息文件从: {messages_dir_path}")
+    messages = load_all_messages(str(messages_dir_path))
+    print(f"共读取 {len(messages)} 条消息")
+
+    # 格式化所有消息
+    structured_messages = []
+    for msg in messages:
+        formatted = format_message(msg, messages)
+        structured_messages.append(formatted)
+
+    # 确保输出目录存在
+    output_file_path.parent.mkdir(parents=True, exist_ok=True)
+
+    # 保存结果
+    with open(output_file_path, 'w', encoding='utf-8') as f:
+        json.dump(structured_messages, f, ensure_ascii=False, indent=2)
+
+    print(f"结构化数据已保存到: {output_file_path}")
+    print(f"共处理 {len(structured_messages)} 条消息")
+
+    # 统计信息
+    tool_calls_count = sum(1 for msg in structured_messages if msg.get('children'))
+    print(f"包含工具调用的消息数: {tool_calls_count}")
+
+    return structured_messages
+
+
+if __name__ == "__main__":
+    # 使用定义的变量
+    try:
+        input = ''
+        output = ''
+        process_messages(input, output)
+    except Exception as e:
+        print(f"错误: {e}")
+        exit(1)

+ 457 - 0
examples/content_needs_generation/run.py

@@ -0,0 +1,457 @@
+"""
+示例(流程对齐版)
+
+参考 examples/research/run.py:
+1. 使用框架 InteractiveController 统一交互流程
+2. 使用 config.py 管理运行参数
+3. 保留 create 场景特有的 prompt 注入与详细消息打印
+"""
+
+import argparse
+import asyncio
+import copy
+import json
+import os
+import sys
+from pathlib import Path
+from typing import Any
+
+# Clash Verge TUN 模式兼容:禁止 httpx/urllib 自动检测系统 HTTP 代理
+os.environ.setdefault("no_proxy", "*")
+
+# 添加项目根目录到 Python 路径
+sys.path.insert(0, str(Path(__file__).parent.parent.parent))
+
+from dotenv import load_dotenv
+
+load_dotenv()
+
+from agent.cli import InteractiveController
+from agent.core.presets import AgentPreset, register_preset
+from agent.core.runner import AgentRunner
+from agent.llm import create_openrouter_llm_call
+from agent.llm.prompts import SimplePrompt
+from agent.trace import FileSystemTraceStore, Message, Trace
+from agent.utils import setup_logging
+from examples.content_needs_generation.html import trace_to_html
+
+# 导入项目配置
+from config import DEBUG, LOG_FILE, LOG_LEVEL, RUN_CONFIG, SKILLS_DIR, TRACE_STORE_PATH
+
+# 导入自定义工具模块,触发 @tool 装饰器注册
+import examples.content_needs_generation.tool  # noqa: F401
+
+
+def _format_json(obj: Any, indent: int = 2) -> str:
+    """格式化 JSON 对象为字符串"""
+    try:
+        return json.dumps(obj, indent=indent, ensure_ascii=False)
+    except (TypeError, ValueError):
+        return str(obj)
+
+
+def _print_message_details(message: Message):
+    """完整打印消息的详细信息"""
+    print("\n" + "=" * 80)
+    print(f"[Message #{message.sequence}] {message.role.upper()}")
+    print("=" * 80)
+
+    if message.goal_id:
+        print(f"Goal ID: {message.goal_id}")
+    if message.parent_sequence is not None:
+        print(f"Parent Sequence: {message.parent_sequence}")
+    if message.tool_call_id:
+        print(f"Tool Call ID: {message.tool_call_id}")
+
+    if message.role == "user":
+        print("\n[输入内容]")
+        print("-" * 80)
+        if isinstance(message.content, str):
+            print(message.content)
+        else:
+            print(_format_json(message.content))
+    elif message.role == "assistant":
+        content = message.content
+        if isinstance(content, dict):
+            text = content.get("text", "")
+            tool_calls = content.get("tool_calls")
+
+            if text:
+                print("\n[LLM 文本回复]")
+                print("-" * 80)
+                print(text)
+
+            if tool_calls:
+                print(f"\n[工具调用] (共 {len(tool_calls)} 个)")
+                print("-" * 80)
+                for idx, tc in enumerate(tool_calls, 1):
+                    func = tc.get("function", {})
+                    tool_name = func.get("name", "unknown")
+                    tool_id = tc.get("id", "unknown")
+                    arguments = func.get("arguments", {})
+
+                    print(f"\n工具 #{idx}: {tool_name}")
+                    print(f"  Call ID: {tool_id}")
+                    print("  参数:")
+                    if isinstance(arguments, str):
+                        try:
+                            parsed_args = json.loads(arguments)
+                            print(_format_json(parsed_args, indent=4))
+                        except json.JSONDecodeError:
+                            print(f"    {arguments}")
+                    else:
+                        print(_format_json(arguments, indent=4))
+        elif isinstance(content, str):
+            print("\n[LLM 文本回复]")
+            print("-" * 80)
+            print(content)
+        else:
+            print("\n[内容]")
+            print("-" * 80)
+            print(_format_json(content))
+
+        if message.finish_reason:
+            print(f"\n完成原因: {message.finish_reason}")
+    elif message.role == "tool":
+        content = message.content
+        print("\n[工具执行结果]")
+        print("-" * 80)
+        if isinstance(content, dict):
+            tool_name = content.get("tool_name", "unknown")
+            result = content.get("result", content)
+            print(f"工具名称: {tool_name}")
+            print("\n返回结果:")
+            if isinstance(result, str):
+                print(result)
+            elif isinstance(result, list):
+                for idx, item in enumerate(result, 1):
+                    if isinstance(item, dict) and item.get("type") == "image_url":
+                        print(f"  [{idx}] 图片 (base64, 已省略显示)")
+                    else:
+                        print(f"  [{idx}] {item}")
+            else:
+                print(_format_json(result))
+        else:
+            print(str(content) if content is not None else "(无内容)")
+    elif message.role == "system":
+        print("\n[系统提示]")
+        print("-" * 80)
+        if isinstance(message.content, str):
+            print(message.content)
+        else:
+            print(_format_json(message.content))
+
+    if message.prompt_tokens is not None or message.completion_tokens is not None:
+        print("\n[Token 使用]")
+        print("-" * 80)
+        if message.prompt_tokens is not None:
+            print(f"  输入 Tokens: {message.prompt_tokens:,}")
+        if message.completion_tokens is not None:
+            print(f"  输出 Tokens: {message.completion_tokens:,}")
+        if message.reasoning_tokens is not None:
+            print(f"  推理 Tokens: {message.reasoning_tokens:,}")
+        if message.cache_creation_tokens is not None:
+            print(f"  缓存创建 Tokens: {message.cache_creation_tokens:,}")
+        if message.cache_read_tokens is not None:
+            print(f"  缓存读取 Tokens: {message.cache_read_tokens:,}")
+        if message.tokens:
+            print(f"  总计 Tokens: {message.tokens:,}")
+
+    if message.cost is not None:
+        print(f"\n[成本] ${message.cost:.6f}")
+
+    if message.duration_ms is not None:
+        print(f"[执行时间] {message.duration_ms}ms")
+
+    print("=" * 80 + "\n")
+
+
+def _apply_prompt_placeholders(base_dir: Path, prompt: SimplePrompt):
+    """把 PRD 文件内容注入 prompt 占位符。"""
+    system_md_path = base_dir / "PRD" / "system.md"
+    if system_md_path.exists():
+        system_content = system_md_path.read_text(encoding="utf-8")
+        if "system" in prompt._messages and "{system}" in prompt._messages["system"]:
+            prompt._messages["system"] = prompt._messages["system"].replace("{system}", system_content)
+
+    create_process_md_path = base_dir / "PRD" / "process.md"
+    if create_process_md_path.exists():
+        create_process_content = create_process_md_path.read_text(encoding="utf-8")
+        if "system" in prompt._messages and "{process}" in prompt._messages["system"]:
+            prompt._messages["system"] = prompt._messages["system"].replace("{process}", create_process_content)
+
+    output_md_path = base_dir / "PRD" / "output.md"
+    if output_md_path.exists():
+        output_content = output_md_path.read_text(encoding="utf-8")
+        if "user" in prompt._messages and "{output}" in prompt._messages["user"]:
+            prompt._messages["user"] = prompt._messages["user"].replace("{output}", output_content)
+
+
+async def main():
+    parser = argparse.ArgumentParser(description="任务 (Agent 模式 + 交互增强)")
+    parser.add_argument(
+        "--trace",
+        type=str,
+        default=None,
+        help="已有的 Trace ID,用于恢复继续执行(不指定则新建)",
+    )
+    args = parser.parse_args()
+
+    base_dir = Path(__file__).parent
+    prompt_path = base_dir / "content_needs_generation.prompt"
+    output_dir = base_dir / "output"
+    output_dir.mkdir(exist_ok=True)
+
+    setup_logging(level=LOG_LEVEL, file=LOG_FILE)
+
+    presets_path = base_dir / "presets.json"
+    if presets_path.exists():
+        with open(presets_path, "r", encoding="utf-8") as f:
+            project_presets = json.load(f)
+        for name, cfg in project_presets.items():
+            register_preset(name, AgentPreset(**cfg))
+
+    prompt = SimplePrompt(prompt_path)
+    _apply_prompt_placeholders(base_dir, prompt)
+
+    messages = prompt.build_messages()
+
+    model_from_prompt = prompt.config.get("model")
+    model_from_config = RUN_CONFIG.model
+    default_model = f"anthropic/{model_from_config}" if "/" not in model_from_config else model_from_config
+    model = model_from_prompt or default_model
+
+    skills_dir = str((base_dir / SKILLS_DIR).resolve()) if not Path(SKILLS_DIR).is_absolute() else SKILLS_DIR
+
+    # 验证 skills 目录是否存在
+    skills_path = Path(skills_dir)
+    if not skills_path.exists():
+        print(f"⚠️  警告: Skills 目录不存在: {skills_dir}")
+    else:
+        skill_files = list(skills_path.glob("*.md"))
+        print(f"✓ 找到 {len(skill_files)} 个 skill 文件: {[f.name for f in skill_files]}")
+
+    # 验证工具是否已注册
+    from agent.tools import get_tool_registry
+    tool_registry = get_tool_registry()
+    registered_tools = list(tool_registry._tools.keys())
+    custom_tools = [t for t in registered_tools if "hot_rank" in t.lower()]
+    if custom_tools:
+        print(f"✓ 已注册自定义工具: {custom_tools}")
+    else:
+        print(f"⚠️  警告: 未找到自定义工具 'hot_rank_search'")
+        print(f"   已注册的工具: {registered_tools[:10]}...")  # 只显示前10个
+
+    store = FileSystemTraceStore(base_path=TRACE_STORE_PATH)
+    runner = AgentRunner(
+        trace_store=store,
+        llm_call=create_openrouter_llm_call(model=model),
+        skills_dir=skills_dir,
+        debug=DEBUG,
+    )
+
+    interactive = InteractiveController(
+        runner=runner,
+        store=store,
+        enable_stdin_check=True,
+    )
+
+    task_name = RUN_CONFIG.name or base_dir.name
+    print("=" * 60)
+    print(task_name)
+    print("=" * 60)
+    print("💡 交互提示:")
+    print("   - 执行过程中输入 'p' 或 'pause' 暂停并进入交互模式")
+    print("   - 执行过程中输入 'q' 或 'quit' 停止执行")
+    print("=" * 60)
+    print()
+
+    resume_trace_id = args.trace
+    if resume_trace_id:
+        existing_trace = await store.get_trace(resume_trace_id)
+        if not existing_trace:
+            print(f"\n错误: Trace 不存在: {resume_trace_id}")
+            sys.exit(1)
+
+    final_response = ""
+    current_trace_id = resume_trace_id
+    current_sequence = 0
+    should_exit = False
+
+    try:
+        run_config = copy.deepcopy(RUN_CONFIG)
+        run_config.model = model
+        run_config.temperature = float(prompt.config.get("temperature", run_config.temperature))
+        run_config.max_iterations = int(prompt.config.get("max_iterations", run_config.max_iterations))
+
+        if resume_trace_id:
+            initial_messages = None
+            run_config.trace_id = resume_trace_id
+        else:
+            initial_messages = messages
+            run_config.name = "热点内容搜索"
+
+        while not should_exit:
+            if current_trace_id:
+                run_config.trace_id = current_trace_id
+
+            final_response = ""
+
+            if current_trace_id and initial_messages is None:
+                check_trace = await store.get_trace(current_trace_id)
+                if check_trace and check_trace.status in ("completed", "failed"):
+                    if check_trace.status == "completed":
+                        print("\n[Trace] ✅ 已完成")
+                        print(f"  - Total cost: ${check_trace.total_cost:.4f}")
+                    else:
+                        print(f"\n[Trace] ❌ 已失败: {check_trace.error_message}")
+                    current_sequence = check_trace.head_sequence
+
+                    menu_result = await interactive.show_menu(current_trace_id, current_sequence)
+                    if menu_result["action"] == "stop":
+                        break
+                    if menu_result["action"] == "continue":
+                        new_messages = menu_result.get("messages", [])
+                        if new_messages:
+                            initial_messages = new_messages
+                            run_config.after_sequence = menu_result.get("after_sequence")
+                        else:
+                            initial_messages = []
+                            run_config.after_sequence = None
+                        continue
+                    break
+
+                initial_messages = []
+
+
+            paused = False
+            try:
+                async for item in runner.run(messages=initial_messages, config=run_config):
+                    cmd = interactive.check_stdin()
+                    if cmd == "pause":
+                        print("\n⏸️ 正在暂停执行...")
+                        if current_trace_id:
+                            await runner.stop(current_trace_id)
+                        await asyncio.sleep(0.5)
+
+                        menu_result = await interactive.show_menu(current_trace_id, current_sequence)
+                        if menu_result["action"] == "stop":
+                            should_exit = True
+                            paused = True
+                            break
+                        if menu_result["action"] == "continue":
+                            new_messages = menu_result.get("messages", [])
+                            if new_messages:
+                                initial_messages = new_messages
+                                after_seq = menu_result.get("after_sequence")
+                                if after_seq is not None:
+                                    run_config.after_sequence = after_seq
+                            else:
+                                initial_messages = []
+                                run_config.after_sequence = None
+                            paused = True
+                            break
+
+                    elif cmd == "quit":
+                        print("\n🛑 用户请求停止...")
+                        if current_trace_id:
+                            await runner.stop(current_trace_id)
+                        should_exit = True
+                        break
+
+                    if isinstance(item, Trace):
+                        current_trace_id = item.trace_id
+                        if item.status == "completed":
+                            print("\n[Trace] ✅ 完成")
+                            print(f"  - Total messages: {item.total_messages}")
+                            print(f"  - Total cost: ${item.total_cost:.4f}")
+                        elif item.status == "failed":
+                            print(f"\n[Trace] ❌ 失败: {item.error_message}")
+                        elif item.status == "stopped":
+                            print("\n[Trace] ⏸️ 已停止")
+                    elif isinstance(item, Message):
+                        current_sequence = item.sequence
+                        _print_message_details(item)
+
+                        if item.role == "assistant":
+                            content = item.content
+                            if isinstance(content, dict):
+                                text = content.get("text", "")
+                                tool_calls = content.get("tool_calls")
+                                if text and not tool_calls:
+                                    final_response = text
+            except Exception as e:
+                print(f"\n执行出错: {e}")
+                import traceback
+
+                traceback.print_exc()
+
+            if paused:
+                if should_exit:
+                    break
+                continue
+
+            if should_exit:
+                break
+
+            if current_trace_id:
+                menu_result = await interactive.show_menu(current_trace_id, current_sequence)
+                if menu_result["action"] == "stop":
+                    break
+                if menu_result["action"] == "continue":
+                    new_messages = menu_result.get("messages", [])
+                    if new_messages:
+                        initial_messages = new_messages
+                        run_config.after_sequence = menu_result.get("after_sequence")
+                    else:
+                        initial_messages = []
+                        run_config.after_sequence = None
+                    continue
+            break
+
+    except KeyboardInterrupt:
+        print("\n\n用户中断 (Ctrl+C)")
+        if current_trace_id:
+            await runner.stop(current_trace_id)
+    finally:
+        if current_trace_id:
+            try:
+                html_path = store.base_path / current_trace_id / "messages.html"
+                await trace_to_html(current_trace_id, html_path, base_path=str(store.base_path))
+                print(f"\n✓ Messages 可视化已保存: {html_path}")
+            except Exception as e:
+                print(f"\n⚠ 生成 HTML 失败: {e}")
+
+    if final_response:
+        print()
+        print("=" * 60)
+        print("Agent 响应:")
+        print("=" * 60)
+        print(final_response)
+        print("=" * 60)
+        print()
+
+        output_file = output_dir / "result.txt"
+        with open(output_file, "w", encoding="utf-8") as f:
+            f.write(final_response)
+
+        print(f"✓ 结果已保存到: {output_file}")
+        print()
+
+    if current_trace_id:
+        html_path = store.base_path / current_trace_id / "messages.html"
+        print("=" * 60)
+        print("可视化:")
+        print("=" * 60)
+        print(f"1. 本地 HTML: {html_path}")
+        print()
+        print("2. API Server:")
+        print("   python3 api_server.py")
+        print("   http://localhost:8000/api/traces")
+        print()
+        print(f"3. Trace ID: {current_trace_id}")
+        print("=" * 60)
+
+
+if __name__ == "__main__":
+    asyncio.run(main())

+ 8 - 0
examples/content_needs_generation/skills/hot_rank_search.md

@@ -0,0 +1,8 @@
+---
+name: hot_rank_search
+description: 获取今日热榜内容
+---
+
+## 获取今日热榜内容
+
+你可以通过热门选题检索工具 `hot_rank_search` 获取今日热榜内容

+ 7 - 0
examples/content_needs_generation/tool/__init__.py

@@ -0,0 +1,7 @@
+"""
+content_needs_generation 示例的自定义工具
+"""
+
+from examples.content_needs_generation.tool.hot_rank_search import hot_rank_search
+
+__all__ = ["hot_rank_search"]

+ 151 - 0
examples/content_needs_generation/tool/hot_rank_search.py

@@ -0,0 +1,151 @@
+"""
+热榜搜索工具 - 根据排序类型获取今日热榜内容(news分类)
+
+用于 Agent 执行时自主调取热榜数据。
+"""
+import asyncio
+import json
+import os
+from typing import Any, Dict, List, Optional
+
+import httpx
+
+from agent.tools import tool, ToolResult
+
+# 热榜搜索 API 配置
+HOT_RANK_BASE_URL = os.getenv("HOT_RANK_BASE_URL", "http://crawapi.piaoquantv.com")
+DEFAULT_TIMEOUT = 30.0
+
+
+async def _call_hot_rank_api(
+    sort_type: str = "最热",
+    cursor: int = 0,
+) -> Optional[List[Dict[str, Any]]]:
+    """调用热榜搜索 API,返回结果列表。"""
+    url = f"{HOT_RANK_BASE_URL.rstrip('/')}/crawler/jin_ri_re_bang/content_rank"
+    payload = {
+        "sort_type": sort_type,
+        "category": "news",  # 固定分类
+        "cursor": cursor,
+    }
+
+    try:
+        async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as client:
+            resp = await client.post(url, json=payload)
+            resp.raise_for_status()
+            data = resp.json()
+    except httpx.HTTPStatusError as e:
+        raise RuntimeError(f"API 请求失败: {e.response.status_code} - {e.response.text[:200]}")
+    except Exception as e:
+        raise RuntimeError(f"请求异常: {str(e)}")
+
+    # 解析响应格式: {'code': 0, 'data': {'data': [{'jump_url': ..., 'rankList': [...]}]}}
+    if isinstance(data, dict):
+        # 检查 code 字段
+        code = data.get("code", 0)
+        if code != 0:
+            raise RuntimeError(f"API 返回错误码: {code}")
+        
+        # 获取 data.data 数组
+        outer_data = data.get("data", {})
+        if isinstance(outer_data, dict):
+            inner_data = outer_data.get("data", [])
+        else:
+            inner_data = outer_data if isinstance(outer_data, list) else []
+        
+        # 提取所有 rankList 中的条目
+        all_items = []
+        if isinstance(inner_data, list):
+            for item in inner_data:
+                if isinstance(item, dict):
+                    rank_list = item.get("rankList", [])
+                    if isinstance(rank_list, list):
+                        all_items.extend(rank_list)
+        
+        return all_items if all_items else []
+    
+    # 兼容直接返回列表的情况
+    if isinstance(data, list):
+        return data
+    
+    return []
+
+if __name__ == '__main__':
+    try:
+        res = asyncio.run(_call_hot_rank_api())
+        print(res)
+    except Exception as e:
+        print(f"执行出错: {e}")
+
+@tool(
+    description="根据排序类型获取今日热榜内容(news分类),用于创作参考。支持按最热或最新排序,可指定分页游标。",
+    display={
+        "zh": {
+            "name": "热榜搜索",
+            "params": {
+                "sort_type": "排序类型:最热 或 最新",
+                "cursor": "分页游标,从0开始",
+            },
+        },
+    },
+)
+async def hot_rank_search(
+    sort_type: str = "最热",
+    cursor: int = 0,
+) -> ToolResult:
+    """
+    根据排序类型获取今日热榜内容(固定news分类)。
+
+    Args:
+        sort_type: 排序类型,"最热" 或 "最新",默认为 "最热"
+        cursor: 分页游标,从0开始,默认为 0
+
+    Returns:
+        ToolResult: 热榜内容列表
+    """
+    # 验证排序类型
+    if sort_type not in ["最热", "最新"]:
+        return ToolResult(
+            title="热榜搜索失败",
+            output="",
+            error=f"排序类型必须是 '最热' 或 '最新',当前为: {sort_type}",
+        )
+
+    # 验证游标
+    if cursor < 0:
+        return ToolResult(
+            title="热榜搜索失败",
+            output="",
+            error=f"分页游标必须大于等于0,当前为: {cursor}",
+        )
+
+    try:
+        results = await _call_hot_rank_api(sort_type=sort_type, cursor=cursor)
+    except RuntimeError as e:
+        return ToolResult(
+            title="热榜搜索失败",
+            output="",
+            error=str(e),
+        )
+
+    if not results:
+        return ToolResult(
+            title="热榜搜索",
+            output=json.dumps(
+                {
+                    "message": "未找到热榜内容",
+                    "sort_type": sort_type,
+                    "category": "news",
+                    "cursor": cursor,
+                },
+                ensure_ascii=False,
+                indent=2,
+            ),
+        )
+
+    output = json.dumps(results, ensure_ascii=False, indent=2)
+    return ToolResult(
+        title=f"热榜搜索 - {sort_type} (news)",
+        output=output,
+        long_term_memory=f"检索到热榜内容,排序: {sort_type},分类: news,共 {len(results)} 条",
+    )