Jelajahi Sumber

feat: 生成计划扩充

jihuaqiang 1 hari lalu
induk
melakukan
5f721d932f

+ 0 - 1
examples/content_finder/.env.example

@@ -29,7 +29,6 @@ SCHEDULE_QUERY_API_TIMEOUT=10.0
 MAX_CONCURRENT_TASKS=3
 
 # AIGC 平台配置
-AIGC_DEMAND_DOUYIN_CONTENT_PRODUCE_PLAN_ID=your-produce-plan-id
 
 # MySQL(store_results_mysql、定时任务、AIGC 写库等)
 DB_HOST=your-mysql-host

+ 0 - 1
examples/content_finder/README.md

@@ -91,7 +91,6 @@ python examples/content_finder/server.py
 | `SCHEDULE_QUERY_API` | 空 | 定时任务外部 API 地址(留空则不启动定时任务) |
 | `SCHEDULE_QUERY_API_KEY` | 空 | 定时任务外部 API 认证 Key |
 | `SCHEDULE_QUERY_API_TIMEOUT` | `10.0` | 定时任务外部 API 超时(秒) |
-| `AIGC_DEMAND_DOUYIN_CONTENT_PRODUCE_PLAN_ID` | `` | 需要将内容绑定的AIGC平台生成计划id |
 | `DB_HOST` | 必填(写库/定时任务时) | MySQL 主机 |
 | `DB_PORT` | `3306` | MySQL 端口 |
 | `DB_USER` | 必填 | MySQL 用户名 |

+ 3 - 3
examples/content_finder/core.py

