Просмотр исходного кода

feat:添加根据抖音账号ID爬取计划的逻辑

zhaohaipeng 1 день назад
Родитель
Сommit
42350e06c3
1 измененных файлов с 265 добавлено и 47 удалено
  1. 265 47
      examples/content_finder/tools/aigc_platform_api.py

+ 265 - 47
examples/content_finder/tools/aigc_platform_api.py

@@ -2,33 +2,53 @@
 AIGC接口调用
 调用AIGC接口创建爬取计划,绑定生成计划
 """
+import json
 import logging
+from datetime import datetime
+from typing import List, Dict, Union, Tuple, Any
 
 import requests
 
-from agent import ToolResult
+from agent import ToolResult, tool
 
 logger = logging.getLogger(__name__)
 
 AIGC_BASE_URL = "https://aigc-api.aiddit.com"
 CRAWLER_PLAN_CREATE_URL = f"{AIGC_BASE_URL}/aigc/crawler/plan/save"
+GET_PRODUCE_PLAN_DETAIL_BY_ID = f"{AIGC_BASE_URL}/aigc/produce/plan/detail"
+PRODUCE_PLAN_SAVE = f"{AIGC_BASE_URL}/aigc/produce/plan/save"
 DEFAULT_TOKEN = "8bf14f27fc3a486788f3383452422d72"
 DEFAULT_TIMEOUT = 60.0
 
 
+@tool(description="根据抖音账号ID创建爬取计划")
 async def create_crawler_plan_by_douyin_account_id(
         account_id: str,
-        sort_type: str = "最新"
+        sort_type: str = "最新",
+        produce_plan_ids: List[str] = []
 ) -> ToolResult:
     """
      根据抖音账号ID创建爬取计划
      Args:
          account_id: 抖音账号ID
          sort_type: 搜索时的视频排序方式(最新/最热),默认最新
+         produce_plan_ids: 爬取计划要绑定的生成计划ID,默认为空列表
 
      Returns:
          ToolResult: 包含以下内容
