Просмотр исходного кода

aigc接口联调 & 输出目录修改

jihuaqiang 6 часов назад
Родитель
Сommit
0421781f59

+ 5 - 5
agent/tools/builtin/context.py

@@ -9,6 +9,7 @@
 框架也会在特定轮次自动调用此工具进行周期性上下文刷新。
 框架也会在特定轮次自动调用此工具进行周期性上下文刷新。
 """
 """
 
 
+import os
 from agent.tools import tool, ToolResult, ToolContext
 from agent.tools import tool, ToolResult, ToolContext
 
 
 
 
@@ -52,14 +53,13 @@ async def get_current_context(
             context_content = "暂无计划信息"
             context_content = "暂无计划信息"
 
 
     # 注入 trace_id 和 trace_dir,供需要写入 trace 目录的工具(如输出 JSON)使用
     # 注入 trace_id 和 trace_dir,供需要写入 trace 目录的工具(如输出 JSON)使用
-    trace_dir = ""
-    if runner.trace_store and hasattr(runner.trace_store, "base_path"):
-        trace_dir = str(runner.trace_store.base_path)
+    output_dir = os.getenv("OUTPUT_DIR", ".cache/output")
+
     extra = [
     extra = [
         f"## 当前执行信息",
         f"## 当前执行信息",
         f"- **trace_id**: `{trace_id or '(未知)'}`",
         f"- **trace_id**: `{trace_id or '(未知)'}`",
-        f"- **trace_dir**: `{trace_dir or '(未知)'}`",
-        f"- **输出路径示例**: `{trace_dir}/{trace_id}/output.json`(若需写入当次 trace 目录)",
+        f"- **output_dir**: `{output_dir or '(未知)'}`",
+        f"- **输出路径示例**: `{output_dir}/{trace_id}/output.json`(若需写入 output_dir 目录)",
     ]
     ]
     context_content = (context_content or "") + "\n\n" + "\n".join(extra)
     context_content = (context_content or "") + "\n\n" + "\n".join(extra)
 
 

+ 2 - 2
api_server.py

@@ -56,8 +56,8 @@ app.add_middleware(
 
 
 # ===== 初始化存储 =====
 # ===== 初始化存储 =====
 
 
-# 使用文件系统存储(支持跨进程和持久化)
-trace_store = FileSystemTraceStore(base_path=".trace")
+# 使用文件系统存储(支持跨进程和持久化);与示例中 TRACE_DIR 对齐时设环境变量 TRACE_DIR
+trace_store = FileSystemTraceStore(base_path=os.getenv("TRACE_DIR", ".trace"))
 
 
 # 注入到 step_tree 模块
 # 注入到 step_tree 模块
 set_api_trace_store(trace_store)
 set_api_trace_store(trace_store)

+ 10 - 0
examples/content_finder/.env.example

@@ -26,3 +26,13 @@ SCHEDULE_QUERY_API_TIMEOUT=10.0
 
 
 # 并发控制
 # 并发控制
 MAX_CONCURRENT_TASKS=3
 MAX_CONCURRENT_TASKS=3
+
+# AIGC 平台配置
+AIGC_DEMAND_DOUYIN_CONTENT_PRODUCE_PLAN_ID=your-produce-plan-id
+
+# MySQL(store_results_mysql、定时任务、AIGC 写库等)
+DB_HOST=your-mysql-host
+DB_PORT=3306
+DB_USER=your_user
+DB_PASSWORD=your-password
+DB_NAME=your_database

+ 6 - 0
examples/content_finder/README.md

@@ -89,6 +89,12 @@ python examples/content_finder/server.py
 | `SCHEDULE_QUERY_API` | 空 | 定时任务外部 API 地址(留空则不启动定时任务) |
 | `SCHEDULE_QUERY_API` | 空 | 定时任务外部 API 地址(留空则不启动定时任务) |
 | `SCHEDULE_QUERY_API_KEY` | 空 | 定时任务外部 API 认证 Key |
 | `SCHEDULE_QUERY_API_KEY` | 空 | 定时任务外部 API 认证 Key |
 | `SCHEDULE_QUERY_API_TIMEOUT` | `10.0` | 定时任务外部 API 超时(秒) |
 | `SCHEDULE_QUERY_API_TIMEOUT` | `10.0` | 定时任务外部 API 超时(秒) |
+| `AIGC_DEMAND_DOUYIN_CONTENT_PRODUCE_PLAN_ID` | `` | 需要将内容绑定的AIGC平台生成计划id |
+| `DB_HOST` | 必填(写库/定时任务时) | MySQL 主机 |
+| `DB_PORT` | `3306` | MySQL 端口 |
+| `DB_USER` | 必填 | MySQL 用户名 |
+| `DB_PASSWORD` | 必填 | MySQL 密码 |
+| `DB_NAME` | 必填 | 数据库名 |
 
 
 ## 服务模式 API
 ## 服务模式 API
 
 

+ 13 - 9
examples/content_finder/content_finder.prompt

@@ -21,8 +21,8 @@ $system$
 ## 核心数据使用策略
 ## 核心数据使用策略
 
 
 ### 工具调用结果数据优先提取原则
 ### 工具调用结果数据优先提取原则
-- **搜索结果**:从 `metadata.search_results` 获取数据,不要解析 output 文本
-- **账号作品**:从 `metadata.user_videos` 获取数据(格式与 search_results 一致)
+- **搜索结果**:调用 douyin_search 后,从 metadata.search_results 获取,不要解析 工具的output
+- **账号作品**:调用 douyin_user_videos 后,从 metadata.user_videos 获取数据
 - **画像判断**:使用 `metadata.has_portrait` 字段(True=有画像,False=无画像)
 - **画像判断**:使用 `metadata.has_portrait` 字段(True=有画像,False=无画像)
 - **画像数据**:从 `metadata.portrait_data` 获取结构化数据
 - **画像数据**:从 `metadata.portrait_data` 获取结构化数据
 
 
@@ -59,22 +59,26 @@ $system$
 - **工具调用限制**:每次最多并行调用 3 个画像工具
 - **工具调用限制**:每次最多并行调用 3 个画像工具
 - **画像获取完成标准**:获取画像后立即进入筛选阶段,不要继续搜索新内容
 - **画像获取完成标准**:获取画像后立即进入筛选阶段,不要继续搜索新内容
 
 
+## 数据真实性要求(严格遵守)
+**禁止编造数据**:这是最严重的错误,会导致 404 错误和用户体验问题。
+
 ### 最终结果存储至远程数据库(必须执行)
 ### 最终结果存储至远程数据库(必须执行)
 - 使用 store_results_mysql tool工具进行存储
 - 使用 store_results_mysql tool工具进行存储
 
 
-## 数据真实性要求(严格遵守)
-**禁止编造数据**:这是最严重的错误,会导致 404 错误和用户体验问题
+## 最终输出要求
+最终输出必须严格遵循 Skills 中「输出结果指南」要求的目录和结构
 
 
-## 输出格式要求
-最终输出必须严格遵循 Skills 中「输出 JSON Schema」定义的结构与字段名
+## 接入AIGC平台
+Skills 中的「AIGC 爬取计划生成」用于将寻找的结果接入AIGC平台
 
 
 ## 任务完成要求
 ## 任务完成要求
 - 搜索 M × 2 条内容后,立即停止搜索
 - 搜索 M × 2 条内容后,立即停止搜索
 - 对所有搜索到的内容获取画像后,立即进入筛选阶段
 - 对所有搜索到的内容获取画像后,立即进入筛选阶段
 - 筛选完成后,立即输出完整的推荐结果
 - 筛选完成后,立即输出完整的推荐结果
 - 最终输出必须严格遵循 Skills 中「输出 JSON Schema」,所有的key都必须严格按照schema的约定
 - 最终输出必须严格遵循 Skills 中「输出 JSON Schema」,所有的key都必须严格按照schema的约定
-- 输出已写入到 %trace_dir% 目录下当次执行的trace_id目录内的output.json文件。
+- 输出已写入到 %output_dir% 目录下当次执行的trace_id目录内的output.json文件。
 - 输出已经存储到远程数据库中。
 - 输出已经存储到远程数据库中。
+- 输出结果已经接入AIGC平台。
 - 输出完整的推荐结果后,任务会自动进行反思和知识保存
 - 输出完整的推荐结果后,任务会自动进行反思和知识保存
 - 反思完成后,输出简短的完成确认:✅ 任务完成!已为您找到 [数量] 条视频,并保存了执行经验
 - 反思完成后,输出简短的完成确认:✅ 任务完成!已为您找到 [数量] 条视频,并保存了执行经验
 
 
@@ -84,9 +88,9 @@ $system$
 - 不要陷入”一直获取画像”的循环
 - 不要陷入”一直获取画像”的循环
 - 获取足够画像后,立即进入筛选和输出阶段
 - 获取足够画像后,立即进入筛选和输出阶段
 - 必须输出最终推荐结果,不能在中途停止
 - 必须输出最终推荐结果,不能在中途停止
-- 所有数据必须来自 metadata,禁止编造
+- 所有数据必须来自 TOOLS 返回的 metadata,禁止编造
 - 最终输出必须严格遵循 Skills 中「输出 JSON Schema」,禁止自创/变体字段名或使用中文 key
 - 最终输出必须严格遵循 Skills 中「输出 JSON Schema」,禁止自创/变体字段名或使用中文 key
-- 输出文件的保存地址严格按照要求,在 %trace_dir% 目录下当次执行的trace_id目录内的output.json文件,不能随意放置。
+- 输出文件的保存地址严格按照要求,在 %output_dir% 目录下当次执行的trace_id目录内的output.json文件,不能随意放置。
 
 
 $user$
 $user$
 任务:找10个与「%query%」相关的、老年人感兴趣的视频。
 任务:找10个与「%query%」相关的、老年人感兴趣的视频。

+ 11 - 6
examples/content_finder/core.py

@@ -33,13 +33,16 @@ from tools import (
     douyin_user_videos,
     douyin_user_videos,
     get_content_fans_portrait,
     get_content_fans_portrait,
     get_account_fans_portrait,
     get_account_fans_portrait,
+    create_crawler_plan_by_douyin_content_id,
+    create_crawler_plan_by_douyin_account_id,
+    store_results_mysql,
 )
 )
 
 
 logger = logging.getLogger(__name__)
 logger = logging.getLogger(__name__)
 
 
 # 默认搜索词
 # 默认搜索词
-DEFAULT_QUERY = "养生知识"
-DEFAULT_DEMAND_ID = 1
+DEFAULT_QUERY = "毛泽东1965年深秋预言"
+DEFAULT_DEMAND_ID = 2629
 
 
 
 
 async def run_agent(
 async def run_agent(
@@ -70,11 +73,11 @@ async def run_agent(
     prompt = SimplePrompt(prompt_path)
     prompt = SimplePrompt(prompt_path)
 
 
     # output 目录
     # output 目录
-    trace_dir = os.getenv("TRACE_DIR", ".cache/traces")
+    output_dir = os.getenv("OUTPUT_DIR", ".cache/output")
 
 
-    # 构建消息(替换 %query%、%trace_dir%、%demand_id%)
+    # 构建消息(替换 %query%、%output_dir%、%demand_id%)
     demand_id_str = str(demand_id) if demand_id is not None else ""
     demand_id_str = str(demand_id) if demand_id is not None else ""
-    messages = prompt.build_messages(query=query, trace_dir=trace_dir, demand_id=demand_id_str)
+    messages = prompt.build_messages(query=query, output_dir=output_dir, demand_id=demand_id_str)
 
 
     # 初始化配置
     # 初始化配置
     api_key = os.getenv("OPEN_ROUTER_API_KEY")
     api_key = os.getenv("OPEN_ROUTER_API_KEY")
@@ -86,7 +89,7 @@ async def run_agent(
     temperature = float(prompt.config.get("temperature", 0.3))
     temperature = float(prompt.config.get("temperature", 0.3))
     max_iterations = int(os.getenv("MAX_ITERATIONS", "30"))
     max_iterations = int(os.getenv("MAX_ITERATIONS", "30"))
     trace_dir = os.getenv("TRACE_DIR", ".cache/traces")
     trace_dir = os.getenv("TRACE_DIR", ".cache/traces")
-    output_dir = os.getenv("OUTPUT_DIR", ".cache/output")
+    
     skills_dir = str(Path(__file__).parent / "skills")
     skills_dir = str(Path(__file__).parent / "skills")
 
 
     Path(trace_dir).mkdir(parents=True, exist_ok=True)
     Path(trace_dir).mkdir(parents=True, exist_ok=True)
@@ -99,6 +102,8 @@ async def run_agent(
         "get_content_fans_portrait",
         "get_content_fans_portrait",
         "get_account_fans_portrait",
         "get_account_fans_portrait",
         "store_results_mysql",
         "store_results_mysql",
+        "create_crawler_plan_by_douyin_content_id",
+        "create_crawler_plan_by_douyin_account_id",
     ]
     ]
 
 
     runner = AgentRunner(
     runner = AgentRunner(

+ 2 - 1
examples/content_finder/db/__init__.py

@@ -13,7 +13,7 @@ from .schedule import (
     update_task_status,
     update_task_status,
     update_task_on_complete,
     update_task_on_complete,
 )
 )
-from .store_results import upsert_good_authors, insert_contents
+from .store_results import upsert_good_authors, insert_contents, update_content_plan_ids
 
 
 __all__ = [
 __all__ = [
     "get_connection",
     "get_connection",
@@ -23,4 +23,5 @@ __all__ = [
     "update_task_on_complete",
     "update_task_on_complete",
     "upsert_good_authors",
     "upsert_good_authors",
     "insert_contents",
     "insert_contents",
+    "update_content_plan_ids",
 ]
 ]

+ 12 - 5
examples/content_finder/db/connection.py

@@ -6,12 +6,19 @@ import pymysql
 
 
 
 
 def get_connection():
 def get_connection():
-    """获取数据库连接(与 store_results_mysql、schedule 共用配置)"""
-    host = os.getenv("DB_HOST", "rm-t4nh1xx6o2a6vj8qu3o.mysql.singapore.rds.aliyuncs.com")
+    """获取数据库连接(与 store_results_mysql、schedule 共用配置)
+
+    请在 examples/content_finder/.env 中配置 DB_HOST / DB_PORT / DB_USER / DB_PASSWORD / DB_NAME。
+    """
+    host = os.getenv("DB_HOST", "").strip()
     port = int(os.getenv("DB_PORT", "3306"))
     port = int(os.getenv("DB_PORT", "3306"))
-    user = os.getenv("DB_USER", "content_rw")
-    password = os.getenv("DB_PASSWORD", "bC1aH4bA1lB0")
-    database = os.getenv("DB_NAME", "content-deconstruction-supply")
+    user = os.getenv("DB_USER", "").strip()
+    password = os.getenv("DB_PASSWORD", "")
+    database = os.getenv("DB_NAME", "").strip()
+    if not all([host, user, database]):
+        raise ValueError(
+            "数据库未配置:请在 examples/content_finder/.env 中设置 DB_HOST、DB_USER、DB_PASSWORD、DB_NAME"
+        )
 
 
     return pymysql.connect(
     return pymysql.connect(
         host=host,
         host=host,

+ 53 - 2
examples/content_finder/db/store_results.py

@@ -3,6 +3,8 @@
 """
 """
 from typing import Any, Dict, List, Optional
 from typing import Any, Dict, List, Optional
 
 
+from .connection import get_connection
+
 
 
 def upsert_good_authors(
 def upsert_good_authors(
     conn,
     conn,
@@ -75,12 +77,12 @@ def insert_contents(
 
 
     sql = """
     sql = """
     INSERT INTO demand_find_content_result (
     INSERT INTO demand_find_content_result (
-      trace_id, query, rank_no, video_url, title, author_name, author_link,
+      trace_id, query, rank_no, aweme_id, video_url, title, author_name, author_link,
       digg_count, comment_count, share_count,
       digg_count, comment_count, share_count,
       portrait_source, elderly_ratio, elderly_tgi, recommendation_reason,
       portrait_source, elderly_ratio, elderly_tgi, recommendation_reason,
       demand_content_id
       demand_content_id
     ) VALUES (
     ) VALUES (
-      %s, %s, %s, %s, %s, %s, %s,
+      %s, %s, %s, %s, %s, %s, %s, %s,
       %s, %s, %s,
       %s, %s, %s,
       %s, %s, %s, %s,
       %s, %s, %s, %s,
       %s
       %s
@@ -96,6 +98,7 @@ def insert_contents(
                     trace_id,
                     trace_id,
                     query,
                     query,
                     int(item.get("rank") or item.get("rank_no") or 0),
                     int(item.get("rank") or item.get("rank_no") or 0),
+                    item.get("aweme_id") or "",
                     video_url,
                     video_url,
                     item.get("title") or "",
                     item.get("title") or "",
                     item.get("author_nickname") or "",
                     item.get("author_nickname") or "",
@@ -112,3 +115,51 @@ def insert_contents(
             )
             )
             rows += cur.rowcount
             rows += cur.rowcount
         return rows
         return rows
+
+
+def update_content_plan_ids(
+    trace_id: str,
+    aweme_ids: List[str],
+    crawler_plan_id: str = "",
+    produce_plan_id: str = "",
+) -> int:
+    """
+    更新 demand_find_content_result 中指定内容的计划字段。
+
+    约定:
+    - 通过 (trace_id, aweme_id) 定位内容行
+    - crawler_plan_id / produce_plan_id 可只传其一:仅更新非空字段
+    - 至少一个计划 id 非空时才执行 UPDATE
+    - 内部自行获取并关闭数据库连接
+    """
+    if not aweme_ids or not isinstance(aweme_ids, list):
+        return 0
+    c = (crawler_plan_id or "").strip()
+    p = (produce_plan_id or "").strip()
+    if not c and not p:
+        return 0
+
+    set_parts: List[str] = []
+    params: List[Any] = []
+    if c:
+        set_parts.append("crawler_plan_id = %s")
+        params.append(c)
+    if p:
+        set_parts.append("produce_plan_id = %s")
+        params.append(p)
+
+    sql = f"""
+    UPDATE demand_find_content_result
+    SET {", ".join(set_parts)}
+    WHERE trace_id = %s AND aweme_id = %s
+    """
+    conn = get_connection()
+    try:
+        rows = 0
+        with conn.cursor() as cur:
+            for aweme_id in aweme_ids:
+                cur.execute(sql, (*params, trace_id, aweme_id))
+                rows += cur.rowcount
+        return rows
+    finally:
+        conn.close()

+ 16 - 0
examples/content_finder/skills/aigc_platform_plan.md

@@ -0,0 +1,16 @@
+---
+name: aigc_platform_plan
+description: AIGC 爬取计划生成
+---
+
+## AIGC 爬取计划生成
+
+对内容寻找任务产出的视频结果和作者结果进行进一步处理:提取所有视频,调用工具生成爬取计划。
+
+## 适用场景
+- 内容寻找任务执行完成,`output.json` 已生成
+- 需要把筛选出的抖音视频批量接入到自有抓取 / 发布平台
+
+## 行为约定
+1. 从内容寻找输出中查看**视频列表**。
+2. 如果有视频结果,先调用`get_current_context` 获取 `trace_id`,再调用 `create_crawler_plan_by_douyin_content_id` 创建爬取计划。

+ 16 - 10
examples/content_finder/skills/output_schema.md

@@ -1,8 +1,15 @@
-# 输出目录
-输出 JSON 写入到当次执行的 trace_id 目录内的 `output.json` 文件。
-**获取路径方式**:先调用 `get_current_context` 获取 `trace_id` 和 `trace_dir`,再使用 `write_file` 写入 `{trace_dir}/{trace_id}/output.json`。
+---
+name: output_schema
+description: 输出结果指南
+---
 
 
-# **输出 JSON Schema**
+## 输出结果指南
+
+### 输出目录
+输出 JSON 写入到output_dir目录下当次执行的 trace_id 目录内的 `output.json` 文件。
+**获取路径方式**:先调用 `get_current_context` 获取 `trace_id` 和 `output_dir`,再使用 `write_file` 写入 `{output_dir}/{trace_id}/output.json`。
+
+### **输出 JSON Schema**
 ```json
 ```json
 {
 {
   "trace_id": "<由系统生成的真实 trace_id;如果你不知道就填空字符串,程序会覆盖修正>",
   "trace_id": "<由系统生成的真实 trace_id;如果你不知道就填空字符串,程序会覆盖修正>",
@@ -44,18 +51,17 @@
         "source": "content_like | account_fans | none",
         "source": "content_like | account_fans | none",
         "age_50_plus_ratio": null,
         "age_50_plus_ratio": null,
         "age_50_plus_tgi": null,
         "age_50_plus_tgi": null,
-        "url": null
+        "url": "画像链接"
       },
       },
       "reason": "<入选理由>"
       "reason": "<入选理由>"
     }
     }
   ]
   ]
 }
 }
 ```
 ```
-
-画像链接规则:
-- `portrait.source="content_like"` → `portrait.url = https://douhot.douyin.com/video/detail?active_tab=video_fans&video_id={aweme_id}`
-- `portrait.source="account_fans"` → `portrait.url = https://douhot.douyin.com/creator/detail?active_tab=creator_fans_portrait&creator_id={author_sec_uid}`
-- `portrait.source="none"` → `portrait.url=null`,并且画像字段都为 null
+portrait_data内部字段规则说明:
+- `portrait_data.source="content_like"` → `portrait.url = https://douhot.douyin.com/video/detail?active_tab=video_fans&video_id={aweme_id}`
+- `portrait_data.source="account_fans"` → `portrait.url = https://douhot.douyin.com/creator/detail?active_tab=creator_fans_portrait&creator_id={author_sec_uid}`
+- `portrait_data.source="none"` → `portrait_data.url=null`,并且画像字段都为 null
 
 
 ## JSON 编写规范
 ## JSON 编写规范
 - 字符串值中若有双引号 `"`,必须写成 `\"`(反斜杠 + 双引号)
 - 字符串值中若有双引号 `"`,必须写成 `\"`(反斜杠 + 双引号)

+ 3 - 0
examples/content_finder/tools/__init__.py

@@ -6,6 +6,7 @@ from .douyin_search import douyin_search
 from .douyin_user_videos import douyin_user_videos
 from .douyin_user_videos import douyin_user_videos
 from .hotspot_profile import get_content_fans_portrait, get_account_fans_portrait
 from .hotspot_profile import get_content_fans_portrait, get_account_fans_portrait
 from .store_results_mysql import store_results_mysql
 from .store_results_mysql import store_results_mysql
+from .aigc_platform_api import create_crawler_plan_by_douyin_content_id, create_crawler_plan_by_douyin_account_id
 
 
 __all__ = [
 __all__ = [
     "douyin_search",
     "douyin_search",
@@ -13,4 +14,6 @@ __all__ = [
     "get_content_fans_portrait",
     "get_content_fans_portrait",
     "get_account_fans_portrait",
     "get_account_fans_portrait",
     "store_results_mysql",
     "store_results_mysql",
+    "create_crawler_plan_by_douyin_content_id",
+    "create_crawler_plan_by_douyin_account_id",
 ]
 ]

+ 91 - 12
examples/content_finder/tools/aigc_platform_api.py

@@ -4,12 +4,15 @@ AIGC接口调用
 """
 """
 import json
 import json
 import logging
 import logging
+import os
 from datetime import datetime
 from datetime import datetime
+from pathlib import Path
 from typing import List, Dict, Union, Tuple, Any
 from typing import List, Dict, Union, Tuple, Any
 
 
 import requests
 import requests
 
 
 from agent import ToolResult, tool
 from agent import ToolResult, tool
+from db import update_content_plan_ids
 
 
 logger = logging.getLogger(__name__)
 logger = logging.getLogger(__name__)
 
 
@@ -21,6 +24,42 @@ DEFAULT_TOKEN = "8bf14f27fc3a486788f3383452422d72"
 DEFAULT_TIMEOUT = 60.0
 DEFAULT_TIMEOUT = 60.0
 
 
 
 
+def _load_output_json(trace_id: str, output_dir: str) -> Dict[str, Any]:
+    """Load {output_dir}/{trace_id}/output.json."""
+    path = Path(output_dir) / trace_id / "output.json"
+    if not path.exists():
+        raise FileNotFoundError(f"output.json not found: {path}")
+    with path.open("r", encoding="utf-8") as f:
+        return json.load(f)
+
+
+def _extract_content_ids(data: Dict[str, Any]) -> List[str]:
+    """Extract aweme_id list from output json."""
+    contents = data.get("contents") or []
+    if not isinstance(contents, list):
+        return []
+    content_ids: List[str] = []
+    for item in contents:
+        if not isinstance(item, dict):
+            continue
+        aweme_id = item.get("aweme_id")
+        if aweme_id is None:
+            continue
+        aweme_id_str = str(aweme_id).strip()
+        if aweme_id_str:
+            content_ids.append(aweme_id_str)
+    return content_ids
+
+
+def _get_produce_plan_ids_from_env() -> List[str]:
+    """Read AIGC_DEMAND_DOUYIN_CONTENT_PRODUCE_PLAN_ID from env."""
+    raw = os.getenv("AIGC_DEMAND_DOUYIN_CONTENT_PRODUCE_PLAN_ID", "").strip()
+    if not raw:
+        return []
+    # 接口需要 List[str],因此把 env 字段(字符串)包装成 list。
+    return [raw]
+
+
 @tool(description="根据抖音账号ID创建爬取计划")
 @tool(description="根据抖音账号ID创建爬取计划")
 async def create_crawler_plan_by_douyin_account_id(
 async def create_crawler_plan_by_douyin_account_id(
         account_id: str,
         account_id: str,
@@ -168,14 +207,12 @@ async def create_crawler_plan_by_douyin_account_id(
 
 
 @tool(description="根据抖音视频ID创建爬取计划")
 @tool(description="根据抖音视频ID创建爬取计划")
 async def create_crawler_plan_by_douyin_content_id(
 async def create_crawler_plan_by_douyin_content_id(
-        content_ids: List[str],
-        produce_plan_ids: List[str] = []
+        trace_id: str,
 ) -> ToolResult:
 ) -> ToolResult:
     """
     """
     根据抖音视频ID创建爬取计划
     根据抖音视频ID创建爬取计划
     Args:
     Args:
-        content_ids: 抖音内容ID列表
-        produce_plan_ids: 爬取计划要绑定的生成计划ID,默认为空列表
+        trace_id: 内容寻找任务 trace_id(用于读取 {output_dir}/{trace_id}/output.json)
     Returns:
     Returns:
              Returns:
              Returns:
          ToolResult: 包含以下内容
          ToolResult: 包含以下内容
@@ -193,20 +230,45 @@ async def create_crawler_plan_by_douyin_content_id(
     Note:
     Note:
         - 建议从 metadata.result 获取结构化数据,而非解析 output 文本
         - 建议从 metadata.result 获取结构化数据,而非解析 output 文本
     """
     """
-    if not content_ids or not isinstance(content_ids, list):
-        logger.error(f"create_crawler_plan_by_douyin_content_id invalid content_ids. content_ids: {content_ids}")
+    if not trace_id or not isinstance(trace_id, str):
+        logger.error(f"create_crawler_plan_by_douyin_content_id invalid trace_id: {trace_id}")
         return ToolResult(
         return ToolResult(
-            title="根据抖音内容ID创建爬取计划失败",
+            title="根据抖音内容创建爬取计划失败",
+            output="",
+            error="trace_id 参数无效: trace_id 必须是非空字符串",
+        )
+
+    output_dir = os.getenv("OUTPUT_DIR", ".cache/output")
+    try:
+        data = _load_output_json(trace_id=trace_id, output_dir=output_dir)
+        content_ids = _extract_content_ids(data)
+    except Exception as e:
+        msg = f"加载/解析 output.json 失败: {e}"
+        logger.error(msg, exc_info=True)
+        return ToolResult(
+            title="根据抖音内容创建爬取计划失败",
             output="",
             output="",
-            error="content_ids 参数无效: content_ids必须是列表"
+            error=msg,
+        )
+
+    if not content_ids:
+        return ToolResult(
+            title="根据抖音内容创建爬取计划失败",
+            output="",
+            error="未在 output.json.contents 中找到有效 aweme_id",
         )
         )
     if len(content_ids) > 100:
     if len(content_ids) > 100:
-        logger.error(f"create_crawler_plan_by_douyin_content_id invalid content_ids length. content_ids.length: {len(content_ids)}")
+        logger.error(
+            "create_crawler_plan_by_douyin_content_id invalid content_ids length. "
+            f"content_ids.length: {len(content_ids)}"
+        )
         return ToolResult(
         return ToolResult(
-            title="根据抖音内容ID创建爬取计划失败",
+            title="根据抖音内容创建爬取计划失败",
             output="",
             output="",
-            error=f"content_ids 长度异常: 期望1~100, 实际{len(content_ids)}"
+            error=f"content_ids 长度异常: 期望1~100, 实际{len(content_ids)}",
         )
         )
+
+    produce_plan_ids = _get_produce_plan_ids_from_env()
     dt = datetime.now().strftime("%Y%m%d%h%M%s")
     dt = datetime.now().strftime("%Y%m%d%h%M%s")
     crawler_plan_name = f"【内容寻找Agent自动创建】抖音视频直接抓取-{dt}-抖音"
     crawler_plan_name = f"【内容寻找Agent自动创建】抖音视频直接抓取-{dt}-抖音"
     params = {
     params = {
@@ -242,6 +304,10 @@ async def create_crawler_plan_by_douyin_content_id(
         summary_lines.append(f"    抖音视频IDs: {','.join(content_ids)}")
         summary_lines.append(f"    抖音视频IDs: {','.join(content_ids)}")
         summary_lines.append(f"    爬取计划ID: {crawler_plan_id}")
         summary_lines.append(f"    爬取计划ID: {crawler_plan_id}")
         produce_plan_infos: List[Dict[str, str]] = []
         produce_plan_infos: List[Dict[str, str]] = []
+        db_updated_rows = 0
+        # 环境里的生成计划 ID(字符串);与是否执行绑定接口无关,用于写库
+        env_produce_plan_id = (produce_plan_ids[0] if produce_plan_ids else "").strip()
+
         if produce_plan_ids:
         if produce_plan_ids:
             input_source_info = {
             input_source_info = {
                 "contentType": 1,
                 "contentType": 1,
@@ -260,6 +326,18 @@ async def create_crawler_plan_by_douyin_content_id(
                     summary_lines.append(f"            绑定结果: {'绑定成功' if not produce_plan_info.get('msg') else '绑定失败'}")
                     summary_lines.append(f"            绑定结果: {'绑定成功' if not produce_plan_info.get('msg') else '绑定失败'}")
                     summary_lines.append(f"            信息: {produce_plan_info.get('msg', '成功')}")
                     summary_lines.append(f"            信息: {produce_plan_info.get('msg', '成功')}")
 
 
+        # 爬取计划 id 与生成计划 id 任一存在则写库(不依赖是否已配置 produce_plan_ids 去走绑定)
+        if (crawler_plan_id or "").strip() or env_produce_plan_id:
+            try:
+                db_updated_rows = update_content_plan_ids(
+                    trace_id=trace_id,
+                    aweme_ids=content_ids,
+                    crawler_plan_id=crawler_plan_id or "",
+                    produce_plan_id=env_produce_plan_id,
+                )
+            except Exception as e:
+                logger.error(f"update content plan ids failed: {e}", exc_info=True)
+
         return ToolResult(
         return ToolResult(
             title="根据抖音内容ID创建爬取计划",
             title="根据抖音内容ID创建爬取计划",
             output="\n".join(summary_lines),
             output="\n".join(summary_lines),
@@ -278,7 +356,8 @@ async def create_crawler_plan_by_douyin_content_id(
                         }
                         }
                         for produce_plan_info in produce_plan_infos
                         for produce_plan_info in produce_plan_infos
                     ]
                     ]
-                }
+                },
+                "db": {"updated_rows": db_updated_rows},
             },
             },
             long_term_memory="Create crawler plan by DouYin Content IDs",
             long_term_memory="Create crawler plan by DouYin Content IDs",
         )
         )

+ 4 - 4
examples/content_finder/tools/store_results_mysql.py

@@ -21,12 +21,12 @@ logger = logging.getLogger(__name__)
 
 
 
 
 def _load_output(trace_id: str) -> Dict[str, Any]:
 def _load_output(trace_id: str) -> Dict[str, Any]:
-    """从 {TRACE_DIR}/{trace_id}/output.json 读取输出数据。"""
-    trace_root = Path(os.getenv("TRACE_DIR", ".cache/traces"))
-    path = trace_root / trace_id / "output.json"
+    """从 {output_dir}/{trace_id}/output.json 读取输出数据。"""
+    output_dir = Path(os.getenv("OUTPUT_DIR", ".cache/output"))
+    path = output_dir / trace_id / "output.json"
 
 
     if not path.exists():
     if not path.exists():
-        raise FileNotFoundError(f"output.json not found for trace_id={trace_id}: {path}")
+        raise FileNotFoundError(f"output.json not found for output_dir={output_dir}: {path}")
 
 
     with path.open("r", encoding="utf-8") as f:
     with path.open("r", encoding="utf-8") as f:
         return json.load(f)
         return json.load(f)