Sfoglia il codice sorgente

内容寻找Agent工程化

supeng 15 ore fa
parent
commit
cffac1b641

+ 14 - 0
examples/content_finder/.env.example

@@ -12,3 +12,17 @@ OUTPUT_DIR=.cache/output
 SKILLS_DIR=./skills
 # 留空则加载所有 skills,指定则只加载指定的 skills
 ENABLED_SKILLS=
+
+# 服务配置
+PORT=8000
+
+# 定时任务配置
+# 外部 API 地址(用于获取 query)
+SCHEDULE_QUERY_API=http://your-api.com/content-finder/get-query
+# 外部 API 认证(可选)
+SCHEDULE_QUERY_API_KEY=your-api-key
+# 外部 API 超时时间(秒)
+SCHEDULE_QUERY_API_TIMEOUT=10.0
+
+# 并发控制
+MAX_CONCURRENT_TASKS=3

+ 180 - 132
examples/content_finder/README.md

@@ -1,13 +1,6 @@
 # 内容寻找 Agent
 
-基于 AI Agent 的抖音内容寻找工具,根据用户需求智能搜索和筛选符合目标受众的视频内容。
-
-## 核心功能
-
-1. **智能搜索**:根据用户需求解析关键词,调用抖音搜索 API
-2. **画像筛选**:基于热点宝画像数据,分析内容受众特征
-3. **深度挖掘**:对优质作者进行深度挖掘,获取更多相关作品
-4. **综合评估**:多维度评估内容质量和受众匹配度
+基于 AI Agent 的抖音内容寻找工具,根据用户需求智能搜索和筛选符合目标受众的视频内容。支持命令行和 HTTP 服务两种运行方式。
 
 ## 平台背景
 
@@ -16,166 +9,221 @@
 - **增长方式**:微信分享裂变
 - **核心指标**:分享率、DAU
 
-## 工具列表
-
-### 1. douyin_search
-通过关键词搜索抖音视频内容
-
-**参数**:
-- `keyword` (必需): 搜索关键词
-- `content_type`: 内容类型,默认 "视频"
-- `sort_type`: 排序方式,默认 "综合排序"
-- `publish_time`: 发布时间范围,默认 "不限"
-- `cursor`: 分页游标,默认 "0"
-- `account_id`: 账号ID(可选)
-- `timeout`: 超时时间(秒),默认 60
-
-**返回**:搜索结果 JSON
-
-### 2. douyin_user_videos
-获取抖音账号的历史作品列表
-
-**参数**:
-- `account_id` (必需): 抖音账号ID
-- `sort_type`: 排序方式,默认 "最新"
-- `cursor`: 分页游标,默认 ""
-- `timeout`: 超时时间(秒),默认 60
-
-**返回**:作品列表 JSON
-
-### 3. get_account_fans_portrait
-获取抖音账号的粉丝画像(热点宝)
-
-**参数**:
-- `account_id` (必需): 抖音账号ID
-- `need_province`: 是否获取省份分布,默认 False
-- `need_city`: 是否获取城市分布,默认 False
-- `need_city_level`: 是否获取城市等级分布,默认 False
-- `need_gender`: 是否获取性别分布,默认 False
-- `need_age`: 是否获取年龄分布,默认 True
-- `need_phone_brand`: 是否获取手机品牌分布,默认 False
-- `need_phone_price`: 是否获取手机价格分布,默认 False
-- `timeout`: 超时时间(秒),默认 60
-
-**返回**:粉丝画像 JSON(包含占比和偏好度)
-
-### 4. get_content_fans_portrait
-获取抖音内容的点赞用户画像(热点宝)
-
-**参数**:
-- `content_id` (必需): 抖音内容ID
-- `need_province`: 是否获取省份分布,默认 False
-- `need_city`: 是否获取城市分布,默认 False
-- `need_city_level`: 是否获取城市等级分布,默认 False
-- `need_gender`: 是否获取性别分布,默认 False
-- `need_age`: 是否获取年龄分布,默认 True
-- `need_phone_brand`: 是否获取手机品牌分布,默认 False
-- `need_phone_price`: 是否获取手机价格分布,默认 False
-- `timeout`: 超时时间(秒),默认 60
-
-**返回**:点赞用户画像 JSON(包含占比和偏好度)
+## 核心功能
 
-## Skills 策略
+1. **智能搜索**:解析用户需求,提取关键词,调用抖音搜索 API
+2. **画像筛选**:基于热点宝画像数据,分析内容受众年龄分布和偏好度(TGI)
+3. **深度挖掘**:对优质账号(目标人群占比 > 60% 且 TGI > 120)获取更多作品
+4. **分层推荐**:按强烈推荐 / 推荐 / 可选三档输出,附完整链接和数据来源
 
-### 1. content_finding_strategy(内容寻找方法论)
-教授如何系统化地寻找符合任意需求的内容:
-- 需求拆解技巧
-- 搜索策略制定
-- 迭代优化方法
+## 项目结构
 
-### 2. content_filtering_strategy(内容筛选方法论)
-教授如何评估内容是否符合要求:
-- 从需求中提取评估标准
-- 多维度评估框架
-- 分层推荐策略
+```
+content_finder/
+├── run.py                         # 命令行入口(流式输出)
+├── server.py                      # HTTP 服务入口(FastAPI + APScheduler)
+├── core.py                        # 共享 Agent 执行逻辑
+├── content_finder.prompt          # System Prompt + User Prompt 模板
+├── .env.example                   # 环境变量模板
+├── SERVICE.md                     # 服务模式详细说明
+├── tools/                         # 自定义工具
+│   ├── __init__.py
+│   ├── douyin_search.py           # 抖音关键词搜索
+│   ├── douyin_user_videos.py      # 账号作品列表
+│   └── hotspot_profile.py         # 热点宝画像数据
+├── skills/                        # Agent 方法论(注入 System Prompt)
+│   ├── content_finding_strategy.md    # 内容寻找 5 步流程
+│   └── content_filtering_strategy.md  # 内容筛选分阶段策略
+└── .cache/                        # 运行时目录(gitignore)
+    ├── traces/                    # Trace 存储
+    ├── agent.log                  # 命令行模式日志
+    └── server.log                 # 服务模式日志
+```
 
 ## 快速开始
 
 ### 1. 安装依赖
 
 ```bash
-pip install python-dotenv httpx
+pip install -r requirements.txt
 ```
 
 ### 2. 配置环境变量
 
-复制 `.env.example` 为 `.env` 并配置:
+```bash
+cp examples/content_finder/.env.example examples/content_finder/.env
+```
+
+编辑 `.env`,至少填写:
 
 ```bash
 OPEN_ROUTER_API_KEY=your_api_key_here
-MODEL=anthropic/claude-sonnet-4.6
-TEMPERATURE=0.3
-MAX_ITERATIONS=30
 ```
 
-### 3. 配置需求
+### 3. 运行
 
-编辑 `content_finder.prompt` 文件的 `$user$` 段,填写你的内容需求。
+**命令行模式**(交互式,流式输出):
 
-### 4. 运行
+```bash
+# 在项目根目录执行,trace 存储在根目录 .trace/
+python examples/content_finder/run.py
+```
+
+**服务模式**(HTTP API + 定时调度):
 
 ```bash
-cd examples/content_finder
-python run.py
+python examples/content_finder/server.py
 ```
 
-## 使用示例
+## 环境变量说明
+
+| 变量 | 默认值 | 说明 |
+|------|--------|------|
+| `OPEN_ROUTER_API_KEY` | 必填 | OpenRouter API Key |
+| `MODEL` | `anthropic/claude-sonnet-4.6` | 使用的模型 |
+| `TEMPERATURE` | `0.3` | 模型温度 |
+| `MAX_ITERATIONS` | `30` | Agent 最大迭代轮数 |
+| `TRACE_DIR` | `.cache/traces` | Trace 存储目录 |
+| `PORT` | `8080` | 服务端口(服务模式) |
+| `MAX_CONCURRENT_TASKS` | `3` | 最大并发任务数(服务模式) |
+| `SCHEDULE_QUERY_API` | 空 | 定时任务外部 API 地址(留空则不启动定时任务) |
+| `SCHEDULE_QUERY_API_KEY` | 空 | 定时任务外部 API 认证 Key |
+| `SCHEDULE_QUERY_API_TIMEOUT` | `10.0` | 定时任务外部 API 超时(秒) |
+
+## 服务模式 API
 
-**需求示例**(在 `content_finder.prompt` 中配置):
+服务启动后监听 `PORT`(默认 8080)。
+
+### POST /api/tasks — 创建任务
+
+```bash
+curl -X POST http://localhost:8080/api/tasks \
+  -H "Content-Type: application/json" \
+  -d '{"query": "找15个和广场舞相关的视频,热度要高"}'
 ```
-孩子军抗日,让人感动。找这样的视频。
 
-要求:
-- 内容要有情感共鸣
-- 适合老年人观看
-- 热度要高,质量要好
+响应:
+
+```json
+{
+  "trace_id": "20260317_103046_xyz789",
+  "status": "started",
+  "query": "找15个和广场舞相关的视频,热度要高",
+  "message": "任务已启动,结果将保存到 .cache/traces/20260317_103046_xyz789/"
+}
 ```
 
-**执行流程**:
-1. Agent 解析需求,提取关键词和评估标准
-2. 使用 `douyin_search` 搜索相关内容
-3. 使用 `get_content_fans_portrait` 获取内容画像
-4. 根据年龄分布和偏好度筛选符合老年人群体的内容
-5. 对优质内容作者使用 `douyin_user_videos` 获取更多作品
-6. 使用 `get_account_fans_portrait` 验证作者粉丝画像
-7. 综合评估并推荐最合适的内容
+`query` 不传则使用默认需求(养老服务与政策扶持相关内容)。
 
-## 项目结构
+### GET /health — 健康检查
 
+```bash
+curl http://localhost:8080/health
 ```
-content_finder/
-├── .env                           # 环境变量配置
-├── run.py                         # 主程序入口
-├── content_finder.prompt          # Prompt 配置文件
-├── README.md                      # 项目文档
-├── tools/                         # 工具包
-│   ├── __init__.py
-│   ├── douyin_search.py           # 抖音搜索
-│   ├── douyin_user_videos.py      # 用户作品列表
-│   └── hotspot_profile.py         # 热点宝画像数据
-├── skills/                        # Skills 策略
-│   ├── content_finding_strategy.md    # 内容寻找方法论
-│   └── content_filtering_strategy.md  # 内容筛选方法论
-└── .cache/                        # 缓存目录
-    ├── traces/                    # Trace 存储
-    └── agent.log                  # 日志文件
+
+响应包含当前并发数、定时任务状态和累计统计。
+
+### 定时任务
+
+配置 `SCHEDULE_QUERY_API` 后,服务每 10 分钟自动调用该接口获取 query 并执行任务。外部接口规范:
+
+```
+GET {SCHEDULE_QUERY_API}
+Authorization: Bearer {SCHEDULE_QUERY_API_KEY}
+
+# 有任务时返回:
+{"query": "找10个和健康养生相关的视频"}
+
+# 无任务时返回:
+{"query": null}
 ```
 