@@ -96,9 +96,9 @@ from tools import (
 logger = logging.getLogger(__name__)
 
 # 默认搜索词
-DEFAULT_QUERY = "分享"
-DEFAULT_SUGGESTION = "用户希望分享贪污腐败相关的信息、案例、观点给他人"
-DEFAULT_DEMAND_ID = 1
+DEFAULT_QUERY = "财神,祝福语"
+DEFAULT_SUGGESTION = ""
+DEFAULT_DEMAND_ID = 19443
 
 
 def extract_assistant_text(message: Message) -> str:

+ 2 - 0
examples/content_finder/db/__init__.py

@@ -20,6 +20,7 @@ from .schedule import (
 )
 from .store_results import (
     fetch_demand_content_dt,
+    fetch_demand_content_merge_leve2,
     upsert_good_authors,
     insert_contents,
     update_content_plan_ids,
@@ -39,6 +40,7 @@ __all__ = [
     "update_task_status",
     "update_task_on_complete",
     "fetch_demand_content_dt",
+    "fetch_demand_content_merge_leve2",
     "upsert_good_authors",
     "insert_contents",
     "update_content_plan_ids",

+ 20 - 0
examples/content_finder/db/store_results.py

@@ -130,6 +130,26 @@ def fetch_demand_content_dt(conn, demand_content_id: int) -> Optional[Any]:
     return row.get("dt")
 
 
+def fetch_demand_content_merge_leve2(conn, demand_content_id: int) -> Optional[str]:
+    """
+    按 demand_content.id 查询 merge_leve2(品类/二级类目)。
+
+    Returns:
+        merge_leve2 字符串;未查到或为空则返回 None。
+    """
+    sql = "SELECT merge_leve2 FROM demand_content WHERE id = %s LIMIT 1"
+    with conn.cursor() as cur:
+        cur.execute(sql, (demand_content_id,))
+        row = cur.fetchone()
+    if not row:
+        return None
+    val = row.get("merge_leve2")
+    if val is None:
+        return None
+    s = str(val).strip()
+    return s or None
+
+
 def insert_contents(
     conn,
     trace_id: str,

+ 85 - 17
examples/content_finder/tools/aigc_platform_api.py

@@ -7,22 +7,42 @@ import logging
 import os
 from datetime import datetime
 from pathlib import Path
-from typing import List, Dict, Union, Tuple, Any
+from typing import List, Dict, Union, Tuple, Any, Optional
 
 import requests
 from zoneinfo import ZoneInfo
 
 from agent import ToolResult, tool
-from db import update_content_plan_ids
+from db import get_connection, fetch_demand_content_merge_leve2, update_content_plan_ids
 from utils.tool_logging import format_tool_result_for_log, log_tool_call
 
 logger = logging.getLogger(__name__)
 
+
+AIGC_PLAN_ID_MAP = {
+"健康知识": {"生成ID": "20260408092313211598604", "发布ID": "20260408115944193153417"},
+"历史名人": {"生成ID": "20260408083251311809309", "发布ID": "20260408115139124511126"},
+"知识科普": {"生成ID": "20260408083824905654920", "发布ID": "20260408115231567509261"},
+"搞笑段子": {"生成ID": "20260408091536533918237", "发布ID": "20260408115842127748387"},
+"社会风气": {"生成ID": "20260408084318884115213", "发布ID": "20260408115315950129776"},
+"人生忠告": {"生成ID": "20260408085205791658566", "发布ID": "20260408115405410408001"},
+"国际时政": {"生成ID": "20260408090208237400605", "发布ID": "20260408115616925523989"},
+"生活技巧科普": {"生成ID": "20260408083824905654920", "发布ID": "20260408115231567509261"},
+"贪污腐败": {"生成ID": "20260408090309503416878", "发布ID": "20260408115653908856043"},
+"民生政策": {"生成ID": "20260408090721867506475", "发布ID": "20260408115727030928177"},
+"对口型表演": {"生成ID": "20260408092122328523262", "发布ID": "20260408115914659162376"},
+"中国战争史": {"生成ID": "20260408090950446586451", "发布ID": "20260408115804931772327"},
+"人财诈骗": {"生成ID": "20260408093140652233649", "发布ID": "20260408120019784463902"},
+"当代正能量人物": {"生成ID": "20260408083148399635274", "发布ID": "20260408115046382803287"},
+"国家科技力量": {"生成ID": "20260408085807674913378", "发布ID": "20260408115542550181196"},
+"国家力量": {"生成ID": "20260408085807674913378", "发布ID": "20260408115542550181196"},
+"通用": {"生成ID": "20260408085649635441036", "发布ID": "20260408115439581604474"},
+}
+
+
 _LABEL_ACCOUNT = "工具调用:create_crawler_plan_by_douyin_account_id -> 按抖音账号创建爬取计划"
 _LABEL_CONTENT = "工具调用:create_crawler_plan_by_douyin_content_id -> 按抖音视频创建爬取计划"
 
-AIGC_DEMAND_DOUYIN_CONTENT_PUBLISH_PLAN_ID=20260320065232171836746
-
 SHANGHAI_TZ = ZoneInfo("Asia/Shanghai")
 
 
@@ -79,13 +99,31 @@ def _extract_content_ids(data: Dict[str, Any]) -> List[str]:
     return content_ids
 
 
-def _get_produce_plan_ids_from_env() -> List[str]:
-    """Read AIGC_DEMAND_DOUYIN_CONTENT_PRODUCE_PLAN_ID from env."""
-    raw = os.getenv("AIGC_DEMAND_DOUYIN_CONTENT_PRODUCE_PLAN_ID", "").strip()
-    if not raw:
-        return []
-    # 接口需要 List[str],因此把 env 字段(字符串)包装成 list。
-    return [raw]
+def _extract_content_demand_id(data: Dict[str, Any]) -> Optional[int]:
+    """
+    Extract content_demand_id (demand_content.id) from output json.
+
+    Compatible keys:
+    - content_demand_id
+    - demand_content_id
+    - demand_id (legacy)
+    """
+    if not isinstance(data, dict):
+        return None
+    raw = (
+        data.get("content_demand_id")
+        if data.get("content_demand_id") is not None
+        else data.get("demand_content_id")
+        if data.get("demand_content_id") is not None
+        else data.get("demand_id")
+    )
+    if raw is None:
+        return None
+    try:
+        v = int(raw)
+    except Exception:
+        return None
+    return v if v > 0 else None
 
 
 @tool(description="根据抖音账号ID创建爬取计划")
@@ -292,7 +330,7 @@ async def create_crawler_plan_by_douyin_content_id(
             _LABEL_CONTENT,
             call_params,
             ToolResult(
-                title="根据抖音内容创建爬取计划",
+                title="根据抖音内容创建爬取计划-本地环境跳过此步骤",
                 output="",
                 metadata={
                     "result": {
@@ -329,6 +367,7 @@ async def create_crawler_plan_by_douyin_content_id(
     try:
         data = _load_output_json(trace_id=trace_id, output_dir=output_dir)
         content_ids = _extract_content_ids(data)
+        content_demand_id = _extract_content_demand_id(data)
     except Exception as e:
         msg = f"加载/解析 output.json 失败: {e}"
         logger.error(msg, exc_info=True)
@@ -343,6 +382,8 @@ async def create_crawler_plan_by_douyin_content_id(
         )
 
     call_params["content_ids_count"] = len(content_ids)
+    if content_demand_id is not None:
+        call_params["content_demand_id"] = content_demand_id
     if not content_ids:
         return _log_aigc_return(
             _LABEL_CONTENT,
@@ -368,7 +409,28 @@ async def create_crawler_plan_by_douyin_content_id(
             ),
         )
 
-    produce_plan_ids = _get_produce_plan_ids_from_env()
+    merge_leve2 = ""
+    if content_demand_id is not None:
+        try:
+            conn = get_connection()
+            try:
+                merge_leve2 = fetch_demand_content_merge_leve2(conn, content_demand_id) or ""
+            finally:
+                conn.close()
+        except Exception as e:
+            logger.error(
+                "fetch demand_content.merge_leve2 failed. demand_content_id=%s err=%s",
+                content_demand_id,
+                str(e),
+                exc_info=True,
+            )
+            merge_leve2 = ""
+
+    plan_key = merge_leve2.strip() if merge_leve2.strip() in AIGC_PLAN_ID_MAP else "通用"
+    plan_ids = AIGC_PLAN_ID_MAP.get(plan_key) or AIGC_PLAN_ID_MAP.get("通用") or {}
+    produce_plan_id_selected = str(plan_ids.get("生成ID") or "").strip()
+    publish_plan_id_selected = str(plan_ids.get("发布ID") or "").strip()
+    produce_plan_ids = [produce_plan_id_selected] if produce_plan_id_selected else []
     dt = datetime.now(SHANGHAI_TZ).strftime("%Y%m%d%H%M%S")
     crawler_plan_name = f"【内容寻找Agent自动创建】抖音视频直接抓取-{dt}-抖音"
     params = {
@@ -390,6 +452,13 @@ async def create_crawler_plan_by_douyin_content_id(
 
     try:
         summary_lines = [f"抖音视频爬取计划"]
+        if merge_leve2.strip():
+            summary_lines.append(f"需求品类(merge_leve2): {merge_leve2.strip()}")
+        summary_lines.append(f"计划匹配key: {plan_key}")
+        if produce_plan_id_selected:
+            summary_lines.append(f"生成计划ID(按品类匹配): {produce_plan_id_selected}")
+        if publish_plan_id_selected:
+            summary_lines.append(f"发布计划ID(按品类匹配): {publish_plan_id_selected}")
 
         response_json = post(CRAWLER_PLAN_CREATE_URL, params)
         if response_json.get("code") != 0:
@@ -409,7 +478,7 @@ async def create_crawler_plan_by_douyin_content_id(
         summary_lines.append(f"    爬取计划ID: {crawler_plan_id}")
         produce_plan_infos: List[Dict[str, str]] = []
         db_updated_rows = 0
-        # 环境里的生成计划 ID(字符串);与是否执行绑定接口无关,用于写库
+        # 选中的生成计划 ID(字符串);与是否执行绑定接口无关,用于写库
         env_produce_plan_id = (produce_plan_ids[0] if produce_plan_ids else "").strip()
 
         if produce_plan_ids:
@@ -430,16 +499,15 @@ async def create_crawler_plan_by_douyin_content_id(
                     summary_lines.append(f"            绑定结果: {'绑定成功' if not produce_plan_info.get('msg') else '绑定失败'}")
                     summary_lines.append(f"            信息: {produce_plan_info.get('msg', '成功')}")
 
-        publish_plan_id_str = str(AIGC_DEMAND_DOUYIN_CONTENT_PUBLISH_PLAN_ID).strip()
         # 爬取 / 生成 / 发布计划 id 任一存在则写库(不依赖是否已配置 produce_plan_ids 去走绑定)
-        if (crawler_plan_id or "").strip() or env_produce_plan_id or publish_plan_id_str:
+        if (crawler_plan_id or "").strip() or env_produce_plan_id or publish_plan_id_selected:
             try:
                 db_updated_rows = update_content_plan_ids(
                     trace_id=trace_id,
                     aweme_ids=content_ids,
                     crawler_plan_id=crawler_plan_id or "",
                     produce_plan_id=env_produce_plan_id,
-                    publish_plan_id=publish_plan_id_str,
+                    publish_plan_id=publish_plan_id_selected,
                 )
             except Exception as e:
                 logger.error(f"update content plan ids failed: {e}", exc_info=True)

+ 233 - 0
examples/content_finder/utils/aigc_plan.py

@@ -0,0 +1,233 @@
+"""
+AIGC plan helpers.
+
+This module contains small, side-effect-free helpers for building requests and
+validating inputs related to AIGC plans.
+"""
+
+from __future__ import annotations
+
+import json
+import logging
+import os
+import sys
+from typing import Any, Dict, Optional, Tuple
+
+import requests
+
+logger = logging.getLogger(__name__)
+
+
+_AIGC_BASE_URL = "https://aigc-api.aiddit.com"
+_GET_PRODUCE_PLAN_DETAIL_BY_ID_URL = f"{_AIGC_BASE_URL}/aigc/produce/plan/detail"
+_PRODUCE_PLAN_SAVE_URL = f"{_AIGC_BASE_URL}/aigc/produce/plan/save"
+_DEFAULT_TIMEOUT_SECONDS = 60.0
+# Keep consistent with `tools/aigc_platform_api.py` to reduce config friction.
+_DEFAULT_TOKEN = "8bf14f27fc3a486788f3383452422d72"
+
+
+def _get_aigc_token() -> str:
+    token = (os.getenv("AIGC_TOKEN") or "").strip()
+    return token or _DEFAULT_TOKEN
+
+
+def _post_aigc(url: str, params: Any, *, timeout_seconds: float) -> Dict[str, Any]:
+    payload = {"baseInfo": {"token": _get_aigc_token()}, "params": params}
+    try:
+        resp = requests.post(
+            url=url,
+            json=payload,
+            headers={"Content-Type": "application/json"},
+            timeout=timeout_seconds,
+        )
+        resp.raise_for_status()
+        data = resp.json()
+        if isinstance(data, dict):
+            return data
+        logger.error("AIGC response json is not dict. url=%s", url)
+        return {}
+    except Exception as e:
+        logger.error(
+            "Invoke AIGC platform failed. url=%s payload=%s err=%s",
+            url,
+            json.dumps(payload, ensure_ascii=False),
+            str(e),
+        )
+        return {}
+
+
+def build_produce_plan_detail_query(produce_plan_id: str) -> Dict[str, str]:
+    """
+    Build query params for "produce plan detail by id".
+
+    This follows the AIGC platform API contract used in
+    `tools/aigc_platform_api.py` (see `find_produce_plan_info_by_id`), where the
+    request params payload is `{"id": <produce_plan_id>}`.
+
+    Args:
+        produce_plan_id: AIGC produce plan id (non-empty string).
+
+    Returns:
+        A dict payload to be used as "params" in the API request.
+
+    Raises:
+        ValueError: If `produce_plan_id` is empty or not a string.
+    """
+    if not isinstance(produce_plan_id, str):
+        raise ValueError(f"produce_plan_id must be a string, got: {type(produce_plan_id)!r}")
+
+    plan_id = produce_plan_id.strip()
+    if not plan_id:
+        raise ValueError("produce_plan_id must be a non-empty string")
+
+    return {"id": plan_id}
+
+
+def query_produce_plan_detail_by_id(produce_plan_id: str) -> Tuple[Optional[Dict[str, Any]], str]:
+    """
+    Query produce plan detail by id from AIGC platform.
+
+    This performs the same API call as `tools/aigc_platform_api.py::find_produce_plan_info_by_id`,
+    but is implemented as a standalone utility for reuse.
+
+    Args:
+        produce_plan_id: AIGC produce plan id (non-empty string).
+
+    Returns:
+        (data, msg)
+        - data: response["data"] dict when success; otherwise None
+        - msg: empty string on success; otherwise an error message
+    """
+    try:
+        params = build_produce_plan_detail_query(produce_plan_id)
+    except ValueError as e:
+        return None, str(e)
+
+    response_json = _post_aigc(
+        _GET_PRODUCE_PLAN_DETAIL_BY_ID_URL,
+        params,
+        timeout_seconds=_DEFAULT_TIMEOUT_SECONDS,
+    )
+    if not response_json:
+        return None, "AIGC接口调用失败:空响应"
+
+    if response_json.get("code") != 0:
+        return None, str(response_json.get("msg") or "获取生成计划详情异常")
+
+    data = response_json.get("data") or {}
+    if not isinstance(data, dict) or not data:
+        return None, str(response_json.get("msg") or "获取生成计划详情异常")
+
+    return data, ""
+
+
+def shrink_video_group_input_sources(plan_detail: Dict[str, Any]) -> int:
+    """
+    Mutate plan detail in-place:
+    For each item in inputSourceGroups where groupName == "视频",
+    keep only the first element of inputSources.
+
+    Returns:
+        Number of groups modified.
+    """
+    groups = plan_detail.get("inputSourceGroups")
+    if not isinstance(groups, list) or not groups:
+        return 0
+
+    modified = 0
+    for group in groups:
+        if not isinstance(group, dict):
+            continue
+        if group.get("groupName") != "视频":
+            continue
+        sources = group.get("inputSources")
+        if not isinstance(sources, list):
+            group["inputSources"] = []
+            modified += 1
+            continue
+        if len(sources) <= 1:
+            continue
+        group["inputSources"] = [sources[0]]
+        modified += 1
+
+    return modified
+
+
+def save_produce_plan(plan_detail: Dict[str, Any]) -> Tuple[Optional[Dict[str, Any]], str]:
+    """
+    Save (update) a produce plan detail back to AIGC platform.
+
+    This mirrors `tools/aigc_platform_api.py` behavior which calls
+    `/aigc/produce/plan/save` with the full plan detail object.
+
+    Args:
+        plan_detail: The (possibly modified) plan detail dict returned by detail API.
+
+    Returns:
+        (data, msg)
+        - data: response["data"] dict when success; otherwise None
+        - msg: empty string on success; otherwise an error message
+    """
+    if not isinstance(plan_detail, dict) or not plan_detail:
+        return None, "plan_detail 参数无效:必须是非空 dict"
+
+    response_json = _post_aigc(
+        _PRODUCE_PLAN_SAVE_URL,
+        plan_detail,
+        timeout_seconds=_DEFAULT_TIMEOUT_SECONDS,
+    )
+    if not response_json:
+        return None, "AIGC接口调用失败:空响应"
+
+    if response_json.get("code") != 0:
+        return None, str(response_json.get("msg") or "保存生成计划异常")
+
+    data = response_json.get("data") or {}
+    if not isinstance(data, dict) or not data:
+        return None, str(response_json.get("msg") or "保存生成计划异常")
+
+    return data, ""
+
+
+def main(argv: list[str]) -> int:
+    """
+    CLI for quick manual testing.
+
+    Usage:
+        python3 examples/content_finder/utils/aigc_plan.py <produce_plan_id>
+
+    Env:
+        - AIGC_TOKEN: optional; overrides default token
+        - PRODUCE_PLAN_ID: optional fallback when arg is not provided
+    """
+    produce_plan_id = (argv[1] if len(argv) > 1 else "").strip() or (os.getenv("PRODUCE_PLAN_ID") or "").strip()
+    if not produce_plan_id:
+        print(
+            "Missing produce_plan_id.\n"
+            "Usage: python3 examples/content_finder/utils/aigc_plan.py <produce_plan_id>\n"
+            "Or set env PRODUCE_PLAN_ID.",
+            file=sys.stderr,
+        )
+        return 2
+
+    data, msg = query_produce_plan_detail_by_id(produce_plan_id)
+    if msg:
+        print(f"Error: {msg}", file=sys.stderr)
+        return 1
+
+    modified_groups = shrink_video_group_input_sources(data)
+    if modified_groups:
+        saved, save_msg = save_produce_plan(data)
+        if save_msg:
+            print(f"Error: 保存生成计划失败: {save_msg}", file=sys.stderr)
+            return 1
+        print(f"Modified groups: {modified_groups}", file=sys.stderr)
+        print(json.dumps(saved, ensure_ascii=False, indent=2))
+        return 0
+
+    print(json.dumps(data, ensure_ascii=False, indent=2))
+    return 0
+
+
+if __name__ == "__main__":
+    raise SystemExit(main(sys.argv))