-             - output: 创建的爬取计划ID
+             - output: 文本格式的爬取计划创建结果摘要
+             - metadata.result: 结构化的爬取计划创建结果
+                - crawler_info: 爬取计划信息
+                    - crawler_plan_id: 创建的爬取计划ID
+                    - crawler_plan_name: 创建的爬取计划名称
+                    - sort_type: 排序方式
+                - produce_plan_infos: 绑定的生成计划信息
+                    - produce_plan_id: 生成计划ID
+                    - produce_plan_name: 生成计划名称
+                    - is_success: 是否成功, true表示绑定成功,false表示绑定失败
+                    - msg: 绑定失败时为错误信息,绑定成功则为“成功”
+     Note:
+         - 建议从 metadata.result 获取结构化数据,而非解析 output 文本
     """
 
     # 验证 account_id 格式
@@ -48,53 +68,41 @@ async def create_crawler_plan_by_douyin_account_id(
             error=f"account_id 格式错误:必须以 MS4wLjABAAAA 开头,当前值: {account_id[:min(20, len(account_id))]}...",
         )
 
-    if len(account_id) < 70 or len(account_id) > 90:
-        logger.error("create_crawler_plan_by_douyin_account_id invalid account_id length", extra={"account_id": account_id, "length": len(account_id)})
-        return ToolResult(
-            title="根据抖音账号ID创建爬取计划失败",
-            output="",
-            error=f"account_id 长度异常:期望 70-90 字符,实际 {len(account_id)} 字符。这可能是编造或截断的数据。",
-        )
+    if produce_plan_ids is None:
+        produce_plan_ids = []
 
+    dt = datetime.now().strftime("%Y%m%d%h%M%s")
+    crawler_plan_name = f"【内容寻找Agent自动创建】{dt}_抖音账号ID爬取计划_{account_id[:min(30, len(account_id))]}"
     params = {
-        "baseInfo": {
-            "token": DEFAULT_TOKEN,
-            "userName": ""
+        "accountFilters": [],
+        "channel": 2,
+        "contentFilters": [],
+        "contentModal": 4,
+        "crawlerComment": 0,
+        "crawlerMode": 4,
+        "filterAccountMatchMode": 2,
+        "filterContentMatchMode": 2,
+        "frequencyType": 1,
+        "inputModeValues": [
+            account_id
+        ],
+        "modelValueConfig": {
+            "sortType": sort_type
         },
-        "params": {
-            "accountFilters": [],
-            "channel": 2,
-            "contentFilters": [],
-            "contentModal": 4,
-            "crawlerComment": 0,
-            "crawlerMode": 4,
-            "filterAccountMatchMode": 2,
-            "filterContentMatchMode": 2,
-            "frequencyType": 1,
-            "inputModeValues": [
-                account_id
-            ],
-            "modelValueConfig": {
-                "sortType": sort_type
-            },
-            "name": f"【Agent自动创建】抖音账号ID爬取计划_{account_id[:min(30, len(account_id))]}",
-            "planType": 2,
-            "searchModeValues": [],
-            "selectModeValues": [],
-            "srtExtractFlag": 1,
-            "videoKeyFrameType": 1,
-            "voiceExtractFlag": 1
-        }
+        "name": crawler_plan_name,
+        "planType": 2,
+        "searchModeValues": [],
+        "selectModeValues": [],
+        "srtExtractFlag": 1,
+        "videoKeyFrameType": 1,
+        "voiceExtractFlag": 1
     }
+
     try:
-        response = requests.post(
-            CRAWLER_PLAN_CREATE_URL,
-            json=params,
-            headers={"Content-Type": "application/json"},
-            timeout=DEFAULT_TIMEOUT
-        )
-        response.raise_for_status()
-        response_json = response.json()
+
+        summary_lines = [f"抖音账号【{account_id}】创建爬取计划"]
+
+        response_json = post(CRAWLER_PLAN_CREATE_URL, params)
         if response_json.get("code") != 0:
             return ToolResult(
                 title="根据抖音账号ID创建爬取计划失败",
@@ -103,15 +111,225 @@ async def create_crawler_plan_by_douyin_account_id(
             )
 
         crawler_plan_id = response_json.get("data", {}).get("id", "")
+        summary_lines.append(f"爬取计划名称: {crawler_plan_name}")
+        summary_lines.append(f"    抖音账号ID: {account_id}")
+        summary_lines.append(f"    爬取计划ID: {crawler_plan_id}")
+        summary_lines.append(f"    爬取计划排序方式: {sort_type}")
+        produce_plan_infos: List[Dict[str, str]] = []
+        if produce_plan_ids:
+            input_source_info = {
+                "contentType": 1,
+                "inputSourceType": 2,
+                "inputSourceValue": crawler_plan_id,
+                "inputSourceLabel": f"原始帖子-视频-抖音-内容添加计划-{crawler_plan_name}",
+                "inputSourceModal": 4,
+                "inputSourceChannel": 2
+            }
+            produce_plan_infos, msg = crawler_plan_bind_produce_plan(input_source_info, produce_plan_ids)
+            if produce_plan_infos:
+                for produce_plan_info in produce_plan_infos:
+                    summary_lines.append("    绑定的生成计划列表: ")
+                    summary_lines.append(f"        生成计划名称: {produce_plan_info.get('produce_plan_name', '')}")
+                    summary_lines.append(f"            生成计划ID: {produce_plan_info.get('produce_plan_id', '')}")
+                    summary_lines.append(f"            绑定结果: {"绑定成功" if not produce_plan_info.get("msg") else "绑定失败"}")
+                    summary_lines.append(f"            信息: {produce_plan_info.get('msg', '成功')}")
+
         return ToolResult(
             title="根据抖音账号ID创建爬取计划",
-            output=crawler_plan_id,
+            output="\n".join(summary_lines),
+            metadata={
+                "result": {
+                    "crawler_info": {
+                        "crawler_plan_id": crawler_plan_id,
+                        "crawler_plan_name": crawler_plan_name,
+                        "sort_type": sort_type,
+                    },
+                    "produce_plan_infos": [
+                        {
+                            "produce_plan_id": produce_plan_info.get("produce_plan_id", ""),
+                            "produce_plan_name": produce_plan_info.get("produce_plan_name", ""),
+                            "is_success": "绑定成功" if not produce_plan_info.get("msg") else "绑定失败",
+                            "msg": produce_plan_info.get("msg", "成功"),
+                        }
+                        for produce_plan_info in produce_plan_infos
+                    ]
+                }
+            },
             long_term_memory="Create crawler plan by DouYin Account ID",
         )
     except Exception as e:
-        logger.error(e, extra={"account_id": account_id})
+        logger.error(f"create douyin account crawler plan error: {str(e)}, account_id: {account_id} ")
         return ToolResult(
             title="根据抖音账号ID创建爬取计划失败",
             output="",
             error=f"创建爬取计划错误:{str(e)}",
         )
+
+
+@tool(description="根据抖音视频ID创建爬取计划")
+async def create_crawler_plan_by_douyin_content_id(
+        content_ids: List[str],
+        produce_plan_ids: List[str] = []
+) -> ToolResult:
+    """
+    根据抖音内容ID创建爬取计划
+    Args:
+        content_ids: 抖音内容ID列表
+        produce_plan_ids: 要绑定的生成计划列表
+    Returns:
+        ToolResult: 包含以下内容
+            - output: 创建的爬取计划ID
+    """
+    if not content_ids or not isinstance(content_ids, list):
+        logger.error("create_crawler_plan_by_douyin_content_id invalid content_ids", extra={"content_ids": content_ids})
+        return ToolResult(
+            title="根据抖音内容ID创建爬取计划失败",
+            output="",
+            error="content_ids 参数无效: content_ids必须是列表"
+        )
+    if len(content_ids) > 100:
+        logger.error("create_crawler_plan_by_douyin_content_id invalid content_ids length", extra={"content_ids": content_ids})
+        return ToolResult(
+            title="根据抖音内容ID创建爬取计划失败",
+            output="",
+            error=f"content_ids 长度异常: 期望1~100, 实际{len(content_ids)}"
+        )
+    dt = datetime.now().strftime("%Y%m%d%h%M%s")
+    params = {
+        "channel": 2,
+        "contentModal": 4,
+        "crawlerComment": 0,
+        "crawlerMode": 5,
+        "filterAccountMatchMode": 2,
+        "filterContentMatchMode": 2,
+        "frequencyType": 2,
+        "inputModeValues": content_ids,
+        "name": f"【内容寻找Agent自动创建】抖音视频直接抓取-{dt}-抖音",
+        "planType": 2,
+        "searchModeValues": [],
+        "srtExtractFlag": 1,
+        "videoKeyFrameType": 1,
+        "voiceExtractFlag": 1
+    }
+
+    try:
+        response_json = post(CRAWLER_PLAN_CREATE_URL, params)
+        if response_json.get("code") != 0:
+            return ToolResult(
+                title="根据抖音内容ID创建爬取计划失败",
+                output=response_json.get("msg", "接口异常"),
+                error=f"create crawler plan interface error",
+            )
+
+        crawler_plan_id = response_json.get("data", {}).get("id", "")
+        return ToolResult(
+            title="根据抖音内容ID创建爬取计划",
+            output=crawler_plan_id,
+            long_term_memory="Create crawler plan by DouYin Content IDs",
+        )
+    except Exception as e:
+        logger.error(e, extra={"content_ids": content_ids})
+        return ToolResult(
+            title="根据抖音内容ID创建爬取计划失败",
+            output="",
+            error=f"创建爬取计划错误:{str(e)}",
+        )
+
+
+def crawler_plan_bind_produce_plan(
+        input_source_info: Dict[str, Any],
+        produce_plan_ids: List[str],
+) -> Tuple[Union[List[Dict[str, str]], None], str]:
+    if not input_source_info or not produce_plan_ids:
+        return None, f"input_source_info or produce_plan_ids is invalid"
+    input_source_check_key = ["inputSourceModal", "inputSourceChannel", "contentType"]
+    try:
+        if not isinstance(produce_plan_ids, list):
+            return None, f"produce_plan_ids is not list"
+        result: List[Dict[str, str]] = []
+        for produce_plan_id in produce_plan_ids:
+            produce_plan_info = {
+                "produce_plan_id": produce_plan_id,
+            }
+            result.append(produce_plan_info)
+            # 获取生成计划详情,msg不为空表示获取失败
+            produce_plan_detail_info, msg = find_produce_plan_info_by_id(produce_plan_id)
+            if msg:
+                produce_plan_info["msg"] = msg
+                continue
+
+            produce_plan_info["produce_plan_name"] = produce_plan_detail_info.get("name", "")
+
+            input_source_groups = produce_plan_detail_info.get("inputSourceGroups", [])
+            if not input_source_groups:
+                produce_plan_info["msg"] = "生成计划没有输入源组"
+                continue
+            # 查询当前爬取计划要添加到的输入源组下标
+            input_source_index = 0
+            for i in range(len(input_source_groups)):
+                input_source_group = input_source_groups[i]
+                if not input_source_group.get("inputSources", []):
+                    continue
+                first_input_source = input_source_group.get("inputSources")[0]
+                if all(input_source_info.get(k, 0) == first_input_source.get(k, -1) for k in input_source_check_key):
+                    input_source_index = i
+                    break
+
+            # 对应的输入源组添加输入源
+            input_source_group = input_source_groups[input_source_index]
+            input_source_group.get("inputSources", []).append(input_source_info)
+
+            response_json = post(PRODUCE_PLAN_SAVE, produce_plan_detail_info)
+            if response_json.get("code") != 0 or not response_json.get("data", {}):
+                produce_plan_info["msg"] = response_json.get("msg", "爬取计划绑定生成计划异常")
+
+        return result, ""
+    except Exception as e:
+        logger.error(e, extra={"input_source_info": input_source_info})
+        return None, str(e)
+
+
+def find_produce_plan_info_by_id(
+        produce_plan_id: str,
+) -> Tuple[Union[Dict[str, str], None], str]:
+    try:
+        if not produce_plan_id or not isinstance(produce_plan_id, str):
+            return None, f"非法的produce_plan_id: {produce_plan_id}"
+
+        params = {
+            "id": produce_plan_id,
+        }
+        response_json = post(GET_PRODUCE_PLAN_DETAIL_BY_ID, params)
+
+        if response_json.get("code") != 0 or not response_json.get("data", {}):
+            return None, response_json.get("msg", "获取生成计划详情异常")
+
+        return response_json.get("data", {}), ""
+    except Exception as e:
+        logger.error(e, extra={"produce_plan_id": produce_plan_id})
+        return None, str(e)
+
+
+def post(url: str, params: Any) -> Dict[str, Any]:
+    request = {
+        "baseInfo": {
+            "token": DEFAULT_TOKEN,
+        },
+        "params": params
+    }
+    try:
+        logger.info(f"invoke aigc platform. url: {url}, request: {json.dumps(request)}")
+        response = requests.post(
+            url=url,
+            json=request,
+            headers={"Content-Type": "application/json"},
+            timeout=DEFAULT_TIMEOUT
+        )
+        response.raise_for_status()
+        response_json = response.json()
+
+        logger.info(f"invoke aigc platform. url: {url}, request: {json.dumps(request)}, response: {json.dumps(response_json)}")
+        return response_json
+    except Exception as e:
+        logger.error(f"invoke aigc platform error. url: {url}, request: {json.dumps(request)}, error: {str(e)}")
+    return {}