-## API 配置
+## 工具说明
 
-工具调用的爬虫服务 API 地址:
-- 抖音搜索:`http://crawapi.piaoquantv.com/crawler/dou_yin/keyword`
-- 账号作品:`http://crawapi.piaoquantv.com/crawler/dou_yin/blogger`
-- 账号粉丝画像:`http://crawapi.piaoquantv.com/crawler/dou_yin/re_dian_bao/account_fans_portrait`
-- 内容点赞画像:`http://crawapi.piaoquantv.com/crawler/dou_yin/re_dian_bao/video_like_portrait`
+Agent 只允许调用以下 4 个工具,其他工具(包括浏览器工具)均被禁止:
 
-## 注意事项
+### douyin_search
+
+通过关键词搜索抖音视频。
+
+| 参数 | 必填 | 默认值 | 说明 |
+|------|------|--------|------|
+| `keyword` | ✅ | — | 搜索关键词 |
+| `content_type` | | `视频` | 内容类型 |
+| `sort_type` | | `综合排序` | 排序方式 |
+| `publish_time` | | `不限` | 发布时间范围 |
+| `cursor` | | `0` | 分页游标 |
+| `timeout` | | `60` | 超时秒数 |
+
+结果通过 `metadata.search_results` 获取结构化数据。
+
+### douyin_user_videos
+
+获取账号历史作品列表。
+
+| 参数 | 必填 | 默认值 | 说明 |
+|------|------|--------|------|
+| `account_id` | ✅ | — | 账号 sec_uid(约 80 字符) |
+| `sort_type` | | `最新` | 排序方式 |
+| `cursor` | | `""` | 分页游标 |
+| `timeout` | | `60` | 超时秒数 |
+
+结果通过 `metadata.user_videos` 获取,格式与 `search_results` 一致。
+
+### get_content_fans_portrait
 
-1. **API 密钥**:需要配置有效的 OPEN_ROUTER_API_KEY
-2. **爬虫服务**:确保爬虫服务 API 可访问
-3. **超时设置**:默认 60 秒,可根据网络情况调整
-4. **画像数据**:默认只获取年龄分布,需要其他维度时设置对应参数为 True
+获取视频点赞用户画像(热点宝)。
 
-## License
+| 参数 | 必填 | 默认值 | 说明 |
+|------|------|--------|------|
+| `content_id` | ✅ | — | 视频 aweme_id |
+| `need_age` | | `True` | 是否获取年龄分布 |
+| `need_gender` | | `False` | 是否获取性别分布 |
+| `need_province` | | `False` | 省份分布 |
+| `timeout` | | `60` | 超时秒数 |
+
+通过 `metadata.has_portrait` 判断是否有有效画像,数据从 `metadata.portrait_data` 获取。
+
+### get_account_fans_portrait
+
+获取账号粉丝画像(热点宝),作为内容画像缺失时的兜底。
+
+参数与 `get_content_fans_portrait` 相同,`content_id` 替换为 `account_id`(传入 sec_uid)。
+
+## Skills 策略
+
+### content_finding_strategy — 内容寻找 5 步流程
+
+1. **需求分析**:提取关键词,确定目标数量 M
+2. **串行搜索**:每次搜索 N = M × 2 条,够了立即停止
+3. **分阶段筛选**:基础质量(热度 + 相关性)→ 画像匹配 → 优质账号扩展
+4. **结果评估**:符合数量 C ≥ M 则完成,否则换关键词补充
+5. **去重排序**:按 aweme_id 去重,按画像匹配度 × 热度综合排序
+
+### content_filtering_strategy — 内容筛选分阶段策略
+
+- **阶段一**:热度筛选(1000+ 一般 / 5000+ 较高 / 10000+ 高 / 50000+ 爆款)
+- **阶段二**:画像匹配(优先内容点赞画像,缺失时用账号粉丝画像兜底)
+- **阶段三**:优质账号扩展(占比 > 60% 且 TGI > 120,获取 5-10 条作品)
+- **阶段四**:去重排序(画像匹配度优先,其次热度,其次数据来源可靠性)
+- **阶段五**:分层输出(强烈推荐 / 推荐 / 可选)
+
+## 输出格式
+
+每条推荐内容包含:
+
+- 内容链接:`https://www.douyin.com/video/{aweme_id}`
+- 作者链接:`https://www.douyin.com/user/{author.sec_uid}`(完整 sec_uid,约 80 字符)
+- 热度数据:点赞 / 评论 / 分享(来自 `metadata.statistics`)
+- 画像数据:50 岁以上占比 + TGI(来自 `metadata.portrait_data`)
+- 画像链接:
+  - 内容点赞画像:`https://douhot.douyin.com/video/detail?active_tab=video_fans&video_id={aweme_id}`
+  - 账号粉丝画像:`https://douhot.douyin.com/creator/detail?active_tab=creator_fans_portrait&creator_id={author.sec_uid}`
+- 数据来源标注:"内容点赞画像" 或 "账号粉丝画像" 或 "无画像数据"
+
+## 注意事项
 
-MIT
+- **数据真实性**:所有字段必须来自 `metadata`,禁止从 output 文本解析,禁止编造任何数据
+- **sec_uid 完整性**:`author.sec_uid` 约 80 字符,必须完整复制,格式以 `MS4wLjABAAAA` 开头
+- **工具限制**:只允许调用上述 4 个工具,浏览器工具已在 Prompt 中明确禁止
+- **Token 控制**:搜索上限 N = M × 2,画像获取上限 M × 1.5,避免超出上下文

+ 5 - 6
examples/content_finder/content_finder.prompt

@@ -11,6 +11,9 @@ $system$
 - 只使用 douyin_search、douyin_user_videos、get_content_fans_portrait、get_account_fans_portrait 这4个工具
 - 不要使用浏览器工具、文件操作工具、或其他平台的搜索工具
 
+**严格禁止调用以下浏览器工具**(调用任何一个都是错误行为):
+browser_get_live_url、browser_navigate_to_url、browser_search_web、browser_go_back、browser_wait、browser_click_element、browser_input_text、browser_send_keys、browser_upload_file、browser_scroll_page、browser_find_text、browser_screenshot、browser_switch_tab、browser_close_tab、browser_get_dropdown_options、browser_select_dropdown_option、browser_extract_content、browser_read_long_content、browser_download_direct_url、browser_get_page_html、browser_get_visual_selector_map、browser_evaluate、browser_ensure_login_with_cookies、browser_done、browser_export_cookies、browser_load_cookies
+
 平台背景(仅供参考):
 - 平台载体:微信小程序
 - 核心用户群:95% 是 50 岁以上中老年人
@@ -141,7 +144,7 @@ sec_uid = item["author"]["sec_uid"]  # 完整复制,约80字符
 - 输出完整的推荐结果后,任务会自动进行反思和知识保存
 - 反思完成后,输出简短的完成确认:✅ 任务完成!已为您找到 [数量] 条视频,并保存了执行经验
 
-请按照 content_finding_strategy_v2 和 content_filtering_strategy_v2 中的方法论执行任务。
+请按照 content_finding_strategy 和 content_filtering_strategy 中的方法论执行任务。
 
 **关键提醒**:
 - 不要陷入”一直获取画像”的循环
@@ -150,10 +153,6 @@ sec_uid = item["author"]["sec_uid"]  # 完整复制,约80字符
 - 所有数据必须来自 metadata,禁止编造
 
 $user$
-找10个和”养老服务与政策扶持”相关的,老年人感兴趣的视频。
-
-要求:
-- 适合老年人分享观看
-- 热度要高,质量要好
+%query%
 
 请开始执行内容寻找任务。记住要多步推理,每次只执行一小步,然后思考下一步该做什么。

+ 201 - 0
examples/content_finder/core.py

@@ -0,0 +1,201 @@
+"""
+内容寻找 Agent - 核心执行逻辑
+
+提供可复用的 agent 执行函数,供 run.py 和 server.py 调用。
+"""
+
+import asyncio
+import logging
+import sys
+import os
+from pathlib import Path
+from typing import Optional, Dict, Any
+
+sys.path.insert(0, str(Path(__file__).parent.parent.parent))
+
+from dotenv import load_dotenv
+load_dotenv()
+
+from agent import (
+    AgentRunner,
+    RunConfig,
+    FileSystemTraceStore,
+    Trace,
+    Message,
+)
+from agent.llm import create_openrouter_llm_call
+from agent.llm.prompts import SimplePrompt
+from agent.tools.builtin.knowledge import KnowledgeConfig
+
+# 导入工具(确保工具被注册)
+from tools import (
+    douyin_search,
+    douyin_user_videos,
+    get_content_fans_portrait,
+    get_account_fans_portrait,
+)
+
+logger = logging.getLogger(__name__)
+
+# 默认 query
+DEFAULT_QUERY = """找10个和"养老服务与政策扶持"相关的,老年人感兴趣的视频。
+
+要求:
+- 适合老年人分享观看
+- 热度要高,质量要好"""
+
+
+async def run_agent(query: Optional[str] = None, stream_output: bool = True) -> Dict[str, Any]:
+    """
+    执行 agent 任务
+
+    Args:
+        query: 查询内容,None 则使用默认值
+        stream_output: 是否流式输出到 stdout(run.py 需要,server.py 不需要)
+
+    Returns:
+        {
+            "trace_id": "20260317_103046_xyz789",
+            "status": "completed" | "failed",
+            "error": "错误信息"  # 失败时
+        }
+    """
+    query = query or DEFAULT_QUERY
+
+    # 加载 prompt
+    prompt_path = Path(__file__).parent / "content_finder.prompt"
+    prompt = SimplePrompt(prompt_path)
+
+    # 构建消息(替换 %query%)
+    messages = prompt.build_messages(query=query)
+
+    # 初始化配置
+    api_key = os.getenv("OPEN_ROUTER_API_KEY")
+    if not api_key:
+        raise ValueError("OPEN_ROUTER_API_KEY 未设置")
+
+    model_name = prompt.config.get("model", "sonnet-4.6")
+    model = os.getenv("MODEL", f"anthropic/claude-{model_name}")
+    temperature = float(prompt.config.get("temperature", 0.3))
+    max_iterations = int(os.getenv("MAX_ITERATIONS", "30"))
+    trace_dir = os.getenv("TRACE_DIR", ".cache/traces")
+    skills_dir = str(Path(__file__).parent / "skills")
+
+    Path(trace_dir).mkdir(parents=True, exist_ok=True)
+
+    store = FileSystemTraceStore(base_path=trace_dir)
+
+    allowed_tools = [
+        "douyin_search",
+        "douyin_user_videos",
+        "get_content_fans_portrait",
+        "get_account_fans_portrait",
+    ]
+
+    runner = AgentRunner(
+        llm_call=create_openrouter_llm_call(model=model),
+        trace_store=store,
+        skills_dir=skills_dir,
+    )
+
+    config = RunConfig(
+        name="内容寻找",
+        model=model,
+        temperature=temperature,
+        max_iterations=max_iterations,
+        tools=allowed_tools,
+        extra_llm_params={"max_tokens": 8192},
+        knowledge=KnowledgeConfig(
+            enable_extraction=True,
+            enable_completion_extraction=True,
+            enable_injection=True,
+            owner="content_finder_agent",
+            default_tags={"project": "content_finder"},
+            default_scopes=["com.piaoquantv.supply"],
+            default_search_types=["tool", "usecase", "definition"],
+            default_search_owner="content_finder_agent"
+        )
+    )
+
+    # 执行
+    trace_id = None
+
+    try:
+        async for item in runner.run(messages=messages, config=config):
+            if isinstance(item, Trace):
+                trace_id = item.trace_id
+
+                if item.status == "completed":
+                    logger.info(f"Agent 执行完成: trace_id={trace_id}")
+                    return {
+                        "trace_id": trace_id,
+                        "status": "completed"
+                    }
+                elif item.status == "failed":
+                    logger.error(f"Agent 执行失败: {item.error_message}")
+                    return {
+                        "trace_id": trace_id,
+                        "status": "failed",
+                        "error": item.error_message
+                    }
+
+            elif isinstance(item, Message) and stream_output:
+                # 流式输出(仅 run.py 需要)
+                if item.role == "assistant":
+                    content = item.content
+                    if isinstance(content, dict):
+                        text = content.get("text", "")
+                        tool_calls = content.get("tool_calls", [])
+
+                        if text:
+                            # 如果有推荐结果,完整输出
+                            if len(text) > 500 and ("推荐结果" in text or "推荐内容" in text or "🎯" in text):
+                                print(f"\n{text}")
+                            # 如果有工具调用且文本较短,只输出摘要
+                            elif tool_calls and len(text) > 100:
+                                print(f"[思考] {text[:100]}...")
+                            # 其他情况输出完整文本
+                            else:
+                                print(f"\n{text}")
+
+                        # 输出工具调用信息
+                        if tool_calls:
+                            for tc in tool_calls:
+                                tool_name = tc.get("function", {}).get("name", "unknown")
+                                # 跳过 goal 工具的输出,减少噪音
+                                if tool_name != "goal":
+                                    print(f"[工具] {tool_name}")
+                    elif isinstance(content, str) and content:
+                        print(f"\n{content}")
+
+                elif item.role == "tool":
+                    content = item.content
+                    if isinstance(content, dict):
+                        tool_name = content.get("tool_name", "unknown")
+                        print(f"[结果] {tool_name} ✓")
+
+        # 如果循环结束但没有返回,说明异常退出
+        return {
+            "trace_id": trace_id,
+            "status": "failed",
+            "error": "Agent 异常退出"
+        }
+
+    except KeyboardInterrupt:
+        logger.info("用户中断")
+        if stream_output:
+            print("\n用户中断")
+        return {
+            "trace_id": trace_id,
+            "status": "failed",
+            "error": "用户中断"
+        }
+    except Exception as e:
+        logger.error(f"Agent 执行异常: {e}", exc_info=True)
+        if stream_output:
+            print(f"\n执行失败: {e}")
+        return {
+            "trace_id": trace_id,
+            "status": "failed",
+            "error": str(e)
+        }

