supeng 10 godzin temu
rodzic
commit
177e5e0ea3

+ 5 - 1
.claude/settings.local.json

@@ -14,7 +14,11 @@
       "Bash(tree:*)",
       "Bash(xargs grep:*)",
       "Bash(npm run:*)",
-      "Bash(sed:*)"
+      "Bash(sed:*)",
+      "Bash(xargs ls:*)",
+      "Bash(.venv/bin/pip install:*)",
+      "Bash(cp:*)",
+      "Bash(OPENROUTER_API_KEY=test python:*)"
     ],
     "deny": [],
     "ask": []

+ 444 - 0
GOAL_EXPLAINED.md

@@ -0,0 +1,444 @@
+# Goal 系统详解
+
+## 什么是 Goal?
+
+**Goal(目标)** 是这个 Agent 框架中的**计划管理系统**,用于帮助 Agent 组织和追踪复杂任务的执行过程。
+
+### 核心概念
+
+```
+Goal = 执行计划中的一个目标节点
+GoalTree = 目标树,管理整个执行计划的层级结构
+```
+
+可以把 Goal 理解为:
+- **任务分解**:将大任务拆分成小目标
+- **工作记忆**:记录当前在做什么、做完了什么
+- **进度追踪**:跟踪每个目标的状态和结果
+
+## 为什么需要 Goal?
+
+### 问题场景
+
+当 Agent 执行复杂任务时,会遇到这些问题:
+
+1. **任务复杂**:一个大任务需要多个步骤
+2. **容易迷失**:执行过程中忘记原本要做什么
+3. **难以追踪**:不知道哪些完成了,哪些还没做
+4. **缺乏规划**:没有清晰的执行路径
+
+### Goal 的解决方案
+
+```
+用户任务: "帮我实现一个用户登录功能"
+
+没有 Goal:
+Agent → 直接开始写代码 → 可能遗漏测试、文档等
+
+有 Goal:
+Agent → 创建计划:
+  1. 设计数据库表结构
+  2. 实现登录接口
+    2.1 实现密码加密
+    2.2 实现 JWT 生成
+    2.3 实现登录验证
+  3. 编写单元测试
+  4. 编写 API 文档
+
+→ 按计划逐步执行 → 完整、有序、可追踪
+```
+
+## Goal 的结构
+
+### Goal 对象
+
+```python
+@dataclass
+class Goal:
+    id: str                    # 唯一 ID(如 "1", "2", "3")
+    description: str           # 目标描述(如 "实现登录接口")
+    reason: str               # 创建理由(为什么要做这个)
+    parent_id: Optional[str]  # 父目标 ID(构建层级关系)
+    status: GoalStatus        # 状态:pending/in_progress/completed/abandoned
+    summary: Optional[str]    # 完成时的总结(记录关键结论)
+
+    # 统计信息
+    self_stats: GoalStats     # 自身统计(消息数、token、成本)
+    cumulative_stats: GoalStats  # 累计统计(包含所有子目标)
+```
+
+### GoalTree 目标树
+
+```python
+@dataclass
+class GoalTree:
+    mission: str              # 总任务描述
+    goals: List[Goal]         # 所有目标的扁平列表
+    current_id: Optional[str] # 当前焦点目标 ID
+```
+
+### 层级结构示例
+
+```
+GoalTree
+├── mission: "实现用户登录功能"
+├── current_id: "2.1"
+└── goals:
+    ├── Goal(id="1", description="设计数据库表结构", status="completed")
+    ├── Goal(id="2", description="实现登录接口", status="in_progress")
+    │   ├── Goal(id="3", parent_id="2", description="实现密码加密", status="completed")
+    │   ├── Goal(id="4", parent_id="2", description="实现 JWT 生成", status="in_progress") ← 当前焦点
+    │   └── Goal(id="5", parent_id="2", description="实现登录验证", status="pending")
+    ├── Goal(id="6", description="编写单元测试", status="pending")
+    └── Goal(id="7", description="编写 API 文档", status="pending")
+```
+
+显示为:
+
+```
+[Mission] 实现用户登录功能
+
+1. [✓] 设计数据库表结构
+2. [→] 实现登录接口
+   2.1 [✓] 实现密码加密
+   2.2 [→] 实现 JWT 生成  ← 当前焦点
+   2.3 [ ] 实现登录验证
+3. [ ] 编写单元测试
+4. [ ] 编写 API 文档
+```
+
+## Goal 工具的使用
+
+Agent 通过 `goal` 工具来管理计划:
+
+### 1. 创建目标
+
+```python
+# 创建顶层目标
+goal(add="设计方案, 实现代码, 编写测试")
+
+# 创建子目标
+goal(add="实现接口, 实现逻辑", under="2")
+
+# 在某个目标后添加同级目标
+goal(add="编写文档", after="3")
+```
+
+### 2. 切换焦点
+
+```python
+# 聚焦到目标 1,开始执行
+goal(focus="1")
+
+# 聚焦到子目标
+goal(focus="2.1")
+```
+
+### 3. 完成目标
+
+```python
+# 完成当前目标,记录关键结论
+goal(done="方案确定使用 JWT 认证,安全性高")
+
+# 完成并切换到下一个
+goal(done="接口实现完成", focus="2")
+```
+
+### 4. 放弃目标
+
+```python
+# 放弃不可行的目标
+goal(abandon="方案 A 需要 Redis,但环境不支持")
+```
+
+## Goal 的工作流程
+
+### 典型执行流程
+
+```
+1. 用户提交任务
+   ↓
+2. Agent 分析任务,创建初步计划
+   goal(add="调研方案, 实现功能, 测试验证")
+   ↓
+3. 聚焦到第一个目标
+   goal(focus="1")
+   ↓
+4. 执行目标(调用工具、分析结果)
+   ↓
+5. 完成目标,记录结论
+   goal(done="确定使用 OpenPose 方案")
+   ↓
+6. 切换到下一个目标
+   goal(focus="2")
+   ↓
+7. 如果需要,创建子目标
+   goal(add="设计接口, 实现代码", under="2")
+   ↓
+8. 重复 3-7,直到所有目标完成
+```
+
+### 动态调整
+
+```
+执行过程中发现新问题:
+goal(add="修复 Bug", after="2")
+
+发现某个方案不可行:
+goal(abandon="方案 A 性能不达标")
+goal(add="尝试方案 B", after="1")
+```
+
+## Goal 的作用
+
+### 1. 任务分解
+
+将复杂任务拆分成可管理的小目标:
+
+```
+大任务: "开发一个博客系统"
+↓
+Goal Tree:
+1. 设计数据库
+   1.1 设计用户表
+   1.2 设计文章表
+   1.3 设计评论表
+2. 实现后端 API
+   2.1 用户管理
+   2.2 文章管理
+   2.3 评论管理
+3. 实现前端页面
+4. 部署上线
+```
+
+### 2. 进度追踪
+
+清楚知道当前进度:
+
+```
+[Mission] 开发博客系统
+
+1. [✓] 设计数据库
+   1.1 [✓] 设计用户表
+   1.2 [✓] 设计文章表
+   1.3 [✓] 设计评论表
+2. [→] 实现后端 API  ← 当前在这里
+   2.1 [✓] 用户管理
+   2.2 [→] 文章管理  ← 具体在这个子任务
+   2.3 [ ] 评论管理
+3. [ ] 实现前端页面
+4. [ ] 部署上线
+
+进度: 5/10 完成 (50%)
+```
+
+### 3. 工作记忆
+
+记录关键结论,避免重复工作:
+
+```
+Goal 1: 调研数据库方案
+Summary: "选择 PostgreSQL,支持 JSON 字段,性能好"
+
+Goal 2: 实现用户认证
+Summary: "使用 JWT + Redis 存储 token,过期时间 7 天"
+
+→ 后续目标可以参考这些结论
+```
+
+### 4. 上下文管理
+
+系统会定期将当前计划注入到 Agent 的上下文中:
+
+```
+System: 当前计划状态:
+[Mission] 开发博客系统
+1. [✓] 设计数据库 (已完成: 选择 PostgreSQL)
+2. [→] 实现后端 API (进行中)
+   2.1 [✓] 用户管理 (已完成: JWT 认证)
+   2.2 [→] 文章管理 (当前焦点)
+   2.3 [ ] 评论管理
+...
+
+Agent: 好的,我现在在做文章管理,已经完成了用户管理...
+```
+
+### 5. 可视化和调试
+
+通过 API Server 可以可视化查看 Goal Tree:
+
+```bash
+python api_server.py
+# 访问 http://localhost:8000/api/traces/{trace_id}
+```
+
+可以看到:
+- 目标的层级结构
+- 每个目标的状态
+- 执行时间和成本
+- 工具调用序列
+
+## 实际案例
+
+### 案例 1:简单任务(单个 Goal)
+
+```python
+任务: "将 CSV 文件转换为 JSON"
+
+Goal Tree:
+1. [→] 将 CSV 转换为 JSON
+
+执行:
+- 读取 CSV 文件
+- 解析数据
+- 转换为 JSON
+- 保存文件
+- goal(done="转换完成,输出到 output.json")
+```
+
+### 案例 2:中等任务(多个 Goal)
+
+```python
+任务: "分析销售数据并生成报告"
+
+Goal Tree:
+1. [✓] 读取销售数据
+2. [✓] 数据清洗和预处理
+3. [→] 数据分析
+   3.1 [✓] 计算总销售额
+   3.2 [→] 分析销售趋势
+   3.3 [ ] 识别热销产品
+4. [ ] 生成可视化图表
+5. [ ] 编写分析报告
+```
+
+### 案例 3:复杂任务(深层嵌套)
+
+```python
+任务: "实现一个完整的用户认证系统"
+
+Goal Tree:
+1. [✓] 需求分析和方案设计
+2. [→] 后端实现
+   2.1 [✓] 数据库设计
+       2.1.1 [✓] 设计用户表
+       2.1.2 [✓] 设计权限表
+   2.2 [→] API 实现
+       2.2.1 [✓] 注册接口
+       2.2.2 [→] 登录接口
+       2.2.3 [ ] 登出接口
+       2.2.4 [ ] 密码重置接口
+   2.3 [ ] 安全加固
+3. [ ] 前端实现
+4. [ ] 测试
+5. [ ] 部署
+```
+
+## 最佳实践
+
+### 1. 先规划再执行
+
+```python
+# ✅ 好的做法
+goal(add="调研方案, 实现功能, 测试验证")
+goal(focus="1")
+# 开始执行...
+
+# ❌ 不好的做法
+# 直接开始执行,没有计划
+```
+
+### 2. 记录关键结论
+
+```python
+# ✅ 好的做法
+goal(done="选择 PostgreSQL,支持 JSON 字段,性能优于 MySQL")
+
+# ❌ 不好的做法
+goal(done="调研完成")  # 没有信息量
+```
+
+### 3. 保持焦点在具体目标
+
+```python
+# ✅ 好的做法
+goal(add="设计接口, 实现代码", under="2")
+goal(focus="2.1")  # 聚焦到具体的子目标
+
+# ❌ 不好的做法
+goal(focus="2")  # 聚焦在父目标,太宽泛
+```
+
+### 4. 灵活调整计划
+
+```python
+# 执行中发现新问题
+goal(add="修复性能问题", after="3")
+
+# 发现方案不可行
+goal(abandon="方案 A 不支持并发")
+goal(add="尝试方案 B", after="1")
+```
+
+### 5. 简单任务不过度拆分
+
+```python
+# ✅ 简单任务
+goal(add="读取文件并统计行数")
+# 一个目标就够了
+
+# ❌ 过度拆分
+goal(add="打开文件, 读取内容, 统计行数, 关闭文件")
+# 太细碎,没必要
+```
+
+## 总结
+
+### Goal 是什么?
+
+- **计划管理系统**:帮助 Agent 组织和追踪任务执行
+- **工作记忆**:记录当前在做什么、做完了什么
+- **层级结构**:支持任务分解和嵌套
+
+### Goal 的核心价值
+
+1. **任务分解**:将复杂任务拆分成可管理的小目标
+2. **进度追踪**:清楚知道完成了什么、还剩什么
+3. **工作记忆**:记录关键结论,避免重复工作
+4. **上下文管理**:系统自动注入计划状态到 Agent
+5. **可视化调试**:通过 API Server 查看执行过程
+
+### 何时使用 Goal?
+
+- ✅ 复杂任务需要多个步骤
+- ✅ 需要清晰的执行计划
+- ✅ 需要追踪进度和结果
+- ✅ 任务可能需要动态调整
+
+- ❌ 非常简单的单步任务
+- ❌ 不需要计划的即时响应
+
+### 类比理解
+
+```
+Goal 就像是:
+
+📋 待办清单(Todo List)
+- 记录要做的事情
+- 勾选完成的项目
+- 添加新的任务
+
+🗺️ 导航地图
+- 规划路线
+- 追踪当前位置
+- 调整路径
+
+🧠 工作记忆
+- 记住当前在做什么
+- 记录重要结论
+- 避免迷失方向
+```
+
+---
+
+**Goal 让 Agent 像人类一样有计划地工作,而不是盲目执行!** 🎯

+ 370 - 0
HOW_TO_BUILD_AGENT.md

