| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390 |
- """
- AIGC接口调用
- 调用AIGC接口创建爬取计划,绑定生成计划
- """
- import json
- import logging
- from datetime import datetime
- from typing import List, Dict, Union, Tuple, Any
- import requests
- from agent import ToolResult, tool
- logger = logging.getLogger(__name__)
- AIGC_BASE_URL = "https://aigc-api.aiddit.com"
- CRAWLER_PLAN_CREATE_URL = f"{AIGC_BASE_URL}/aigc/crawler/plan/save"
- GET_PRODUCE_PLAN_DETAIL_BY_ID = f"{AIGC_BASE_URL}/aigc/produce/plan/detail"
- PRODUCE_PLAN_SAVE = f"{AIGC_BASE_URL}/aigc/produce/plan/save"
- DEFAULT_TOKEN = "8bf14f27fc3a486788f3383452422d72"
- DEFAULT_TIMEOUT = 60.0
- @tool(description="根据抖音账号ID创建爬取计划")
- async def create_crawler_plan_by_douyin_account_id(
- account_id: str,
- sort_type: str = "最新",
- produce_plan_ids: List[str] = []
- ) -> ToolResult:
- """
- 根据抖音账号ID创建爬取计划
- Args:
- account_id: 抖音账号ID
- sort_type: 搜索时的视频排序方式(最新/最热),默认最新
- produce_plan_ids: 爬取计划要绑定的生成计划ID,默认为空列表
- Returns:
- ToolResult: 包含以下内容
- - output: 文本格式的爬取计划创建结果摘要
- - metadata.result: 结构化的爬取计划创建结果
- - crawler_info: 爬取计划信息
- - crawler_plan_id: 创建的爬取计划ID
- - crawler_plan_name: 创建的爬取计划名称
- - sort_type: 排序方式
- - produce_plan_infos: 绑定的生成计划信息
- - produce_plan_id: 生成计划ID
- - produce_plan_name: 生成计划名称
- - is_success: 是否成功, true表示绑定成功,false表示绑定失败
- - msg: 绑定失败时为错误信息,绑定成功则为“成功”
- Note:
- - 建议从 metadata.result 获取结构化数据,而非解析 output 文本
- """
- # 验证 account_id 格式
- if not account_id or not isinstance(account_id, str):
- logger.error(f"create_crawler_plan_by_douyin_account_id invalid account_id: {account_id}")
- return ToolResult(
- title="根据抖音账号ID创建爬取计划失败",
- output="",
- error="account_id 参数无效:必须是非空字符串",
- )
- if not account_id.startswith("MS4wLjABAAAA"):
- logger.error(f"create_crawler_plan_by_douyin_account_id invalid sec_uid format account_id:{account_id}")
- return ToolResult(
- title="根据抖音账号ID创建爬取计划失败",
- output="",
- error=f"account_id 格式错误:必须以 MS4wLjABAAAA 开头,当前值: {account_id[:min(20, len(account_id))]}...",
- )
- if produce_plan_ids is None:
- produce_plan_ids = []
- dt = datetime.now().strftime("%Y%m%d%h%M%s")
- crawler_plan_name = f"【内容寻找Agent自动创建】{dt}_抖音账号ID爬取计划_{account_id[:min(30, len(account_id))]}"
- params = {
- "accountFilters": [],
- "channel": 2,
- "contentFilters": [],
- "contentModal": 4,
- "crawlerComment": 0,
- "crawlerMode": 4,
- "filterAccountMatchMode": 2,
- "filterContentMatchMode": 2,
- "frequencyType": 1,
- "inputModeValues": [
- account_id
- ],
- "modelValueConfig": {
- "sortType": sort_type
- },
- "name": crawler_plan_name,
- "planType": 2,
- "searchModeValues": [],
- "selectModeValues": [],
- "srtExtractFlag": 1,
- "videoKeyFrameType": 1,
- "voiceExtractFlag": 1
- }
- try:
- summary_lines = [f"抖音账号【{account_id}】创建爬取计划"]
- response_json = post(CRAWLER_PLAN_CREATE_URL, params)
- if response_json.get("code") != 0:
- return ToolResult(
- title="根据抖音账号ID创建爬取计划失败",
- output=response_json.get("msg", "接口异常"),
- error=f"create crawler plan interface error",
- )
- crawler_plan_id = response_json.get("data", {}).get("id", "")
- summary_lines.append(f"爬取计划名称: {crawler_plan_name}")
- summary_lines.append(f" 抖音账号ID: {account_id}")
- summary_lines.append(f" 爬取计划ID: {crawler_plan_id}")
- summary_lines.append(f" 爬取计划排序方式: {sort_type}")
- produce_plan_infos: List[Dict[str, str]] = []
- if produce_plan_ids:
- input_source_info = {
- "contentType": 1,
- "inputSourceType": 2,
- "inputSourceValue": crawler_plan_id,
- "inputSourceLabel": f"原始帖子-视频-抖音-内容添加计划-{crawler_plan_name}",
- "inputSourceModal": 4,
- "inputSourceChannel": 2
- }
- produce_plan_infos, msg = crawler_plan_bind_produce_plan(input_source_info, produce_plan_ids)
- if produce_plan_infos:
- for produce_plan_info in produce_plan_infos:
- summary_lines.append(" 绑定的生成计划列表: ")
- summary_lines.append(f" 生成计划名称: {produce_plan_info.get('produce_plan_name', '')}")
- summary_lines.append(f" 生成计划ID: {produce_plan_info.get('produce_plan_id', '')}")
- summary_lines.append(f" 绑定结果: {"绑定成功" if not produce_plan_info.get("msg") else "绑定失败"}")
- summary_lines.append(f" 信息: {produce_plan_info.get('msg', '成功')}")
- return ToolResult(
- title="根据抖音账号ID创建爬取计划",
- output="\n".join(summary_lines),
- metadata={
- "result": {
- "crawler_info": {
- "crawler_plan_id": crawler_plan_id,
- "crawler_plan_name": crawler_plan_name,
- "sort_type": sort_type,
- },
- "produce_plan_infos": [
- {
- "produce_plan_id": produce_plan_info.get("produce_plan_id", ""),
- "produce_plan_name": produce_plan_info.get("produce_plan_name", ""),
- "is_success": "绑定成功" if not produce_plan_info.get("msg") else "绑定失败",
- "msg": produce_plan_info.get("msg", "成功"),
- }
- for produce_plan_info in produce_plan_infos
- ]
- }
- },
- long_term_memory="Create crawler plan by DouYin Account ID",
- )
- except Exception as e:
- logger.error(f"create douyin account crawler plan error: {str(e)}, account_id: {account_id} ")
- return ToolResult(
- title="根据抖音账号ID创建爬取计划失败",
- output="",
- error=f"创建爬取计划错误:{str(e)}",
- )
- @tool(description="根据抖音视频ID创建爬取计划")
- async def create_crawler_plan_by_douyin_content_id(
- content_ids: List[str],
- produce_plan_ids: List[str] = []
- ) -> ToolResult:
- """
- 根据抖音视频ID创建爬取计划
- Args:
- content_ids: 抖音内容ID列表
- produce_plan_ids: 爬取计划要绑定的生成计划ID,默认为空列表
- Returns:
- Returns:
- ToolResult: 包含以下内容
- - output: 文本格式的爬取计划创建结果摘要
- - metadata.result: 结构化的爬取计划创建结果
- - crawler_info: 爬取计划信息
- - crawler_plan_id: 创建的爬取计划ID
- - crawler_plan_name: 创建的爬取计划名称
- - content_ids: 抖音视频ID列表
- - produce_plan_infos: 绑定的生成计划信息
- - produce_plan_id: 生成计划ID
- - produce_plan_name: 生成计划名称
- - is_success: 是否成功, true表示绑定成功,false表示绑定失败
- - msg: 绑定失败时为错误信息,绑定成功则为“成功”
- Note:
- - 建议从 metadata.result 获取结构化数据,而非解析 output 文本
- """
- if not content_ids or not isinstance(content_ids, list):
- logger.error(f"create_crawler_plan_by_douyin_content_id invalid content_ids. content_ids: {content_ids}")
- return ToolResult(
- title="根据抖音内容ID创建爬取计划失败",
- output="",
- error="content_ids 参数无效: content_ids必须是列表"
- )
- if len(content_ids) > 100:
- logger.error(f"create_crawler_plan_by_douyin_content_id invalid content_ids length. content_ids.length: {len(content_ids)}")
- return ToolResult(
- title="根据抖音内容ID创建爬取计划失败",
- output="",
- error=f"content_ids 长度异常: 期望1~100, 实际{len(content_ids)}"
- )
- dt = datetime.now().strftime("%Y%m%d%h%M%s")
- crawler_plan_name = f"【内容寻找Agent自动创建】抖音视频直接抓取-{dt}-抖音"
- params = {
- "channel": 2,
- "contentModal": 4,
- "crawlerComment": 0,
- "crawlerMode": 5,
- "filterAccountMatchMode": 2,
- "filterContentMatchMode": 2,
- "frequencyType": 2,
- "inputModeValues": content_ids,
- "name": crawler_plan_name,
- "planType": 2,
- "searchModeValues": [],
- "srtExtractFlag": 1,
- "videoKeyFrameType": 1,
- "voiceExtractFlag": 1
- }
- try:
- summary_lines = [f"抖音视频爬取计划"]
- response_json = post(CRAWLER_PLAN_CREATE_URL, params)
- if response_json.get("code") != 0:
- return ToolResult(
- title="根据抖音内容ID创建爬取计划失败",
- output=response_json.get("msg", "接口异常"),
- error=f"create crawler plan interface error",
- )
- crawler_plan_id = response_json.get("data", {}).get("id", "")
- summary_lines.append(f"爬取计划名称: {crawler_plan_name}")
- summary_lines.append(f" 抖音视频IDs: {','.join(content_ids)}")
- summary_lines.append(f" 爬取计划ID: {crawler_plan_id}")
- produce_plan_infos: List[Dict[str, str]] = []
- if produce_plan_ids:
- input_source_info = {
- "contentType": 1,
- "inputSourceType": 2,
- "inputSourceValue": crawler_plan_id,
- "inputSourceLabel": f"原始帖子-视频-抖音-内容添加计划-{crawler_plan_name}",
- "inputSourceModal": 4,
- "inputSourceChannel": 2
- }
- produce_plan_infos, msg = crawler_plan_bind_produce_plan(input_source_info, produce_plan_ids)
- if produce_plan_infos:
- for produce_plan_info in produce_plan_infos:
- summary_lines.append(" 绑定的生成计划列表: ")
- summary_lines.append(f" 生成计划名称: {produce_plan_info.get('produce_plan_name', '')}")
- summary_lines.append(f" 生成计划ID: {produce_plan_info.get('produce_plan_id', '')}")
- summary_lines.append(f" 绑定结果: {"绑定成功" if not produce_plan_info.get("msg") else "绑定失败"}")
- summary_lines.append(f" 信息: {produce_plan_info.get('msg', '成功')}")
- return ToolResult(
- title="根据抖音内容ID创建爬取计划",
- output="\n".join(summary_lines),
- metadata={
- "result": {
- "crawler_info": {
- "crawler_plan_id": crawler_plan_id,
- "crawler_plan_name": crawler_plan_name,
- },
- "produce_plan_infos": [
- {
- "produce_plan_id": produce_plan_info.get("produce_plan_id", ""),
- "produce_plan_name": produce_plan_info.get("produce_plan_name", ""),
- "is_success": "绑定成功" if not produce_plan_info.get("msg") else "绑定失败",
- "msg": produce_plan_info.get("msg", "成功"),
- }
- for produce_plan_info in produce_plan_infos
- ]
- }
- },
- long_term_memory="Create crawler plan by DouYin Content IDs",
- )
- except Exception as e:
- logger.error(f"create douyin content crawler plan error. content_ids: {content_ids}, error: {str(e)}")
- return ToolResult(
- title="根据抖音内容ID创建爬取计划失败",
- output="",
- error=f"创建爬取计划错误:{str(e)}",
- )
- def crawler_plan_bind_produce_plan(
- input_source_info: Dict[str, Any],
- produce_plan_ids: List[str],
- ) -> Tuple[Union[List[Dict[str, str]], None], str]:
- if not input_source_info or not produce_plan_ids:
- return None, f"input_source_info or produce_plan_ids is invalid"
- input_source_check_key = ["inputSourceModal", "inputSourceChannel", "contentType"]
- try:
- if not isinstance(produce_plan_ids, list):
- return None, f"produce_plan_ids is not list"
- result: List[Dict[str, str]] = []
- for produce_plan_id in produce_plan_ids:
- produce_plan_info = {
- "produce_plan_id": produce_plan_id,
- }
- result.append(produce_plan_info)
- # 获取生成计划详情,msg不为空表示获取失败
- produce_plan_detail_info, msg = find_produce_plan_info_by_id(produce_plan_id)
- if msg:
- produce_plan_info["msg"] = msg
- continue
- produce_plan_info["produce_plan_name"] = produce_plan_detail_info.get("name", "")
- input_source_groups = produce_plan_detail_info.get("inputSourceGroups", [])
- if not input_source_groups:
- produce_plan_info["msg"] = "生成计划没有输入源组"
- continue
- # 查询当前爬取计划要添加到的输入源组下标
- input_source_index = 0
- for i in range(len(input_source_groups)):
- input_source_group = input_source_groups[i]
- if not input_source_group.get("inputSources", []):
- continue
- first_input_source = input_source_group.get("inputSources")[0]
- if all(input_source_info.get(k, 0) == first_input_source.get(k, -1) for k in input_source_check_key):
- input_source_index = i
- break
- # 对应的输入源组添加输入源
- input_source_group = input_source_groups[input_source_index]
- input_source_group.get("inputSources", []).append(input_source_info)
- response_json = post(PRODUCE_PLAN_SAVE, produce_plan_detail_info)
- if response_json.get("code") != 0 or not response_json.get("data", {}):
- produce_plan_info["msg"] = response_json.get("msg", "爬取计划绑定生成计划异常")
- return result, ""
- except Exception as e:
- logger.error(f"crawler_plan_bind_produce_plan error. input_source_info: {json.dumps(input_source_info)}, produce_plan_ids: {produce_plan_ids}, error: {str(e)},")
- return None, str(e)
- def find_produce_plan_info_by_id(
- produce_plan_id: str,
- ) -> Tuple[Union[Dict[str, str], None], str]:
- try:
- if not produce_plan_id or not isinstance(produce_plan_id, str):
- return None, f"非法的produce_plan_id: {produce_plan_id}"
- params = {
- "id": produce_plan_id,
- }
- response_json = post(GET_PRODUCE_PLAN_DETAIL_BY_ID, params)
- if response_json.get("code") != 0 or not response_json.get("data", {}):
- return None, response_json.get("msg", "获取生成计划详情异常")
- return response_json.get("data", {}), ""
- except Exception as e:
- logger.error(f"find_produce_plan_info_by_id error. produce_plan_id: {produce_plan_id}, error: {str(e)},")
- return None, str(e)
- def post(url: str, params: Any) -> Dict[str, Any]:
- request = {
- "baseInfo": {
- "token": DEFAULT_TOKEN,
- },
- "params": params
- }
- try:
- logger.info(f"invoke aigc platform. url: {url}, request: {json.dumps(request)}")
- response = requests.post(
- url=url,
- json=request,
- headers={"Content-Type": "application/json"},
- timeout=DEFAULT_TIMEOUT
- )
- response.raise_for_status()
- response_json = response.json()
- logger.info(f"invoke aigc platform. url: {url}, request: {json.dumps(request)}, response: {json.dumps(response_json)}")
- return response_json
- except Exception as e:
- logger.error(f"invoke aigc platform error. url: {url}, request: {json.dumps(request)}, error: {str(e)}")
- return {}
|