+ 10 - 327
examples/content_finder/run.py

@@ -8,33 +8,14 @@
 import asyncio
 import logging
 import sys
-import os
 from pathlib import Path
-from agent.tools.builtin.knowledge import KnowledgeConfig
 
 sys.path.insert(0, str(Path(__file__).parent.parent.parent))
 
 from dotenv import load_dotenv
-
 load_dotenv()
 
-from agent import (
-    AgentRunner,
-    RunConfig,
-    FileSystemTraceStore,
-    Trace,
-    Message,
-)
-from agent.llm import create_openrouter_llm_call
-from agent.llm.prompts import SimplePrompt
-
-# 导入工具(确保工具被注册)
-from tools import (
-    douyin_search,
-    douyin_user_videos,
-    get_content_fans_portrait,
-    get_account_fans_portrait,
-)
+import core
 
 # 配置日志
 log_dir = Path(__file__).parent / '.cache'
@@ -51,315 +32,17 @@ logging.basicConfig(
 logger = logging.getLogger(__name__)
 
 
-async def generate_fallback_output(store: FileSystemTraceStore, trace_id: str):
-    """
-    当任务未正常输出时,从 trace 中提取数据并生成兜底输出
-    """
-    try:
-        # 读取所有消息
-        messages_dir = Path(store.base_path) / trace_id / "messages"
-        if not messages_dir.exists():
-            print("无法生成摘要:找不到消息目录")
-            return
-
-        # 提取搜索结果和画像数据
-        search_results = []
-        portrait_data = {}
-
-        import json
-        import re
-
-        for msg_file in sorted(messages_dir.glob("*.json")):
-            with open(msg_file, 'r', encoding='utf-8') as f:
-                msg = json.load(f)
-
-            # 提取搜索结果(从文本结果中解析)
-            if msg.get("role") == "tool" and msg.get("content", {}).get("tool_name") == "douyin_search":
-                result_text = msg.get("content", {}).get("result", "")
-
-                # 解析每条搜索结果
-                lines = result_text.split("\n")
-                current_item = {}
-
-                for line in lines:
-                    line = line.strip()
-                    if not line:
-                        if current_item.get("aweme_id"):
-                            if current_item["aweme_id"] not in [r["aweme_id"] for r in search_results]:
-                                search_results.append(current_item)
-                            current_item = {}
-                        continue
-
-                    # 解析标题行(以数字开头)
-                    if re.match(r'^\d+\.', line):
-                        current_item["desc"] = line.split(".", 1)[1].strip()[:100]
-                    # 解析 ID
-                    elif line.startswith("ID:"):
-                        current_item["aweme_id"] = line.split("ID:")[1].strip()
-                    # 解析作者
-                    elif line.startswith("作者:"):
-                        author_name = line.split("作者:")[1].strip()
-                        current_item["author"] = {"nickname": author_name}
-                    # 解析 sec_uid
-                    elif line.startswith("sec_uid:"):
-                        sec_uid = line.split("sec_uid:")[1].strip()
-                        if "author" not in current_item:
-                            current_item["author"] = {}
-                        current_item["author"]["sec_uid"] = sec_uid
-                    # 解析数据
-                    elif line.startswith("数据:"):
-                        stats_text = line.split("数据:")[1].strip()
-                        stats = {}
-                        # 解析点赞数
-                        if "点赞" in stats_text:
-                            digg_match = re.search(r'点赞\s+([\d,]+)', stats_text)
-                            if digg_match:
-                                stats["digg_count"] = int(digg_match.group(1).replace(",", ""))
-                        # 解析评论数
-                        if "评论" in stats_text:
-                            comment_match = re.search(r'评论\s+([\d,]+)', stats_text)
-                            if comment_match:
-                                stats["comment_count"] = int(comment_match.group(1).replace(",", ""))
-                        # 解析分享数
-                        if "分享" in stats_text:
-                            share_match = re.search(r'分享\s+([\d,]+)', stats_text)
-                            if share_match:
-                                stats["share_count"] = int(share_match.group(1).replace(",", ""))
-                        current_item["statistics"] = stats
-
-                # 添加最后一条
-                if current_item.get("aweme_id"):
-                    if current_item["aweme_id"] not in [r["aweme_id"] for r in search_results]:
-                        search_results.append(current_item)
-
-            # 提取画像数据
-            elif msg.get("role") == "tool":
-                tool_name = msg.get("content", {}).get("tool_name", "")
-                result_text = msg.get("content", {}).get("result", "")
-
-                if tool_name in ["get_content_fans_portrait", "get_account_fans_portrait"]:
-                    # 解析画像数据
-                    content_id = None
-                    age_50_plus = None
-                    tgi = None
-
-                    # 从结果文本中提取 ID
-                    if "内容 " in result_text:
-                        parts = result_text.split("内容 ")[1].split(" ")[0]
-                        content_id = parts
-                    elif "账号 " in result_text:
-                        parts = result_text.split("账号 ")[1].split(" ")[0]
-                        content_id = parts
-
-                    # 提取50岁以上数据(格式:50-: 48.35% (偏好度: 210.05))
-                    if "【年龄】分布" in result_text:
-                        lines = result_text.split("\n")
-                        for line in lines:
-                            if "50-:" in line:
-                                # 解析:  50-: 48.35% (偏好度: 210.05)
-                                parts = line.split("50-:")[1].strip()
-                                if "%" in parts:
-                                    age_50_plus = parts.split("%")[0].strip()
-                                if "偏好度:" in parts:
-                                    tgi_part = parts.split("偏好度:")[1].strip()
-                                    tgi = tgi_part.replace(")", "").strip()
-                                break
-
-                    if content_id and age_50_plus:
-                        portrait_data[content_id] = {
-                            "age_50_plus": age_50_plus,
-                            "tgi": tgi,
-                            "source": "内容点赞画像" if tool_name == "get_content_fans_portrait" else "账号粉丝画像"
-                        }
-
-        # 生成输出
-        print("\n" + "=" * 60)
-        print("📊 任务执行摘要(兜底输出)")
-        print("=" * 60)
-        print(f"\n搜索情况:找到 {len(search_results)} 条候选内容")
-        print(f"画像获取:获取了 {len(portrait_data)} 条画像数据")
-
-        # 筛选有画像且符合要求的内容
-        matched_results = []
-        for result in search_results:
-            aweme_id = result["aweme_id"]
-            author_id = result["author"].get("sec_uid", "")
-
-            # 查找画像数据(优先内容画像,其次账号画像)
-            portrait = portrait_data.get(aweme_id) or portrait_data.get(author_id)
-
-            if portrait and portrait.get("age_50_plus"):
-                try:
-                    age_ratio = float(portrait["age_50_plus"])
-                    if age_ratio >= 20:  # 50岁以上占比>=20%
-                        matched_results.append({
-                            **result,
-                            "portrait": portrait
-                        })
-                except:
-                    pass
-
-        # 按50岁以上占比排序
-        matched_results.sort(key=lambda x: float(x["portrait"]["age_50_plus"]), reverse=True)
-
-        # 输出推荐结果
-        print(f"\n符合要求:{len(matched_results)} 条内容(50岁以上占比>=20%)")
-        print("\n" + "=" * 60)
-        print("🎯 推荐结果")
-        print("=" * 60)
-
-        for i, result in enumerate(matched_results[:10], 1):
-            aweme_id = result["aweme_id"]
-            desc = result["desc"]
-            author = result["author"]
-            stats = result["statistics"]
-            portrait = result["portrait"]
-
-            print(f"\n{i}. {desc}")
-            print(f"   链接: https://www.douyin.com/video/{aweme_id}")
-            print(f"   作者: {author.get('nickname', '未知')}")
-            print(
-                f"   热度: 👍 {stats.get('digg_count', 0):,} | 💬 {stats.get('comment_count', 0):,} | 🔄 {stats.get('share_count', 0):,}")
-            print(f"   画像: 50岁以上 {portrait['age_50_plus']}% (tgi: {portrait['tgi']}) - {portrait['source']}")
-
-        print("\n" + "=" * 60)
-        print(f"✅ 已为您找到 {min(len(matched_results), 10)} 条推荐视频")
-        print("=" * 60)
-
-    except Exception as e:
-        logger.error(f"生成兜底输出失败: {e}", exc_info=True)
-        print(f"\n生成摘要失败: {e}")
-
-
 async def main():
-    print("\n" + "=" * 60)
-    print("内容寻找 Agent")
-    print("=" * 60)
-    print("开始执行...\n")
-
-    # 加载 prompt
-    prompt_path = Path(__file__).parent / "content_finder.prompt"
-    prompt = SimplePrompt(prompt_path)
-
-    # 构建消息
-    messages = prompt.build_messages()
-
-    # 初始化
-    api_key = os.getenv("OPEN_ROUTER_API_KEY")
-    if not api_key:
-        raise ValueError("OPEN_ROUTER_API_KEY 未设置,请在 .env 文件中配置")
-
-    model = os.getenv("MODEL", f"anthropic/claude-{prompt.config.get('model', 'sonnet-4.6')}")
-    temperature = float(prompt.config.get("temperature", 0.3))
-    max_iterations = int(os.getenv("MAX_ITERATIONS", "30"))
-    trace_dir = os.getenv("TRACE_DIR", ".cache/traces")
-    skills_dir = str(Path(__file__).parent / "skills")
-
-    Path(trace_dir).mkdir(parents=True, exist_ok=True)
-
-    store = FileSystemTraceStore(base_path=trace_dir)
-
-    # 限制工具范围:只使用抖音相关的4个工具
-    allowed_tools = [
-        "douyin_search",
-        "douyin_user_videos",
-        "get_content_fans_portrait",
-        "get_account_fans_portrait",
-    ]
-
-    runner = AgentRunner(
-        llm_call=create_openrouter_llm_call(model=model),
-        trace_store=store,
-        skills_dir=skills_dir,
-    )
-
-    config = RunConfig(
-        name="内容寻找",
-        model=model,
-        temperature=temperature,
-        max_iterations=max_iterations,
-        tools=allowed_tools,  # 限制工具范围
-        extra_llm_params={"max_tokens": 8192},  # 增加输出 token 限制,避免被截断
-        knowledge=KnowledgeConfig(
-            # 压缩时提取(消息量超阈值触发压缩时,用完整 history 反思)
-            enable_extraction=True,
-            reflect_prompt="",  # 自定义反思 prompt;空则使用默认,见 agent/core/prompts/knowledge.py:REFLECT_PROMPT
-
-            # agent运行完成后提取(不代表任务完成,agent 可能中途退出等待人工评估)
-            enable_completion_extraction=True,
-            completion_reflect_prompt="",
-            # 自定义复盘 prompt;空则使用默认,见 agent/core/prompts/knowledge.py:COMPLETION_REFLECT_PROMPT
-
-            # 知识注入(agent切换当前工作的goal时,自动注入相关知识)
-            enable_injection=True,
-
-            # 默认字段(保存/搜索时自动注入)
-            owner="content_finder_agent",  # 所有者(空则尝试从 git config user.email 获取,再空则用 agent:{agent_id})
-            default_tags={"project": "content_finder"},  # 默认 tags(会与工具调用参数合并)
-            default_scopes=["com.piaoquantv.supply"],  # 默认 scopes
-            default_search_types=["tool", "usecase", "definition"],  # 默认搜索类型过滤
-            default_search_owner="content_finder_agent"  # 默认搜索 owner 过滤(空则不过滤)
-        )
-    )
-
-    # 执行
-    trace_id = None
-    has_final_output = False
-
+    """主函数"""
     try:
-        async for item in runner.run(messages=messages, config=config):
-            if isinstance(item, Trace):
-                trace_id = item.trace_id
-
-                if item.status == "completed":
-                    print(f"\n[完成] trace_id={item.trace_id}")
-
-                    # 检查是否有最终输出
-                    if not has_final_output:
-                        print("\n⚠️ 检测到任务未完整输出,正在生成摘要...")
-                        await generate_fallback_output(store, item.trace_id)
-
-                elif item.status == "failed":
-                    print(f"\n[失败] {item.error_message}")
-
-            elif isinstance(item, Message):
-                if item.role == "assistant":
-                    content = item.content
-                    if isinstance(content, dict):
-                        text = content.get("text", "")
-                        tool_calls = content.get("tool_calls")
-
-                        # 输出文本内容
-                        if text:
-                            # 检测是否包含最终推荐结果
-                            if "推荐结果" in text or "推荐内容" in text or "🎯" in text:
-                                has_final_output = True
-
-                            # 如果文本很长(>500字符)且包含推荐结果标记,输出完整内容
-                            if len(text) > 500 and ("推荐结果" in text or "推荐内容" in text or "🎯" in text):
-                                print(f"\n{text}")
-                            # 如果有工具调用且文本较短,只输出摘要
-                            elif tool_calls and len(text) > 100:
-                                print(f"[思考] {text[:100]}...")
-                            # 其他情况输出完整文本
-                            else:
-                                print(f"\n{text}")
-
-                        # 输出工具调用信息
-                        if tool_calls:
-                            for tc in tool_calls:
-                                tool_name = tc.get("function", {}).get("name", "unknown")
-                                # 跳过 goal 工具的输出,减少噪音
-                                if tool_name != "goal":
-                                    print(f"[工具] {tool_name}")
-                    elif isinstance(content, str) and content:
-                        print(f"\n{content}")
-
-                elif item.role == "tool":
-                    content = item.content
-                    if isinstance(content, dict):
-                        tool_name = content.get("tool_name", "unknown")
-                        print(f"[结果] {tool_name} ✓")
+        # 使用 core.py 的共享逻辑,启用流式输出
+        result = await core.run_agent(query=None, stream_output=True)
+
+        if result["status"] == "completed":
+            print(f"\n[完成] trace_id={result['trace_id']}")
+        else:
+            print(f"\n[失败] trace_id={result.get('trace_id')}, 错误: {result.get('error')}")
+            sys.exit(1)
 
     except KeyboardInterrupt:
         print("\n用户中断")

+ 368 - 0
examples/content_finder/server.py

@@ -0,0 +1,368 @@
+"""
+内容寻找服务
+
+提供:
+1. API 接口:POST /api/tasks - 触发内容寻找任务
+2. 定时调度:每 10 分钟调用外部 API 获取 query 并执行任务
+3. 并发控制:限制最大并发任务数
+"""
+
+import asyncio
+import logging
+import os
+from datetime import datetime
+from pathlib import Path
+from typing import Optional
+import sys
+
+sys.path.insert(0, str(Path(__file__).parent.parent.parent))
+
+import httpx
+from fastapi import FastAPI, HTTPException
+from pydantic import BaseModel
+from apscheduler.schedulers.asyncio import AsyncIOScheduler
+from dotenv import load_dotenv
+
+load_dotenv()
+
+import core
+
+# 配置日志
+log_dir = Path(__file__).parent / '.cache'
+log_dir.mkdir(exist_ok=True)
+
+logging.basicConfig(
+    level=logging.INFO,
+    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
+    handlers=[
+        logging.FileHandler(log_dir / 'server.log'),
+        logging.StreamHandler()
+    ]
+)
+logger = logging.getLogger(__name__)
+
+# FastAPI 应用
+app = FastAPI(
+    title="内容寻找服务",
+    version="1.0.0",
+    description="抖音内容寻找 Agent 服务"
+)
+
+# 定时调度器
+scheduler = AsyncIOScheduler()
+
+# 并发控制
+MAX_CONCURRENT_TASKS = int(os.getenv("MAX_CONCURRENT_TASKS", "3"))
+task_semaphore = asyncio.Semaphore(MAX_CONCURRENT_TASKS)
+
+# 统计信息
+stats = {
+    "total_tasks": 0,
+    "completed_tasks": 0,
+    "failed_tasks": 0,
+    "scheduled_tasks": 0
+}
+
+
+# ============ 数据模型 ============
+
+class TaskRequest(BaseModel):
+    query: Optional[str] = None
+
+
+class TaskResponse(BaseModel):
+    trace_id: str
+    status: str
+    query: str
+    message: str
+
+
+# ============ 核心函数 ============
+
+async def execute_task(query: str, task_type: str = "api"):
+    """
+    执行任务(带并发控制)
+
+    Args:
+        query: 查询内容
+        task_type: 任务类型("api" 或 "scheduled")
+    """
+    async with task_semaphore:
+        current_concurrent = MAX_CONCURRENT_TASKS - task_semaphore._value + 1
+        logger.info(f"任务开始 [{task_type}]: query={query[:50]}..., 当前并发={current_concurrent}/{MAX_CONCURRENT_TASKS}")
+
+        start_time = datetime.now()
+        stats["total_tasks"] += 1
+        if task_type == "scheduled":
+            stats["scheduled_tasks"] += 1
+
+        try:
+            # 执行 agent(不流式输出)
+            result = await core.run_agent(query, stream_output=False)
+
+            duration = (datetime.now() - start_time).total_seconds()
+
+            if result["status"] == "completed":
+                stats["completed_tasks"] += 1
+                logger.info(f"任务完成 [{task_type}]: trace_id={result['trace_id']}, 耗时={duration:.1f}s")
+            else:
+                stats["failed_tasks"] += 1
+                logger.error(f"任务失败 [{task_type}]: trace_id={result.get('trace_id')}, 错误={result.get('error')}, 耗时={duration:.1f}s")
+
+        except Exception as e:
+            stats["failed_tasks"] += 1
+            duration = (datetime.now() - start_time).total_seconds()
+            logger.error(f"任务异常 [{task_type}]: {e}, 耗时={duration:.1f}s", exc_info=True)
+
+
+async def scheduled_task():
+    """
+    定时任务:每 10 分钟执行一次
+
+    流程:
+    1. 调用外部 API 获取 query
+    2. 如果成功,执行任务
+    3. 如果失败,跳过本次执行(不使用兜底)
+    """
+    logger.info("定时任务触发")
+
+    try:
+        # 1. 调用外部 API 获取 query
+        query_api = os.getenv("SCHEDULE_QUERY_API")
+        if not query_api:
+            logger.warning("未配置 SCHEDULE_QUERY_API,跳过定时任务")
+            return
+
+        api_key = os.getenv("SCHEDULE_QUERY_API_KEY", "")
+        timeout = float(os.getenv("SCHEDULE_QUERY_API_TIMEOUT", "10.0"))
+
+        async with httpx.AsyncClient() as client:
+            headers = {}
+            if api_key:
+                headers["Authorization"] = f"Bearer {api_key}"
+
+            logger.info(f"调用外部 API: {query_api}")
+            response = await client.get(
+                query_api,
+                headers=headers,
+                timeout=timeout
+            )
+            response.raise_for_status()
+            data = response.json()
+
+        # 2. 提取 query
+        query = data.get("query")
+        if not query:
+            logger.info("定时任务跳过:外部 API 返回的 query 为空")
+            return
+
+        # 3. 执行任务
+        logger.info(f"定时任务启动: query={query[:50]}...")
+        asyncio.create_task(execute_task(query, task_type="scheduled"))
+
+    except httpx.HTTPStatusError as e:
+        logger.error(f"定时任务失败:外部 API 返回错误 {e.response.status_code}: {e.response.text}")
+    except httpx.RequestError as e:
+        logger.error(f"定时任务失败:外部 API 请求失败: {e}")
+    except httpx.TimeoutException:
+        logger.error(f"定时任务失败:外部 API 请求超时")
+    except Exception as e:
+        logger.error(f"定时任务失败:未知错误: {e}", exc_info=True)
+
+
+# ============ API 接口 ============
+
+@app.post("/api/tasks", response_model=TaskResponse)
+async def create_task(request: TaskRequest):
+    """
+    创建内容寻找任务
+
+    Args:
+        request.query: 查询内容(可选,不传则使用默认值)
+
+    Returns:
+        {
+            "trace_id": "20260317_103046_xyz789",
+            "status": "started",
+            "query": "...",
+            "message": "任务已启动,结果将保存到 .cache/traces/xxx/"
+        }
+    """
+    # 获取 query
+    query = request.query or core.DEFAULT_QUERY
+
+    # 用 Event 等待 trace_id
+    trace_id_ready = asyncio.Event()
+    trace_id_holder = {"id": None}
+
+    async def run_and_capture():
+        try:
+            # 获取第一个 Trace 对象来获取 trace_id
+            from agent import Trace
+
+            async with task_semaphore:
+                # 重新构建 runner 来获取 trace_id
+                from agent import AgentRunner, RunConfig, FileSystemTraceStore
+                from agent.llm import create_openrouter_llm_call
+                from agent.llm.prompts import SimplePrompt
+                from agent.tools.builtin.knowledge import KnowledgeConfig
+
+                prompt_path = Path(__file__).parent / "content_finder.prompt"
+                prompt = SimplePrompt(prompt_path)
+                messages = prompt.build_messages(query=query)
+
+                api_key = os.getenv("OPEN_ROUTER_API_KEY")
+                model_name = prompt.config.get("model", "sonnet-4.6")
+                model = os.getenv("MODEL", f"anthropic/claude-{model_name}")
+                temperature = float(prompt.config.get("temperature", 0.3))
+                max_iterations = int(os.getenv("MAX_ITERATIONS", "30"))
+                trace_dir = os.getenv("TRACE_DIR", ".cache/traces")
+                skills_dir = str(Path(__file__).parent / "skills")
+
+                Path(trace_dir).mkdir(parents=True, exist_ok=True)
+                store = FileSystemTraceStore(base_path=trace_dir)
+
+                allowed_tools = [
+                    "douyin_search",
+                    "douyin_user_videos",
+                    "get_content_fans_portrait",
+                    "get_account_fans_portrait",
+                ]
+
+                runner = AgentRunner(
+                    llm_call=create_openrouter_llm_call(model=model),
+                    trace_store=store,
+                    skills_dir=skills_dir,
+                )
+
+                config = RunConfig(
+                    name="内容寻找",
+                    model=model,
+                    temperature=temperature,
+                    max_iterations=max_iterations,
+                    tools=allowed_tools,
+                    extra_llm_params={"max_tokens": 8192},
+                    knowledge=KnowledgeConfig(
+                        enable_extraction=True,
+                        enable_completion_extraction=True,
+                        enable_injection=True,
+                        owner="content_finder_agent",
+                        default_tags={"project": "content_finder"},
+                        default_scopes=["com.piaoquantv.supply"],
+                        default_search_types=["tool", "usecase", "definition"],
+                        default_search_owner="content_finder_agent"
+                    )
+                )
+
+                async for item in runner.run(messages=messages, config=config):
+                    if isinstance(item, Trace):
+                        if not trace_id_holder["id"]:
+                            trace_id_holder["id"] = item.trace_id
+                            trace_id_ready.set()
+                            logger.info(f"任务启动 [api]: trace_id={item.trace_id}")
+
+                        if item.status == "completed":
+                            stats["completed_tasks"] += 1
+                            logger.info(f"任务完成 [api]: trace_id={item.trace_id}")
+                            break
+                        elif item.status == "failed":
+                            stats["failed_tasks"] += 1
+                            logger.error(f"任务失败 [api]: trace_id={item.trace_id}, 错误={item.error_message}")
+                            break
+
+        except Exception as e:
+            stats["failed_tasks"] += 1
+            logger.error(f"任务异常 [api]: {e}", exc_info=True)
+            if not trace_id_holder["id"]:
+                trace_id_holder["id"] = f"error_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
+                trace_id_ready.set()
+
+    # 启动后台任务
+    stats["total_tasks"] += 1
+    asyncio.create_task(run_and_capture())
+
+    # 等待 trace_id(最多 5 秒)
+    try:
+        await asyncio.wait_for(trace_id_ready.wait(), timeout=5.0)
+    except asyncio.TimeoutError:
+        logger.error("获取 trace_id 超时")
+        raise HTTPException(status_code=500, detail="任务启动超时")
+
+    trace_id = trace_id_holder["id"]
+
+    return TaskResponse(
+        trace_id=trace_id,
+        status="started",
+        query=query,
+        message=f"任务已启动,结果将保存到 .cache/traces/{trace_id}/"
+    )
+
+
+@app.get("/health")
+async def health_check():
+    """健康检查"""
+    return {
+        "status": "ok",
+        "max_concurrent_tasks": MAX_CONCURRENT_TASKS,
+        "current_tasks": MAX_CONCURRENT_TASKS - task_semaphore._value,
+        "scheduler_running": scheduler.running,
+        "stats": stats
+    }
+
+
+@app.get("/")
+async def root():
+    """根路径"""
+    return {
+        "service": "内容寻找服务",
+        "version": "1.0.0",
+        "endpoints": {
+            "create_task": "POST /api/tasks",
+            "health": "GET /health"
+        }
+    }
+
+
+# ============ 启动事件 ============
+
+@app.on_event("startup")
+async def startup():
+    """服务启动时初始化"""
+    logger.info("=" * 60)
+    logger.info("内容寻找服务启动中...")
+    logger.info(f"最大并发任务数: {MAX_CONCURRENT_TASKS}")
+
+    # 配置定时任务
+    query_api = os.getenv("SCHEDULE_QUERY_API")
+    if query_api:
+        # 每 10 分钟执行一次
+        scheduler.add_job(scheduled_task, "cron", minute="*/10")
+        scheduler.start()
+        logger.info(f"定时任务已启动:每 10 分钟执行一次")
+        logger.info(f"外部 API: {query_api}")
+    else:
+        logger.info("未配置 SCHEDULE_QUERY_API,定时任务未启动")
+
+    logger.info("服务启动完成")
+    logger.info("=" * 60)
+
+
+@app.on_event("shutdown")
+async def shutdown():
+    """服务关闭时清理"""
+    logger.info("服务关闭中...")
+    if scheduler.running:
+        scheduler.shutdown()
+    logger.info("服务已关闭")
+
+
+# ============ 主函数 ============
+
+if __name__ == "__main__":
+    import uvicorn
+
+    port = int(os.getenv("PORT", "8080"))
+    host = os.getenv("HOST", "0.0.0.0")
+
+    logger.info(f"启动服务: http://{host}:{port}")
+    uvicorn.run(app, host=host, port=port)

+ 186 - 50
examples/content_finder/skills/content_filtering_strategy.md

@@ -1,74 +1,210 @@
 # 内容筛选方法论
 
-## 核心方法:从需求中提取标准 → 按标准评估 → 分层输出
+## 核心方法:分阶段筛选 → 画像兜底 → 分层输出
 
-### 第一步:从需求中提取评估标准
+本方案采用结构化的筛选流程,明确每个阶段的评估标准和执行顺序。
 
-用户的要求即评估标准,不要引入用户没有提到的标准。
+---
 
-**如何提取**:
+## 完整筛选流程
+
+### 阶段一:基础质量筛选
+
+在获取画像数据前,先进行快速的基础筛选,减少不必要的 API 调用。
+
+#### 热度评估
+
+**量化标准**(可根据需求调整):
+- 1000+: 一般热度
+- 5000+: 较高热度
+- 10000+: 高热度
+- 50000+: 爆款
+
+**评估维度**:
+- 点赞数(digg_count)
+- 评论数(comment_count)
+- 分享数(share_count)
+
+**筛选策略**:
+- 根据用户需求设定最低热度门槛
+- 如果用户未明确要求,保持宽松标准
+
+#### 相关性评估
+
+**评估依据**:
+- 内容描述(desc)是否包含关键词
+- 内容主题是否与需求相关
+- 明显不相关的内容直接过滤
+
+**输出**:保留通过基础筛选的候选内容列表
+
+---
+
+### 阶段二:画像匹配筛选
+
+对通过基础筛选的内容,进行精细的画像匹配评估。
+
+#### 画像数据获取策略
+
+**优先级1:内容点赞用户画像**
+- 调用 `get_content_fans_portrait(content_id=aweme_id)`
+- 检查返回的 metadata.has_portrait 字段
+- 如果 has_portrait 为 True:
+  - 从 metadata.portrait_data 中获取结构化画像数据
+  - 评估是否符合目标人群
+  - 在结果中标注"数据来源:内容点赞画像"
+
+**优先级2:账号粉丝画像(兜底)**
+- 如果 metadata.has_portrait 为 False(画像数据缺失)
+- 调用 `get_account_fans_portrait(account_id=author.sec_uid)`
+- 检查返回的 metadata.has_portrait 字段
+- 如果 has_portrait 为 True:
+  - 从 metadata.portrait_data 中获取结构化画像数据
+  - 评估是否符合目标人群
+  - 在结果中标注"数据来源:账号粉丝画像(内容点赞画像缺失)"
+
+**优先级3:无画像数据**
+- 如果两种画像的 has_portrait 都为 False
+- 仅基于热度和相关性评估
+- 在结果中标注"数据来源:无画像数据"
+
+#### 画像评估标准
+
+**从需求中提取目标人群**:
 - 用户明确说的条件,作为硬性标准(必须满足)
 - 用户模糊描述的,作为软性标准(尽量满足)
-- 用户没提的,不要主动引入
+
+**量化评估**:
+- **占比(ratio/percentage)**:目标人群在总体中的占比
+- **偏好度(tgi/preference)**:> 100 表示该人群偏好高于平均水平,= 100 表示平均,< 100 表示低于平均
 
 **举例**:
-- 用户说「热度高」→ 评估热度数据,其他维度作参考
-- 用户说「感人」→ 评估情感共鸣信号(标题、描述等),而不只看数字
-- 用户说「适合XX人群」→ 评估画像数据中该人群的占比和偏好度
-
-### 第二步:评估每条内容
-
-**分阶段筛选**:
-- 阶段一:先进行基础质量筛选(热度、相关性),快速过滤明显不符合的内容
-- 阶段二:对通过基础筛选的内容,分批获取画像数据进行精细评估
-- 分批策略:先处理 10 条,不足再继续下一批
-- 目的:减少不必要的画像 API 调用,提高效率,避免 token 超限
-
-**画像数据获取策略**:
-- 优先使用内容点赞用户画像(get_content_fans_portrait)
-- 当点赞画像数据缺失时(返回只有标题无画像内容,或包含"暂无画像数据"),使用该内容作者的账号粉丝画像(get_account_fans_portrait)作为兜底
-- 在结果中明确标注数据来源("内容点赞画像"或"账号粉丝画像")
-
-**优质账号扩展**:
-- 如果某个账号的粉丝画像非常符合要求(如目标人群占比 > 60% 且 tgi > 120)
-- 可以获取该账号的其他作品作为补充(5-10条)
-- 对扩展作品仅做基础筛选,不再递归获取画像
-
-**输出链接要求**:
-每条推荐内容必须包含:
-- 内容链接:https://www.douyin.com/video/{aweme_id}
-- 作者链接:https://www.douyin.com/user/{author.sec_uid}
-- 画像链接(如果有):
-  - 内容点赞画像:https://douhot.douyin.com/video/detail?active_tab=video_fans&video_id={aweme_id}
-  - 账号粉丝画像:https://douhot.douyin.com/creator/detail?active_tab=creator_fans_portrait&creator_id={author.sec_uid}
-
-**量化指标**:有具体数字的直接比较(点赞量、分享率、年龄占比等)。
-
-**质性判断**:无法量化时,寻找间接信号:
+- 需求:"适合50岁以上老年人"
+- 评估:年龄分布中"50岁以上"的占比和 tgi
+- 判断:占比 > 40% 且 tgi > 100 为符合
+
+**质性判断**:
+- 无法量化时,寻找间接信号
 - 「感人」→ 看评论中是否有情感表达
 - 「有价值」→ 看收藏率是否高于平均水平
-- 「适合XX」→ 看画像偏好度是否大于 100(tgi > 100 表示高于平均)
 
-**相对比较**:在候选集中排序,而不是套用固定门槛。同一批结果里,谁更好就推谁。
+**输出**:筛选出符合目标人群的内容
+
+---
+
+### 阶段三:优质账号扩展
+
+识别优质账号并扩展其作品,作为补充内容。
+
+#### 优质账号识别标准
 
-**去重机制**:
-- 按 aweme_id 去重,避免重复推荐相同内容
-- 不同来源(不同关键词、账号扩展)可能搜到相同内容
-- 保留第一次出现的版本,或合并多个来源的标签
+**量化标准**:
+- 账号粉丝画像中,目标人群占比 > 60%
+- 目标人群偏好度(tgi)> 120
 
-### 第三步:分层输出
+**判断时机**:
+- 在阶段二中,如果某个账号的粉丝画像非常符合要求
+- 标记为"优质账号"
 
-不是非此即彼,而是按匹配度分层:
+#### 扩展策略
 
-- **强烈推荐**:核心标准全部满足,其他维度也好
-- **推荐**:核心标准满足,有小瑕疵
-- **可选**:核心标准基本满足,作为补充
-- **不推荐**:核心标准不满足,明确说明原因
+**获取账号作品**:
+- 调用 `douyin_user_videos(account_id=author.sec_uid)`
+- 限制数量:5-10 条近期作品
+- 从返回的 metadata.user_videos 中获取结构化数据
+
+**筛选扩展作品**:
+- **仅执行阶段一筛选**(热度、相关性)
+- **不再递归获取画像**,避免无限展开
+- 假设该账号的其他作品也符合目标人群
+
+**加入候选池**:
+- 将通过筛选的扩展作品加入候选池
+- 标注"来源:优质账号扩展"
+
+---
+
+### 阶段四:去重与排序
+
+#### 去重机制
+
+**去重依据**:
+- 按 aweme_id 去重
+- 不同关键词或不同来源可能搜到相同内容
+
+**去重策略**:
+- 保留第一次出现的版本
+- 合并多个来源的标签(如"关键词A + 优质账号扩展")
+
+#### 排序策略
+
+**综合排序**:
+- 优先级1:画像匹配度(目标人群占比 × tgi)
+- 优先级2:热度(点赞、评论、分享综合)
+- 优先级3:数据来源(内容点赞画像 > 账号粉丝画像 > 无画像)
+
+**相对比较**:
+- 在候选集中排序,而不是套用固定门槛
+- 同一批结果里,谁更好就推谁
+
+---
+
+### 阶段五:分层输出
+
+不是非此即彼,而是按匹配度分层。
+
+#### 输出分层
+
+**强烈推荐**:
+- 核心标准全部满足(画像匹配 + 热度高)
+- 数据来源可靠(内容点赞画像或账号粉丝画像)
+
+**推荐**:
+- 核心标准满足,有小瑕疵
+- 如:画像匹配但热度一般,或热度高但画像数据缺失
+
+**可选**:
+- 核心标准基本满足,作为补充
+- 如:优质账号扩展的作品
+
+**不推荐**:
+- 核心标准不满足,明确说明原因
+- 如:画像不匹配、热度过低
+
+#### 输出内容
+
+**每条内容包含**:
+- 内容基本信息(ID、描述、作者)
+  - **数据来源**:必须从 metadata.search_results 或 metadata.user_videos 中获取
+  - **sec_uid 要求**:必须完整复制(约80字符),不能截断
+- 热度数据(点赞、评论、分享)
+  - **数据来源**:必须从 metadata 中的 statistics 字段获取
+- 画像数据(目标人群占比、tgi)
+  - **数据来源**:从 metadata.portrait_data 中获取
+  - **有效性判断**:通过 metadata.has_portrait 字段判断
+- 数据来源标注
+- 推荐理由
+- **必须包含链接**:
+  - 内容链接:https://www.douyin.com/video/{aweme_id}
+  - 作者链接:https://www.douyin.com/user/{author.sec_uid}
+  - 画像链接(如果 has_portrait 为 True):
+    - 内容点赞画像:https://douhot.douyin.com/video/detail?active_tab=video_fans&video_id={aweme_id}
+    - 账号粉丝画像:https://douhot.douyin.com/creator/detail?active_tab=creator_fans_portrait&creator_id={author.sec_uid}
+
+**说明评估逻辑**:
+- 让用户理解为什么这条内容被推荐或排除
+- 透明展示评估过程
+
+---
 
 ## 关键原则
 
 **标准来自需求**:评估维度随需求变化,不固化。
 
+**分阶段筛选**:先快速过滤,再精细评估,提高效率。
+
+**画像兜底策略**:优先内容画像,缺失时用账号画像,确保数据覆盖。
+
 **说明评估逻辑**:让用户理解为什么这条内容被推荐或排除。
 
 **承认不确定性**:数据不足以判断时,如实说明,而不是强行打分。

+ 0 - 212
examples/content_finder/skills/content_filtering_strategy_v2.md

@@ -1,212 +0,0 @@
-# 内容筛选方法论 V2(优化版)
-
-## 核心方法:分阶段筛选 → 画像兜底 → 分层输出
-
-本方案采用结构化的筛选流程,明确每个阶段的评估标准和执行顺序。
-
----
-
-## 完整筛选流程
-
-### 阶段一:基础质量筛选
-
-在获取画像数据前,先进行快速的基础筛选,减少不必要的 API 调用。
-
-#### 热度评估
-
-**量化标准**(可根据需求调整):
-- 1000+: 一般热度
-- 5000+: 较高热度
-- 10000+: 高热度
-- 50000+: 爆款
-
-**评估维度**:
-- 点赞数(digg_count)
-- 评论数(comment_count)
-- 分享数(share_count)
-
-**筛选策略**:
-- 根据用户需求设定最低热度门槛
-- 如果用户未明确要求,保持宽松标准
-
-#### 相关性评估
-
-**评估依据**:
-- 内容描述(desc)是否包含关键词
-- 内容主题是否与需求相关
-- 明显不相关的内容直接过滤
-
-**输出**:保留通过基础筛选的候选内容列表
-
----
-
-### 阶段二:画像匹配筛选
-
-对通过基础筛选的内容,进行精细的画像匹配评估。
-
-#### 画像数据获取策略
-
-**优先级1:内容点赞用户画像**
-- 调用 `get_content_fans_portrait(content_id=aweme_id)`
-- 检查返回的 metadata.has_portrait 字段
-- 如果 has_portrait 为 True:
-  - 从 metadata.portrait_data 中获取结构化画像数据
-  - 评估是否符合目标人群
-  - 在结果中标注"数据来源:内容点赞画像"
-
-**优先级2:账号粉丝画像(兜底)**
-- 如果 metadata.has_portrait 为 False(画像数据缺失)
-- 调用 `get_account_fans_portrait(account_id=author.sec_uid)`
-- 检查返回的 metadata.has_portrait 字段
-- 如果 has_portrait 为 True:
-  - 从 metadata.portrait_data 中获取结构化画像数据
-  - 评估是否符合目标人群
-  - 在结果中标注"数据来源:账号粉丝画像(内容点赞画像缺失)"
-
-**优先级3:无画像数据**
-- 如果两种画像的 has_portrait 都为 False
-- 仅基于热度和相关性评估
-- 在结果中标注"数据来源:无画像数据"
-
-#### 画像评估标准
-
-**从需求中提取目标人群**:
-- 用户明确说的条件,作为硬性标准(必须满足)
-- 用户模糊描述的,作为软性标准(尽量满足)
-
-**量化评估**:
-- **占比(ratio/percentage)**:目标人群在总体中的占比
-- **偏好度(tgi/preference)**:> 100 表示该人群偏好高于平均水平,= 100 表示平均,< 100 表示低于平均
-
-**举例**:
-- 需求:"适合50岁以上老年人"
-- 评估:年龄分布中"50岁以上"的占比和 tgi
-- 判断:占比 > 40% 且 tgi > 100 为符合
-
-**质性判断**:
-- 无法量化时,寻找间接信号
-- 「感人」→ 看评论中是否有情感表达
-- 「有价值」→ 看收藏率是否高于平均水平
-
-**输出**:筛选出符合目标人群的内容
-
----
-
-### 阶段三:优质账号扩展
-
-识别优质账号并扩展其作品,作为补充内容。
-
-#### 优质账号识别标准
-
-**量化标准**:
-- 账号粉丝画像中,目标人群占比 > 60%
-- 目标人群偏好度(tgi)> 120
-
-**判断时机**:
-- 在阶段二中,如果某个账号的粉丝画像非常符合要求
-- 标记为"优质账号"
-
-#### 扩展策略
-
-**获取账号作品**:
-- 调用 `douyin_user_videos(account_id=author.sec_uid)`
-- 限制数量:5-10 条近期作品
-- 从返回的 metadata.user_videos 中获取结构化数据
-
-**筛选扩展作品**:
-- **仅执行阶段一筛选**(热度、相关性)
-- **不再递归获取画像**,避免无限展开
-- 假设该账号的其他作品也符合目标人群
-
-**加入候选池**:
-- 将通过筛选的扩展作品加入候选池
-- 标注"来源:优质账号扩展"
-
----
-
-### 阶段四:去重与排序
-
-#### 去重机制
-
-**去重依据**:
-- 按 aweme_id 去重
-- 不同关键词或不同来源可能搜到相同内容
-
-**去重策略**:
-- 保留第一次出现的版本
-- 合并多个来源的标签(如"关键词A + 优质账号扩展")
-
-#### 排序策略
-
-**综合排序**:
-- 优先级1:画像匹配度(目标人群占比 × tgi)
-- 优先级2:热度(点赞、评论、分享综合)
-- 优先级3:数据来源(内容点赞画像 > 账号粉丝画像 > 无画像)
-
-**相对比较**:
-- 在候选集中排序,而不是套用固定门槛
-- 同一批结果里,谁更好就推谁
-
----
-
-### 阶段五:分层输出
-
-不是非此即彼,而是按匹配度分层。
-
-#### 输出分层
-
-**强烈推荐**:
-- 核心标准全部满足(画像匹配 + 热度高)
-- 数据来源可靠(内容点赞画像或账号粉丝画像)
-
-**推荐**:
-- 核心标准满足,有小瑕疵
-- 如:画像匹配但热度一般,或热度高但画像数据缺失
-
-**可选**:
-- 核心标准基本满足,作为补充
-- 如:优质账号扩展的作品
-
-**不推荐**:
-- 核心标准不满足,明确说明原因
-- 如:画像不匹配、热度过低
-
-#### 输出内容
-
-**每条内容包含**:
-- 内容基本信息(ID、描述、作者)
-  - **数据来源**:必须从 metadata.search_results 或 metadata.user_videos 中获取
-  - **sec_uid 要求**:必须完整复制(约80字符),不能截断
-- 热度数据(点赞、评论、分享)
-  - **数据来源**:必须从 metadata 中的 statistics 字段获取
-- 画像数据(目标人群占比、tgi)
-  - **数据来源**:从 metadata.portrait_data 中获取
-  - **有效性判断**:通过 metadata.has_portrait 字段判断
-- 数据来源标注
-- 推荐理由
-- **必须包含链接**:
-  - 内容链接:https://www.douyin.com/video/{aweme_id}
-  - 作者链接:https://www.douyin.com/user/{author.sec_uid}
-  - 画像链接(如果 has_portrait 为 True):
-    - 内容点赞画像:https://douhot.douyin.com/video/detail?active_tab=video_fans&video_id={aweme_id}
-    - 账号粉丝画像:https://douhot.douyin.com/creator/detail?active_tab=creator_fans_portrait&creator_id={author.sec_uid}
-
-**说明评估逻辑**:
-- 让用户理解为什么这条内容被推荐或排除
-- 透明展示评估过程
-
----
-
-## 关键原则
-
-**标准来自需求**:评估维度随需求变化,不固化。
-
-**分阶段筛选**:先快速过滤,再精细评估,提高效率。
-
-**画像兜底策略**:优先内容画像,缺失时用账号画像,确保数据覆盖。
-
-**说明评估逻辑**:让用户理解为什么这条内容被推荐或排除。
-
-**承认不确定性**:数据不足以判断时,如实说明,而不是强行打分。
-
-**让用户决定**:提供充分信息,最终选择权在用户。

+ 152 - 41
examples/content_finder/skills/content_finding_strategy.md

@@ -1,91 +1,202 @@
 # 内容寻找方法论
 
-## 核心方法:需求拆解 → 搜索执行 → 结果验证
+## 核心方法:串行搜索 → 分阶段筛选 → 按需补充
 
-### 第一步:拆解用户需求
+本方案采用更结构化的执行流程,提供明确的数量控制和资源优化策略。
 
-在搜索前,先理解需求的结构:
+---
 
-**主题维度**:用户要找什么内容?提取核心关键词。
+## 完整执行流程
 
-**约束维度**:有哪些限制条件?(热度要求、受众特征、时间范围等)
+### 第一步:需求分析与关键词提取
 
-**优先级**:哪个维度是最重要的?用户明确说的 > 用户暗示的 > 你的默认假设。
-
-### 第二步:制定搜索策略
-
-**关键词选择**:
+**提取多个搜索关键词**:
+- 从用户需求中提取核心关键词和扩展关键词
 - 优先使用用户原话中的关键词
 - 必要时补充同义词或相关词
-- 不要替换用户的核心意图
 
-**参数设置**:
-- 根据约束条件设置搜索参数
-- 不确定时使用宽松参数,宁可多搜再筛选
+**关键词排序**:
+- 按相关性排序:核心关键词优先,扩展关键词其次
+- 优先级:用户明确说的 > 用户暗示的 > 推测的
+
+**确定目标数量 M**:
+- M = 用户要求的内容数量(如"找10条内容",则 M = 10)
+
+---
+
+### 第二步:串行关键词搜索
+
+**选择关键词**:
+- 从关键词列表中选择优先级最高的未使用关键词
 
 **搜索数量控制**:
-- 如果用户要求找 M 条内容,只搜索 N = M × 2 条,不要超过
+- 只搜索 N 条内容,其中 **N = M × 2,不要超过**
 - 如果第一次搜索返回超过 N 条,只保留前 N 条处理
 - 目的:控制 token 消耗,确保筛选后有足够余量
-- 示例:用户要10条,只搜索20条候选
 
 **分页策略**:
 - 第一次搜索使用默认 cursor("0" 或 "")
-- 如果结果不够,从返回数据中提取 cursor 值继续获取下一页
+- 如果需要更多结果,使用返回的 cursor 值继续获取
 - 示例:`douyin_search(keyword="...", cursor="返回的cursor值")`
 
-**迭代策略**:
-- 第一轮搜索结果不够好时,调整关键词或参数再搜
-- 不要在一次失败后就放弃
+---
+
+### 第三步:分阶段内容筛选
+
+#### 阶段一:基础质量筛选
 
-**优质账号扩展**:
-- 如果发现某个账号的粉丝画像非常符合目标人群(如目标人群占比 > 60% 且 tgi > 120)
-- 可以获取该账号的其他作品作为补充:`douyin_user_videos(account_id=author.sec_uid)`
-- 建议限制数量(如5-10条),避免过度依赖单一账号
-- 对扩展作品进行基础筛选(热度、相关性),不需要再次获取画像
+**热度筛选**:
+- 根据点赞、评论、分享数据过滤低质量内容
+- 参考标准(可根据需求调整):
+  - 1000+: 一般热度
+  - 5000+: 较高热度
+  - 10000+: 高热度
+
+**相关性筛选**:
+- 根据内容描述过滤明显不相关的内容
+- 保留候选内容列表
+
+#### 阶段二:画像匹配筛选
 
 **分批处理策略**:
 - 不要一次性处理所有候选内容
-- 先处理前 10 条,筛选后如果符合要求的内容 >= M,停止处理
+- 先处理前 10 条候选内容
+- 筛选后如果符合要求的内容 >= M,停止处理
 - 如果不足,继续处理下一批 10 条
 - 目的:避免一次性调用过多工具导致 token 超限
 
-**工具调用建议**:
+**画像获取完成标准(重要)**:
+- 当已获取画像的内容数量 >= M × 1.5 时,立即停止获取画像
+- 示例:用户要10条,获取15条画像后立即进入筛选和输出阶段
+- 不要无限循环获取画像,避免陷入"一直获取画像"的状态
+
+**工具调用限制**:
 - 每次最多并行调用 3 个画像工具
 - 避免一次性调用过多工具导致响应被截断
 
-### 第三步:验证结果
+**对每条候选内容**:
+
+1. **从 metadata.search_results 或 metadata.user_videos 中获取基础信息**:
+   - aweme_id、desc、author.nickname、author.sec_uid、statistics
+   - 这些数据将用于最终输出,必须完整保留
+   - 特别注意:author.sec_uid 约80字符,必须完整复制
+
+2. **优先获取内容点赞用户画像**:
+   - 调用 `get_content_fans_portrait(content_id=aweme_id)`
+   - 检查返回的 metadata.has_portrait 字段
+   - 如果 has_portrait 为 True:评估是否符合目标人群,标注"内容点赞画像"
+
+3. **画像缺失时的兜底策略**:
+   - 如果 metadata.has_portrait 为 False(无画像数据)
+   - 获取该内容作者的账号粉丝画像:`get_account_fans_portrait(account_id=author.sec_uid)`
+   - 检查账号画像的 metadata.has_portrait
+   - 如果有画像:评估是否符合目标人群,标注"账号粉丝画像"
+
+4. **画像评估**:
+   - 从 metadata.portrait_data 中获取结构化的画像数据
+   - 根据目标人群的占比和偏好度(tgi)判断
+   - 偏好度 > 100 表示该人群偏好高于平均水平,= 100 表示平均,< 100 表示低于平均
+   - 筛选出符合要求的内容
+
+#### 阶段三:优质账号扩展(可选)
 
-拿到搜索结果后,对照需求逐一验证:
+**识别优质账号**:
+- 标准:账号粉丝画像匹配度高
+- 量化指标:目标人群占比 > 60% 且 tgi > 120
 
-- 每个结果是否满足用户的核心要求?
-- 约束条件是否达标?
-- 有无更好的候选?
+**扩展策略**:
+- 对优质账号,获取其近期作品:`douyin_user_videos(account_id=author.sec_uid)`
+- 限制数量:5-10 条
+- 从返回的 metadata.user_videos 中获取结构化数据
+- 对这些作品**仅执行阶段一筛选**(热度、相关性)
+- **不再递归获取画像**,避免无限展开
+- 作为补充内容加入候选池
 
-**如果结果不满足要求**:调整搜索策略,再次尝试,而不是凑合推荐。
+**必须说明(重要)**:
+- 在输出推荐结果时,必须明确说明优质账号扩展情况
+- 如果发现优质账号:说明"发现 X 个优质账号(账号名,目标人群占比 Y%,tgi Z),已扩展其作品"
+- 如果未发现优质账号:说明"未发现符合扩展条件的优质账号(需要目标人群占比 > 60% 且 tgi > 120)"
+- 让用户清楚知道是否执行了扩展,以及扩展的结果
 
-**如果工具返回错误**:
+---
+
+### 第四步:结果评估与补充
+
+**统计当前符合要求的内容数量 C**:
+- C = 通过所有筛选阶段的内容数量
+
+**判断是否需要补充**:
+- 如果 **C >= M**:完成,进入第五步
+- 如果 **C < M × 0.8**:内容不足,选择下一个关键词,回到第二步
+- 如果 **M × 0.8 <= C < M**:接近目标,可选择继续补充或直接输出
+
+---
+
+### 第五步:去重与排序
+
+**去重机制**:
+- 按 aweme_id 去重(不同关键词可能搜到相同内容)
+- 保留第一次出现的版本
+
+**排序策略**:
+- 按匹配度和热度综合排序
+- 优先推荐匹配度高且热度高的内容
+
+**输出结果**:
+- 按分层输出:强烈推荐、推荐、可选
+- 说明每条内容的推荐理由和数据来源
+- **数据来源要求**:
+  - 所有基础信息(aweme_id、作者名、sec_uid、热度数据)必须来自 metadata.search_results
+  - 不能使用 output 文本中的数据
+  - 不能编造任何字段
+- **必须包含链接**:
+  - 内容链接:https://www.douyin.com/video/{aweme_id}
+  - 作者链接:https://www.douyin.com/user/{author.sec_uid}(完整复制,不截断)
+  - 画像链接(如果有):
+    - 内容点赞画像:https://douhot.douyin.com/video/detail?active_tab=video_fans&video_id={aweme_id}
+    - 账号粉丝画像:https://douhot.douyin.com/creator/detail?active_tab=creator_fans_portrait&creator_id={author.sec_uid}
+
+---
+
+## 错误处理
 
 **服务级错误(HTTP 502/503/504)**:
-- 这是服务暂时不可用,不是你的参数问题
-- 不要重复尝试相同的调用(最多重试1次即可)
-- 不要尝试换关键词(关键词不是问题)
+- 这是服务暂时不可用,不是参数问题
+- 工具会返回详细的错误信息,包含 HTTP 状态码
+- 不要重复尝试相同的调用(最多重试1次
 - 直接告知用户"服务暂时不可用,请稍后再试"
 - 不要切换到其他平台或工具
 
 **参数错误(HTTP 400/404)**:
 - 检查参数格式是否正确
-- 检查 ID 是否存在
 - 调整参数后重试
 
-**网络错误(Timeout/Connection)**:
+**超时错误(Timeout)**:
+- 工具会返回明确的超时错误信息
+- 可以重试1次,如果仍然超时则告知用户
+
+**网络错误(Connection/Network)**:
 - 可以重试1-2次
 - 如果持续失败,告知用户网络问题
 
+---
+
+## 关键参数
+
+- **搜索倍数**:N = M × 2
+- **不足阈值**:C < M × 0.8
+- **优质账号标准**:目标人群占比 > 60% 且 tgi > 120
+- **账号扩展数量**:5-10 条
+- **TGI 说明**:tgi > 100 表示高于平均,= 100 表示平均,< 100 表示低于平均
+
+---
+
 ## 关键原则
 
-**忠实需求**:用户要什么就找什么,不要基于自己的判断替换用户意图。
+**结构化执行**:严格按照5步流程执行,确保可控性。
+
+**资源优化**:串行搜索,够了就停,避免浪费。
 
 **透明过程**:说明为什么选择这些关键词,用了什么筛选逻辑。
 
-**承认局限**:如果真的找不到符合要求的内容,如实说明,而不是推荐不符合要求的内容。
+**承认局限**:如果真的找不到符合要求的内容,如实说明。

+ 0 - 202
examples/content_finder/skills/content_finding_strategy_v2.md

@@ -1,202 +0,0 @@
-# 内容寻找方法论 V2(优化版)
-
-## 核心方法:串行搜索 → 分阶段筛选 → 按需补充
-
-本方案采用更结构化的执行流程,提供明确的数量控制和资源优化策略。
-
----
-
-## 完整执行流程
-
-### 第一步:需求分析与关键词提取
-
-**提取多个搜索关键词**:
-- 从用户需求中提取核心关键词和扩展关键词
-- 优先使用用户原话中的关键词
-- 必要时补充同义词或相关词
-
-**关键词排序**:
-- 按相关性排序:核心关键词优先,扩展关键词其次
-- 优先级:用户明确说的 > 用户暗示的 > 推测的
-
-**确定目标数量 M**:
-- M = 用户要求的内容数量(如"找10条内容",则 M = 10)
-
----
-
-### 第二步:串行关键词搜索
-
-**选择关键词**:
-- 从关键词列表中选择优先级最高的未使用关键词
-
-**搜索数量控制**:
-- 只搜索 N 条内容,其中 **N = M × 2,不要超过**
-- 如果第一次搜索返回超过 N 条,只保留前 N 条处理
-- 目的:控制 token 消耗,确保筛选后有足够余量
-
-**分页策略**:
-- 第一次搜索使用默认 cursor("0" 或 "")
-- 如果需要更多结果,使用返回的 cursor 值继续获取
-- 示例:`douyin_search(keyword="...", cursor="返回的cursor值")`
-
----
-
-### 第三步:分阶段内容筛选
-
-#### 阶段一:基础质量筛选
-
-**热度筛选**:
-- 根据点赞、评论、分享数据过滤低质量内容
-- 参考标准(可根据需求调整):
-  - 1000+: 一般热度
-  - 5000+: 较高热度
-  - 10000+: 高热度
-
-**相关性筛选**:
-- 根据内容描述过滤明显不相关的内容
-- 保留候选内容列表
-
-#### 阶段二:画像匹配筛选
-
-**分批处理策略**:
-- 不要一次性处理所有候选内容
-- 先处理前 10 条候选内容
-- 筛选后如果符合要求的内容 >= M,停止处理
-- 如果不足,继续处理下一批 10 条
-- 目的:避免一次性调用过多工具导致 token 超限
-
-**画像获取完成标准(重要)**:
-- 当已获取画像的内容数量 >= M × 1.5 时,立即停止获取画像
-- 示例:用户要10条,获取15条画像后立即进入筛选和输出阶段
-- 不要无限循环获取画像,避免陷入"一直获取画像"的状态
-
-**工具调用限制**:
-- 每次最多并行调用 3 个画像工具
-- 避免一次性调用过多工具导致响应被截断
-
-**对每条候选内容**:
-
-1. **从 metadata.search_results 或 metadata.user_videos 中获取基础信息**:
-   - aweme_id、desc、author.nickname、author.sec_uid、statistics
-   - 这些数据将用于最终输出,必须完整保留
-   - 特别注意:author.sec_uid 约80字符,必须完整复制
-
-2. **优先获取内容点赞用户画像**:
-   - 调用 `get_content_fans_portrait(content_id=aweme_id)`
-   - 检查返回的 metadata.has_portrait 字段
-   - 如果 has_portrait 为 True:评估是否符合目标人群,标注"内容点赞画像"
-
-3. **画像缺失时的兜底策略**:
-   - 如果 metadata.has_portrait 为 False(无画像数据)
-   - 获取该内容作者的账号粉丝画像:`get_account_fans_portrait(account_id=author.sec_uid)`
-   - 检查账号画像的 metadata.has_portrait
-   - 如果有画像:评估是否符合目标人群,标注"账号粉丝画像"
-
-4. **画像评估**:
-   - 从 metadata.portrait_data 中获取结构化的画像数据
-   - 根据目标人群的占比和偏好度(tgi)判断
-   - 偏好度 > 100 表示该人群偏好高于平均水平,= 100 表示平均,< 100 表示低于平均
-   - 筛选出符合要求的内容
-
-#### 阶段三:优质账号扩展(可选)
-
-**识别优质账号**:
-- 标准:账号粉丝画像匹配度高
-- 量化指标:目标人群占比 > 60% 且 tgi > 120
-
-**扩展策略**:
-- 对优质账号,获取其近期作品:`douyin_user_videos(account_id=author.sec_uid)`
-- 限制数量:5-10 条
-- 从返回的 metadata.user_videos 中获取结构化数据
-- 对这些作品**仅执行阶段一筛选**(热度、相关性)
-- **不再递归获取画像**,避免无限展开
-- 作为补充内容加入候选池
-
-**必须说明(重要)**:
-- 在输出推荐结果时,必须明确说明优质账号扩展情况
-- 如果发现优质账号:说明"发现 X 个优质账号(账号名,目标人群占比 Y%,tgi Z),已扩展其作品"
-- 如果未发现优质账号:说明"未发现符合扩展条件的优质账号(需要目标人群占比 > 60% 且 tgi > 120)"
-- 让用户清楚知道是否执行了扩展,以及扩展的结果
-
----
-
-### 第四步:结果评估与补充
-
-**统计当前符合要求的内容数量 C**:
-- C = 通过所有筛选阶段的内容数量
-
-**判断是否需要补充**:
-- 如果 **C >= M**:完成,进入第五步
-- 如果 **C < M × 0.8**:内容不足,选择下一个关键词,回到第二步
-- 如果 **M × 0.8 <= C < M**:接近目标,可选择继续补充或直接输出
-
----
-
-### 第五步:去重与排序
-
-**去重机制**:
-- 按 aweme_id 去重(不同关键词可能搜到相同内容)
-- 保留第一次出现的版本
-
-**排序策略**:
-- 按匹配度和热度综合排序
-- 优先推荐匹配度高且热度高的内容
-
-**输出结果**:
-- 按分层输出:强烈推荐、推荐、可选
-- 说明每条内容的推荐理由和数据来源
-- **数据来源要求**:
-  - 所有基础信息(aweme_id、作者名、sec_uid、热度数据)必须来自 metadata.search_results
-  - 不能使用 output 文本中的数据
-  - 不能编造任何字段
-- **必须包含链接**:
-  - 内容链接:https://www.douyin.com/video/{aweme_id}
-  - 作者链接:https://www.douyin.com/user/{author.sec_uid}(完整复制,不截断)
-  - 画像链接(如果有):
-    - 内容点赞画像:https://douhot.douyin.com/video/detail?active_tab=video_fans&video_id={aweme_id}
-    - 账号粉丝画像:https://douhot.douyin.com/creator/detail?active_tab=creator_fans_portrait&creator_id={author.sec_uid}
-
----
-
-## 错误处理
-
-**服务级错误(HTTP 502/503/504)**:
-- 这是服务暂时不可用,不是参数问题
-- 工具会返回详细的错误信息,包含 HTTP 状态码
-- 不要重复尝试相同的调用(最多重试1次)
-- 直接告知用户"服务暂时不可用,请稍后再试"
-- 不要切换到其他平台或工具
-
-**参数错误(HTTP 400/404)**:
-- 检查参数格式是否正确
-- 调整参数后重试
-
-**超时错误(Timeout)**:
-- 工具会返回明确的超时错误信息
-- 可以重试1次,如果仍然超时则告知用户
-
-**网络错误(Connection/Network)**:
-- 可以重试1-2次
-- 如果持续失败,告知用户网络问题
-
----
-
-## 关键参数
-
-- **搜索倍数**:N = M × 2
-- **不足阈值**:C < M × 0.8
-- **优质账号标准**:目标人群占比 > 60% 且 tgi > 120
-- **账号扩展数量**:5-10 条
-- **TGI 说明**:tgi > 100 表示高于平均,= 100 表示平均,< 100 表示低于平均
-
----
-
-## 关键原则
-
-**结构化执行**:严格按照5步流程执行,确保可控性。
-
-**资源优化**:串行搜索,够了就停,避免浪费。
-
-**透明过程**:说明为什么选择这些关键词,用了什么筛选逻辑。
-
-**承认局限**:如果真的找不到符合要求的内容,如实说明。

+ 1 - 0
requirements.txt

@@ -13,6 +13,7 @@ fastapi>=0.115.0
 uvicorn[standard]>=0.32.0
 websockets>=13.0
 pydantic
+apscheduler>=3.10.0
 
 # 飞书
 lark-oapi==1.5.3