@@ -0,0 +1,370 @@
+# 如何快速实现自己的 Agent - 完整指南
+
+## 📚 文档索引
+
+1. **[QUICKSTART.md](./QUICKSTART.md)** - 快速上手指南
+   - 三步快速开始
+   - 核心组件详解
+   - 实战案例
+   - 高级功能
+   - 最佳实践
+
+2. **[examples/production_template/](./examples/production_template/)** - 生产级模板
+   - 完整的生产级代码
+   - 配置管理
+   - 错误处理
+   - 日志记录
+   - 部署指南
+
+3. **[examples/content_finder/](./examples/content_finder/)** - 实战案例
+   - 内容寻找 Agent 完整实现
+   - 工具、记忆、技能完整示例
+   - 可运行的演示代码
+
+## 🚀 快速开始(5分钟)
+
+### 1. 安装依赖
+
+```bash
+pip install -r requirements.txt
+pip install dbutils pymysql  # browser 模块依赖
+```
+
+### 2. 配置环境
+
+```bash
+cp .env.example .env
+# 编辑 .env 填入 OPENROUTER_API_KEY
+```
+
+### 3. 运行模板
+
+```bash
+python examples/production_template/run.py
+```
+
+## 📖 学习路径
+
+### 初学者
+
+1. **阅读** [QUICKSTART.md](./QUICKSTART.md) 的"三步快速开始"部分
+2. **运行** `examples/content_finder/demo.py` 查看效果
+3. **修改** demo.py 中的任务描述,尝试不同功能
+4. **创建** 第一个自定义工具
+
+### 进阶开发者
+
+1. **学习** [QUICKSTART.md](./QUICKSTART.md) 的"核心组件"和"高级功能"
+2. **研究** `examples/production_template/` 的代码结构
+3. **实现** 自己的业务工具和 Skills
+4. **部署** 到生产环境
+
+### 生产使用
+
+1. **复制** `examples/production_template/` 作为起点
+2. **替换** 示例工具为实际业务工具
+3. **编写** 业务相关的 Skills
+4. **配置** 监控、日志、告警
+5. **部署** 到生产环境
+
+## 🎯 核心概念
+
+### Agent = Runner + Tools + Skills
+
+```
+┌─────────────────────────────────────┐
+│         AgentRunner                 │
+│  ┌──────────┐  ┌──────────┐        │
+│  │   LLM    │  │  Trace   │        │
+│  │  Call    │  │  Store   │        │
+│  └──────────┘  └──────────┘        │
+└─────────────────────────────────────┘
+           │
+    ┌──────┴──────┐
+    │             │
+┌───▼───┐    ┌───▼────┐
+│ Tools │    │ Skills │
+│       │    │        │
+│ 工具   │    │ 技能   │
+│ 能力   │    │ 知识   │
+└───────┘    └────────┘
+```
+
+### 工具 (Tools)
+
+- **定义**:Agent 可以调用的函数
+- **作用**:扩展 Agent 的能力(读文件、调 API、查数据库等)
+- **实现**:使用 `@tool` 装饰器
+
+### 技能 (Skills)
+
+- **定义**:Markdown 格式的知识文档
+- **作用**:指导 Agent 如何工作(何时用什么工具、如何处理任务)
+- **实现**:创建 `.md` 文件,使用 YAML frontmatter
+
+### 执行追踪 (Trace)
+
+- **定义**:一次完整的 Agent 执行过程
+- **作用**:记录所有消息、工具调用、结果
+- **用途**:调试、回溯、续跑
+
+## 💡 最佳实践
+
+### 1. 工具设计
+
+```python
+@tool(description="简洁清晰的描述")
+async def my_tool(
+    param: str,              # 明确的参数类型
+    optional: str = "default",  # 可选参数有默认值
+    ctx: ToolContext = None,    # 上下文参数
+) -> ToolResult:
+    """
+    详细的文档字符串
+
+    Args:
+        param: 参数说明
+        optional: 可选参数说明
+    """
+    try:
+        # 业务逻辑
+        result = do_something(param)
+
+        return ToolResult(
+            title="成功",
+            output="结果描述",
+            data={"key": "value"},  # 结构化数据
+        )
+    except Exception as e:
+        logger.error(f"错误: {e}", exc_info=True)
+        return ToolResult(
+            title="失败",
+            output=str(e),
+            error=True,
+        )
+```
+
+### 2. Skill 编写
+
+```markdown
+---
+name: skill-name
+description: 简短描述
+category: 分类
+---
+
+# 技能名称
+
+## 何时使用
+
+- 明确的使用场景
+- 具体的触发条件
+
+## 工作流程
+
+1. 第一步做什么
+2. 第二步做什么
+3. 如何验证结果
+
+## 最佳实践
+
+- 实用的建议
+- 常见陷阱
+```
+
+### 3. 配置管理
+
+```python
+# 使用环境变量
+MODEL = os.getenv("MODEL", "default-model")
+
+# 验证配置
+if not API_KEY:
+    raise ValueError("API_KEY 未设置")
+
+# 创建必要的目录
+Path(output_dir).mkdir(parents=True, exist_ok=True)
+```
+
+### 4. 错误处理
+
+```python
+try:
+    result = await agent.run(task)
+except Exception as e:
+    logger.error(f"执行失败: {e}", exc_info=True)
+    # 发送告警
+    # 保存错误信息
+    # 返回友好的错误消息
+```
+
+## 🔧 常用模式
+
+### 模式1:简单任务执行
+
+```python
+runner = AgentRunner(llm_call=llm, trace_store=store)
+
+async for item in runner.run(
+    messages=[{"role": "user", "content": "任务"}],
+    config=RunConfig(model="claude-sonnet-4.5"),
+):
+    if isinstance(item, Message):
+        print(item.content)
+```
+
+### 模式2:带自定义工具
+
+```python
+# 1. 定义工具
+@tool(description="我的工具")
+async def my_tool(param: str, ctx: ToolContext = None) -> ToolResult:
+    return ToolResult(output="结果")
+
+# 2. 确保工具被导入
+import my_tools  # 触发 @tool 注册
+
+# 3. 运行
+runner = AgentRunner(llm_call=llm, trace_store=store)
+async for item in runner.run(...):
+    pass
+```
+
+### 模式3:带 Skills
+
+```python
+# 1. 创建 skills/my-skill.md
+
+# 2. 加载 skills
+runner = AgentRunner(
+    llm_call=llm,
+    trace_store=store,
+    skills_dir="./skills",
+)
+
+# 3. 指定使用的 skills
+config = RunConfig(skills=["my-skill"])
+```
+
+### 模式4:生产级封装
+
+```python
+class MyAgent:
+    def __init__(self, config):
+        self.runner = AgentRunner(...)
+
+    async def run(self, task: str) -> Dict:
+        try:
+            result = await self._execute(task)
+            self._save_result(result)
+            return result
+        except Exception as e:
+            self._handle_error(e)
+            raise
+```
+
+## 📊 性能优化
+
+### 1. 选择合适的模型
+
+```python
+# 简单任务:使用 Haiku(快+便宜)
+llm = create_openrouter_llm_call(model="anthropic/claude-haiku-4.5")
+
+# 复杂任务:使用 Sonnet(平衡)
+llm = create_openrouter_llm_call(model="anthropic/claude-sonnet-4.5")
+
+# 极难任务:使用 Opus(最强)
+llm = create_openrouter_llm_call(model="anthropic/claude-opus-4.6")
+```
+
+### 2. 限制迭代次数
+
+```python
+config = RunConfig(
+    max_iterations=20,  # 防止无限循环
+)
+```
+
+### 3. 启用 Prompt Caching
+
+```python
+config = RunConfig(
+    enable_prompt_caching=True,  # Claude 模型支持
+)
+```
+
+### 4. 限制工具权限
+
+```python
+config = RunConfig(
+    agent_type="explore",  # 只读权限,更快
+)
+```
+
+## 🐛 调试技巧
+
+### 1. 打印所有消息
+
+```python
+async for item in runner.run(...):
+    print(f"[{type(item).__name__}] {item}")
+```
+
+### 2. 查看 Trace
+
+```python
+trace = await trace_store.get_trace(trace_id)
+for msg in trace.messages:
+    print(f"{msg.role}: {msg.content}")
+```
+
+### 3. 启动可视化
+
+```bash
+python api_server.py
+# 访问 http://localhost:8000/api/traces
+```
+
+### 4. 查看日志
+
+```python
+logging.basicConfig(level=logging.DEBUG)
+```
+
+## 📦 项目模板
+
+```
+my_agent/
+├── run.py              # 主程序
+├── tools/              # 自定义工具
+│   ├── __init__.py
+│   ├── api.py
+│   └── database.py
+├── skills/             # Skills
+│   ├── main.md
+│   └── helper.md
+├── .env                # 环境变量
+├── .env.example        # 环境变量示例
+├── requirements.txt    # 依赖
+└── README.md           # 说明文档
+```
+
+## 🎓 下一步
+
+1. ✅ 阅读 [QUICKSTART.md](./QUICKSTART.md)
+2. ✅ 运行 `examples/content_finder/demo.py`
+3. ✅ 复制 `examples/production_template/`
+4. ✅ 实现第一个自定义工具
+5. ✅ 编写第一个 Skill
+6. ✅ 部署到生产环境
+
+## 💬 获取帮助
+
+- **文档**:查看 `agent/README.md` 和 `agent/docs/`
+- **示例**:参考 `examples/` 目录
+- **问题**:检查日志和 Trace
+
+---
+
+**开始构建你的 Agent 吧!** 🚀

+ 562 - 0
QUICKSTART.md

@@ -0,0 +1,562 @@
+# 快速实现自己的 Agent - 生产使用指南
+
+## 🚀 三步快速开始
+
+### 第一步:安装依赖
+
+```bash
+# 安装基础依赖
+pip install -r requirements.txt
+
+# 安装 browser 模块依赖(如果需要浏览器工具)
+pip install dbutils pymysql
+
+# 配置环境变量
+cp .env.example .env
+# 编辑 .env 填入你的 API Key
+```
+
+### 第二步:创建你的 Agent
+
+创建 `my_agent/run.py`:
+
+```python
+import asyncio
+import sys
+from pathlib import Path
+
+# 添加项目根目录到 Python 路径
+sys.path.insert(0, str(Path(__file__).parent.parent))
+
+from agent import AgentRunner, RunConfig, FileSystemTraceStore
+from agent.llm import create_openrouter_llm_call
+
+async def main():
+    # 1. 初始化存储
+    trace_store = FileSystemTraceStore(base_path=".cache/traces")
+
+    # 2. 初始化 LLM
+    llm_call = create_openrouter_llm_call(
+        model="anthropic/claude-sonnet-4.5"
+    )
+
+    # 3. 创建 Runner
+    runner = AgentRunner(
+        llm_call=llm_call,
+        trace_store=trace_store,
+    )
+
+    # 4. 运行 Agent
+    async for item in runner.run(
+        messages=[{"role": "user", "content": "你的任务描述"}],
+        config=RunConfig(
+            model="anthropic/claude-sonnet-4.5",
+            max_iterations=30,
+        ),
+    ):
+        print(item)
+
+if __name__ == "__main__":
+    asyncio.run(main())
+```
+
+### 第三步:运行
+
+```bash
+python my_agent/run.py
+```
+
+## 📦 核心组件
+
+### 1. AgentRunner - 执行引擎
+
+```python
+from agent import AgentRunner, RunConfig
+
+runner = AgentRunner(
+    llm_call=llm_call,              # LLM 调用函数
+    trace_store=trace_store,        # Trace 存储
+    memory_store=memory_store,      # 记忆存储(可选)
+    skills_dir="./skills",          # Skills 目录(可选)
+)
+```
+
+### 2. LLM Providers
+
+框架支持三种 LLM 提供商:
+
+```python
+# OpenRouter(推荐,支持多种模型)
+from agent.llm import create_openrouter_llm_call
+llm_call = create_openrouter_llm_call(
+    model="anthropic/claude-sonnet-4.5",
+    api_key="your-api-key",  # 或从环境变量读取
+)
+
+# Gemini
+from agent.llm import create_gemini_llm_call
+llm_call = create_gemini_llm_call(
+    model="gemini-2.0-flash-exp",
+    api_key="your-api-key",
+)
+
+# Yescode(内部)
+from agent.llm import create_yescode_llm_call
+llm_call = create_yescode_llm_call(
+    model="gpt-4o",
+    api_key="your-api-key",
+)
+```
+
+### 3. 自定义工具
+
+使用 `@tool` 装饰器注册工具:
+
+```python
+from agent import tool, ToolResult, ToolContext
+
+@tool(description="查询产品库存")
+async def check_inventory(
+    product_id: str,
+    warehouse: str = "default",
+    ctx: ToolContext = None,
+) -> ToolResult:
+    """
+    查询指定仓库的产品库存
+
+    Args:
+        product_id: 产品唯一标识符
+        warehouse: 仓库编码,默认为主仓库
+        ctx: 工具上下文(自动注入)
+    """
+    # 你的业务逻辑
+    stock = await query_database(product_id, warehouse)
+
+    return ToolResult(
+        title="库存查询成功",
+        output=f"产品 {product_id} 在 {warehouse} 仓库的库存: {stock}",
+        data={"product_id": product_id, "stock": stock},
+    )
+```
+
+**重要**:确保定义工具的模块在 `runner.run()` 之前被 import。
+
+### 4. Skills - Agent 能力定义
+
+创建 `skills/my-skill.md`:
+
+```markdown
+---
+name: my-skill
+description: 我的自定义技能
+category: custom
+---
+
+# 我的技能
+
+## 何时使用
+
+- 场景1:当需要...
+- 场景2:当遇到...
+
+## 使用指南
+
+1. 首先...
+2. 然后...
+3. 最后...
+
+## 最佳实践
+
+- 建议1
+- 建议2
+```
+
+加载 Skills:
+
+```python
+runner = AgentRunner(
+    llm_call=llm_call,
+    trace_store=trace_store,
+    skills_dir="./skills",  # 指定 skills 目录
+)
+
+# 或在 RunConfig 中指定
+config = RunConfig(
+    skills=["my-skill", "planning"],  # 只加载指定的 skills
+)
+```
+
+### 5. Agent Presets - 预设配置
+
+使用内置预设:
+
+```python
+from agent import RunConfig
+
+# 默认 Agent(全部工具权限)
+config = RunConfig(agent_type="default")
+
+# 探索型 Agent(只读权限)
+config = RunConfig(agent_type="explore")
+
+# 评估型 Agent(只读+评估)
+config = RunConfig(agent_type="evaluate")
+```
+
+自定义预设:
+
+```python
+from agent import AgentPreset
+from agent.core.presets import register_preset
+
+# 定义预设
+my_preset = AgentPreset(
+    allowed_tools=["read_file", "grep_content", "my_custom_tool"],
+    denied_tools=["bash_command"],
+    max_iterations=20,
+    skills=["my-skill"],
+    description="我的自定义 Agent",
+)
+
+# 注册预设
+register_preset("my-agent", my_preset)
+
+# 使用预设
+config = RunConfig(agent_type="my-agent")
+```
+
+## 🎯 实战案例
+
+### 案例1:内容寻找 Agent
+
+参考 `examples/content_finder/`:
+
+```python
+# 1. 定义工具
+@tool(description="从抖音搜索视频")
+async def douyin_search(keywords: str, ctx: ToolContext = None) -> ToolResult:
+    results = await call_douyin_api(keywords)
+    return ToolResult(output=f"找到 {len(results)} 条内容", data=results)
+
+# 2. 定义 Skill
+# skills/content-finder.md
+
+# 3. 运行 Agent
+runner = AgentRunner(
+    llm_call=llm_call,
+    trace_store=trace_store,
+    skills_dir="./skills",
+)
+
+async for item in runner.run(
+    messages=[{"role": "user", "content": "搜索美食类视频"}],
+    config=RunConfig(skills=["content-finder"]),
+):
+    if isinstance(item, Message):
+        print(item.content)
+```
+
+### 案例2:数据分析 Agent
+
+```python
+@tool(description="查询数据库")
+async def query_db(sql: str, ctx: ToolContext = None) -> ToolResult:
+    results = await execute_sql(sql)
+    return ToolResult(
+        title="查询成功",
+        output=f"返回 {len(results)} 条记录",
+        data=results,
+    )
+
+@tool(description="生成图表")
+async def create_chart(data: list, chart_type: str, ctx: ToolContext = None) -> ToolResult:
+    chart_url = await generate_chart(data, chart_type)
+    return ToolResult(
+        title="图表已生成",
+        output=f"图表类型: {chart_type}",
+        data={"url": chart_url},
+    )
+
+# 运行
+async for item in runner.run(
+    messages=[{"role": "user", "content": "分析最近一周的销售数据并生成图表"}],
+    config=RunConfig(
+        skills=["data-analysis"],
+        max_iterations=50,
+    ),
+):
+    pass
+```
+
+### 案例3:自动化测试 Agent
+
+```python
+@tool(description="运行测试用例")
+async def run_tests(test_path: str, ctx: ToolContext = None) -> ToolResult:
+    result = await execute_tests(test_path)
+    return ToolResult(
+        title="测试完成",
+        output=f"通过: {result.passed}, 失败: {result.failed}",
+        data=result.to_dict(),
+    )
+
+@tool(description="生成测试报告")
+async def generate_report(test_results: dict, ctx: ToolContext = None) -> ToolResult:
+    report_path = await create_html_report(test_results)
+    return ToolResult(
+        title="报告已生成",
+        output=f"报告路径: {report_path}",
+    )
+```
+
+## 🔧 高级功能
+
+### 1. 子 Agent
+
+创建子 Agent 处理子任务:
+
+```python
+from agent.tools.builtin.subagent import agent
+
+# Agent 会自动调用 agent 工具创建子 Agent
+# 在 system prompt 中说明何时使用子 Agent
+```
+
+### 2. 记忆系统
+
+使用记忆存储跨会话数据:
+
+```python
+from agent.memory.stores import MemoryMemoryStore
+
+memory_store = MemoryMemoryStore()
+
+runner = AgentRunner(
+    llm_call=llm_call,
+    trace_store=trace_store,
+    memory_store=memory_store,
+)
+```
+
+### 3. Goal Tree - 计划管理
+
+Agent 自动使用 `goal` 工具管理计划:
+
+```python
+# Agent 会自动创建和更新 Goal
+# 查看 Goal Tree
+from agent.trace import FileSystemTraceStore
+
+trace_store = FileSystemTraceStore(base_path=".cache/traces")
+trace = await trace_store.get_trace(trace_id)
+goal_tree = trace.goal_tree
+```
+
+### 4. 续跑和回溯
+
+从指定消息后继续执行:
+
+```python
+# 续跑
+async for item in runner.run(
+    messages=[],
+    config=RunConfig(
+        trace_id="existing-trace-id",
+        after_sequence=10,  # 从第10条消息后继续
+    ),
+):
+    pass
+
+# 回溯重跑
+async for item in runner.run(
+    messages=[],
+    config=RunConfig(
+        trace_id="existing-trace-id",
+        after_sequence=5,  # 回到第5条消息重新执行
+    ),
+):
+    pass
+```
+
+### 5. API Server - 可视化
+
+启动 API Server 查看执行过程:
+
+```bash
+python api_server.py
+```
+
+访问:
+- `http://localhost:8000/api/traces` - 查看所有 Traces
+- `http://localhost:8000/api/traces/{id}` - 查看 Trace 详情
+- WebSocket: `ws://localhost:8000/api/traces/{id}/watch` - 实时监控
+
+## 📝 最佳实践
+
+### 1. 工具设计原则
+
+- **单一职责**:每个工具只做一件事
+- **清晰描述**:description 要准确描述工具功能
+- **详细文档**:docstring 要包含参数说明
+- **错误处理**:捕获异常并返回友好的错误信息
+
+```python
+@tool(description="发送邮件")
+async def send_email(
+    to: str,
+    subject: str,
+    body: str,
+    ctx: ToolContext = None,
+) -> ToolResult:
+    """
+    发送邮件
+
+    Args:
+        to: 收件人邮箱地址
+        subject: 邮件主题
+        body: 邮件正文
+    """
+    try:
+        await email_service.send(to, subject, body)
+        return ToolResult(
+            title="邮件发送成功",
+            output=f"已发送到 {to}",
+        )
+    except Exception as e:
+        return ToolResult(
+            title="邮件发送失败",
+            output=f"错误: {str(e)}",
+            error=str(e),
+        )
+```
+
+### 2. Skill 编写原则
+
+- **明确场景**:清楚说明何时使用这个 Skill
+- **提供指南**:给出具体的操作步骤
+- **包含示例**:展示最佳实践
+- **避免冗余**:不要重复框架已有的功能
+
+### 3. 性能优化
+
+```python
+# 1. 限制迭代次数
+config = RunConfig(max_iterations=30)
+
+# 2. 使用合适的模型
+llm_call = create_openrouter_llm_call(
+    model="anthropic/claude-haiku-4.5",  # 更快更便宜
+)
+
+# 3. 启用 Prompt Caching(Claude 模型)
+config = RunConfig(enable_prompt_caching=True)
+
+# 4. 限制工具权限
+config = RunConfig(
+    agent_type="explore",  # 只读权限,更快
+)
+```
+
+### 4. 错误处理
+
+```python
+try:
+    async for item in runner.run(messages=messages, config=config):
+        if isinstance(item, Message):
+            # 处理消息
+            pass
+except Exception as e:
+    logger.error(f"Agent 执行失败: {e}")
+    # 错误处理逻辑
+```
+
+### 5. 日志和监控
+
+```python
+import logging
+
+logging.basicConfig(
+    level=logging.INFO,
+    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
+)
+
+logger = logging.getLogger(__name__)
+
+# 在关键位置添加日志
+logger.info("开始执行 Agent")
+logger.debug(f"配置: {config}")
+```
+
+## 🚨 常见问题
+
+### Q1: 如何调试 Agent?
+
+```python
+# 1. 打印所有消息
+async for item in runner.run(messages=messages, config=config):
+    print(f"[{type(item).__name__}] {item}")
+
+# 2. 查看 Trace
+trace = await trace_store.get_trace(trace_id)
+for msg in trace.messages:
+    print(f"{msg.role}: {msg.content}")
+
+# 3. 启动 API Server 可视化
+python api_server.py
+```
+
+### Q2: 工具没有被调用?
+
+检查:
+1. 工具是否正确注册(使用 `@tool` 装饰器)
+2. 工具模块是否被 import
+3. 工具描述是否清晰
+4. Agent 是否有权限使用该工具(检查 `allowed_tools`)
+
+### Q3: Agent 陷入循环?
+
+```python
+# 1. 限制迭代次数
+config = RunConfig(max_iterations=20)
+
+# 2. 在 Skill 中明确终止条件
+# 3. 检查工具返回是否有明确的成功/失败标识
+```
+
+### Q4: 如何处理长时间运行的任务?
+
+```python
+# 使用异步处理
+import asyncio
+
+async def long_running_task():
+    async for item in runner.run(messages=messages, config=config):
+        # 定期保存状态
+        if isinstance(item, Trace):
+            await save_checkpoint(item)
+
+# 支持中断和恢复
+config = RunConfig(
+    trace_id=last_trace_id,
+    after_sequence=last_sequence,
+)
+```
+
+## 📚 参考资源
+
+- **框架文档**: `agent/README.md`
+- **架构设计**: `agent/docs/architecture.md`
+- **工具系统**: `agent/docs/tools.md`
+- **Skills 指南**: `agent/docs/skills.md`
+- **示例项目**: `examples/`
+  - `examples/content_finder/` - 内容寻找 Agent
+  - `examples/how/` - 完整示例
+  - `examples/research/` - 研究型 Agent
+
+## 🎓 下一步
+
+1. **阅读示例代码**: 从 `examples/content_finder/demo.py` 开始
+2. **创建第一个工具**: 实现一个简单的业务工具
+3. **编写 Skill**: 定义 Agent 的工作方式
+4. **测试运行**: 小规模测试验证功能
+5. **生产部署**: 添加监控、日志、错误处理

