Просмотр исходного кода

feat:搜索数据存储 & 定时任务逻辑

jihuaqiang 8 часов назад
Родитель
Сommit
89a8673499

+ 20 - 65
examples/content_finder/content_finder.prompt

@@ -6,16 +6,13 @@ temperature: 0.3
 $system$
 你是一个专业的内容寻找助手,帮助运营人员在抖音平台上寻找符合要求的视频内容。
 
-**重要约束**:
+## 重要约束
 - 只在抖音平台搜索,不要切换到其他平台(小红书、B站等)
 - 只使用 douyin_search、douyin_user_videos、get_content_fans_portrait、get_account_fans_portrait、store_results_mysql 这4个工具
 - 不要使用浏览器工具、或其他平台的搜索工具
-- 最终结果需要按要求存储为本地json文件,也需要借助tools存储至远程库
+- **严格禁止调用任何名称以browser_开头的浏览器工具**(调用任何一个都是错误行为):
 
-**严格禁止调用以下浏览器工具**(调用任何一个都是错误行为):
-browser_get_live_url、browser_navigate_to_url、browser_search_web、browser_go_back、browser_wait、browser_click_element、browser_input_text、browser_send_keys、browser_upload_file、browser_scroll_page、browser_find_text、browser_screenshot、browser_switch_tab、browser_close_tab、browser_get_dropdown_options、browser_select_dropdown_option、browser_extract_content、browser_read_long_content、browser_download_direct_url、browser_get_page_html、browser_get_visual_selector_map、browser_evaluate、browser_ensure_login_with_cookies、browser_done、browser_export_cookies、browser_load_cookies
-
-平台背景(仅供参考):
+## 平台背景(仅供参考):
 - 平台载体:微信小程序
 - 核心用户群:95% 是 50 岁以上中老年人
 - 增长方式:微信分享裂变
@@ -23,7 +20,7 @@ browser_get_live_url、browser_navigate_to_url、browser_search_web、browser_go
 
 ## 核心数据使用策略
 
-### 结构化数据优先原则
+### 工具调用结果数据优先提取原则
 - **搜索结果**:从 `metadata.search_results` 获取数据,不要解析 output 文本
 - **账号作品**:从 `metadata.user_videos` 获取数据(格式与 search_results 一致)
 - **画像判断**:使用 `metadata.has_portrait` 字段(True=有画像,False=无画像)
@@ -62,70 +59,22 @@ browser_get_live_url、browser_navigate_to_url、browser_search_web、browser_go
 - **工具调用限制**:每次最多并行调用 3 个画像工具
 - **画像获取完成标准**:获取画像后立即进入筛选阶段,不要继续搜索新内容
 
-## 数据真实性要求(严格遵守)
+### 最终结果存储至远程数据库(必须执行)
+- 使用 store_results_mysql tool工具进行存储
 
+## 数据真实性要求(严格遵守)
 **禁止编造数据**:这是最严重的错误,会导致 404 错误和用户体验问题。
 
-### 唯一数据源
-- 推荐结果的所有数据必须来自 `metadata.search_results` 或 `metadata.user_videos`
-- **禁止**从 output 文本中解析数据(可能有格式问题)
-- **禁止**编造任何不在 metadata 中的数据
-- **禁止**拼接、修改、截断任何字段值
-
-### 字段完整性
-- `author.sec_uid` 必须**逐字符完整复制**(约80字符),不能截断、不能修改
-- 格式检查:必须以 `MS4wLjABAAAA` 开头,后面约 68 个字符
-- `aweme_id`、作者名、热度数据必须来自**同一条记录**,不能混用
-
-### 数据获取步骤(必须遵守)
-1. 从工具返回的 `metadata.search_results` 或 `metadata.user_videos` 中选择一条记录
-2. 从该记录中提取 `author.sec_uid`,**完整复制**,不做任何修改
-3. 使用该 sec_uid 调用画像工具
-4. 如果 metadata 中没有该字段或为空,**不要编造**,标注为"无数据"
-
-### 错误示例(禁止)
-❌ 编造 sec_uid:`MS4wLjABAAAA2Ue8Ks9rkqNmLCy_3bRYCcjmLPXCxQzQOWrGGLZqLmNjFCFUhXJWVLPOxLPO`
-❌ 截断 sec_uid:`MS4wLjABAAAAknWSpc8MaIgiXwRsohQtmeF6dJD0CxofXq4v8QtSVDw5eyehGrb_P4a`
-❌ 从 output 解析:从文本 "sec_uid: MS4w..." 中提取
-❌ 混用字段:用 A 记录的 aweme_id + B 记录的 sec_uid
-
-### 正确示例
-✅ 从 metadata.search_results[0] 中获取:
-```python
-item = metadata.search_results[0]
-aweme_id = item["aweme_id"]  # "7598168772859838016"
-sec_uid = item["author"]["sec_uid"]  # 完整复制,约80字符
-```
-
-### 示例
-如果 metadata.search_results[0] = {
-  "aweme_id": "7598168772859838016",
-  "desc": "养老服务消费补贴全国落地",
-  "author": {
-    "nickname": "宁波养老小翁",
-    "sec_uid": "MS4wLjABAAAAknWSpc8MaIgiXwRsohQtmeF6dJD0CxofXq4v8QtSVDw5eyehGrb_P4aMQisRyUuY"
-  },
-  "statistics": {"digg_count": 343, "comment_count": 33, "share_count": 369}
-}
-
-则输出:
-- 内容链接:https://www.douyin.com/video/7598168772859838016
-- 作者:宁波养老小翁
-- 作者链接:https://www.douyin.com/user/MS4wLjABAAAAknWSpc8MaIgiXwRsohQtmeF6dJD0CxofXq4v8QtSVDw5eyehGrb_P4aMQisRyUuY
-- 热度:👍 343 | 💬 33 | 🔄 369
-
-**违反后果**:编造数据会导致404错误,严重影响用户体验。
-
 ## 输出格式要求
-最终输出必须是 **单个 JSON 对象**(不要夹杂额外文本),并严格遵循 `output_schema`(skills)中定义的结构与字段名(顶层 `trace_id/query/summary/good_account_expansion/contents`)
+最终输出必须严格遵循 Skills 中「输出 JSON Schema」定义的结构与字段名。
 
 ## 任务完成要求
 - 搜索 M × 2 条内容后,立即停止搜索
 - 对所有搜索到的内容获取画像后,立即进入筛选阶段
 - 筛选完成后,立即输出完整的推荐结果
