Explorar el Código

feat: 输出结果存储

jihuaqiang hace 7 horas
padre
commit
201e27599e

+ 11 - 2
agent/tools/builtin/context.py

@@ -51,8 +51,17 @@ async def get_current_context(
         else:
             context_content = "暂无计划信息"
 
-    if not context_content:
-        context_content = "当前无需要刷新的上下文信息"
+    # 注入 trace_id 和 trace_dir,供需要写入 trace 目录的工具(如输出 JSON)使用
+    trace_dir = ""
+    if runner.trace_store and hasattr(runner.trace_store, "base_path"):
+        trace_dir = str(runner.trace_store.base_path)
+    extra = [
+        f"## 当前执行信息",
+        f"- **trace_id**: `{trace_id or '(未知)'}`",
+        f"- **trace_dir**: `{trace_dir or '(未知)'}`",
+        f"- **输出路径示例**: `{trace_dir}/{trace_id}/output.json`(若需写入当次 trace 目录)",
+    ]
+    context_content = (context_content or "") + "\n\n" + "\n".join(extra)
 
     return ToolResult(
         title="📋 当前执行上下文",

+ 9 - 23
examples/content_finder/content_finder.prompt

@@ -8,8 +8,9 @@ $system$
 
 **重要约束**:
 - 只在抖音平台搜索,不要切换到其他平台(小红书、B站等)
-- 只使用 douyin_search、douyin_user_videos、get_content_fans_portrait、get_account_fans_portrait 这4个工具
-- 不要使用浏览器工具、文件操作工具、或其他平台的搜索工具
+- 只使用 douyin_search、douyin_user_videos、get_content_fans_portrait、get_account_fans_portrait、store_results_mysql 这4个工具
+- 不要使用浏览器工具、或其他平台的搜索工具
+- 最终结果需要按要求存储为本地json文件,也需要借助tools存储至远程库
 
 **严格禁止调用以下浏览器工具**(调用任何一个都是错误行为):
 browser_get_live_url、browser_navigate_to_url、browser_search_web、browser_go_back、browser_wait、browser_click_element、browser_input_text、browser_send_keys、browser_upload_file、browser_scroll_page、browser_find_text、browser_screenshot、browser_switch_tab、browser_close_tab、browser_get_dropdown_options、browser_select_dropdown_option、browser_extract_content、browser_read_long_content、browser_download_direct_url、browser_get_page_html、browser_get_visual_selector_map、browser_evaluate、browser_ensure_login_with_cookies、browser_done、browser_export_cookies、browser_load_cookies
@@ -116,32 +117,15 @@ sec_uid = item["author"]["sec_uid"]  # 完整复制,约80字符
 **违反后果**:编造数据会导致404错误,严重影响用户体验。
 
 ## 输出格式要求
-
-每条推荐内容必须包含:
-- 内容标题(来自 metadata)
-- 内容链接:https://www.douyin.com/video/{aweme_id}
-- 作者名称和链接:https://www.douyin.com/user/{author.sec_uid}(完整复制)
-- 热度数据:点赞、评论、分享(来自 metadata.statistics)
-- 画像数据(如果 has_portrait=True):
-  - 50岁以上占比和 tgi 值
-  - 画像链接:
-    - 内容点赞画像:https://douhot.douyin.com/video/detail?active_tab=video_fans&video_id={aweme_id}
-    - 账号粉丝画像:https://douhot.douyin.com/creator/detail?active_tab=creator_fans_portrait&creator_id={author.sec_uid}
-  - 数据来源标注:”内容点赞画像”或”账号粉丝画像”
-- 如果没有画像数据,明确标注:”无画像数据”
-
-推荐结果开头必须说明:
-1. 搜索情况:搜索到多少条候选内容
-2. 画像获取情况:成功获取内容点赞画像的数量、账号粉丝画像的数量、无画像数据的数量
-3. 筛选情况:多少条符合老年人画像要求,最终推荐多少条
-4. 优质账号扩展情况:是否发现优质账号及扩展结果
+最终输出必须是 **单个 JSON 对象**(不要夹杂额外文本),并严格遵循 `output_schema`(skills)中定义的结构与字段名(顶层 `trace_id/query/summary/good_account_expansion/contents`)。
 
 ## 任务完成要求
-
 - 搜索 M × 2 条内容后,立即停止搜索
 - 对所有搜索到的内容获取画像后,立即进入筛选阶段
 - 筛选完成后,立即输出完整的推荐结果
-- 推荐内容以json格式输出到 %output% 目录下
+- 最终输出必须严格遵循 `output_schema`(skills)
+- 输出 JSON 已写入到 %trace_dir% 目录下当次执行的trace_id目录内。
+- 输出已经存储到mysql库中。
 - 输出完整的推荐结果后,任务会自动进行反思和知识保存
 - 反思完成后,输出简短的完成确认:✅ 任务完成!已为您找到 [数量] 条视频,并保存了执行经验
 
@@ -152,6 +136,8 @@ sec_uid = item["author"]["sec_uid"]  # 完整复制,约80字符
 - 获取足够画像后,立即进入筛选和输出阶段
 - 必须输出最终推荐结果,不能在中途停止
 - 所有数据必须来自 metadata,禁止编造
+- 最终输出必须严格遵循 `output_schema`(skills),禁止自创/变体字段名或使用中文 key
+- 输出文件的保存地址严格按照要求,在 %trace_dir% 目录下当次执行的trace_id目录内,不能随意放置。
 
 $user$
 %query%

+ 5 - 4
examples/content_finder/core.py

@@ -38,7 +38,7 @@ from tools import (
 logger = logging.getLogger(__name__)
 
 # 默认 query
-DEFAULT_QUERY = """找10个和"养老服务与政策扶持"相关的,老年人感兴趣的视频。
+DEFAULT_QUERY = """找10个和"毛主席"相关的,老年人感兴趣的视频。
 
 要求:
 - 适合老年人分享观看
@@ -67,10 +67,10 @@ async def run_agent(query: Optional[str] = None, stream_output: bool = True) ->
     prompt = SimplePrompt(prompt_path)
 
     # output 目录
-    output_dir = os.getenv("OUTPUT_DIR", ".output")
+    trace_dir = os.getenv("TRACE_DIR", ".cache/traces")
 
-    # 构建消息(替换 %query% 和 $output$
-    messages = prompt.build_messages(query=query, output=output_dir)
+    # 构建消息(替换 %query% 和 %trace_dir%
+    messages = prompt.build_messages(query=query, trace_dir=trace_dir)
 
 
     # 初始化配置
@@ -132,6 +132,7 @@ async def run_agent(query: Optional[str] = None, stream_output: bool = True) ->
 
                 if item.status == "completed":
                     logger.info(f"Agent 执行完成: trace_id={trace_id}")
+                    logger.info(f"结果------: {item}")
                     return {
                         "trace_id": trace_id,
                         "status": "completed"

+ 60 - 0
examples/content_finder/skills/output_schema.md

@@ -0,0 +1,60 @@
+# 输出目录
+输出 JSON 写入到当次执行的 trace_id 目录内的 `output.json` 文件。
+**获取路径方式**:先调用 `get_current_context` 获取 `trace_id` 和 `trace_dir`,再使用 `write_file` 写入 `{trace_dir}/{trace_id}/output.json`。
+
+# 输出 JSON Schema(content_finder)
+最终必须输出 **单个 JSON 对象**(不要夹杂额外文本),字段名必须严格一致,禁止中文 key / 禁止自创变体 key。
+
+```json
+{
+  "trace_id": "<由系统生成的真实 trace_id;如果你不知道就填空字符串,程序会覆盖修正>",
+  "query": "<本次任务的 query>",
+  "summary": {
+    "candidate_count": 0,
+    "portrait_content_like_count": 0,
+    "portrait_account_fans_count": 0,
+    "portrait_none_count": 0,
+    "filtered_in_count": 0
+  },
+  "good_account_expansion": {
+    "enabled": false,
+    "accounts": [
+      {
+        "author_nickname": "<作者名>",
+        "author_sec_uid": "<完整 sec_uid>",
+        "age_50_plus_ratio": null,
+        "age_50_plus_tgi": null
+      }
+    ]
+  },
+  "contents": [
+    {
+      "title": "<来自 metadata 的标题/desc>",
+      "aweme_id": "<来自 metadata>",
+      "video_url": "https://www.douyin.com/video/<aweme_id>",
+      "author_nickname": "<来自 metadata>",
+      "author_sec_uid": "<来自 metadata,必须完整复制>",
+      "author_url": "https://www.douyin.com/user/<author_sec_uid>",
+      "hotness": {
+        "digg_count": 0,
+        "comment_count": 0,
+        "share_count": 0
+      },
+      "portrait": {
+        "source": "content_like | account_fans | none",
+        "age_50_plus_ratio": null,
+        "age_50_plus_tgi": null,
+        "url": null
+      },
+      "reasons": ["<入选理由1>", "<入选理由2>"],
+      "tags": ["<可选标签>"]
+    }
+  ]
+}
+```
+
+画像链接规则:
+- `portrait.source="content_like"` → `portrait.url = https://douhot.douyin.com/video/detail?active_tab=video_fans&video_id={aweme_id}`
+- `portrait.source="account_fans"` → `portrait.url = https://douhot.douyin.com/creator/detail?active_tab=creator_fans_portrait&creator_id={author_sec_uid}`
+- `portrait.source="none"` → `portrait.url=null`,并且画像字段都为 null
+

+ 2 - 0
examples/content_finder/tools/__init__.py

@@ -5,10 +5,12 @@
 from .douyin_search import douyin_search
 from .douyin_user_videos import douyin_user_videos
 from .hotspot_profile import get_content_fans_portrait, get_account_fans_portrait
+from .store_results_mysql import store_results_mysql
 
 __all__ = [
     "douyin_search",
     "douyin_user_videos",
     "get_content_fans_portrait",
     "get_account_fans_portrait",
+    "store_results_mysql",
 ]

+ 237 - 0
examples/content_finder/tools/store_results_mysql.py

@@ -0,0 +1,237 @@
+"""
+将推荐结果写入 MySQL(优质作者表 + 内容表)。
+
+约定:
+- 输入参数:trace_id(字符串)
+- 数据来源:.cache/traces/{trace_id}/recommendations.json
+- 表结构:good_authors, contents(字段见下面 SQL 注释)
+"""
+
+import json
+import logging
+import os
+from pathlib import Path
+from typing import Any, Dict, List, Optional
+
+import pymysql
+
+from agent.tools import tool, ToolResult
+
+logger = logging.getLogger(__name__)
+
+
+def _get_connection():
+    host = os.getenv("DB_HOST", "rm-t4nh1xx6o2a6vj8qu3o.mysql.singapore.rds.aliyuncs.com")
+    port = int(os.getenv("DB_PORT", "3306"))
+    user = os.getenv("DB_USER", "content_rw")
+    password = os.getenv("DB_PASSWORD", "bC1aH4bA1lB0")
+    database = os.getenv("DB_NAME", "content-deconstruction-supply")
+
+    return pymysql.connect(
+        host=host,
+        port=port,
+        user=user,
+        password=password,
+        database=database,
+        charset="utf8mb4",
+        cursorclass=pymysql.cursors.DictCursor,
+        autocommit=True,
+    )
+
+
+def _load_recommendations(trace_id: str) -> Dict[str, Any]:
+    """
+    按约定路径读取推荐结果:
+    - 优先:{TRACE_DIR}/{trace_id}/output.json  (与 output_schema.md 保持一致)
+    - 兼容:{TRACE_DIR}/{trace_id}/recommendations.json
+    """
+    trace_root = Path(os.getenv("TRACE_DIR", ".cache/traces"))
+    base = trace_root / trace_id
+
+    candidates = [
+        base / "output.json",
+        base / "recommendations.json",
+    ]
+
+    for path in candidates:
+        if path.exists():
+            with path.open("r", encoding="utf-8") as f:
+                return json.load(f)
+
+    raise FileNotFoundError(
+        f"no recommendations JSON found for trace_id={trace_id}, tried: "
+        + ", ".join(str(p) for p in candidates)
+    )
+
+
+def _upsert_good_authors(
+    conn,
+    trace_id: str,
+    good_account_block: Optional[Dict[str, Any]],
+) -> int:
+    """
+    将 good_account_expansion 中的 accounts 写入 good_authors 表。
+
+    约定表结构示例:
+    CREATE TABLE demand_find_author (
+      id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
+      trace_id VARCHAR(64) NOT NULL,
+      author_name VARCHAR(255) NOT NULL,
+      author_link VARCHAR(512) NOT NULL,
+      reason TEXT,
+      expanded_count INT DEFAULT 0,
+      PRIMARY KEY (id),
+      KEY idx_demand_find_author_trace (trace_id),
+      UNIQUE KEY uk_demand_find_author_trace_author (trace_id, author_link)
+    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
+    """
+    if not good_account_block:
+        return 0
+
+    if not good_account_block.get("found"):
+        return 0
+
+    accounts: List[Dict[str, Any]] = good_account_block.get("accounts") or []
+    if not accounts:
+        return 0
+
+    sql = """
+    INSERT INTO demand_find_author (trace_id, author_name, author_link, reason, expanded_count)
+    VALUES (%s, %s, %s, %s, %s)
+    ON DUPLICATE KEY UPDATE
+      reason = VALUES(reason),
+      expanded_count = VALUES(expanded_count)
+    """
+    with conn.cursor() as cur:
+        rows = 0
+        for acc in accounts:
+            author_name = acc.get("account_name") or acc.get("author_name") or ""
+            author_link = acc.get("author_link") or ""
+            if not author_name or not author_link:
+                # 如果只给了 sec_uid,可以由上层补 author_link
+                sec_uid = acc.get("sec_uid")
+                if sec_uid and not author_link:
+                    author_link = f"https://www.douyin.com/user/{sec_uid}"
+            if not author_name or not author_link:
+                continue
+
+            reason = acc.get("reason") or ""
+            expanded_count = int(acc.get("expanded_count") or 0)
+            cur.execute(sql, (trace_id, author_name, author_link, reason, expanded_count))
+            rows += cur.rowcount
+        return rows
+
+
+def _insert_contents(
+    conn,
+    trace_id: str,
+    contents: List[Dict[str, Any]],
+) -> int:
+    """
+    将 contents 列表写入 contents 表。
+
+    约定表结构示例:
+    CREATE TABLE demand_find_content_result (
+      id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
+      trace_id VARCHAR(64) NOT NULL,
+      rank INT NOT NULL,
+      content_link VARCHAR(512) NOT NULL,
+      title TEXT NOT NULL,
+      author_name VARCHAR(255) NOT NULL,
+      author_link VARCHAR(512) NOT NULL,
+      digg_count BIGINT DEFAULT 0,
+      comment_count BIGINT DEFAULT 0,
+      share_count BIGINT DEFAULT 0,
+      portrait_source VARCHAR(255),
+      elderly_ratio VARCHAR(255),
+      elderly_tgi VARCHAR(255),
+      recommendation_reason TEXT,
+      PRIMARY KEY (id),
+      KEY idx_demand_find_content_trace (trace_id),
+      KEY idx_demand_find_content_author (author_link)
+    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
+    """
+    if not contents:
+        return 0
+
+    sql = """
+    INSERT INTO demand_find_content_result (
+      trace_id, rank, content_link, title, author_name, author_link,
+      digg_count, comment_count, share_count,
+      portrait_source, elderly_ratio, elderly_tgi, recommendation_reason
+    ) VALUES (
+      %s, %s, %s, %s, %s, %s,
+      %s, %s, %s,
+      %s, %s, %s, %s
+    )
+    """
+    with conn.cursor() as cur:
+        rows = 0
+        for item in contents:
+            cur.execute(
+                sql,
+                (
+                    trace_id,
+                    int(item.get("rank") or 0),
+                    item.get("content_link") or "",
+                    item.get("title") or "",
+                    item.get("author_name") or "",
+                    item.get("author_link") or "",
+                    int(item.get("heat_metrics", {}).get("digg_count") or 0),
+                    int(item.get("heat_metrics", {}).get("comment_count") or 0),
+                    int(item.get("heat_metrics", {}).get("share_count") or 0),
+                    item.get("portrait_source") or "",
+                    str(item.get("elderly_ratio") or ""),
+                    str(item.get("elderly_tgi") or ""),
+                    item.get("recommendation_reason") or "",
+                ),
+            )
+            rows += cur.rowcount
+        return rows
+
+
+@tool(description="将推荐结果写入 MySQL(good_authors + contents)")
+async def store_results_mysql(trace_id: str) -> ToolResult:
+    """
+    根据 trace_id 读取对应的 recommendations.json,并写入 MySQL 的两个表:
+    - demand_find_author:优质账号信息
+    - demand_find_content_result:推荐内容列表
+    """
+    try:
+        data = _load_recommendations(trace_id)
+    except Exception as e:
+        msg = f"加载 recommendations.json 失败: {e}"
+        logger.error(msg)
+        return ToolResult(output=msg, metadata={"ok": False, "error": str(e)})
+
+    conn = None
+    try:
+        conn = _get_connection()
+        good_block = data.get("good_account_expansion") or data.get("good_accounts")
+        contents = data.get("contents") or []
+
+        authors_rows = _upsert_good_authors(conn, trace_id, good_block)
+        contents_rows = _insert_contents(conn, trace_id, contents)
+
+        output = (
+            f"MySQL 写入完成:demand_find_author 影响行数={authors_rows}, "
+            f"demand_find_content_result 插入条数={contents_rows}"
+        )
+        logger.info(output)
+        return ToolResult(
+            output=output,
+            metadata={
+                "ok": True,
+                "trace_id": trace_id,
+                "good_authors_affected": authors_rows,
+                "contents_inserted": contents_rows,
+            },
+        )
+    except Exception as e:
+        msg = f"写入 MySQL 失败: {e}"
+        logger.error(msg, exc_info=True)
+        return ToolResult(output=msg, metadata={"ok": False, "error": str(e)})
+    finally:
+        if conn is not None:
+            conn.close()
+