+ 282 - 0
examples/content_finder/ARCHITECTURE.md

@@ -0,0 +1,282 @@
+# 内容寻找 AI Agent 架构设计
+
+## 系统架构
+
+```
+┌─────────────────────────────────────────────────────────────┐
+│                     ContentFinderAgent                       │
+│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐      │
+│  │ AgentRunner  │  │ TraceStore   │  │ MemoryStore  │      │
+│  └──────────────┘  └──────────────┘  └──────────────┘      │
+└─────────────────────────────────────────────────────────────┘
+                              │
+        ┌─────────────────────┼─────────────────────┐
+        │                     │                     │
+        ▼                     ▼                     ▼
+┌──────────────┐      ┌──────────────┐      ┌──────────────┐
+│   工具层      │      │   记忆层      │      │   技能层      │
+├──────────────┤      ├──────────────┤      ├──────────────┤
+│ 爬虫工具      │      │ 搜索历史      │      │ content-finder│
+│ - douyin     │      │ 内容表现      │      │   skill       │
+│ - kuaishou   │      │ 运营偏好      │      └──────────────┘
+│              │      └──────────────┘
+│ 评估工具      │
+│ - evaluate   │
+│              │
+│ 反馈工具      │
+│ - feedback   │
+│ - performance│
+│ - query      │
+└──────────────┘
+```
+
+## 核心组件
+
+### 1. ContentFinderAgent(主Agent)
+
+**职责**:
+- 协调整个内容搜索流程
+- 管理搜索请求和结果
+- 收集和处理反馈
+- 优化搜索策略
+
+**关键方法**:
+- `search_content()`: 执行内容搜索
+- `collect_feedback()`: 收集运营反馈
+- `update_performance()`: 更新内容表现
+- `optimize_strategy()`: 优化搜索策略
+
+### 2. 工具层(Tools)
+
+#### 2.1 爬虫工具(crawler.py)
+- `douyin_search`: 抖音内容搜索
+- `kuaishou_search`: 快手内容搜索
+
+**特点**:
+- 支持关键词、筛选条件
+- 返回标准化的内容数据
+- 可扩展到其他平台
+
+#### 2.2 评估工具(content_eval.py)
+- `evaluate_content`: 内容质量和相关性评估
+
+**评估维度**:
+- 质量分数:播放量、互动率、新鲜度
+- 相关性分数:关键词匹配度
+
+#### 2.3 反馈工具(feedback.py)
+- `record_operator_feedback`: 记录运营反馈
+- `update_content_performance`: 更新表现数据
+- `query_historical_data`: 查询历史数据
+
+### 3. 记忆层(Memory)
+
+#### 3.1 搜索历史(search_history.py)
+**功能**:
+- 记录每次搜索的参数和结果
+- 查询相似搜索记录
+- 分析搜索模式
+
+**数据结构**:
+```json
+{
+  "timestamp": "2026-03-06T10:00:00",
+  "keywords": ["美食", "探店"],
+  "platforms": ["douyin", "kuaishou"],
+  "filters": {"min_views": 10000},
+  "results_count": 20,
+  "trace_id": "trace_123"
+}
+```
+
+#### 3.2 内容表现(content_perf.py)
+**功能**:
+- 记录内容在平台的表现数据
+- 分析表现趋势
+- 识别高表现内容
+
+**数据结构**:
+```json
+{
+  "content_id": "dy_123456",
+  "timestamp": "2026-03-06T10:00:00",
+  "platform_views": 150000,
+  "platform_likes": 8000,
+  "internal_views": 5000,
+  "internal_engagement": 0.15,
+  "conversion_rate": 0.08
+}
+```
+
+#### 3.3 运营偏好(operator_pref.py)
+**功能**:
+- 记录运营人员反馈
+- 分析运营偏好
+- 提取学习洞察
+
+**数据结构**:
+```json
+{
+  "content_id": "dy_123456",
+  "timestamp": "2026-03-06T10:00:00",
+  "rating": "excellent",
+  "notes": "内容质量很高",
+  "operator_id": "operator_001",
+  "content_features": {
+    "keywords": ["美食"],
+    "tags": ["探店"],
+    "platform": "douyin"
+  }
+}
+```
+
+### 4. 技能层(Skills)
+
+**content-finder skill**:
+- 定义Agent的工作流程
+- 提供最佳实践指南
+- 规范工具使用方式
+
+## 数据流程
+
+### 搜索流程
+
+```
+用户需求
+   ↓
+1. 构建搜索请求(SearchRequest)
+   ↓
+2. 查询相似历史搜索
+   ↓
+3. 基于历史数据优化参数
+   ↓
+4. 并行调用爬虫工具
+   │
+   ├─→ douyin_search
+   └─→ kuaishou_search
+   ↓
+5. 合并搜索结果
+   ↓
+6. 评估内容质量(evaluate_content)
+   ↓
+7. 排序并返回结果
+   ↓
+8. 保存搜索历史
+```
+
+### 反馈流程
+
+```
+运营人员反馈
+   ↓
+1. 记录反馈(record_operator_feedback)
+   ↓
+2. 保存到运营偏好数据
+   ↓
+3. 提取内容特征
+   ↓
+4. 更新偏好模型
+```
+
+### 优化流程
+
+```
+触发优化
+   ↓
+1. 获取历史数据
+   │
+   ├─→ 搜索历史
+   ├─→ 运营反馈
+   └─→ 内容表现
+   ↓
+2. 分析高表现内容特征
+   │
+   ├─→ 关键词分析
+   ├─→ 标签分析
+   ├─→ 平台权重分析
+   └─→ 筛选条件分析
+   ↓
+3. 生成优化策略
+   ↓
+4. 应用到下次搜索
+```
+
+## 学习机制
+
+### 1. 关键词优化
+- 统计高表现内容的关键词
+- 提取运营偏好的关键词
+- 自动扩展相关关键词
+
+### 2. 筛选条件优化
+- 分析高表现内容的数值特征
+- 动态调整播放量、点赞数阈值
+- 优化时间范围设置
+
+### 3. 平台权重优化
+- 统计各平台的内容表现
+- 计算平台权重系数
+- 优先搜索高权重平台
+
+### 4. 质量评估优化
+- 学习运营反馈的评分标准
+- 调整质量评估算法权重
+- 提高内容匹配准确度
+
+## 扩展性设计
+
+### 1. 新增平台
+```python
+@tool(description="从新平台搜索视频内容")
+async def new_platform_search(
+    keywords: str,
+    max_results: int = 20,
+    ctx: ToolContext = None,
+) -> ToolResult:
+    # 实现新平台的搜索逻辑
+    pass
+```
+
+### 2. 新增评估维度
+```python
+def _calculate_custom_score(item: Dict[str, Any]) -> float:
+    # 添加自定义评估逻辑
+    pass
+```
+
+### 3. 新增记忆类型
+```python
+class CustomMemoryManager:
+    """自定义记忆管理器"""
+    def __init__(self, storage_path: str):
+        # 初始化存储
+        pass
+```
+
+## 性能优化
+
+### 1. 并行搜索
+- 多个平台同时搜索
+- 减少总体响应时间
+
+### 2. 缓存机制
+- 缓存相似搜索结果
+- 减少重复爬取
+
+### 3. 增量更新
+- 只更新变化的数据
+- 减少存储开销
+
+## 安全考虑
+
+### 1. 数据隐私
+- 敏感信息脱敏
+- 访问权限控制
+
+### 2. 爬虫合规
+- 遵守平台规则
+- 控制请求频率
+
+### 3. 内容审核
+- 过滤违规内容
+- 质量把关机制

+ 55 - 0
examples/content_finder/README.md

@@ -0,0 +1,55 @@
+# 内容寻找 AI Agent
+
+## 概述
+
+内容寻找Agent是一个智能视频内容发现系统,能够根据需求自主从全网寻找相关视频内容,并通过运营反馈和内容表现数据不断优化寻找策略。
+
+## 核心功能
+
+### 1. 智能内容搜索
+- 根据需求关键词和标签自动搜索多个平台(抖音、快手等)
+- 支持多维度内容筛选(播放量、点赞数、发布时间等)
+- 自动去重和内容质量评估
+
+### 2. 运营交互系统
+- 支持运营人员标注内容质量(优质/一般/差)
+- 记录运营偏好和筛选规则
+- 可调整搜索策略和关键词权重
+
+### 3. 记忆与学习
+- 记录每次搜索的关键词、平台、筛选条件
+- 存储内容在平台的实际表现数据(播放、互动、转化)
+- 基于历史数据优化搜索策略
+
+### 4. 自适应优化
+- 分析高表现内容的共同特征
+- 自动调整搜索关键词和筛选条件
+- 学习运营偏好,提高内容匹配度
+
+## 架构设计
+
+```
+content_finder/
+├── agent.py              # ContentFinderAgent 主类
+├── tools/
+│   ├── crawler.py        # 爬虫工具(抖音、快手等)
+│   ├── content_eval.py   # 内容评估工具
+│   └── feedback.py       # 反馈收集工具
+├── memory/
+│   ├── search_history.py # 搜索历史记录
+│   ├── content_perf.py   # 内容表现数据
+│   └── operator_pref.py  # 运营偏好数据
+├── skills/
+│   └── content_finder.md # Agent技能定义
+└── run.py                # 运行入口
+```
+
+## 数据流程
+
+1. **需求输入** → Agent接收搜索需求
+2. **策略生成** → 基于记忆生成搜索策略
+3. **多平台搜索** → 调用爬虫工具搜索内容
+4. **内容评估** → 评估内容质量和相关性
+5. **运营反馈** → 收集运营人员反馈
+6. **表现追踪** → 记录内容实际表现
+7. **策略优化** → 更新搜索策略和记忆

