""" AIGC接口调用 调用AIGC接口创建爬取计划,绑定生成计划 """ import json import logging from datetime import datetime from typing import List, Dict, Union, Tuple, Any import requests from agent import ToolResult, tool logger = logging.getLogger(__name__) AIGC_BASE_URL = "https://aigc-api.aiddit.com" CRAWLER_PLAN_CREATE_URL = f"{AIGC_BASE_URL}/aigc/crawler/plan/save" GET_PRODUCE_PLAN_DETAIL_BY_ID = f"{AIGC_BASE_URL}/aigc/produce/plan/detail" PRODUCE_PLAN_SAVE = f"{AIGC_BASE_URL}/aigc/produce/plan/save" DEFAULT_TOKEN = "8bf14f27fc3a486788f3383452422d72" DEFAULT_TIMEOUT = 60.0 @tool(description="根据抖音账号ID创建爬取计划") async def create_crawler_plan_by_douyin_account_id( account_id: str, sort_type: str = "最新", produce_plan_ids: List[str] = [] ) -> ToolResult: """ 根据抖音账号ID创建爬取计划 Args: account_id: 抖音账号ID sort_type: 搜索时的视频排序方式(最新/最热),默认最新 produce_plan_ids: 爬取计划要绑定的生成计划ID,默认为空列表 Returns: ToolResult: 包含以下内容 - output: 文本格式的爬取计划创建结果摘要 - metadata.result: 结构化的爬取计划创建结果 - crawler_info: 爬取计划信息 - crawler_plan_id: 创建的爬取计划ID - crawler_plan_name: 创建的爬取计划名称 - sort_type: 排序方式 - produce_plan_infos: 绑定的生成计划信息 - produce_plan_id: 生成计划ID - produce_plan_name: 生成计划名称 - is_success: 是否成功, true表示绑定成功,false表示绑定失败 - msg: 绑定失败时为错误信息,绑定成功则为“成功” Note: - 建议从 metadata.result 获取结构化数据,而非解析 output 文本 """ # 验证 account_id 格式 if not account_id or not isinstance(account_id, str): logger.error(f"create_crawler_plan_by_douyin_account_id invalid account_id: {account_id}") return ToolResult( title="根据抖音账号ID创建爬取计划失败", output="", error="account_id 参数无效:必须是非空字符串", ) if not account_id.startswith("MS4wLjABAAAA"): logger.error(f"create_crawler_plan_by_douyin_account_id invalid sec_uid format account_id:{account_id}") return ToolResult( title="根据抖音账号ID创建爬取计划失败", output="", error=f"account_id 格式错误:必须以 MS4wLjABAAAA 开头,当前值: {account_id[:min(20, len(account_id))]}...", ) if produce_plan_ids is None: produce_plan_ids = [] dt = datetime.now().strftime("%Y%m%d%h%M%s") crawler_plan_name = f"【内容寻找Agent自动创建】{dt}_抖音账号ID爬取计划_{account_id[:min(30, len(account_id))]}" params = { "accountFilters": [], "channel": 2, "contentFilters": [], "contentModal": 4, "crawlerComment": 0, "crawlerMode": 4, "filterAccountMatchMode": 2, "filterContentMatchMode": 2, "frequencyType": 1, "inputModeValues": [ account_id ], "modelValueConfig": { "sortType": sort_type }, "name": crawler_plan_name, "planType": 2, "searchModeValues": [], "selectModeValues": [], "srtExtractFlag": 1, "videoKeyFrameType": 1, "voiceExtractFlag": 1 } try: summary_lines = [f"抖音账号【{account_id}】创建爬取计划"] response_json = post(CRAWLER_PLAN_CREATE_URL, params) if response_json.get("code") != 0: return ToolResult( title="根据抖音账号ID创建爬取计划失败", output=response_json.get("msg", "接口异常"), error=f"create crawler plan interface error", ) crawler_plan_id = response_json.get("data", {}).get("id", "") summary_lines.append(f"爬取计划名称: {crawler_plan_name}") summary_lines.append(f" 抖音账号ID: {account_id}") summary_lines.append(f" 爬取计划ID: {crawler_plan_id}") summary_lines.append(f" 爬取计划排序方式: {sort_type}") produce_plan_infos: List[Dict[str, str]] = [] if produce_plan_ids: input_source_info = { "contentType": 1, "inputSourceType": 2, "inputSourceValue": crawler_plan_id, "inputSourceLabel": f"原始帖子-视频-抖音-内容添加计划-{crawler_plan_name}", "inputSourceModal": 4, "inputSourceChannel": 2 } produce_plan_infos, msg = crawler_plan_bind_produce_plan(input_source_info, produce_plan_ids) if produce_plan_infos: for produce_plan_info in produce_plan_infos: summary_lines.append(" 绑定的生成计划列表: ") summary_lines.append(f" 生成计划名称: {produce_plan_info.get('produce_plan_name', '')}") summary_lines.append(f" 生成计划ID: {produce_plan_info.get('produce_plan_id', '')}") summary_lines.append(f" 绑定结果: {"绑定成功" if not produce_plan_info.get("msg") else "绑定失败"}") summary_lines.append(f" 信息: {produce_plan_info.get('msg', '成功')}") return ToolResult( title="根据抖音账号ID创建爬取计划", output="\n".join(summary_lines), metadata={ "result": { "crawler_info": { "crawler_plan_id": crawler_plan_id, "crawler_plan_name": crawler_plan_name, "sort_type": sort_type, }, "produce_plan_infos": [ { "produce_plan_id": produce_plan_info.get("produce_plan_id", ""), "produce_plan_name": produce_plan_info.get("produce_plan_name", ""), "is_success": "绑定成功" if not produce_plan_info.get("msg") else "绑定失败", "msg": produce_plan_info.get("msg", "成功"), } for produce_plan_info in produce_plan_infos ] } }, long_term_memory="Create crawler plan by DouYin Account ID", ) except Exception as e: logger.error(f"create douyin account crawler plan error: {str(e)}, account_id: {account_id} ") return ToolResult( title="根据抖音账号ID创建爬取计划失败", output="", error=f"创建爬取计划错误:{str(e)}", ) @tool(description="根据抖音视频ID创建爬取计划") async def create_crawler_plan_by_douyin_content_id( content_ids: List[str], produce_plan_ids: List[str] = [] ) -> ToolResult: """ 根据抖音视频ID创建爬取计划 Args: content_ids: 抖音内容ID列表 produce_plan_ids: 爬取计划要绑定的生成计划ID,默认为空列表 Returns: Returns: ToolResult: 包含以下内容 - output: 文本格式的爬取计划创建结果摘要 - metadata.result: 结构化的爬取计划创建结果 - crawler_info: 爬取计划信息 - crawler_plan_id: 创建的爬取计划ID - crawler_plan_name: 创建的爬取计划名称 - content_ids: 抖音视频ID列表 - produce_plan_infos: 绑定的生成计划信息 - produce_plan_id: 生成计划ID - produce_plan_name: 生成计划名称 - is_success: 是否成功, true表示绑定成功,false表示绑定失败 - msg: 绑定失败时为错误信息,绑定成功则为“成功” Note: - 建议从 metadata.result 获取结构化数据,而非解析 output 文本 """ if not content_ids or not isinstance(content_ids, list): logger.error(f"create_crawler_plan_by_douyin_content_id invalid content_ids. content_ids: {content_ids}") return ToolResult( title="根据抖音内容ID创建爬取计划失败", output="", error="content_ids 参数无效: content_ids必须是列表" ) if len(content_ids) > 100: logger.error(f"create_crawler_plan_by_douyin_content_id invalid content_ids length. content_ids.length: {len(content_ids)}") return ToolResult( title="根据抖音内容ID创建爬取计划失败", output="", error=f"content_ids 长度异常: 期望1~100, 实际{len(content_ids)}" ) dt = datetime.now().strftime("%Y%m%d%h%M%s") crawler_plan_name = f"【内容寻找Agent自动创建】抖音视频直接抓取-{dt}-抖音" params = { "channel": 2, "contentModal": 4, "crawlerComment": 0, "crawlerMode": 5, "filterAccountMatchMode": 2, "filterContentMatchMode": 2, "frequencyType": 2, "inputModeValues": content_ids, "name": crawler_plan_name, "planType": 2, "searchModeValues": [], "srtExtractFlag": 1, "videoKeyFrameType": 1, "voiceExtractFlag": 1 } try: summary_lines = [f"抖音视频爬取计划"] response_json = post(CRAWLER_PLAN_CREATE_URL, params) if response_json.get("code") != 0: return ToolResult( title="根据抖音内容ID创建爬取计划失败", output=response_json.get("msg", "接口异常"), error=f"create crawler plan interface error", ) crawler_plan_id = response_json.get("data", {}).get("id", "") summary_lines.append(f"爬取计划名称: {crawler_plan_name}") summary_lines.append(f" 抖音视频IDs: {','.join(content_ids)}") summary_lines.append(f" 爬取计划ID: {crawler_plan_id}") produce_plan_infos: List[Dict[str, str]] = [] if produce_plan_ids: input_source_info = { "contentType": 1, "inputSourceType": 2, "inputSourceValue": crawler_plan_id, "inputSourceLabel": f"原始帖子-视频-抖音-内容添加计划-{crawler_plan_name}", "inputSourceModal": 4, "inputSourceChannel": 2 } produce_plan_infos, msg = crawler_plan_bind_produce_plan(input_source_info, produce_plan_ids) if produce_plan_infos: for produce_plan_info in produce_plan_infos: summary_lines.append(" 绑定的生成计划列表: ") summary_lines.append(f" 生成计划名称: {produce_plan_info.get('produce_plan_name', '')}") summary_lines.append(f" 生成计划ID: {produce_plan_info.get('produce_plan_id', '')}") summary_lines.append(f" 绑定结果: {"绑定成功" if not produce_plan_info.get("msg") else "绑定失败"}") summary_lines.append(f" 信息: {produce_plan_info.get('msg', '成功')}") return ToolResult( title="根据抖音内容ID创建爬取计划", output="\n".join(summary_lines), metadata={ "result": { "crawler_info": { "crawler_plan_id": crawler_plan_id, "crawler_plan_name": crawler_plan_name, }, "produce_plan_infos": [ { "produce_plan_id": produce_plan_info.get("produce_plan_id", ""), "produce_plan_name": produce_plan_info.get("produce_plan_name", ""), "is_success": "绑定成功" if not produce_plan_info.get("msg") else "绑定失败", "msg": produce_plan_info.get("msg", "成功"), } for produce_plan_info in produce_plan_infos ] } }, long_term_memory="Create crawler plan by DouYin Content IDs", ) except Exception as e: logger.error(f"create douyin content crawler plan error. content_ids: {content_ids}, error: {str(e)}") return ToolResult( title="根据抖音内容ID创建爬取计划失败", output="", error=f"创建爬取计划错误:{str(e)}", ) def crawler_plan_bind_produce_plan( input_source_info: Dict[str, Any], produce_plan_ids: List[str], ) -> Tuple[Union[List[Dict[str, str]], None], str]: if not input_source_info or not produce_plan_ids: return None, f"input_source_info or produce_plan_ids is invalid" input_source_check_key = ["inputSourceModal", "inputSourceChannel", "contentType"] try: if not isinstance(produce_plan_ids, list): return None, f"produce_plan_ids is not list" result: List[Dict[str, str]] = [] for produce_plan_id in produce_plan_ids: produce_plan_info = { "produce_plan_id": produce_plan_id, } result.append(produce_plan_info) # 获取生成计划详情,msg不为空表示获取失败 produce_plan_detail_info, msg = find_produce_plan_info_by_id(produce_plan_id) if msg: produce_plan_info["msg"] = msg continue produce_plan_info["produce_plan_name"] = produce_plan_detail_info.get("name", "") input_source_groups = produce_plan_detail_info.get("inputSourceGroups", []) if not input_source_groups: produce_plan_info["msg"] = "生成计划没有输入源组" continue # 查询当前爬取计划要添加到的输入源组下标 input_source_index = 0 for i in range(len(input_source_groups)): input_source_group = input_source_groups[i] if not input_source_group.get("inputSources", []): continue first_input_source = input_source_group.get("inputSources")[0] if all(input_source_info.get(k, 0) == first_input_source.get(k, -1) for k in input_source_check_key): input_source_index = i break # 对应的输入源组添加输入源 input_source_group = input_source_groups[input_source_index] input_source_group.get("inputSources", []).append(input_source_info) response_json = post(PRODUCE_PLAN_SAVE, produce_plan_detail_info) if response_json.get("code") != 0 or not response_json.get("data", {}): produce_plan_info["msg"] = response_json.get("msg", "爬取计划绑定生成计划异常") return result, "" except Exception as e: logger.error(f"crawler_plan_bind_produce_plan error. input_source_info: {json.dumps(input_source_info)}, produce_plan_ids: {produce_plan_ids}, error: {str(e)},") return None, str(e) def find_produce_plan_info_by_id( produce_plan_id: str, ) -> Tuple[Union[Dict[str, str], None], str]: try: if not produce_plan_id or not isinstance(produce_plan_id, str): return None, f"非法的produce_plan_id: {produce_plan_id}" params = { "id": produce_plan_id, } response_json = post(GET_PRODUCE_PLAN_DETAIL_BY_ID, params) if response_json.get("code") != 0 or not response_json.get("data", {}): return None, response_json.get("msg", "获取生成计划详情异常") return response_json.get("data", {}), "" except Exception as e: logger.error(f"find_produce_plan_info_by_id error. produce_plan_id: {produce_plan_id}, error: {str(e)},") return None, str(e) def post(url: str, params: Any) -> Dict[str, Any]: request = { "baseInfo": { "token": DEFAULT_TOKEN, }, "params": params } try: logger.info(f"invoke aigc platform. url: {url}, request: {json.dumps(request)}") response = requests.post( url=url, json=request, headers={"Content-Type": "application/json"}, timeout=DEFAULT_TIMEOUT ) response.raise_for_status() response_json = response.json() logger.info(f"invoke aigc platform. url: {url}, request: {json.dumps(request)}, response: {json.dumps(response_json)}") return response_json except Exception as e: logger.error(f"invoke aigc platform error. url: {url}, request: {json.dumps(request)}, error: {str(e)}") return {}