-- 最终输出必须严格遵循 `output_schema`(skills)
-- 输出 JSON 已写入到 %trace_dir% 目录下当次执行的trace_id目录内。
-- 输出已经存储到mysql库中。
+- 最终输出必须严格遵循 Skills 中「输出 JSON Schema」,所有的key都必须严格按照schema的约定
+- 输出已写入到 %trace_dir% 目录下当次执行的trace_id目录内的output.json文件。
+- 输出已经存储到远程数据库中。
 - 输出完整的推荐结果后,任务会自动进行反思和知识保存
 - 反思完成后,输出简短的完成确认:✅ 任务完成!已为您找到 [数量] 条视频,并保存了执行经验
 
@@ -136,10 +85,16 @@ sec_uid = item["author"]["sec_uid"]  # 完整复制,约80字符
 - 获取足够画像后,立即进入筛选和输出阶段
 - 必须输出最终推荐结果,不能在中途停止
 - 所有数据必须来自 metadata,禁止编造
-- 最终输出必须严格遵循 `output_schema`(skills),禁止自创/变体字段名或使用中文 key
-- 输出文件的保存地址严格按照要求,在 %trace_dir% 目录下当次执行的trace_id目录内,不能随意放置。
+- 最终输出必须严格遵循 Skills 中「输出 JSON Schema」,禁止自创/变体字段名或使用中文 key
+- 输出文件的保存地址严格按照要求,在 %trace_dir% 目录下当次执行的trace_id目录内的output.json文件,不能随意放置。
 
 $user$
-%query%
+任务:找10个与「%query%」相关的、老年人感兴趣的视频。
+要求:
+- 适合老年人分享观看
+- 热度要高,质量要好
+
+搜索词: %query%
+搜索词id: %demand_id%(如有)
 
 请开始执行内容寻找任务。记住要多步推理,每次只执行一小步,然后思考下一步该做什么。

+ 15 - 12
examples/content_finder/core.py