+ 318 - 0
examples/content_finder/USAGE.md

@@ -0,0 +1,318 @@
+# 内容寻找 AI Agent - 使用指南
+
+## 快速开始
+
+### 运行演示
+
+```bash
+cd examples/content_finder
+python demo.py
+```
+
+这个演示展示了完整的工作流程:
+1. 搜索美食类视频内容(抖音+快手)
+2. 收集运营反馈
+3. 更新内容表现数据
+4. 分析运营洞察
+
+## 项目结构
+
+```
+content_finder/
+├── demo.py               # ✅ 可运行的演示脚本(独立版本)
+├── agent.py              # ContentFinderAgent 完整实现
+├── run.py                # 框架集成版本(需要完整框架)
+├── tools/                # 工具层
+│   ├── crawler.py        # 爬虫工具(抖音、快手)
+│   ├── content_eval.py   # 内容评估工具
+│   └── feedback.py       # 反馈收集工具
+├── memory/               # 记忆层
+│   ├── search_history.py # 搜索历史管理
+│   ├── content_perf.py   # 内容表现数据
+│   └── operator_pref.py  # 运营偏好数据
+├── skills/               # 技能层
+│   └── content_finder.md # Agent技能定义
+├── README.md             # 项目概述
+└── ARCHITECTURE.md       # 架构设计文档
+```
+
+## 两个版本说明
+
+### 1. demo.py - 独立演示版本 ✅
+
+**特点**:
+- 完全独立运行,不依赖框架
+- 包含所有核心逻辑
+- 使用模拟数据演示功能
+- 适合快速理解和演示
+
+**运行**:
+```bash
+python demo.py
+```
+
+### 2. run.py - 框架集成版本
+
+**特点**:
+- 完整集成 Agent 框架
+- 使用真实的 LLM 和工具系统
+- 支持 Trace、Memory、Skills
+- 适合生产环境
+
+**运行**(需要框架环境):
+```bash
+# 从项目根目录运行
+python -m examples.content_finder.run
+```
+
+## 核心功能
+
+### 1. 内容搜索
+
+```python
+from demo import ContentFinderAgent, MemoryManager, SearchRequest
+
+memory = MemoryManager(".cache/content_finder")
+agent = ContentFinderAgent(memory)
+
+request = SearchRequest(
+    keywords=["美食探店", "美食推荐"],
+    tags=["美食", "探店"],
+    platforms=["douyin", "kuaishou"],
+    filters={"min_views": 10000},
+    max_results=20,
+)
+
+results = await agent.search_content(request)
+```
+
+### 2. 收集反馈
+
+```python
+# 运营人员对内容进行评级
+agent.collect_feedback(
+    content_id="dy_123",
+    rating="excellent",  # excellent / good / poor
+    notes="内容质量很高,符合平台调性"
+)
+```
+
+### 3. 更新表现
+
+```python
+# 更新内容在平台的实际表现
+agent.update_performance(
+    content_id="dy_123",
+    internal_views=5000,
+    engagement=0.15
+)
+```
+
+### 4. 分析洞察
+
+```python
+# 分析运营反馈,获取洞察
+agent.analyze_insights()
+```
+
+## 数据存储
+
+所有数据存储在 `.cache/content_finder/` 目录:
+
+```
+.cache/content_finder/
+├── feedbacks.jsonl       # 运营反馈数据
+└── performances.jsonl    # 内容表现数据
+```
+
+### 反馈数据格式
+
+```json
+{
+  "content_id": "dy_123",
+  "rating": "excellent",
+  "notes": "内容质量很高",
+  "timestamp": "2026-03-06T10:00:00"
+}
+```
+
+### 表现数据格式
+
+```json
+{
+  "content_id": "dy_123",
+  "internal_views": 5000,
+  "engagement": 0.15,
+  "timestamp": "2026-03-06T10:00:00"
+}
+```
+
+## 扩展开发
+
+### 添加新平台
+
+在 `demo.py` 中添加新的搜索函数:
+
+```python
+async def bilibili_search(keywords: str, max_results: int = 20) -> List[ContentItem]:
+    """B站搜索"""
+    # 实现搜索逻辑
+    pass
+```
+
+然后在 `search_content` 方法中添加平台支持:
+
+```python
+elif platform == "bilibili":
+    tasks.append(bilibili_search(keyword, request.max_results))
+```
+
+### 自定义评估算法
+
+修改 `evaluate_content` 函数中的评分逻辑:
+
+```python
+def evaluate_content(items: List[ContentItem], keywords: List[str]) -> List[ContentItem]:
+    for item in items:
+        # 自定义评分算法
+        quality_score = your_custom_scoring_logic(item)
+        item.quality_score = quality_score
+    return items
+```
+
+### 添加新的记忆类型
+
+在 `MemoryManager` 类中添加新方法:
+
+```python
+def save_custom_data(self, data: Dict):
+    """保存自定义数据"""
+    with open(self.custom_file, "a", encoding="utf-8") as f:
+        f.write(json.dumps(data, ensure_ascii=False) + "\n")
+```
+
+## 实际部署
+
+### 1. 集成真实爬虫
+
+将 `demo.py` 中的模拟函数替换为真实爬虫调用:
+
+```python
+async def douyin_search(keywords: str, max_results: int = 20) -> List[ContentItem]:
+    """真实的抖音搜索"""
+    # 调用抖音爬虫 API
+    response = await douyin_api.search(keywords, limit=max_results)
+
+    # 解析并返回结果
+    return [
+        ContentItem(
+            content_id=item['id'],
+            platform='douyin',
+            title=item['title'],
+            # ... 其他字段
+        )
+        for item in response['data']
+    ]
+```
+
+### 2. 集成数据库
+
+将 JSONL 文件存储替换为数据库:
+
+```python
+class DatabaseMemoryManager:
+    def __init__(self, db_connection):
+        self.db = db_connection
+
+    def save_feedback(self, content_id: str, rating: str, notes: str):
+        self.db.execute(
+            "INSERT INTO feedbacks (content_id, rating, notes, created_at) VALUES (?, ?, ?, ?)",
+            (content_id, rating, notes, datetime.now())
+        )
+```
+
+### 3. 添加 API 接口
+
+使用 FastAPI 提供 HTTP 接口:
+
+```python
+from fastapi import FastAPI
+
+app = FastAPI()
+agent = ContentFinderAgent(memory)
+
+@app.post("/search")
+async def search(request: SearchRequest):
+    results = await agent.search_content(request)
+    return {"results": results}
+
+@app.post("/feedback")
+async def feedback(content_id: str, rating: str, notes: str):
+    agent.collect_feedback(content_id, rating, notes)
+    return {"status": "ok"}
+```
+
+## 性能优化
+
+### 1. 并行搜索
+
+已实现多平台并行搜索:
+
+```python
+tasks = []
+for platform in request.platforms:
+    tasks.append(search_platform(platform, keywords))
+
+results = await asyncio.gather(*tasks)
+```
+
+### 2. 结果缓存
+
+添加缓存机制避免重复搜索:
+
+```python
+from functools import lru_cache
+
+@lru_cache(maxsize=100)
+def get_cached_results(keywords: str, platform: str):
+    # 返回缓存的结果
+    pass
+```
+
+### 3. 批量处理
+
+批量更新表现数据:
+
+```python
+def batch_update_performance(self, updates: List[Dict]):
+    """批量更新表现数据"""
+    with open(self.performance_file, "a", encoding="utf-8") as f:
+        for update in updates:
+            f.write(json.dumps(update, ensure_ascii=False) + "\n")
+```
+
+## 常见问题
+
+### Q: 如何运行框架集成版本?
+
+A: 需要先解决框架依赖问题(dbutils),或者使用 demo.py 独立版本。
+
+### Q: 如何添加更多平台?
+
+A: 参考 `douyin_search` 和 `kuaishou_search` 的实现模式,添加新的搜索函数。
+
+### Q: 数据存储在哪里?
+
+A: 默认存储在 `.cache/content_finder/` 目录,可以通过修改 `MemoryManager` 的初始化参数更改。
+
+### Q: 如何自定义评分算法?
+
+A: 修改 `evaluate_content` 函数中的 `quality_score` 计算逻辑。
+
+## 下一步
+
+1. **集成真实爬虫**:替换模拟数据为真实 API 调用
+2. **优化评估算法**:基于实际数据调整评分权重
+3. **添加更多平台**:支持 B站、小红书等平台
+4. **实现学习机制**:基于反馈自动优化搜索策略
+5. **部署生产环境**:添加 API 接口、数据库、监控等

+ 267 - 0
examples/content_finder/agent.py

@@ -0,0 +1,267 @@
+"""
+内容寻找 AI Agent
+
+核心功能:
+1. 根据需求自主从全网寻找相关视频内容
+2. 支持运营人工交互,调整记忆相关数据
+3. 集成现有爬虫能力(抖音、快手等)
+4. 通过运营交互和内容表现不断优化寻找路径
+"""
+
+import asyncio
+import logging
+from dataclasses import dataclass
+from typing import List, Dict, Any, Optional
+from datetime import datetime
+
+from agent.core.runner import AgentRunner, RunConfig
+from agent.trace.protocols import TraceStore
+from agent.memory.protocols import MemoryStore
+
+logger = logging.getLogger(__name__)
+
+
+@dataclass
+class SearchRequest:
+    """搜索请求"""
+    keywords: List[str]              # 关键词列表
+    tags: List[str]                  # 标签列表
+    platforms: List[str]             # 目标平台(douyin, kuaishou等)
+    filters: Dict[str, Any]          # 筛选条件(播放量、时间范围等)
+    max_results: int = 50            # 最大结果数
+
+
+@dataclass
+class ContentItem:
+    """内容项"""
+    content_id: str                  # 内容唯一ID
+    platform: str                    # 平台名称
+    title: str                       # 标题
+    author: str                      # 作者
+    url: str                         # 链接
+    cover_url: str                   # 封面
+    description: str                 # 描述
+    stats: Dict[str, int]            # 统计数据(播放、点赞等)
+    tags: List[str]                  # 标签
+    publish_time: datetime           # 发布时间
+    crawl_time: datetime             # 抓取时间
+
+
+@dataclass
+class OperatorFeedback:
+    """运营反馈"""
+    content_id: str                  # 内容ID
+    rating: str                      # 评级(excellent/good/poor)
+    notes: str                       # 备注
+    operator_id: str                 # 运营人员ID
+    feedback_time: datetime          # 反馈时间
+
+
+@dataclass
+class ContentPerformance:
+    """内容表现数据"""
+    content_id: str                  # 内容ID
+    platform_views: int              # 平台播放量
+    platform_likes: int              # 平台点赞数
+    platform_shares: int             # 平台分享数
+    internal_views: int              # 内部平台播放量
+    internal_engagement: float       # 内部互动率
+    conversion_rate: float           # 转化率
+    update_time: datetime            # 更新时间
+
+
+class ContentFinderAgent:
+    """内容寻找 Agent"""
+
+    def __init__(
+        self,
+        runner: AgentRunner,
+        trace_store: TraceStore,
+        memory_store: Optional[MemoryStore] = None,
+    ):
+        self.runner = runner
+        self.trace_store = trace_store
+        self.memory_store = memory_store
+
+    async def search_content(
+        self,
+        request: SearchRequest,
+        config: Optional[RunConfig] = None,
+    ) -> List[ContentItem]:
+        """
+        搜索内容
+
+        Args:
+            request: 搜索请求
+            config: 运行配置
+
+        Returns:
+            内容列表
+        """
+        if config is None:
+            config = RunConfig(
+                model="gpt-4o",
+                agent_type="content_finder",
+                skills=["content-finder"],
+            )
+
+        # 构建搜索提示词
+        prompt = self._build_search_prompt(request)
+
+        # 运行Agent
+        results = []
+        async for item in self.runner.run(
+            messages=[{"role": "user", "content": prompt}],
+            config=config,
+        ):
+            # 处理返回的内容项
+            if isinstance(item, dict) and "content_items" in item:
+                results.extend(item["content_items"])
+
+        return results
+
+    def _build_search_prompt(self, request: SearchRequest) -> str:
+        """构建搜索提示词"""
+        prompt_parts = [
+            f"请帮我从以下平台搜索视频内容:{', '.join(request.platforms)}",
+            f"\n关键词:{', '.join(request.keywords)}",
+        ]
+
+        if request.tags:
+            prompt_parts.append(f"标签:{', '.join(request.tags)}")
+
+        if request.filters:
+            filter_desc = []
+            if "min_views" in request.filters:
+                filter_desc.append(f"最小播放量:{request.filters['min_views']}")
+            if "min_likes" in request.filters:
+                filter_desc.append(f"最小点赞数:{request.filters['min_likes']}")
+            if "date_range" in request.filters:
+                filter_desc.append(f"时间范围:{request.filters['date_range']}")
+            if filter_desc:
+                prompt_parts.append(f"筛选条件:{', '.join(filter_desc)}")
+
+        prompt_parts.append(f"\n最多返回 {request.max_results} 条结果")
+        prompt_parts.append("\n请使用爬虫工具搜索内容,并评估内容质量和相关性。")
+
+        return "\n".join(prompt_parts)
+
+    async def collect_feedback(
+        self,
+        content_id: str,
+        rating: str,
+        notes: str,
+        operator_id: str,
+    ) -> OperatorFeedback:
+        """
+        收集运营反馈
+
+        Args:
+            content_id: 内容ID
+            rating: 评级
+            notes: 备注
+            operator_id: 运营人员ID
+
+        Returns:
+            反馈记录
+        """
+        feedback = OperatorFeedback(
+            content_id=content_id,
+            rating=rating,
+            notes=notes,
+            operator_id=operator_id,
+            feedback_time=datetime.now(),
+        )
+
+        # 保存到记忆系统
+        if self.memory_store:
+            await self._save_feedback(feedback)
+
+        return feedback
+
+    async def update_performance(
+        self,
+        content_id: str,
+        performance: ContentPerformance,
+    ) -> None:
+        """
+        更新内容表现数据
+
+        Args:
+            content_id: 内容ID
+            performance: 表现数据
+        """
+        if self.memory_store:
+            await self._save_performance(performance)
+
+    async def optimize_strategy(self) -> Dict[str, Any]:
+        """
+        优化搜索策略
+
+        基于历史搜索记录、运营反馈和内容表现数据,
+        分析并优化搜索策略。
+
+        Returns:
+            优化后的策略配置
+        """
+        # 获取历史数据
+        search_history = await self._get_search_history()
+        feedbacks = await self._get_feedbacks()
+        performances = await self._get_performances()
+
+        # 分析高表现内容特征
+        high_perf_features = self._analyze_high_performance_content(
+            feedbacks, performances
+        )
+
+        # 生成优化策略
+        strategy = {
+            "recommended_keywords": high_perf_features.get("keywords", []),
+            "recommended_tags": high_perf_features.get("tags", []),
+            "optimal_filters": high_perf_features.get("filters", {}),
+            "platform_weights": high_perf_features.get("platform_weights", {}),
+        }
+
+        return strategy
+
+    # ===== 私有方法 =====
+
+    async def _save_feedback(self, feedback: OperatorFeedback) -> None:
+        """保存反馈到记忆系统"""
+        # 伪代码:实际实现需要调用memory_store
+        pass
+
+    async def _save_performance(self, performance: ContentPerformance) -> None:
+        """保存表现数据到记忆系统"""
+        # 伪代码:实际实现需要调用memory_store
+        pass
+
+    async def _get_search_history(self) -> List[Dict[str, Any]]:
+        """获取搜索历史"""
+        # 伪代码:从memory_store读取
+        return []
+
+    async def _get_feedbacks(self) -> List[OperatorFeedback]:
+        """获取所有反馈"""
+        # 伪代码:从memory_store读取
+        return []
+
+    async def _get_performances(self) -> List[ContentPerformance]:
+        """获取所有表现数据"""
+        # 伪代码:从memory_store读取
+        return []
+
+    def _analyze_high_performance_content(
+        self,
+        feedbacks: List[OperatorFeedback],
+        performances: List[ContentPerformance],
+    ) -> Dict[str, Any]:
+        """分析高表现内容的共同特征"""
+        # 伪代码:实际实现需要数据分析逻辑
+        return {
+            "keywords": [],
+            "tags": [],
+            "filters": {},
+            "platform_weights": {},
+        }
+

+ 298 - 0
examples/content_finder/demo.py

@@ -0,0 +1,298 @@
+"""
+内容寻找 Agent - 简化运行示例
+
+这是一个独立的演示脚本,展示核心逻辑,不依赖完整的框架。
+"""
+
+import asyncio
+import json
+from pathlib import Path
+from datetime import datetime
+from typing import List, Dict, Any
+
+
+# ===== 数据模型 =====
+
+class SearchRequest:
+    """搜索请求"""
+    def __init__(self, keywords: List[str], tags: List[str], platforms: List[str],
+                 filters: Dict[str, Any], max_results: int = 50):
+        self.keywords = keywords
+        self.tags = tags
+        self.platforms = platforms
+        self.filters = filters
+        self.max_results = max_results
+
+
+class ContentItem:
+    """内容项"""
+    def __init__(self, content_id: str, platform: str, title: str, author: str,
+                 url: str, stats: Dict[str, int], tags: List[str]):
+        self.content_id = content_id
+        self.platform = platform
+        self.title = title
+        self.author = author
+        self.url = url
+        self.stats = stats
+        self.tags = tags
+
+
+# ===== 模拟爬虫工具 =====
+
+async def douyin_search(keywords: str, max_results: int = 20) -> List[ContentItem]:
+    """模拟抖音搜索"""
+    print(f"🔍 正在抖音搜索: {keywords}")
+    await asyncio.sleep(0.5)  # 模拟网络延迟
+
+    # 返回模拟数据
+    return [
+        ContentItem(
+            content_id=f"dy_{i}",
+            platform="douyin",
+            title=f"【{keywords}】精彩视频 #{i}",
+            author=f"作者{i}",
+            url=f"https://douyin.com/video/{i}",
+            stats={"views": 100000 + i * 10000, "likes": 5000 + i * 500, "comments": 200 + i * 20},
+            tags=[keywords, "热门", "推荐"]
+        )
+        for i in range(1, min(max_results, 6))
+    ]
+
+
+async def kuaishou_search(keywords: str, max_results: int = 20) -> List[ContentItem]:
+    """模拟快手搜索"""
+    print(f"🔍 正在快手搜索: {keywords}")
+    await asyncio.sleep(0.5)
+
+    return [
+        ContentItem(
+            content_id=f"ks_{i}",
+            platform="kuaishou",
+            title=f"【{keywords}】优质内容 #{i}",
+            author=f"创作者{i}",
+            url=f"https://kuaishou.com/video/{i}",
+            stats={"views": 80000 + i * 8000, "likes": 4000 + i * 400, "comments": 150 + i * 15},
+            tags=[keywords, "精选"]
+        )
+        for i in range(1, min(max_results, 6))
+    ]
+
+
+# ===== 内容评估 =====
+
+def evaluate_content(items: List[ContentItem], keywords: List[str]) -> List[ContentItem]:
+    """评估内容质量"""
+    print(f"📊 正在评估 {len(items)} 条内容...")
+
+    for item in items:
+        # 计算质量分数
+        stats = item.stats
+        views = stats.get("views", 0)
+        likes = stats.get("likes", 0)
+        comments = stats.get("comments", 0)
+
+        engagement_rate = (likes + comments) / max(views, 1)
+        quality_score = (views / 10000) * 0.5 + engagement_rate * 50
+
+        item.quality_score = round(quality_score, 2)
+
+        # 计算相关性分数
+        relevance = sum(1 for kw in keywords if kw in item.title) / len(keywords) * 100
+        item.relevance_score = round(relevance, 2)
+
+    # 按质量分数排序
+    items.sort(key=lambda x: x.quality_score, reverse=True)
+    return items
+
+
+# ===== 记忆管理 =====
+
+class MemoryManager:
+    """简化的记忆管理器"""
+
+    def __init__(self, storage_path: str):
+        self.storage_path = Path(storage_path)
+        self.storage_path.mkdir(parents=True, exist_ok=True)
+        self.feedback_file = self.storage_path / "feedbacks.jsonl"
+        self.performance_file = self.storage_path / "performances.jsonl"
+
+    def save_feedback(self, content_id: str, rating: str, notes: str):
+        """保存反馈"""
+        record = {
+            "content_id": content_id,
+            "rating": rating,
+            "notes": notes,
+            "timestamp": datetime.now().isoformat(),
+        }
+        with open(self.feedback_file, "a", encoding="utf-8") as f:
+            f.write(json.dumps(record, ensure_ascii=False) + "\n")
+        print(f"✅ 反馈已保存: {rating}")
+
+    def save_performance(self, content_id: str, internal_views: int, engagement: float):
+        """保存表现数据"""
+        record = {
+            "content_id": content_id,
+            "internal_views": internal_views,
+            "engagement": engagement,
+            "timestamp": datetime.now().isoformat(),
+        }
+        with open(self.performance_file, "a", encoding="utf-8") as f:
+            f.write(json.dumps(record, ensure_ascii=False) + "\n")
+        print(f"✅ 表现数据已保存")
+
+    def get_feedbacks(self, rating_filter: str = None) -> List[Dict]:
+        """获取反馈"""
+        if not self.feedback_file.exists():
+            return []
+
+        feedbacks = []
+        with open(self.feedback_file, "r", encoding="utf-8") as f:
+            for line in f:
+                if line.strip():
+                    record = json.loads(line)
+                    if rating_filter is None or record.get("rating") == rating_filter:
+                        feedbacks.append(record)
+        return feedbacks
+
+
+# ===== 主Agent =====
+
+class ContentFinderAgent:
+    """内容寻找Agent"""
+
+    def __init__(self, memory: MemoryManager):
+        self.memory = memory
+
+    async def search_content(self, request: SearchRequest) -> List[ContentItem]:
+        """搜索内容"""
+        print(f"\n{'='*60}")
+        print(f"🎯 开始搜索内容")
+        print(f"关键词: {', '.join(request.keywords)}")
+        print(f"平台: {', '.join(request.platforms)}")
+        print(f"{'='*60}\n")
+
+        # 并行搜索多个平台
+        tasks = []
+        for platform in request.platforms:
+            for keyword in request.keywords:
+                if platform == "douyin":
+                    tasks.append(douyin_search(keyword, request.max_results // len(request.keywords)))
+                elif platform == "kuaishou":
+                    tasks.append(kuaishou_search(keyword, request.max_results // len(request.keywords)))
+
+        results_list = await asyncio.gather(*tasks)
+
+        # 合并结果
+        all_items = []
+        for results in results_list:
+            all_items.extend(results)
+
+        # 去重
+        seen = set()
+        unique_items = []
+        for item in all_items:
+            if item.content_id not in seen:
+                seen.add(item.content_id)
+                unique_items.append(item)
+
+        # 评估内容
+        evaluated_items = evaluate_content(unique_items, request.keywords)
+
+        print(f"\n✨ 搜索完成,找到 {len(evaluated_items)} 条内容\n")
+        return evaluated_items
+
+    def collect_feedback(self, content_id: str, rating: str, notes: str):
+        """收集反馈"""
+        self.memory.save_feedback(content_id, rating, notes)
+
+    def update_performance(self, content_id: str, internal_views: int, engagement: float):
+        """更新表现"""
+        self.memory.save_performance(content_id, internal_views, engagement)
+
+    def analyze_insights(self):
+        """分析洞察"""
+        print(f"\n{'='*60}")
+        print("📈 分析运营洞察")
+        print(f"{'='*60}\n")
+
+        excellent = self.memory.get_feedbacks("excellent")
+        good = self.memory.get_feedbacks("good")
+        poor = self.memory.get_feedbacks("poor")
+
+        total = len(excellent) + len(good) + len(poor)
+        if total == 0:
+            print("暂无反馈数据")
+            return
+
+        print(f"总反馈数: {total}")
+        print(f"  - 优质 (excellent): {len(excellent)} ({len(excellent)/total*100:.1f}%)")
+        print(f"  - 良好 (good): {len(good)} ({len(good)/total*100:.1f}%)")
+        print(f"  - 较差 (poor): {len(poor)} ({len(poor)/total*100:.1f}%)")
+        print()
+
+
+# ===== 主函数 =====
+
+async def main():
+    """演示完整流程"""
+
+    # 初始化
+    memory = MemoryManager(".cache/content_finder")
+    agent = ContentFinderAgent(memory)
+
+    # 示例1: 搜索美食内容
+    print("\n" + "="*60)
+    print("示例1: 搜索美食类视频内容")
+    print("="*60)
+
+    request = SearchRequest(
+        keywords=["美食探店", "美食推荐"],
+        tags=["美食", "探店"],
+        platforms=["douyin", "kuaishou"],
+        filters={"min_views": 10000},
+        max_results=20,
+    )
+
+    results = await agent.search_content(request)
+
+    # 显示前5条结果
+    print("🏆 Top 5 内容:")
+    for i, item in enumerate(results[:5], 1):
+        print(f"\n{i}. [{item.platform}] {item.title}")
+        print(f"   作者: {item.author}")
+        print(f"   播放: {item.stats['views']:,} | 点赞: {item.stats['likes']:,}")
+        print(f"   质量分数: {item.quality_score} | 相关性: {item.relevance_score}")
+
+    # 示例2: 收集反馈
+    print("\n" + "="*60)
+    print("示例2: 收集运营反馈")
+    print("="*60 + "\n")
+
+    if results:
+        agent.collect_feedback(results[0].content_id, "excellent", "内容质量很高")
+        agent.collect_feedback(results[1].content_id, "good", "内容不错")
+        agent.collect_feedback(results[2].content_id, "excellent", "非常符合需求")
+
+    # 示例3: 更新表现
+    print("\n" + "="*60)
+    print("示例3: 更新内容表现")
+    print("="*60 + "\n")
+
+    if results:
+        agent.update_performance(results[0].content_id, internal_views=5000, engagement=0.15)
+        agent.update_performance(results[1].content_id, internal_views=3000, engagement=0.12)
+
+    # 示例4: 分析洞察
+    print("\n" + "="*60)
+    print("示例4: 分析运营洞察")
+    print("="*60)
+
+    agent.analyze_insights()
+
+    print("\n" + "="*60)
+    print("✅ 演示完成!")
+    print("="*60 + "\n")
+
+
+if __name__ == "__main__":
+    asyncio.run(main())

+ 13 - 0
examples/content_finder/memory/__init__.py

@@ -0,0 +1,13 @@
+"""
+记忆系统模块
+"""
+
+from .search_history import SearchHistoryManager
+from .content_perf import ContentPerformanceManager
+from .operator_pref import OperatorPreferenceManager
+
+__all__ = [
+    "SearchHistoryManager",
+    "ContentPerformanceManager",
+    "OperatorPreferenceManager",
+]

+ 100 - 0
examples/content_finder/memory/content_perf.py

@@ -0,0 +1,100 @@
+"""
+内容表现数据管理
+"""
+
+from typing import List, Dict, Any, Optional
+from datetime import datetime
+import json
+from pathlib import Path
+
+
+class ContentPerformanceManager:
+    """内容表现数据管理器"""
+
+    def __init__(self, storage_path: str):
+        self.storage_path = Path(storage_path)
+        self.storage_path.mkdir(parents=True, exist_ok=True)
+        self.perf_file = self.storage_path / "content_performance.jsonl"
+
+    async def save_performance(
+        self,
+        content_id: str,
+        platform_views: int,
+        platform_likes: int,
+        platform_shares: int,
+        internal_views: int,
+        internal_engagement: float,
+        conversion_rate: float,
+    ) -> None:
+        """保存内容表现数据"""
+        record = {
+            "content_id": content_id,
+            "timestamp": datetime.now().isoformat(),
+            "platform_views": platform_views,
+            "platform_likes": platform_likes,
+            "platform_shares": platform_shares,
+            "internal_views": internal_views,
+            "internal_engagement": internal_engagement,
+            "conversion_rate": conversion_rate,
+        }
+
+        with open(self.perf_file, "a", encoding="utf-8") as f:
+            f.write(json.dumps(record, ensure_ascii=False) + "\n")
+
+    async def get_performance(
+        self,
+        content_id: Optional[str] = None,
+        limit: int = 100,
+    ) -> List[Dict[str, Any]]:
+        """获取内容表现数据"""
+        if not self.perf_file.exists():
+            return []
+
+        records = []
+        with open(self.perf_file, "r", encoding="utf-8") as f:
+            for line in f:
+                if line.strip():
+                    record = json.loads(line)
+                    if content_id is None or record.get("content_id") == content_id:
+                        records.append(record)
+
+        return records[-limit:]
+
+    async def get_top_performers(
+        self,
+        metric: str = "internal_engagement",
+        limit: int = 20,
+    ) -> List[Dict[str, Any]]:
+        """获取表现最好的内容"""
+        all_records = await self.get_performance(limit=1000)
+
+        # 按指定指标排序
+        sorted_records = sorted(
+            all_records,
+            key=lambda x: x.get(metric, 0),
+            reverse=True,
+        )
+
+        return sorted_records[:limit]
+
+    async def analyze_trends(self) -> Dict[str, Any]:
+        """分析内容表现趋势"""
+        all_records = await self.get_performance(limit=1000)
+
+        if not all_records:
+            return {
+                "avg_internal_engagement": 0,
+                "avg_conversion_rate": 0,
+                "total_internal_views": 0,
+            }
+
+        total_engagement = sum(r.get("internal_engagement", 0) for r in all_records)
+        total_conversion = sum(r.get("conversion_rate", 0) for r in all_records)
+        total_views = sum(r.get("internal_views", 0) for r in all_records)
+
+        return {
+            "avg_internal_engagement": total_engagement / len(all_records),
+            "avg_conversion_rate": total_conversion / len(all_records),
+            "total_internal_views": total_views,
+            "sample_size": len(all_records),
+        }

+ 185 - 0
examples/content_finder/memory/operator_pref.py

@@ -0,0 +1,185 @@
+"""
+运营偏好数据管理
+"""
+
+from typing import List, Dict, Any, Optional
+from datetime import datetime
+import json
+from pathlib import Path
+from collections import Counter
+
+
+class OperatorPreferenceManager:
+    """运营偏好数据管理器"""
+
+    def __init__(self, storage_path: str):
+        self.storage_path = Path(storage_path)
+        self.storage_path.mkdir(parents=True, exist_ok=True)
+        self.feedback_file = self.storage_path / "operator_feedback.jsonl"
+
+    async def save_feedback(
+        self,
+        content_id: str,
+        rating: str,
+        notes: str,
+        operator_id: str,
+        content_features: Optional[Dict[str, Any]] = None,
+    ) -> None:
+        """保存运营反馈"""
+        record = {
+            "content_id": content_id,
+            "timestamp": datetime.now().isoformat(),
+            "rating": rating,
+            "notes": notes,
+            "operator_id": operator_id,
+            "content_features": content_features or {},
+        }
+
+        with open(self.feedback_file, "a", encoding="utf-8") as f:
+            f.write(json.dumps(record, ensure_ascii=False) + "\n")
+
+    async def get_feedbacks(
+        self,
+        content_id: Optional[str] = None,
+        rating_filter: Optional[str] = None,
+        operator_id: Optional[str] = None,
+        limit: int = 100,
+    ) -> List[Dict[str, Any]]:
+        """获取反馈记录"""
+        if not self.feedback_file.exists():
+            return []
+
+        records = []
+        with open(self.feedback_file, "r", encoding="utf-8") as f:
+            for line in f:
+                if line.strip():
+                    record = json.loads(line)
+
+                    # 应用筛选条件
+                    if content_id and record.get("content_id") != content_id:
+                        continue
+                    if rating_filter and record.get("rating") != rating_filter:
+                        continue
+                    if operator_id and record.get("operator_id") != operator_id:
+                        continue
+
+                    records.append(record)
+
+        return records[-limit:]
+
+    async def analyze_preferences(
+        self,
+        operator_id: Optional[str] = None,
+    ) -> Dict[str, Any]:
+        """分析运营偏好"""
+        feedbacks = await self.get_feedbacks(
+            operator_id=operator_id,
+            limit=1000,
+        )
+
+        if not feedbacks:
+            return {
+                "rating_distribution": {},
+                "preferred_keywords": [],
+                "preferred_tags": [],
+                "preferred_platforms": [],
+            }
+
+        # 评级分布
+        ratings = [f.get("rating") for f in feedbacks]
+        rating_dist = dict(Counter(ratings))
+
+        # 提取优质内容的特征
+        excellent_feedbacks = [f for f in feedbacks if f.get("rating") == "excellent"]
+
+        keywords = []
+        tags = []
+        platforms = []
+
+        for feedback in excellent_feedbacks:
+            features = feedback.get("content_features", {})
+            keywords.extend(features.get("keywords", []))
+            tags.extend(features.get("tags", []))
+            if "platform" in features:
+                platforms.append(features["platform"])
+
+        # 统计频率
+        keyword_freq = Counter(keywords)
+        tag_freq = Counter(tags)
+        platform_freq = Counter(platforms)
+
+        return {
+            "rating_distribution": rating_dist,
+            "preferred_keywords": [k for k, _ in keyword_freq.most_common(10)],
+            "preferred_tags": [t for t, _ in tag_freq.most_common(10)],
+            "preferred_platforms": [p for p, _ in platform_freq.most_common()],
+            "total_feedbacks": len(feedbacks),
+            "excellent_count": len(excellent_feedbacks),
+        }
+
+    async def get_learning_insights(self) -> Dict[str, Any]:
+        """获取学习洞察"""
+        all_feedbacks = await self.get_feedbacks(limit=1000)
+
+        if not all_feedbacks:
+            return {"insights": []}
+
+        # 分析优质内容的共同特征
+        excellent = [f for f in all_feedbacks if f.get("rating") == "excellent"]
+        good = [f for f in all_feedbacks if f.get("rating") == "good"]
+        poor = [f for f in all_feedbacks if f.get("rating") == "poor"]
+
+        insights = []
+
+        # 洞察1:评级分布
+        insights.append({
+            "type": "rating_distribution",
+            "message": f"优质内容占比:{len(excellent) / len(all_feedbacks) * 100:.1f}%",
+            "data": {
+                "excellent": len(excellent),
+                "good": len(good),
+                "poor": len(poor),
+            },
+        })
+
+        # 洞察2:优质内容特征
+        if excellent:
+            excellent_features = self._extract_common_features(excellent)
+            insights.append({
+                "type": "excellent_features",
+                "message": "优质内容的共同特征",
+                "data": excellent_features,
+            })
+
+        # 洞察3:需要避免的特征
+        if poor:
+            poor_features = self._extract_common_features(poor)
+            insights.append({
+                "type": "poor_features",
+                "message": "低质内容的共同特征(需避免)",
+                "data": poor_features,
+            })
+
+        return {"insights": insights}
+
+    def _extract_common_features(
+        self,
+        feedbacks: List[Dict[str, Any]],
+    ) -> Dict[str, Any]:
+        """提取共同特征"""
+        all_keywords = []
+        all_tags = []
+        all_platforms = []
+
+        for feedback in feedbacks:
+            features = feedback.get("content_features", {})
+            all_keywords.extend(features.get("keywords", []))
+            all_tags.extend(features.get("tags", []))
+            if "platform" in features:
+                all_platforms.append(features["platform"])
+
+        return {
+            "top_keywords": [k for k, _ in Counter(all_keywords).most_common(5)],
+            "top_tags": [t for t, _ in Counter(all_tags).most_common(5)],
+            "top_platforms": [p for p, _ in Counter(all_platforms).most_common(3)],
+        }

+ 83 - 0
examples/content_finder/memory/search_history.py

@@ -0,0 +1,83 @@
+"""
+搜索历史记录管理
+"""
+
+from typing import List, Dict, Any, Optional
+from datetime import datetime
+import json
+from pathlib import Path
+
+
+class SearchHistoryManager:
+    """搜索历史记录管理器"""
+
+    def __init__(self, storage_path: str):
+        self.storage_path = Path(storage_path)
+        self.storage_path.mkdir(parents=True, exist_ok=True)
+        self.history_file = self.storage_path / "search_history.jsonl"
+
+    async def save_search(
+        self,
+        keywords: List[str],
+        platforms: List[str],
+        filters: Dict[str, Any],
+        results_count: int,
+        trace_id: str,
+    ) -> None:
+        """保存搜索记录"""
+        record = {
+            "timestamp": datetime.now().isoformat(),
+            "keywords": keywords,
+            "platforms": platforms,
+            "filters": filters,
+            "results_count": results_count,
+            "trace_id": trace_id,
+        }
+
+        with open(self.history_file, "a", encoding="utf-8") as f:
+            f.write(json.dumps(record, ensure_ascii=False) + "\n")
+
+    async def get_recent_searches(
+        self,
+        limit: int = 50,
+        keyword_filter: Optional[str] = None,
+    ) -> List[Dict[str, Any]]:
+        """获取最近的搜索记录"""
+        if not self.history_file.exists():
+            return []
+
+        records = []
+        with open(self.history_file, "r", encoding="utf-8") as f:
+            for line in f:
+                if line.strip():
+                    record = json.loads(line)
+                    if keyword_filter:
+                        if keyword_filter in record.get("keywords", []):
+                            records.append(record)
+                    else:
+                        records.append(record)
+
+        # 返回最近的记录
+        return records[-limit:]
+
+    async def get_similar_searches(
+        self,
+        keywords: List[str],
+        limit: int = 10,
+    ) -> List[Dict[str, Any]]:
+        """获取相似的搜索记录"""
+        all_searches = await self.get_recent_searches(limit=1000)
+
+        # 计算相似度
+        scored_searches = []
+        for search in all_searches:
+            search_keywords = set(search.get("keywords", []))
+            input_keywords = set(keywords)
+            similarity = len(search_keywords & input_keywords) / len(search_keywords | input_keywords)
+            if similarity > 0:
+                scored_searches.append((similarity, search))
+
+        # 按相似度排序
+        scored_searches.sort(key=lambda x: x[0], reverse=True)
+
+        return [search for _, search in scored_searches[:limit]]

+ 110 - 0
examples/content_finder/run.py

@@ -0,0 +1,110 @@
+"""
+内容寻找 Agent 运行入口
+
+使用示例:
+    python run.py
+"""
+
+import asyncio
+import logging
+from pathlib import Path
+
+from agent.core.runner import AgentRunner, RunConfig
+from agent.trace.store import FileSystemTraceStore
+from agent.llm import create_openrouter_llm_call
+from agent.memory.stores import FileSystemMemoryStore
+
+from agent import SearchRequest, ContentFinderAgent
+
+# 导入工具(确保工具被注册)
+from tools import crawler, content_eval, feedback
+
+logging.basicConfig(level=logging.INFO)
+logger = logging.getLogger(__name__)
+
+
+async def main():
+    """主函数"""
+    # 初始化存储
+    base_path = Path(__file__).parent / ".cache"
+    trace_store = FileSystemTraceStore(base_path=str(base_path / "traces"))
+    memory_store = FileSystemMemoryStore(base_path=str(base_path / "memory"))
+
+    # 初始化LLM
+    llm_call = create_openrouter_llm_call(
+        model="anthropic/claude-sonnet-4.5"
+    )
+
+    # 初始化Runner
+    runner = AgentRunner(
+        llm_call=llm_call,
+        trace_store=trace_store,
+        memory_store=memory_store,
+        skills_dir=str(Path(__file__).parent / "skills"),
+    )
+
+    # 初始化ContentFinderAgent
+    agent = ContentFinderAgent(
+        runner=runner,
+        trace_store=trace_store,
+        memory_store=memory_store,
+    )
+
+    # 示例1:搜索内容
+    logger.info("=== 示例1:搜索美食类视频内容 ===")
+    search_request = SearchRequest(
+        keywords=["美食", "探店", "美食推荐"],
+        tags=["美食", "探店"],
+        platforms=["douyin", "kuaishou"],
+        filters={
+            "min_views": 10000,
+            "min_likes": 500,
+        },
+        max_results=20,
+    )
+
+    results = await agent.search_content(search_request)
+    logger.info(f"找到 {len(results)} 条内容")
+
+    # 示例2:收集运营反馈
+    if results:
+        logger.info("\n=== 示例2:收集运营反馈 ===")
+        first_content = results[0]
+        feedback = await agent.collect_feedback(
+            content_id=first_content.content_id,
+            rating="excellent",
+            notes="内容质量很高,符合平台调性",
+            operator_id="operator_001",
+        )
+        logger.info(f"反馈已记录:{feedback}")
+
+    # 示例3:更新内容表现
+    if results:
+        logger.info("\n=== 示例3:更新内容表现 ===")
+        from agent import ContentPerformance
+        from datetime import datetime
+
+        performance = ContentPerformance(
+            content_id=first_content.content_id,
+            platform_views=150000,
+            platform_likes=8000,
+            platform_shares=500,
+            internal_views=5000,
+            internal_engagement=0.15,
+            conversion_rate=0.08,
+            update_time=datetime.now(),
+        )
+        await agent.update_performance(
+            content_id=first_content.content_id,
+            performance=performance,
+        )
+        logger.info("表现数据已更新")
+
+    # 示例4:优化搜索策略
+    logger.info("\n=== 示例4:优化搜索策略 ===")
+    strategy = await agent.optimize_strategy()
+    logger.info(f"优化后的策略:{strategy}")
+
+
+if __name__ == "__main__":
+    asyncio.run(main())

+ 76 - 0
examples/content_finder/skills/content_finder.md

@@ -0,0 +1,76 @@
+---
+name: content-finder
+description: 内容寻找专家 - 从全网寻找优质视频内容
+---
+
+# 内容寻找 Agent
+
+你是一个专业的内容寻找专家,负责从全网(抖音、快手等平台)寻找符合需求的优质视频内容。
+
+## 核心能力
+
+### 1. 智能搜索
+- 使用 `douyin_search` 和 `kuaishou_search` 工具搜索内容
+- 根据关键词、标签、筛选条件精准定位内容
+- 支持多平台并行搜索,提高效率
+
+### 2. 内容评估
+- 使用 `evaluate_content` 工具评估内容质量和相关性
+- 综合考虑播放量、互动率、新鲜度等因素
+- 优先推荐高质量、高相关性的内容
+
+### 3. 记忆学习
+- 记录每次搜索的关键词、平台、结果
+- 分析运营反馈,学习内容偏好
+- 基于历史表现数据优化搜索策略
+
+### 4. 反馈收集
+- 使用 `record_operator_feedback` 收集运营反馈
+- 使用 `update_content_performance` 更新内容表现
+- 使用 `query_historical_data` 查询历史数据
+
+## 工作流程
+
+### 搜索内容
+1. 理解需求:分析关键词、标签、筛选条件
+2. 查询历史:检查是否有相似搜索的历史数据
+3. 制定策略:基于历史数据优化搜索参数
+4. 并行搜索:同时在多个平台搜索
+5. 评估筛选:评估内容质量,筛选最佳结果
+6. 返回结果:按质量分数排序返回
+
+### 优化策略
+1. 分析高表现内容的共同特征
+2. 提取有效的关键词和标签
+3. 调整筛选条件(播放量、点赞数等)
+4. 优化平台权重(哪个平台效果更好)
+
+## 最佳实践
+
+### 关键词优化
+- 使用多个相关关键词组合搜索
+- 尝试同义词和相关词扩展
+- 根据反馈调整关键词权重
+
+### 筛选条件
+- 播放量:根据内容类型设置合理阈值
+- 互动率:优先选择高互动内容
+- 发布时间:平衡新鲜度和热度
+
+### 平台选择
+- 抖音:适合短视频、娱乐、生活类内容
+- 快手:适合接地气、真实、情感类内容
+- 根据内容类型选择最合适的平台
+
+### 质量评估
+- 不仅看绝对数值(播放量),更要看互动率
+- 关注内容的完播率和转化效果
+- 综合考虑运营反馈和实际表现
+
+## 注意事项
+
+1. **多样性**:不要只关注头部内容,也要发现潜力内容
+2. **时效性**:优先推荐新鲜、热门的内容
+3. **相关性**:确保内容与需求高度相关
+4. **合规性**:避免推荐违规、低俗内容
+5. **持续学习**:不断从反馈中学习,优化策略

+ 20 - 0
examples/content_finder/tools/__init__.py

@@ -0,0 +1,20 @@
+"""
+内容寻找工具包
+"""
+
+from .crawler import douyin_search, kuaishou_search
+from .content_eval import evaluate_content
+from .feedback import (
+    record_operator_feedback,
+    update_content_performance,
+    query_historical_data,
+)
+
+__all__ = [
+    "douyin_search",
+    "kuaishou_search",
+    "evaluate_content",
+    "record_operator_feedback",
+    "update_content_performance",
+    "query_historical_data",
+]

+ 105 - 0
examples/content_finder/tools/content_eval.py

@@ -0,0 +1,105 @@
+"""
+内容评估工具 - 评估内容质量和相关性
+"""
+
+from typing import Dict, Any, List
+from agent.tools import tool, ToolResult, ToolContext
+
+
+@tool(description="评估内容质量和相关性")
+async def evaluate_content(
+    content_items: List[Dict[str, Any]],
+    keywords: List[str],
+    ctx: ToolContext = None,
+) -> ToolResult:
+    """
+    评估内容质量和相关性
+
+    Args:
+        content_items: 内容列表
+        keywords: 搜索关键词
+        ctx: 工具上下文
+    """
+    evaluated_items = []
+
+    for item in content_items:
+        score = _calculate_quality_score(item, keywords)
+        item["quality_score"] = score
+        item["relevance_score"] = _calculate_relevance_score(item, keywords)
+        evaluated_items.append(item)
+
+    # 按质量分数排序
+    evaluated_items.sort(key=lambda x: x["quality_score"], reverse=True)
+
+    return ToolResult(
+        title="内容评估完成",
+        output=f"已评估 {len(evaluated_items)} 条内容",
+        data={"evaluated_items": evaluated_items},
+    )
+
+
+def _calculate_quality_score(item: Dict[str, Any], keywords: List[str]) -> float:
+    """
+    计算内容质量分数
+
+    考虑因素:
+    1. 播放量、点赞数、评论数、分享数
+    2. 互动率(点赞/播放、评论/播放)
+    3. 发布时间(新鲜度)
+    """
+    stats = item.get("stats", {})
+    views = stats.get("views", 0)
+    likes = stats.get("likes", 0)
+    comments = stats.get("comments", 0)
+    shares = stats.get("shares", 0)
+
+    # 基础分数:基于绝对数值
+    base_score = (
+        (views / 10000) * 0.3 +
+        (likes / 1000) * 0.3 +
+        (comments / 100) * 0.2 +
+        (shares / 100) * 0.2
+    )
+
+    # 互动率加成
+    engagement_rate = (likes + comments + shares) / max(views, 1)
+    engagement_bonus = engagement_rate * 20
+
+    # 总分
+    total_score = min(base_score + engagement_bonus, 100)
+
+    return round(total_score, 2)
+
+
+def _calculate_relevance_score(item: Dict[str, Any], keywords: List[str]) -> float:
+    """
+    计算内容相关性分数
+
+    考虑因素:
+    1. 标题中关键词匹配度
+    2. 描述中关键词匹配度
+    3. 标签匹配度
+    """
+    title = item.get("title", "").lower()
+    description = item.get("description", "").lower()
+    tags = [t.lower() for t in item.get("tags", [])]
+
+    score = 0.0
+    keyword_count = len(keywords)
+
+    for keyword in keywords:
+        keyword_lower = keyword.lower()
+
+        # 标题匹配(权重最高)
+        if keyword_lower in title:
+            score += 40 / keyword_count
+
+        # 描述匹配
+        if keyword_lower in description:
+            score += 30 / keyword_count
+
+        # 标签匹配
+        if keyword_lower in tags:
+            score += 30 / keyword_count
+
+    return min(score, 100)

+ 152 - 0
examples/content_finder/tools/crawler.py

@@ -0,0 +1,152 @@
+"""
+爬虫工具 - 集成抖音、快手等平台的爬虫能力
+"""
+
+from typing import List, Dict, Any, Optional
+from datetime import datetime
+from agent.tools import tool, ToolResult, ToolContext
+
+
+@tool(description="从抖音搜索视频内容")
+async def douyin_search(
+    keywords: str,
+    max_results: int = 20,
+    min_views: Optional[int] = None,
+    min_likes: Optional[int] = None,
+    ctx: ToolContext = None,
+) -> ToolResult:
+    """
+    从抖音搜索视频内容
+
+    Args:
+        keywords: 搜索关键词
+        max_results: 最大结果数
+        min_views: 最小播放量
+        min_likes: 最小点赞数
+        ctx: 工具上下文
+    """
+    # 伪代码:实际实现需要调用抖音爬虫API
+    results = await _call_douyin_crawler(
+        keywords=keywords,
+        max_results=max_results,
+        min_views=min_views,
+        min_likes=min_likes,
+    )
+
+    return ToolResult(
+        title=f"抖音搜索结果",
+        output=f"找到 {len(results)} 条内容",
+        data={"items": results},
+    )
+
+
+@tool(description="从快手搜索视频内容")
+async def kuaishou_search(
+    keywords: str,
+    max_results: int = 20,
+    min_views: Optional[int] = None,
+    min_likes: Optional[int] = None,
+    ctx: ToolContext = None,
+) -> ToolResult:
+    """
+    从快手搜索视频内容
+
+    Args:
+        keywords: 搜索关键词
+        max_results: 最大结果数
+        min_views: 最小播放量
+        min_likes: 最小点赞数
+        ctx: 工具上下文
+    """
+    # 伪代码:实际实现需要调用快手爬虫API
+    results = await _call_kuaishou_crawler(
+        keywords=keywords,
+        max_results=max_results,
+        min_views=min_views,
+        min_likes=min_likes,
+    )
+
+    return ToolResult(
+        title=f"快手搜索结果",
+        output=f"找到 {len(results)} 条内容",
+        data={"items": results},
+    )
+
+
+# ===== 爬虫实现(伪代码)=====
+
+async def _call_douyin_crawler(
+    keywords: str,
+    max_results: int,
+    min_views: Optional[int],
+    min_likes: Optional[int],
+) -> List[Dict[str, Any]]:
+    """
+    调用抖音爬虫
+
+    实际实现需要:
+    1. 调用抖音API或爬虫服务
+    2. 解析返回数据
+    3. 应用筛选条件
+    4. 格式化为标准ContentItem格式
+    """
+    # 伪代码示例
+    results = [
+        {
+            "content_id": "dy_123456",
+            "platform": "douyin",
+            "title": "示例视频标题",
+            "author": "作者名",
+            "url": "https://douyin.com/video/123456",
+            "cover_url": "https://douyin.com/cover/123456.jpg",
+            "description": "视频描述",
+            "stats": {
+                "views": 100000,
+                "likes": 5000,
+                "comments": 200,
+                "shares": 300,
+            },
+            "tags": ["标签1", "标签2"],
+            "publish_time": datetime.now().isoformat(),
+        }
+    ]
+    return results
+
+
+async def _call_kuaishou_crawler(
+    keywords: str,
+    max_results: int,
+    min_views: Optional[int],
+    min_likes: Optional[int],
+) -> List[Dict[str, Any]]:
+    """
+    调用快手爬虫
+
+    实际实现需要:
+    1. 调用快手API或爬虫服务
+    2. 解析返回数据
+    3. 应用筛选条件
+    4. 格式化为标准ContentItem格式
+    """
+    # 伪代码示例
+    results = [
+        {
+            "content_id": "ks_789012",
+            "platform": "kuaishou",
+            "title": "示例视频标题",
+            "author": "作者名",
+            "url": "https://kuaishou.com/video/789012",
+            "cover_url": "https://kuaishou.com/cover/789012.jpg",
+            "description": "视频描述",
+            "stats": {
+                "views": 80000,
+                "likes": 4000,
+                "comments": 150,
+                "shares": 250,
+            },
+            "tags": ["标签1", "标签2"],
+            "publish_time": datetime.now().isoformat(),
+        }
+    ]
+    return results
+

+ 163 - 0
examples/content_finder/tools/feedback.py

@@ -0,0 +1,163 @@
+"""
+反馈收集工具 - 收集运营反馈和内容表现数据
+"""
+
+from typing import Dict, Any, Optional
+from datetime import datetime
+from agent.tools import tool, ToolResult, ToolContext
+
+
+@tool(description="记录运营人员对内容的反馈")
+async def record_operator_feedback(
+    content_id: str,
+    rating: str,
+    notes: str = "",
+    operator_id: str = "default",
+    ctx: ToolContext = None,
+) -> ToolResult:
+    """
+    记录运营人员对内容的反馈
+
+    Args:
+        content_id: 内容ID
+        rating: 评级(excellent/good/poor)
+        notes: 备注说明
+        operator_id: 运营人员ID
+        ctx: 工具上下文
+    """
+    feedback = {
+        "content_id": content_id,
+        "rating": rating,
+        "notes": notes,
+        "operator_id": operator_id,
+        "feedback_time": datetime.now().isoformat(),
+    }
+
+    # 保存反馈(伪代码)
+    await _save_feedback_to_memory(feedback, ctx)
+
+    return ToolResult(
+        title="反馈已记录",
+        output=f"已记录对内容 {content_id} 的反馈:{rating}",
+        data=feedback,
+    )
+
+
+@tool(description="更新内容在平台的表现数据")
+async def update_content_performance(
+    content_id: str,
+    platform_views: int = 0,
+    platform_likes: int = 0,
+    platform_shares: int = 0,
+    internal_views: int = 0,
+    internal_engagement: float = 0.0,
+    conversion_rate: float = 0.0,
+    ctx: ToolContext = None,
+) -> ToolResult:
+    """
+    更新内容在平台的表现数据
+
+    Args:
+        content_id: 内容ID
+        platform_views: 平台播放量
+        platform_likes: 平台点赞数
+        platform_shares: 平台分享数
+        internal_views: 内部平台播放量
+        internal_engagement: 内部互动率
+        conversion_rate: 转化率
+        ctx: 工具上下文
+    """
+    performance = {
+        "content_id": content_id,
+        "platform_views": platform_views,
+        "platform_likes": platform_likes,
+        "platform_shares": platform_shares,
+        "internal_views": internal_views,
+        "internal_engagement": internal_engagement,
+        "conversion_rate": conversion_rate,
+        "update_time": datetime.now().isoformat(),
+    }
+
+    # 保存表现数据(伪代码)
+    await _save_performance_to_memory(performance, ctx)
+
+    return ToolResult(
+        title="表现数据已更新",
+        output=f"已更新内容 {content_id} 的表现数据",
+        data=performance,
+    )
+
+
+@tool(description="查询历史反馈和表现数据")
+async def query_historical_data(
+    content_id: Optional[str] = None,
+    rating_filter: Optional[str] = None,
+    limit: int = 50,
+    ctx: ToolContext = None,
+) -> ToolResult:
+    """
+    查询历史反馈和表现数据
+
+    Args:
+        content_id: 内容ID(可选,不指定则查询所有)
+        rating_filter: 评级筛选(excellent/good/poor)
+        limit: 返回数量限制
+        ctx: 工具上下文
+    """
+    # 查询数据(伪代码)
+    feedbacks = await _query_feedbacks_from_memory(
+        content_id=content_id,
+        rating_filter=rating_filter,
+        limit=limit,
+        ctx=ctx,
+    )
+
+    performances = await _query_performances_from_memory(
+        content_id=content_id,
+        limit=limit,
+        ctx=ctx,
+    )
+
+    return ToolResult(
+        title="历史数据查询完成",
+        output=f"找到 {len(feedbacks)} 条反馈,{len(performances)} 条表现数据",
+        data={
+            "feedbacks": feedbacks,
+            "performances": performances,
+        },
+    )
+
+
+# ===== 私有辅助函数(伪代码)=====
+
+async def _save_feedback_to_memory(feedback: Dict[str, Any], ctx: ToolContext) -> None:
+    """保存反馈到记忆系统"""
+    # 实际实现需要调用memory_store或knowledge系统
+    pass
+
+
+async def _save_performance_to_memory(performance: Dict[str, Any], ctx: ToolContext) -> None:
+    """保存表现数据到记忆系统"""
+    # 实际实现需要调用memory_store或knowledge系统
+    pass
+
+
+async def _query_feedbacks_from_memory(
+    content_id: Optional[str],
+    rating_filter: Optional[str],
+    limit: int,
+    ctx: ToolContext,
+) -> list:
+    """从记忆系统查询反馈"""
+    # 实际实现需要调用memory_store或knowledge系统
+    return []
+
+
+async def _query_performances_from_memory(
+    content_id: Optional[str],
+    limit: int,
+    ctx: ToolContext,
+) -> list:
+    """从记忆系统查询表现数据"""
+    # 实际实现需要调用memory_store或knowledge系统
+    return []

+ 14 - 0
examples/production_template/.env.example

@@ -0,0 +1,14 @@
+# LLM 配置
+OPENROUTER_API_KEY=your-api-key-here
+MODEL=anthropic/claude-sonnet-4.5
+TEMPERATURE=0.3
+MAX_ITERATIONS=30
+
+# 存储配置
+TRACE_DIR=.cache/traces
+OUTPUT_DIR=.cache/output
+
+# Skills 配置
+SKILLS_DIR=./skills
+# 留空则加载所有 skills,指定则只加载指定的 skills
+ENABLED_SKILLS=

+ 288 - 0
examples/production_template/README.md

@@ -0,0 +1,288 @@
+# 生产级 Agent 模板
+
+这是一个可直接用于生产环境的 Agent 模板项目。
+
+## 特性
+
+- ✅ 完整的错误处理
+- ✅ 日志记录(文件+控制台)
+- ✅ 环境变量配置
+- ✅ 工具注册示例
+- ✅ Skills 加载
+- ✅ 结果持久化
+- ✅ 生产级代码结构
+
+## 快速开始
+
+### 0. 安装依赖
+
+```bash
+# 从项目根目录安装
+pip install -r requirements.txt
+pip install dbutils pymysql  # browser 模块依赖
+```
+
+### 1. 配置环境变量
+
+创建 `.env` 文件:
+
+```bash
+# LLM 配置
+OPENROUTER_API_KEY=your-api-key-here
+MODEL=anthropic/claude-sonnet-4.5
+TEMPERATURE=0.3
+MAX_ITERATIONS=30
+
+# 存储配置
+TRACE_DIR=.cache/traces
+OUTPUT_DIR=.cache/output
+
+# Skills 配置
+SKILLS_DIR=./skills
+ENABLED_SKILLS=planning,research
+```
+
+### 2. 运行
+
+```bash
+# 从项目根目录运行
+python examples/production_template/run.py
+
+# 或进入目录运行
+cd examples/production_template
+python run.py
+```
+
+## 项目结构
+
+```
+production_template/
+├── run.py              # 主程序
+├── .env                # 环境变量配置
+├── skills/             # Skills 目录
+│   └── custom.md       # 自定义 Skill
+└── .cache/             # 缓存目录
+    ├── traces/         # Trace 存储
+    ├── output/         # 结果输出
+    └── agent.log       # 日志文件
+```
+
+## 核心组件
+
+### 1. AgentConfig - 配置管理
+
+统一管理所有配置项,从环境变量读取:
+
+```python
+config = AgentConfig()
+print(config.model)  # anthropic/claude-sonnet-4.5
+```
+
+### 2. 自定义工具
+
+使用 `@tool` 装饰器定义工具:
+
+```python
+@tool(description="你的工具描述")
+async def your_tool(param: str, ctx: ToolContext = None) -> ToolResult:
+    # 你的业务逻辑
+    return ToolResult(title="成功", output="结果")
+```
+
+### 3. ProductionAgent - Agent 封装
+
+封装了 AgentRunner,提供简洁的 API:
+
+```python
+agent = ProductionAgent(config)
+result = await agent.run("你的任务")
+```
+
+### 4. 日志记录
+
+自动记录到文件和控制台:
+
+```python
+logger.info("信息日志")
+logger.error("错误日志", exc_info=True)
+```
+
+## 自定义开发
+
+### 添加新工具
+
+在 `run.py` 中添加:
+
+```python
+@tool(description="你的新工具")
+async def new_tool(param: str, ctx: ToolContext = None) -> ToolResult:
+    try:
+        # 业务逻辑
+        result = do_something(param)
+
+        return ToolResult(
+            title="成功",
+            output=result,
+            data={"param": param, "result": result},
+        )
+    except Exception as e:
+        logger.error(f"工具执行失败: {e}", exc_info=True)
+        return ToolResult(
+            title="失败",
+            output=str(e),
+            error=True,
+        )
+```
+
+### 添加 Skill
+
+创建 `skills/your-skill.md`:
+
+```markdown
+---
+name: your-skill
+description: 你的技能描述
+---
+
+# 你的技能
+
+## 何时使用
+
+- 场景1
+- 场景2
+
+## 使用指南
+
+1. 步骤1
+2. 步骤2
+```
+
+在 `.env` 中启用:
+
+```bash
+ENABLED_SKILLS=planning,research,your-skill
+```
+
+### 修改任务
+
+在 `main()` 函数中修改 `task` 变量:
+
+```python
+task = """
+你的任务描述
+"""
+
+result = await agent.run(task)
+```
+
+## 生产部署
+
+### 1. Docker 部署
+
+创建 `Dockerfile`:
+
+```dockerfile
+FROM python:3.11-slim
+
+WORKDIR /app
+
+COPY requirements.txt .
+RUN pip install -r requirements.txt
+
+COPY . .
+
+CMD ["python", "run.py"]
+```
+
+构建和运行:
+
+```bash
+docker build -t my-agent .
+docker run --env-file .env my-agent
+```
+
+### 2. 监控和告警
+
+添加监控:
+
+```python
+# 在 ProductionAgent 中添加
+def _send_alert(self, message: str):
+    """发送告警"""
+    # 集成你的告警系统(钉钉、企业微信等)
+    pass
+
+# 在异常处理中调用
+except Exception as e:
+    self._send_alert(f"Agent 执行失败: {e}")
+```
+
+### 3. 性能优化
+
+```python
+# 1. 使用更快的模型
+MODEL=anthropic/claude-haiku-4.5
+
+# 2. 限制迭代次数
+MAX_ITERATIONS=20
+
+# 3. 启用 Prompt Caching
+config = RunConfig(enable_prompt_caching=True)
+```
+
+### 4. 错误重试
+
+```python
+async def run_with_retry(self, task: str, max_retries: int = 3):
+    """带重试的运行"""
+    for attempt in range(max_retries):
+        try:
+            return await self.run(task)
+        except Exception as e:
+            if attempt == max_retries - 1:
+                raise
+            logger.warning(f"重试 {attempt + 1}/{max_retries}: {e}")
+            await asyncio.sleep(2 ** attempt)  # 指数退避
+```
+
+## 最佳实践
+
+1. **环境变量管理**:所有配置通过环境变量管理,不要硬编码
+2. **日志记录**:关键操作都要记录日志
+3. **错误处理**:所有工具都要有 try-except
+4. **结果持久化**:重要结果要保存到文件或数据库
+5. **监控告警**:生产环境要有监控和告警机制
+
+## 常见问题
+
+### Q: 如何调试?
+
+```bash
+# 设置日志级别为 DEBUG
+logging.basicConfig(level=logging.DEBUG)
+```
+
+### Q: 如何查看执行过程?
+
+```bash
+# 启动 API Server
+python api_server.py
+
+# 访问
+http://localhost:8000/api/traces
+```
+
+### Q: 如何处理长时间任务?
+
+```python
+# 增加最大迭代次数
+MAX_ITERATIONS=100
+
+# 或使用异步任务队列(Celery、RQ 等)
+```
+
+## 参考
+
+- [快速上手指南](../../QUICKSTART.md)
+- [框架文档](../../agent/README.md)
+- [示例项目](../content_finder/)

+ 324 - 0
examples/production_template/run.py

@@ -0,0 +1,324 @@
+"""
+生产级 Agent 模板
+
+这是一个可直接用于生产环境的 Agent 模板,包含:
+- 完整的错误处理
+- 日志记录
+- 配置管理
+- 工具注册
+- Skills 加载
+- 结果输出
+"""
+
+import asyncio
+import logging
+import sys
+import os
+from pathlib import Path
+from typing import Optional, List, Dict, Any
+from datetime import datetime
+
+# 添加项目根目录到 Python 路径
+sys.path.insert(0, str(Path(__file__).parent.parent.parent))
+
+from dotenv import load_dotenv
+load_dotenv()
+
+from agent import (
+    AgentRunner,
+    RunConfig,
+    FileSystemTraceStore,
+    Trace,
+    Message,
+    tool,
+    ToolResult,
+    ToolContext,
+)
+from agent.llm import create_openrouter_llm_call
+
+# 配置日志
+logging.basicConfig(
+    level=logging.INFO,
+    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
+    handlers=[
+        logging.FileHandler('.cache/agent.log'),
+        logging.StreamHandler()
+    ]
+)
+logger = logging.getLogger(__name__)
+
+
+# ===== 配置管理 =====
+
+class AgentConfig:
+    """Agent 配置"""
+
+    def __init__(self):
+        # LLM 配置
+        self.model = os.getenv("MODEL", "anthropic/claude-sonnet-4.5")
+        self.api_key = os.getenv("OPENROUTER_API_KEY")
+        self.temperature = float(os.getenv("TEMPERATURE", "0.3"))
+        self.max_iterations = int(os.getenv("MAX_ITERATIONS", "30"))
+
+        # 存储配置
+        self.trace_dir = os.getenv("TRACE_DIR", ".cache/traces")
+        self.output_dir = os.getenv("OUTPUT_DIR", ".cache/output")
+
+        # Skills 配置
+        self.skills_dir = os.getenv("SKILLS_DIR", "./skills")
+        self.enabled_skills = os.getenv("ENABLED_SKILLS", "").split(",") if os.getenv("ENABLED_SKILLS") else None
+
+        # 验证配置
+        self._validate()
+
+    def _validate(self):
+        """验证配置"""
+        if not self.api_key:
+            raise ValueError("OPENROUTER_API_KEY 未设置,请在 .env 文件中配置")
+
+        # 创建必要的目录
+        Path(self.trace_dir).mkdir(parents=True, exist_ok=True)
+        Path(self.output_dir).mkdir(parents=True, exist_ok=True)
+
+
+# ===== 自定义工具 =====
+
+@tool(description="示例工具:处理数据")
+async def process_data(
+    data: str,
+    operation: str = "transform",
+    ctx: ToolContext = None,
+) -> ToolResult:
+    """
+    处理数据的示例工具
+
+    Args:
+        data: 要处理的数据
+        operation: 操作类型(transform/validate/analyze)
+        ctx: 工具上下文
+    """
+    try:
+        logger.info(f"处理数据: operation={operation}, data_length={len(data)}")
+
+        # 你的业务逻辑
+        if operation == "transform":
+            result = data.upper()
+        elif operation == "validate":
+            result = f"数据有效: {len(data)} 字符"
+        elif operation == "analyze":
+            result = f"分析结果: 包含 {len(data.split())} 个单词"
+        else:
+            return ToolResult(
+                title="操作失败",
+                output=f"不支持的操作: {operation}",
+                error=True,
+            )
+
+        return ToolResult(
+            title="处理成功",
+            output=result,
+            data={"operation": operation, "result": result},
+        )
+
+    except Exception as e:
+        logger.error(f"处理数据失败: {e}", exc_info=True)
+        return ToolResult(
+            title="处理失败",
+            output=str(e),
+            error=True,
+        )
+
+
+@tool(description="示例工具:查询外部 API")
+async def query_api(
+    endpoint: str,
+    params: Optional[Dict[str, Any]] = None,
+    ctx: ToolContext = None,
+) -> ToolResult:
+    """
+    查询外部 API 的示例工具
+
+    Args:
+        endpoint: API 端点
+        params: 查询参数
+        ctx: 工具上下文
+    """
+    try:
+        logger.info(f"查询 API: endpoint={endpoint}, params={params}")
+
+        # 模拟 API 调用
+        # 实际使用时替换为真实的 API 调用
+        await asyncio.sleep(0.5)
+
+        result = {
+            "endpoint": endpoint,
+            "params": params,
+            "data": {"status": "success", "message": "API 调用成功"},
+        }
+
+        return ToolResult(
+            title="API 查询成功",
+            output=f"成功查询 {endpoint}",
+            data=result,
+        )
+
+    except Exception as e:
+        logger.error(f"API 查询失败: {e}", exc_info=True)
+        return ToolResult(
+            title="API 查询失败",
+            output=str(e),
+            error=True,
+        )
+
+
+# ===== Agent 类 =====
+
+class ProductionAgent:
+    """生产级 Agent"""
+
+    def __init__(self, config: AgentConfig):
+        self.config = config
+        self.runner = self._create_runner()
+
+    def _create_runner(self) -> AgentRunner:
+        """创建 AgentRunner"""
+        # 初始化存储
+        trace_store = FileSystemTraceStore(base_path=self.config.trace_dir)
+
+        # 初始化 LLM
+        llm_call = create_openrouter_llm_call(
+            model=self.config.model,
+            api_key=self.config.api_key,
+        )
+
+        # 创建 Runner
+        runner = AgentRunner(
+            llm_call=llm_call,
+            trace_store=trace_store,
+            skills_dir=self.config.skills_dir if Path(self.config.skills_dir).exists() else None,
+        )
+
+        logger.info(f"AgentRunner 已创建: model={self.config.model}")
+        return runner
+
+    async def run(
+        self,
+        task: str,
+        trace_id: Optional[str] = None,
+    ) -> Dict[str, Any]:
+        """
+        运行 Agent
+
+        Args:
+            task: 任务描述
+            trace_id: Trace ID(用于续跑)
+
+        Returns:
+            执行结果
+        """
+        logger.info(f"开始执行任务: {task[:100]}...")
+
+        # 构建运行配置
+        run_config = RunConfig(
+            model=self.config.model,
+            temperature=self.config.temperature,
+            max_iterations=self.config.max_iterations,
+            trace_id=trace_id,
+            skills=self.config.enabled_skills,
+        )
+
+        # 执行 Agent
+        final_response = None
+        current_trace_id = None
+        messages_count = 0
+
+        try:
+            async for item in self.runner.run(
+                messages=[{"role": "user", "content": task}],
+                config=run_config,
+            ):
+                if isinstance(item, Trace):
+                    current_trace_id = item.trace_id
+                    logger.info(f"Trace ID: {current_trace_id}")
+
+                elif isinstance(item, Message):
+                    messages_count += 1
+                    if item.role == "assistant" and item.content:
+                        final_response = item.content
+                        logger.info(f"收到 Assistant 消息 #{messages_count}")
+
+            logger.info(f"任务执行完成: trace_id={current_trace_id}, messages={messages_count}")
+
+            # 保存结果
+            result = {
+                "trace_id": current_trace_id,
+                "task": task,
+                "response": final_response,
+                "messages_count": messages_count,
+                "timestamp": datetime.now().isoformat(),
+            }
+
+            self._save_result(result)
+
+            return result
+
+        except Exception as e:
+            logger.error(f"任务执行失败: {e}", exc_info=True)
+            return {
+                "error": str(e),
+                "trace_id": current_trace_id,
+                "task": task,
+            }
+
+    def _save_result(self, result: Dict[str, Any]):
+        """保存执行结果"""
+        output_file = Path(self.config.output_dir) / f"{result['trace_id']}.txt"
+
+        with open(output_file, 'w', encoding='utf-8') as f:
+            f.write(f"Trace ID: {result['trace_id']}\n")
+            f.write(f"时间: {result['timestamp']}\n")
+            f.write(f"任务: {result['task']}\n")
+            f.write(f"\n{'='*60}\n\n")
+            f.write(f"结果:\n{result['response']}\n")
+
+        logger.info(f"结果已保存: {output_file}")
+
+
+# ===== 主函数 =====
+
+async def main():
+    """主函数"""
+    try:
+        # 加载配置
+        config = AgentConfig()
+
+        # 创建 Agent
+        agent = ProductionAgent(config)
+
+        # 执行任务
+        task = """
+        请帮我完成以下任务:
+        1. 使用 process_data 工具处理文本 "hello world"
+        2. 使用 query_api 工具查询 /api/users 端点
+        3. 总结执行结果
+        """
+
+        result = await agent.run(task)
+
+        # 输出结果
+        print("\n" + "="*60)
+        print("执行结果:")
+        print("="*60)
+        print(f"Trace ID: {result.get('trace_id')}")
+        print(f"消息数: {result.get('messages_count')}")
+        print("\n响应:")
+        print(result.get('response', result.get('error')))
+        print("="*60)
+
+    except Exception as e:
+        logger.error(f"程序执行失败: {e}", exc_info=True)
+        sys.exit(1)
+
+
+if __name__ == "__main__":
+    asyncio.run(main())

+ 90 - 0
examples/production_template/skills/custom.md

@@ -0,0 +1,90 @@
+---
+name: custom-skill
+description: 自定义技能示例
+category: custom
+---
+
+# 自定义技能
+
+这是一个自定义技能的示例,展示如何编写 Skill。
+
+## 何时使用
+
+- 当需要执行特定业务逻辑时
+- 当需要遵循特定工作流程时
+- 当需要应用领域知识时
+
+## 工作流程
+
+### 1. 理解需求
+
+首先,仔细分析用户的需求:
+- 明确任务目标
+- 识别关键信息
+- 确定所需工具
+
+### 2. 制定计划
+
+使用 `goal` 工具创建执行计划:
+- 分解任务为子目标
+- 确定执行顺序
+- 预估所需资源
+
+### 3. 执行任务
+
+按计划执行:
+- 调用相应的工具
+- 处理工具返回结果
+- 记录执行过程
+
+### 4. 验证结果
+
+确保任务完成:
+- 检查输出是否符合预期
+- 验证数据完整性
+- 确认无遗漏
+
+## 使用指南
+
+### 工具调用
+
+调用工具时要:
+- 提供完整的参数
+- 处理可能的错误
+- 记录调用结果
+
+示例:
+```
+使用 process_data 工具处理数据:
+- data: "要处理的数据"
+- operation: "transform"
+```
+
+### 错误处理
+
+遇到错误时:
+1. 记录错误信息
+2. 尝试替代方案
+3. 如果无法解决,向用户说明
+
+### 结果输出
+
+输出结果时:
+- 使用清晰的格式
+- 包含关键信息
+- 提供必要的说明
+
+## 最佳实践
+
+1. **明确目标**:开始前确保理解任务目标
+2. **分步执行**:将复杂任务分解为简单步骤
+3. **及时反馈**:执行过程中提供进度反馈
+4. **验证结果**:完成后验证输出正确性
+5. **记录过程**:保存重要的中间结果
+
+## 注意事项
+
+- 不要跳过必要的验证步骤
+- 遇到不确定的情况要询问用户
+- 保持输出的一致性和可读性
+- 合理使用工具,避免不必要的调用