@@ -37,20 +37,22 @@ from tools import (
 
 logger = logging.getLogger(__name__)
 
-# 默认 query
-DEFAULT_QUERY = """找10个和"毛主席"相关的,老年人感兴趣的视频。
+# 默认搜索词
+DEFAULT_QUERY = "养生知识"
+DEFAULT_DEMAND_ID = 1
 
-要求:
-- 适合老年人分享观看
-- 热度要高,质量要好"""
 
-
-async def run_agent(query: Optional[str] = None, stream_output: bool = True) -> Dict[str, Any]:
+async def run_agent(
+    query: Optional[str] = None,
+    demand_id: Optional[int] = None,
+    stream_output: bool = True,
+) -> Dict[str, Any]:
     """
     执行 agent 任务
 
     Args:
-        query: 查询内容,None 则使用默认值
+        query: 查询内容(搜索词),None 则使用默认值
+        demand_id: 本次搜索任务 id(int,关联 demand_content 表)
         stream_output: 是否流式输出到 stdout(run.py 需要,server.py 不需要)
 
     Returns:
@@ -61,6 +63,7 @@ async def run_agent(query: Optional[str] = None, stream_output: bool = True) ->
         }
     """
     query = query or DEFAULT_QUERY
+    demand_id = demand_id or DEFAULT_DEMAND_ID
 
     # 加载 prompt
     prompt_path = Path(__file__).parent / "content_finder.prompt"
@@ -69,9 +72,9 @@ async def run_agent(query: Optional[str] = None, stream_output: bool = True) ->
     # output 目录
     trace_dir = os.getenv("TRACE_DIR", ".cache/traces")
 
-    # 构建消息(替换 %query% 和 %trace_dir%)
-    messages = prompt.build_messages(query=query, trace_dir=trace_dir)
-
+    # 构建消息(替换 %query%、%trace_dir%、%demand_id%)
+    demand_id_str = str(demand_id) if demand_id is not None else ""
+    messages = prompt.build_messages(query=query, trace_dir=trace_dir, demand_id=demand_id_str)
 
     # 初始化配置
     api_key = os.getenv("OPEN_ROUTER_API_KEY")
@@ -95,6 +98,7 @@ async def run_agent(query: Optional[str] = None, stream_output: bool = True) ->
         "douyin_user_videos",
         "get_content_fans_portrait",
         "get_account_fans_portrait",
+        "store_results_mysql",
     ]
 
     runner = AgentRunner(
@@ -132,7 +136,6 @@ async def run_agent(query: Optional[str] = None, stream_output: bool = True) ->
 
                 if item.status == "completed":
                     logger.info(f"Agent 执行完成: trace_id={trace_id}")
-                    logger.info(f"结果------: {item}")
                     return {
                         "trace_id": trace_id,
                         "status": "completed"

+ 26 - 0
examples/content_finder/db/__init__.py

@@ -0,0 +1,26 @@
+"""
+数据库封装模块
+
+- connection: 共享数据库连接
+- schedule: 定时任务相关(demand_content、demand_find_task)
+- store_results: 推荐结果写入(demand_find_author、demand_find_content_result)
+"""
+
+from .connection import get_connection
+from .schedule import (
+    get_next_unprocessed_demand,
+    create_task_record,
+    update_task_status,
+    update_task_on_complete,
+)
+from .store_results import upsert_good_authors, insert_contents
+
+__all__ = [
+    "get_connection",
+    "get_next_unprocessed_demand",
+    "create_task_record",
+    "update_task_status",
+    "update_task_on_complete",
+    "upsert_good_authors",
+    "insert_contents",
+]

+ 25 - 0
examples/content_finder/db/connection.py

@@ -0,0 +1,25 @@
+"""共享数据库连接"""
+
+import os
+
+import pymysql
+
+
+def get_connection():
+    """获取数据库连接(与 store_results_mysql、schedule 共用配置)"""
+    host = os.getenv("DB_HOST", "rm-t4nh1xx6o2a6vj8qu3o.mysql.singapore.rds.aliyuncs.com")
+    port = int(os.getenv("DB_PORT", "3306"))
+    user = os.getenv("DB_USER", "content_rw")
+    password = os.getenv("DB_PASSWORD", "bC1aH4bA1lB0")
+    database = os.getenv("DB_NAME", "content-deconstruction-supply")
+
+    return pymysql.connect(
+        host=host,
+        port=port,
+        user=user,
+        password=password,
+        database=database,
+        charset="utf8mb4",
+        cursorclass=pymysql.cursors.DictCursor,
+        autocommit=True,
+    )

+ 128 - 0
examples/content_finder/db/schedule.py

@@ -0,0 +1,128 @@
+"""
+定时任务相关数据库操作
+
+demand_content: 原始检索内容库
+demand_find_task: 执行记录表,通过 demand_content_id 关联
+"""
+
+import logging
+from typing import Any, Dict, Optional
+
+from .connection import get_connection
+
+logger = logging.getLogger(__name__)
+
+# 状态常量(与 demand_find_task 表一致)
+STATUS_PENDING = 0
+STATUS_RUNNING = 1
+STATUS_SUCCESS = 2
+STATUS_FAILED = 3
+
+
+def get_next_unprocessed_demand() -> Optional[Dict[str, Any]]:
+    """
+    联表查询 demand_content 和 demand_find_task,找到创建最早且未处理的 demand_content。
+
+    未处理定义:该 demand_content_id 下无 status 为 0/1/2 的任务
+    (即无待执行、执行中、成功的记录)
+
+    Returns:
+        {"demand_content_id": int, "query": str} 或 None
+    """
+    sql = """
+    SELECT dc.id AS demand_content_id,
+           dc.name AS query
+    FROM demand_content dc
+    WHERE NOT EXISTS (
+        SELECT 1 FROM demand_find_task t
+        WHERE t.demand_content_id = dc.id AND t.status IN (%s, %s, %s)
+    )
+    ORDER BY dc.id ASC
+    LIMIT 1
+    """
+    conn = None
+    try:
+        conn = get_connection()
+        with conn.cursor() as cur:
+            cur.execute(sql, (STATUS_PENDING, STATUS_RUNNING, STATUS_SUCCESS))
+            row = cur.fetchone()
+            return dict(row) if row else None
+    except Exception as e:
+        logger.error(f"get_next_unprocessed_demand 失败: {e}", exc_info=True)
+        raise
+    finally:
+        if conn:
+            conn.close()
+
+
+def create_task_record(demand_content_id: int, trace_id: str = "", status: int = STATUS_PENDING) -> None:
+    """
+    在 demand_find_task 中新增一条记录。
+
+    初始创建时 trace_id 可置为空字符串,任务完成后通过 update_task_on_complete 更新。
+    """
+    sql = """
+    INSERT INTO demand_find_task (trace_id, demand_content_id, status)
+    VALUES (%s, %s, %s)
+    """
+    conn = None
+    try:
+        conn = get_connection()
+        with conn.cursor() as cur:
+            cur.execute(sql, (trace_id, demand_content_id, status))
+        logger.info(f"创建任务记录: demand_content_id={demand_content_id}")
+    except Exception as e:
+        logger.error(f"create_task_record 失败: {e}", exc_info=True)
+        raise
+    finally:
+        if conn:
+            conn.close()
+
+
+def update_task_on_complete(demand_content_id: int, trace_id: str, status: int) -> None:
+    """
+    任务完成后更新 trace_id 和 status。
+    匹配 trace_id 为空字符串的记录(初始创建时的占位)。
+    """
+    sql = """
+    UPDATE demand_find_task
+    SET trace_id = %s, status = %s
+    WHERE demand_content_id = %s AND trace_id = ''
+    """
+    conn = None
+    try:
+        conn = get_connection()
+        with conn.cursor() as cur:
+            cur.execute(sql, (trace_id, status, demand_content_id))
+        logger.info(f"更新任务完成: demand_content_id={demand_content_id}, trace_id={trace_id}, status={status}")
+    except Exception as e:
+        logger.error(f"update_task_on_complete 失败: {e}", exc_info=True)
+        raise
+    finally:
+        if conn:
+            conn.close()
+
+
+def update_task_status(trace_id: str, demand_content_id: int, status: int) -> None:
+    """
+    更新 demand_find_task 中指定记录的状态。
+
+    trace_id 可为空字符串(任务尚未返回时,通过 demand_content_id 定位记录)。
+    """
+    sql = """
+    UPDATE demand_find_task
+    SET status = %s
+    WHERE trace_id = %s AND demand_content_id = %s
+    """
+    conn = None
+    try:
+        conn = get_connection()
+        with conn.cursor() as cur:
+            cur.execute(sql, (status, trace_id, demand_content_id))
+        logger.info(f"更新任务状态: trace_id={trace_id}, demand_content_id={demand_content_id}, status={status}")
+    except Exception as e:
+        logger.error(f"update_task_status 失败: {e}", exc_info=True)
+        raise
+    finally:
+        if conn:
+            conn.close()

+ 114 - 0
examples/content_finder/db/store_results.py

@@ -0,0 +1,114 @@
+"""
+推荐结果写入(demand_find_author、demand_find_content_result 表)
+"""
+from typing import Any, Dict, List, Optional
+
+
+def upsert_good_authors(
+    conn,
+    trace_id: str,
+    good_account_block: Optional[Dict[str, Any]],
+) -> int:
+    """
+    将 good_account_expansion 中的 accounts 写入 demand_find_author 表。
+    """
+    if not good_account_block:
+        return 0
+
+    if not good_account_block.get("enabled"):
+        return 0
+
+    accounts: List[Dict[str, Any]] = good_account_block.get("accounts") or []
+    if not accounts:
+        return 0
+
+    sql = """
+    INSERT INTO demand_find_author (trace_id, author_name, author_link, elderly_ratio, elderly_tgi, remark)
+    VALUES (%s, %s, %s, %s, %s, %s)
+    ON DUPLICATE KEY UPDATE
+      elderly_ratio = VALUES(elderly_ratio),
+      elderly_tgi = VALUES(elderly_tgi),
+      remark = VALUES(remark)
+    """
+    with conn.cursor() as cur:
+        rows = 0
+        for acc in accounts:
+            author_name = acc.get("author_nickname") or ""
+            author_link = acc.get("author_url") or ""
+            if not author_name or not author_link:
+                sec_uid = acc.get("author_sec_uid")
+                if sec_uid and not author_link:
+                    author_link = f"https://www.douyin.com/user/{sec_uid}"
+            if not author_name or not author_link:
+                continue
+
+            elderly_ratio = acc.get("age_50_plus_ratio") or ""
+            elderly_tgi = acc.get("age_50_plus_tgi") or ""
+            remark = acc.get("reason") or acc.get("remark") or ""
+            cur.execute(
+                sql,
+                (
+                    trace_id,
+                    author_name,
+                    author_link,
+                    str(elderly_ratio) if elderly_ratio is not None else None,
+                    str(elderly_tgi) if elderly_tgi is not None else None,
+                    remark or None,
+                ),
+            )
+            rows += cur.rowcount
+        return rows
+
+
+def insert_contents(
+    conn,
+    trace_id: str,
+    query: str,
+    demand_content_id: int,
+    contents: List[Dict[str, Any]],
+) -> int:
+    """
+    将 contents 列表写入 demand_find_content_result 表。
+    """
+    if not contents:
+        return 0
+
+    sql = """
+    INSERT INTO demand_find_content_result (
+      trace_id, query, rank_no, video_url, title, author_name, author_link,
+      digg_count, comment_count, share_count,
+      portrait_source, elderly_ratio, elderly_tgi, recommendation_reason,
+      demand_content_id
+    ) VALUES (
+      %s, %s, %s, %s, %s, %s, %s,
+      %s, %s, %s,
+      %s, %s, %s, %s,
+      %s
+    )
+    """
+    with conn.cursor() as cur:
+        rows = 0
+        for item in contents:
+            video_url = item.get("video_url") or ""
+            cur.execute(
+                sql,
+                (
+                    trace_id,
+                    query,
+                    int(item.get("rank") or item.get("rank_no") or 0),
+                    video_url,
+                    item.get("title") or "",
+                    item.get("author_nickname") or "",
+                    item.get("author_url") or "",
+                    int(item.get("statistics", {}).get("digg_count") or 0),
+                    int(item.get("statistics", {}).get("comment_count") or 0),
+                    int(item.get("statistics", {}).get("share_count") or 0),
+                    item.get("portrait_data").get("source") or "",
+                    str(item.get("portrait_data").get("age_50_plus_ratio")  or ""),
+                    str(item.get("portrait_data").get("age_50_plus_tgi") or ""),
+                    item.get("reason") or "",
+                    demand_content_id,
+                ),
+            )
+            rows += cur.rowcount
+        return rows

+ 1 - 2
examples/content_finder/run.py

@@ -35,8 +35,7 @@ logger = logging.getLogger(__name__)
 async def main():
     """主函数"""
     try:
-        # 使用 core.py 的共享逻辑,启用流式输出
-        result = await core.run_agent(query=None, stream_output=True)
+        result = await core.run_agent(query=None, demand_id=None, stream_output=True)
 
         if result["status"] == "completed":
             print(f"\n[完成] trace_id={result['trace_id']}")

+ 64 - 59
examples/content_finder/server.py

@@ -3,13 +3,14 @@
 
 提供:
 1. API 接口:POST /api/tasks - 触发内容寻找任务
-2. 定时调度:每 10 分钟调用外部 API 获取 query 并执行任务
+2. 定时调度:每 10 分钟从数据库联表查询未处理需求并执行任务
 3. 并发控制:限制最大并发任务数
 """
 
 import asyncio
 import logging
 import os
+import uuid
 from datetime import datetime
 from pathlib import Path
 from typing import Optional
@@ -17,7 +18,6 @@ import sys
 
 sys.path.insert(0, str(Path(__file__).parent.parent.parent))
 
-import httpx
 from fastapi import FastAPI, HTTPException
 from pydantic import BaseModel
 from apscheduler.schedulers.asyncio import AsyncIOScheduler
@@ -26,6 +26,8 @@ from dotenv import load_dotenv
 load_dotenv()
 
 import core
+from db import get_next_unprocessed_demand, create_task_record, update_task_status, update_task_on_complete
+from db.schedule import STATUS_RUNNING, STATUS_SUCCESS, STATUS_FAILED
 
 # 配置日志
 log_dir = Path(__file__).parent / '.cache'
@@ -68,6 +70,7 @@ stats = {
 
 class TaskRequest(BaseModel):
     query: Optional[str] = None
+    demand_id: Optional[int] = None
 
 
 class TaskResponse(BaseModel):
@@ -79,12 +82,25 @@ class TaskResponse(BaseModel):
 
 # ============ 核心函数 ============
 
-async def execute_task(query: str, task_type: str = "api"):
+def _update_scheduled_task_complete(demand_id: int, trace_id: str, status: int) -> None:
+    """定时任务完成时更新 trace_id 和 status,静默处理异常"""
+    try:
+        update_task_on_complete(demand_id, trace_id, status)
+    except Exception as e:
+        logger.warning(f"更新任务状态失败: {e}")
+
+
+async def execute_task(
+    query: str,
+    demand_id: Optional[int] = None,
+    task_type: str = "api",
+):
     """
     执行任务(带并发控制)
 
     Args:
         query: 查询内容
+        demand_id: 需求 id(demand_content.id,关联 demand_content 表)
         task_type: 任务类型("api" 或 "scheduled")
     """
     async with task_semaphore:
@@ -96,23 +112,35 @@ async def execute_task(query: str, task_type: str = "api"):
         if task_type == "scheduled":
             stats["scheduled_tasks"] += 1
 
-        try:
-            # 执行 agent(不流式输出)
-            result = await core.run_agent(query, stream_output=False)
+        if task_type == "scheduled" and demand_id is not None:
+            try:
+                update_task_status("", demand_id, STATUS_RUNNING)
+            except Exception as e:
+                logger.warning(f"更新任务状态为执行中失败: {e}")
 
+        try:
+            result = await core.run_agent(
+                query, demand_id=demand_id, stream_output=False, trace_id=None
+            )
             duration = (datetime.now() - start_time).total_seconds()
 
             if result["status"] == "completed":
                 stats["completed_tasks"] += 1
                 logger.info(f"任务完成 [{task_type}]: trace_id={result['trace_id']}, 耗时={duration:.1f}s")
+                if task_type == "scheduled" and demand_id is not None:
+                    _update_scheduled_task_complete(demand_id, result["trace_id"], STATUS_SUCCESS)
             else:
                 stats["failed_tasks"] += 1
                 logger.error(f"任务失败 [{task_type}]: trace_id={result.get('trace_id')}, 错误={result.get('error')}, 耗时={duration:.1f}s")
+                if task_type == "scheduled" and demand_id is not None:
+                    _update_scheduled_task_complete(demand_id, result.get("trace_id") or "", STATUS_FAILED)
 
         except Exception as e:
             stats["failed_tasks"] += 1
             duration = (datetime.now() - start_time).total_seconds()
             logger.error(f"任务异常 [{task_type}]: {e}, 耗时={duration:.1f}s", exc_info=True)
+            if task_type == "scheduled" and demand_id is not None:
+                _update_scheduled_task_complete(demand_id, "", STATUS_FAILED)
 
 
 async def scheduled_task():
@@ -120,50 +148,29 @@ async def scheduled_task():
     定时任务:每 10 分钟执行一次
 
     流程:
-    1. 调用外部 API 获取 query
-    2. 如果成功,执行任务
-    3. 如果失败,跳过本次执行(不使用兜底)
+    1. 联表查询 demand_content + demand_find_task,获取创建时间最早的未处理的 demand_content
+    2. 在 demand_find_task 新增记录
+    3. 调用 execute_task 执行
     """
     logger.info("定时任务触发")
 
-    try:
-        # 1. 调用外部 API 获取 query
-        query_api = os.getenv("SCHEDULE_QUERY_API")
-        if not query_api:
-            logger.warning("未配置 SCHEDULE_QUERY_API,跳过定时任务")
-            return
-
-        timeout = float(os.getenv("SCHEDULE_QUERY_API_TIMEOUT", "10.0"))
-
-        async with httpx.AsyncClient() as client:
-            headers = {}
-            logger.info(f"调用外部 API: {query_api}")
-            response = await client.get(
-                query_api,
-                headers=headers,
-                timeout=timeout
-            )
-            response.raise_for_status()
-            data = response.json()
-
-        # 2. 提取 query
-        query = data.get("query")
-        if not query:
-            logger.info("定时任务跳过:外部 API 返回的 query 为空")
-            return
-
-        # 3. 执行任务
-        logger.info(f"定时任务启动: query={query[:50]}...")
-        asyncio.create_task(execute_task(query, task_type="scheduled"))
-
-    except httpx.HTTPStatusError as e:
-        logger.error(f"定时任务失败:外部 API 返回错误 {e.response.status_code}: {e.response.text}")
-    except httpx.RequestError as e:
-        logger.error(f"定时任务失败:外部 API 请求失败: {e}")
-    except httpx.TimeoutException:
-        logger.error(f"定时任务失败:外部 API 请求超时")
-    except Exception as e:
-        logger.error(f"定时任务失败:未知错误: {e}", exc_info=True)
+    demand = get_next_unprocessed_demand()
+    if not demand:
+        logger.info("定时任务跳过:无待处理需求")
+        return
+
+    query = demand.get("query") or ""
+    if not query:
+        logger.info("定时任务跳过:该需求的 query 为空")
+        return
+
+    demand_content_id = demand.get("demand_content_id")
+    if demand_content_id is None:
+        logger.warning("定时任务跳过:demand_content_id 为空")
+        return
+
+    create_task_record(demand_content_id)  # trace_id 初始为空,完成后更新
+    asyncio.create_task(execute_task(query=query, demand_id=demand_content_id, task_type="scheduled"))
 
 
 # ============ API 接口 ============
@@ -184,8 +191,9 @@ async def create_task(request: TaskRequest):
             "message": "任务已启动,结果将保存到 .cache/traces/xxx/"
         }
     """
-    # 获取 query
+    # 获取 query 和 demand_id
     query = request.query or core.DEFAULT_QUERY
+    demand_id = request.demand_id
 
     # 用 Event 等待 trace_id
     trace_id_ready = asyncio.Event()
@@ -205,7 +213,9 @@ async def create_task(request: TaskRequest):
 
                 prompt_path = Path(__file__).parent / "content_finder.prompt"
                 prompt = SimplePrompt(prompt_path)
-                messages = prompt.build_messages(query=query)
+                trace_dir = os.getenv("TRACE_DIR", ".cache/traces")
+                demand_id_str = str(demand_id) if demand_id is not None else ""
+                messages = prompt.build_messages(query=query, trace_dir=trace_dir, demand_id=demand_id_str)
 
                 api_key = os.getenv("OPEN_ROUTER_API_KEY")
                 model_name = prompt.config.get("model", "sonnet-4.6")
@@ -223,6 +233,7 @@ async def create_task(request: TaskRequest):
                     "douyin_user_videos",
                     "get_content_fans_portrait",
                     "get_account_fans_portrait",
+                    "store_results_mysql",
                 ]
 
                 runner = AgentRunner(
@@ -328,16 +339,10 @@ async def startup():
     logger.info("内容寻找服务启动中...")
     logger.info(f"最大并发任务数: {MAX_CONCURRENT_TASKS}")
 
-    # 配置定时任务
-    query_api = os.getenv("SCHEDULE_QUERY_API")
-    if query_api:
-        # 每 10 分钟执行一次
-        scheduler.add_job(scheduled_task, "cron", minute="*/10")
-        scheduler.start()
-        logger.info(f"定时任务已启动:每 10 分钟执行一次")
-        logger.info(f"外部 API: {query_api}")
-    else:
-        logger.info("未配置 SCHEDULE_QUERY_API,定时任务未启动")
+    # 配置定时任务(从 demand_content 联表查询未处理需求,无需外部 API)
+    scheduler.add_job(scheduled_task, "cron", minute="*/10")
+    scheduler.start()
+    logger.info("定时任务已启动:每 10 分钟执行一次(从数据库获取待处理需求)")
 
     logger.info("服务启动完成")
     logger.info("=" * 60)

+ 13 - 11
examples/content_finder/skills/content_filtering_strategy.md

@@ -173,21 +173,23 @@
 
 #### 输出内容
 
-**每条内容包含**:
-- 内容基本信息(ID、描述、作者)
+> **重要**:输出必须严格遵循本 Skills 中「输出 JSON Schema」定义的 JSON 结构,**禁止使用中文 key**,以下中文仅为语义说明,对应到 schema 中的英文 key。
+
+**每条内容(对应 `contents[].*`)包含**:
+- 内容基本信息 → `title`、`aweme_id`、`author_nickname`、`author_sec_uid`、`video_url`、`author_url`
   - **数据来源**:必须从 metadata.search_results 或 metadata.user_videos 中获取
   - **sec_uid 要求**:必须完整复制(约80字符),不能截断
-- 热度数据(点赞、评论、分享
+- 该内容热度数据 → `statistics`(digg_count、comment_count、share_count
   - **数据来源**:必须从 metadata 中的 statistics 字段获取
-- 画像数据(目标人群占比、tgi
-  - **数据来源**:从 metadata.portrait_data 中获取
+- 该内容的画像数据 → `portrait_data`(
+  - **数据来源**:从 metadata.portrait_data 中获取,输出key为source、age_50_plus_ratio、age_50_plus_tgi、url字段。
   - **有效性判断**:通过 metadata.has_portrait 字段判断
-- 数据来源标注
-- 推荐理由
-- **必须包含链接**:
-  - 内容链接:https://www.douyin.com/video/{aweme_id}
-  - 作者链接:https://www.douyin.com/user/{author.sec_uid}
-  - 画像链接(如果 has_portrait 为 True):
+- 数据来源标注 → `portrait.source`(content_like | account_fans | none)
+- 推荐理由 → `reasons`(字符串数组)
+- **链接的拼接规则**:
+  - video_url(内容链接:https://www.douyin.com/video/{aweme_id}
+  - author_url(作者链接:https://www.douyin.com/user/{author.sec_uid}
+  - portrait_data.url(画像链接)(如果 has_portrait 为 True):
     - 内容点赞画像:https://douhot.douyin.com/video/detail?active_tab=video_fans&video_id={aweme_id}
     - 账号粉丝画像:https://douhot.douyin.com/creator/detail?active_tab=creator_fans_portrait&creator_id={author.sec_uid}
 

+ 33 - 14
examples/content_finder/skills/content_finding_strategy.md

@@ -98,6 +98,39 @@
    - 偏好度 > 100 表示该人群偏好高于平均水平,= 100 表示平均,< 100 表示低于平均
    - 筛选出符合要求的内容
 
+5. **数据获取步骤(必须遵守)**:
+   ### 唯一数据源
+   - 推荐结果的所有数据必须来自 ToolResult 的 `metadata.search_results` 或 `metadata.user_videos`
+   - **禁止**从 metadata.output 中解析数据(可能有格式问题)
+   - **禁止**编造任何不在 metadata 中的数据
+   - **禁止**拼接、修改、截断任何字段值
+
+   ### 字段完整性
+   - `author.sec_uid` 必须**逐字符完整复制**(约80字符),不能截断、不能修改
+   - 格式检查:必须以 `MS4wLjABAAAA` 开头,后面约 68 个字符
+   - `aweme_id`、作者名、热度数据必须来自**同一条记录**,不能混用
+
+   1. 从工具返回的 `metadata.search_results` 或 `metadata.user_videos` 中选择一条记录
+   2. 从该记录中提取 `author.sec_uid`,**完整复制**,不做任何修改
+   3. 使用该 sec_uid 调用画像工具
+   4. 如果 metadata 中没有该字段或为空,**不要编造**,标注为"无数据"
+
+   ### 错误示例(禁止)
+   ❌ 编造 sec_uid:`MS4wLjABAAAA2Ue8Ks9rkqNmLCy_3bRYCcjmLPXCxQzQOWrGGLZqLmNjFCFUhXJWVLPOxLPO`
+   ❌ 截断 sec_uid:`MS4wLjABAAAAknWSpc8MaIgiXwRsohQtmeF6dJD0CxofXq4v8QtSVDw5eyehGrb_P4a`
+   ❌ 从 output 解析:从文本 "sec_uid: MS4w..." 中提取
+   ❌ 混用字段:用 A 记录的 aweme_id + B 记录的 sec_uid
+
+   ### 正确示例
+   ✅ 从 metadata.search_results[0] 中获取:
+   ```python
+      item = metadata.search_results[0]
+      aweme_id = item["aweme_id"]  # "7598168772859838016"
+      sec_uid = item["author"]["sec_uid"]  # 完整复制,约80字符
+   ```
+
+   **违反后果**:编造数据会导致404错误,严重影响用户体验。
+
 #### 阶段三:优质账号扩展(可选)
 
 **识别优质账号**:
@@ -142,20 +175,6 @@
 - 按匹配度和热度综合排序
 - 优先推荐匹配度高且热度高的内容
 
-**输出结果**:
-- 按分层输出:强烈推荐、推荐、可选
-- 说明每条内容的推荐理由和数据来源
-- **数据来源要求**:
-  - 所有基础信息(aweme_id、作者名、sec_uid、热度数据)必须来自 metadata.search_results
-  - 不能使用 output 文本中的数据
-  - 不能编造任何字段
-- **必须包含链接**:
-  - 内容链接:https://www.douyin.com/video/{aweme_id}
-  - 作者链接:https://www.douyin.com/user/{author.sec_uid}(完整复制,不截断)
-  - 画像链接(如果有):
-    - 内容点赞画像:https://douhot.douyin.com/video/detail?active_tab=video_fans&video_id={aweme_id}
-    - 账号粉丝画像:https://douhot.douyin.com/creator/detail?active_tab=creator_fans_portrait&creator_id={author.sec_uid}
-
 ---
 
 ## 错误处理

+ 13 - 10
examples/content_finder/skills/output_schema.md

@@ -2,13 +2,12 @@
 输出 JSON 写入到当次执行的 trace_id 目录内的 `output.json` 文件。
 **获取路径方式**:先调用 `get_current_context` 获取 `trace_id` 和 `trace_dir`,再使用 `write_file` 写入 `{trace_dir}/{trace_id}/output.json`。
 
-# 输出 JSON Schema(content_finder)
-最终必须输出 **单个 JSON 对象**(不要夹杂额外文本),字段名必须严格一致,禁止中文 key / 禁止自创变体 key。
-
+# **输出 JSON Schema**
 ```json
 {
   "trace_id": "<由系统生成的真实 trace_id;如果你不知道就填空字符串,程序会覆盖修正>",
   "query": "<本次任务的 query>",
+  "demand_id": "<来自 user 消息的搜索词 id",
   "summary": {
     "candidate_count": 0,
     "portrait_content_like_count": 0,
@@ -30,24 +29,24 @@
   "contents": [
     {
       "title": "<来自 metadata 的标题/desc>",
-      "aweme_id": "<来自 metadata>",
+      "aweme_id": "内容id",
+      "rank": "排名",
       "video_url": "https://www.douyin.com/video/<aweme_id>",
-      "author_nickname": "<来自 metadata>",
-      "author_sec_uid": "<来自 metadata,必须完整复制>",
+      "author_nickname": "作者名",
+      "author_sec_uid": "作者id",
       "author_url": "https://www.douyin.com/user/<author_sec_uid>",
-      "hotness": {
+      "statistics": {
         "digg_count": 0,
         "comment_count": 0,
         "share_count": 0
       },
-      "portrait": {
+      "portrait_data": {
         "source": "content_like | account_fans | none",
         "age_50_plus_ratio": null,
         "age_50_plus_tgi": null,
         "url": null
       },
-      "reasons": ["<入选理由1>", "<入选理由2>"],
-      "tags": ["<可选标签>"]
+      "reason": "<入选理由>"
     }
   ]
 }
@@ -58,3 +57,7 @@
 - `portrait.source="account_fans"` → `portrait.url = https://douhot.douyin.com/creator/detail?active_tab=creator_fans_portrait&creator_id={author_sec_uid}`
 - `portrait.source="none"` → `portrait.url=null`,并且画像字段都为 null
 
+## JSON 编写规范
+- 字符串值中若有双引号 `"`,必须写成 `\"`(反斜杠 + 双引号)
+- 若有反斜杠 `\`,必须写成 `\\`
+- 若标题含引号,建议使用中文引号「」避免转义,或严格转义为 \"

+ 45 - 182
examples/content_finder/tools/store_results_mysql.py

@@ -3,215 +3,68 @@
 
 约定:
 - 输入参数:trace_id(字符串)
-- 数据来源:.cache/traces/{trace_id}/recommendations.json
+- 数据来源:{TRACE_DIR}/{trace_id}/output.json
 - 表结构:good_authors, contents(字段见下面 SQL 注释)
 """
-
+import asyncio
 import json
 import logging
 import os
 from pathlib import Path
-from typing import Any, Dict, List, Optional
-
-import pymysql
+from typing import Any, Dict
 
 from agent.tools import tool, ToolResult
 
-logger = logging.getLogger(__name__)
+from db import get_connection, insert_contents, upsert_good_authors
 
-
-def _get_connection():
-    host = os.getenv("DB_HOST", "rm-t4nh1xx6o2a6vj8qu3o.mysql.singapore.rds.aliyuncs.com")
-    port = int(os.getenv("DB_PORT", "3306"))
-    user = os.getenv("DB_USER", "content_rw")
-    password = os.getenv("DB_PASSWORD", "bC1aH4bA1lB0")
-    database = os.getenv("DB_NAME", "content-deconstruction-supply")
-
-    return pymysql.connect(
-        host=host,
-        port=port,
-        user=user,
-        password=password,
-        database=database,
-        charset="utf8mb4",
-        cursorclass=pymysql.cursors.DictCursor,
-        autocommit=True,
-    )
+logger = logging.getLogger(__name__)
 
 
-def _load_recommendations(trace_id: str) -> Dict[str, Any]:
-    """
-    按约定路径读取推荐结果:
-    - 优先:{TRACE_DIR}/{trace_id}/output.json  (与 output_schema.md 保持一致)
-    - 兼容:{TRACE_DIR}/{trace_id}/recommendations.json
-    """
+def _load_output(trace_id: str) -> Dict[str, Any]:
+    """从 {TRACE_DIR}/{trace_id}/output.json 读取输出数据。"""
     trace_root = Path(os.getenv("TRACE_DIR", ".cache/traces"))
-    base = trace_root / trace_id
+    path = trace_root / trace_id / "output.json"
 
-    candidates = [
-        base / "output.json",
-        base / "recommendations.json",
-    ]
+    if not path.exists():
+        raise FileNotFoundError(f"output.json not found for trace_id={trace_id}: {path}")
 
-    for path in candidates:
-        if path.exists():
-            with path.open("r", encoding="utf-8") as f:
-                return json.load(f)
-
-    raise FileNotFoundError(
-        f"no recommendations JSON found for trace_id={trace_id}, tried: "
-        + ", ".join(str(p) for p in candidates)
-    )
+    with path.open("r", encoding="utf-8") as f:
+        return json.load(f)
 
 
-def _upsert_good_authors(
-    conn,
-    trace_id: str,
-    good_account_block: Optional[Dict[str, Any]],
-) -> int:
-    """
-    将 good_account_expansion 中的 accounts 写入 good_authors 表。
-
-    约定表结构示例:
-    CREATE TABLE demand_find_author (
-      id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
-      trace_id VARCHAR(64) NOT NULL,
-      author_name VARCHAR(255) NOT NULL,
-      author_link VARCHAR(512) NOT NULL,
-      reason TEXT,
-      expanded_count INT DEFAULT 0,
-      PRIMARY KEY (id),
-      KEY idx_demand_find_author_trace (trace_id),
-      UNIQUE KEY uk_demand_find_author_trace_author (trace_id, author_link)
-    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-    """
-    if not good_account_block:
-        return 0
-
-    if not good_account_block.get("found"):
-        return 0
-
-    accounts: List[Dict[str, Any]] = good_account_block.get("accounts") or []
-    if not accounts:
-        return 0
-
-    sql = """
-    INSERT INTO demand_find_author (trace_id, author_name, author_link, reason, expanded_count)
-    VALUES (%s, %s, %s, %s, %s)
-    ON DUPLICATE KEY UPDATE
-      reason = VALUES(reason),
-      expanded_count = VALUES(expanded_count)
-    """
-    with conn.cursor() as cur:
-        rows = 0
-        for acc in accounts:
-            author_name = acc.get("account_name") or acc.get("author_name") or ""
-            author_link = acc.get("author_link") or ""
-            if not author_name or not author_link:
-                # 如果只给了 sec_uid,可以由上层补 author_link
-                sec_uid = acc.get("sec_uid")
-                if sec_uid and not author_link:
-                    author_link = f"https://www.douyin.com/user/{sec_uid}"
-            if not author_name or not author_link:
-                continue
-
-            reason = acc.get("reason") or ""
-            expanded_count = int(acc.get("expanded_count") or 0)
-            cur.execute(sql, (trace_id, author_name, author_link, reason, expanded_count))
-            rows += cur.rowcount
-        return rows
-
-
-def _insert_contents(
-    conn,
-    trace_id: str,
-    contents: List[Dict[str, Any]],
-) -> int:
-    """
-    将 contents 列表写入 contents 表。
-
-    约定表结构示例:
-    CREATE TABLE demand_find_content_result (
-      id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
-      trace_id VARCHAR(64) NOT NULL,
-      rank INT NOT NULL,
-      content_link VARCHAR(512) NOT NULL,
-      title TEXT NOT NULL,
-      author_name VARCHAR(255) NOT NULL,
-      author_link VARCHAR(512) NOT NULL,
-      digg_count BIGINT DEFAULT 0,
-      comment_count BIGINT DEFAULT 0,
-      share_count BIGINT DEFAULT 0,
-      portrait_source VARCHAR(255),
-      elderly_ratio VARCHAR(255),
-      elderly_tgi VARCHAR(255),
-      recommendation_reason TEXT,
-      PRIMARY KEY (id),
-      KEY idx_demand_find_content_trace (trace_id),
-      KEY idx_demand_find_content_author (author_link)
-    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-    """
-    if not contents:
-        return 0
-
-    sql = """
-    INSERT INTO demand_find_content_result (
-      trace_id, rank, content_link, title, author_name, author_link,
-      digg_count, comment_count, share_count,
-      portrait_source, elderly_ratio, elderly_tgi, recommendation_reason
-    ) VALUES (
-      %s, %s, %s, %s, %s, %s,
-      %s, %s, %s,
-      %s, %s, %s, %s
-    )
-    """
-    with conn.cursor() as cur:
-        rows = 0
-        for item in contents:
-            cur.execute(
-                sql,
-                (
-                    trace_id,
-                    int(item.get("rank") or 0),
-                    item.get("content_link") or "",
-                    item.get("title") or "",
-                    item.get("author_name") or "",
-                    item.get("author_link") or "",
-                    int(item.get("heat_metrics", {}).get("digg_count") or 0),
-                    int(item.get("heat_metrics", {}).get("comment_count") or 0),
-                    int(item.get("heat_metrics", {}).get("share_count") or 0),
-                    item.get("portrait_source") or "",
-                    str(item.get("elderly_ratio") or ""),
-                    str(item.get("elderly_tgi") or ""),
-                    item.get("recommendation_reason") or "",
-                ),
-            )
-            rows += cur.rowcount
-        return rows
-
-
-@tool(description="将推荐结果写入 MySQL(good_authors + contents)")
+@tool(description="将推荐结果写入 MySQL")
 async def store_results_mysql(trace_id: str) -> ToolResult:
     """
-    根据 trace_id 读取对应的 recommendations.json,并写入 MySQL 的两个表:
-    - demand_find_author:优质账号信息
-    - demand_find_content_result:推荐内容列表
+    根据 trace_id 读取 output.json,并写入 MySQL。
+    demand_content_id 从 output 的 demand_id 字段获取,需在 output_schema 中输出。
     """
     try:
-        data = _load_recommendations(trace_id)
+        data = _load_output(trace_id)
     except Exception as e:
-        msg = f"加载 recommendations.json 失败: {e}"
+        msg = f"加载 output.json 失败: {e}"
+        logger.error(msg)
+        return ToolResult(title="存储推荐结果", output=msg, metadata={"ok": False, "error": str(e)})
+
+    demand_content_id = data.get("demand_id")
+    if demand_content_id is not None and not isinstance(demand_content_id, int):
+        try:
+            demand_content_id = int(demand_content_id)
+        except (ValueError, TypeError):
+            demand_content_id = None
+    if demand_content_id is None:
+        msg = "demand_id 必填:请在 output 的 demand_id 字段中输出(来自 user 消息的搜索词 id)"
         logger.error(msg)
-        return ToolResult(output=msg, metadata={"ok": False, "error": str(e)})
+        return ToolResult(title="存储推荐结果", output=msg, metadata={"ok": False, "error": msg})
 
     conn = None
     try:
-        conn = _get_connection()
-        good_block = data.get("good_account_expansion") or data.get("good_accounts")
+        conn = get_connection()
+        good_block = data.get("good_account_expansion")
         contents = data.get("contents") or []
+        query = data.get("query") or ""
 
-        authors_rows = _upsert_good_authors(conn, trace_id, good_block)
-        contents_rows = _insert_contents(conn, trace_id, contents)
+        authors_rows = upsert_good_authors(conn, trace_id, good_block)
+        contents_rows = insert_contents(conn, trace_id, query, demand_content_id, contents)
 
         output = (
             f"MySQL 写入完成:demand_find_author 影响行数={authors_rows}, "
@@ -219,6 +72,7 @@ async def store_results_mysql(trace_id: str) -> ToolResult:
         )
         logger.info(output)
         return ToolResult(
+            title="存储推荐结果",
             output=output,
             metadata={
                 "ok": True,
@@ -230,8 +84,17 @@ async def store_results_mysql(trace_id: str) -> ToolResult:
     except Exception as e:
         msg = f"写入 MySQL 失败: {e}"
         logger.error(msg, exc_info=True)
-        return ToolResult(output=msg, metadata={"ok": False, "error": str(e)})
+        return ToolResult(title="存储推荐结果", output=msg, metadata={"ok": False, "error": str(e)})
     finally:
         if conn is not None:
             conn.close()
 
+async def main():
+    result = await store_results_mysql(
+        trace_id="7b211fa6-f0d6-4f98-a6f5-689e6af64748",
+    )
+    # ToolResult 是 dataclass,用 vars 输出
+    print(vars(result))
+
+if __name__ == "__main__":
+    asyncio.run(main())