aigc_platform_api.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390
  1. """
  2. AIGC接口调用
  3. 调用AIGC接口创建爬取计划,绑定生成计划
  4. """
  5. import json
  6. import logging
  7. from datetime import datetime
  8. from typing import List, Dict, Union, Tuple, Any
  9. import requests
  10. from agent import ToolResult, tool
  11. logger = logging.getLogger(__name__)
  12. AIGC_BASE_URL = "https://aigc-api.aiddit.com"
  13. CRAWLER_PLAN_CREATE_URL = f"{AIGC_BASE_URL}/aigc/crawler/plan/save"
  14. GET_PRODUCE_PLAN_DETAIL_BY_ID = f"{AIGC_BASE_URL}/aigc/produce/plan/detail"
  15. PRODUCE_PLAN_SAVE = f"{AIGC_BASE_URL}/aigc/produce/plan/save"
  16. DEFAULT_TOKEN = "8bf14f27fc3a486788f3383452422d72"
  17. DEFAULT_TIMEOUT = 60.0
  18. @tool(description="根据抖音账号ID创建爬取计划")
  19. async def create_crawler_plan_by_douyin_account_id(
  20. account_id: str,
  21. sort_type: str = "最新",
  22. produce_plan_ids: List[str] = []
  23. ) -> ToolResult:
  24. """
  25. 根据抖音账号ID创建爬取计划
  26. Args:
  27. account_id: 抖音账号ID
  28. sort_type: 搜索时的视频排序方式(最新/最热),默认最新
  29. produce_plan_ids: 爬取计划要绑定的生成计划ID,默认为空列表
  30. Returns:
  31. ToolResult: 包含以下内容
  32. - output: 文本格式的爬取计划创建结果摘要
  33. - metadata.result: 结构化的爬取计划创建结果
  34. - crawler_info: 爬取计划信息
  35. - crawler_plan_id: 创建的爬取计划ID
  36. - crawler_plan_name: 创建的爬取计划名称
  37. - sort_type: 排序方式
  38. - produce_plan_infos: 绑定的生成计划信息
  39. - produce_plan_id: 生成计划ID
  40. - produce_plan_name: 生成计划名称
  41. - is_success: 是否成功, true表示绑定成功,false表示绑定失败
  42. - msg: 绑定失败时为错误信息,绑定成功则为“成功”
  43. Note:
  44. - 建议从 metadata.result 获取结构化数据,而非解析 output 文本
  45. """
  46. # 验证 account_id 格式
  47. if not account_id or not isinstance(account_id, str):
  48. logger.error(f"create_crawler_plan_by_douyin_account_id invalid account_id: {account_id}")
  49. return ToolResult(
  50. title="根据抖音账号ID创建爬取计划失败",
  51. output="",
  52. error="account_id 参数无效:必须是非空字符串",
  53. )
  54. if not account_id.startswith("MS4wLjABAAAA"):
  55. logger.error(f"create_crawler_plan_by_douyin_account_id invalid sec_uid format account_id:{account_id}")
  56. return ToolResult(
  57. title="根据抖音账号ID创建爬取计划失败",
  58. output="",
  59. error=f"account_id 格式错误:必须以 MS4wLjABAAAA 开头,当前值: {account_id[:min(20, len(account_id))]}...",
  60. )
  61. if produce_plan_ids is None:
  62. produce_plan_ids = []
  63. dt = datetime.now().strftime("%Y%m%d%h%M%s")
  64. crawler_plan_name = f"【内容寻找Agent自动创建】{dt}_抖音账号ID爬取计划_{account_id[:min(30, len(account_id))]}"
  65. params = {
  66. "accountFilters": [],
  67. "channel": 2,
  68. "contentFilters": [],
  69. "contentModal": 4,
  70. "crawlerComment": 0,
  71. "crawlerMode": 4,
  72. "filterAccountMatchMode": 2,
  73. "filterContentMatchMode": 2,
  74. "frequencyType": 1,
  75. "inputModeValues": [
  76. account_id
  77. ],
  78. "modelValueConfig": {
  79. "sortType": sort_type
  80. },
  81. "name": crawler_plan_name,
  82. "planType": 2,
  83. "searchModeValues": [],
  84. "selectModeValues": [],
  85. "srtExtractFlag": 1,
  86. "videoKeyFrameType": 1,
  87. "voiceExtractFlag": 1
  88. }
  89. try:
  90. summary_lines = [f"抖音账号【{account_id}】创建爬取计划"]
  91. response_json = post(CRAWLER_PLAN_CREATE_URL, params)
  92. if response_json.get("code") != 0:
  93. return ToolResult(
  94. title="根据抖音账号ID创建爬取计划失败",
  95. output=response_json.get("msg", "接口异常"),
  96. error=f"create crawler plan interface error",
  97. )
  98. crawler_plan_id = response_json.get("data", {}).get("id", "")
  99. summary_lines.append(f"爬取计划名称: {crawler_plan_name}")
  100. summary_lines.append(f" 抖音账号ID: {account_id}")
  101. summary_lines.append(f" 爬取计划ID: {crawler_plan_id}")
  102. summary_lines.append(f" 爬取计划排序方式: {sort_type}")
  103. produce_plan_infos: List[Dict[str, str]] = []
  104. if produce_plan_ids:
  105. input_source_info = {
  106. "contentType": 1,
  107. "inputSourceType": 2,
  108. "inputSourceValue": crawler_plan_id,
  109. "inputSourceLabel": f"原始帖子-视频-抖音-内容添加计划-{crawler_plan_name}",
  110. "inputSourceModal": 4,
  111. "inputSourceChannel": 2
  112. }
  113. produce_plan_infos, msg = crawler_plan_bind_produce_plan(input_source_info, produce_plan_ids)
  114. if produce_plan_infos:
  115. for produce_plan_info in produce_plan_infos:
  116. summary_lines.append(" 绑定的生成计划列表: ")
  117. summary_lines.append(f" 生成计划名称: {produce_plan_info.get('produce_plan_name', '')}")
  118. summary_lines.append(f" 生成计划ID: {produce_plan_info.get('produce_plan_id', '')}")
  119. summary_lines.append(f" 绑定结果: {'绑定成功' if not produce_plan_info.get('msg') else '绑定失败'}")
  120. summary_lines.append(f" 信息: {produce_plan_info.get('msg', '成功')}")
  121. return ToolResult(
  122. title="根据抖音账号ID创建爬取计划",
  123. output="\n".join(summary_lines),
  124. metadata={
  125. "result": {
  126. "crawler_info": {
  127. "crawler_plan_id": crawler_plan_id,
  128. "crawler_plan_name": crawler_plan_name,
  129. "sort_type": sort_type,
  130. },
  131. "produce_plan_infos": [
  132. {
  133. "produce_plan_id": produce_plan_info.get("produce_plan_id", ""),
  134. "produce_plan_name": produce_plan_info.get("produce_plan_name", ""),
  135. "is_success": "绑定成功" if not produce_plan_info.get("msg") else "绑定失败",
  136. "msg": produce_plan_info.get("msg", "成功"),
  137. }
  138. for produce_plan_info in produce_plan_infos
  139. ]
  140. }
  141. },
  142. long_term_memory="Create crawler plan by DouYin Account ID",
  143. )
  144. except Exception as e:
  145. logger.error(f"create douyin account crawler plan error: {str(e)}, account_id: {account_id} ")
  146. return ToolResult(
  147. title="根据抖音账号ID创建爬取计划失败",
  148. output="",
  149. error=f"创建爬取计划错误:{str(e)}",
  150. )
  151. @tool(description="根据抖音视频ID创建爬取计划")
  152. async def create_crawler_plan_by_douyin_content_id(
  153. content_ids: List[str],
  154. produce_plan_ids: List[str] = []
  155. ) -> ToolResult:
  156. """
  157. 根据抖音视频ID创建爬取计划
  158. Args:
  159. content_ids: 抖音内容ID列表
  160. produce_plan_ids: 爬取计划要绑定的生成计划ID,默认为空列表
  161. Returns:
  162. Returns:
  163. ToolResult: 包含以下内容
  164. - output: 文本格式的爬取计划创建结果摘要
  165. - metadata.result: 结构化的爬取计划创建结果
  166. - crawler_info: 爬取计划信息
  167. - crawler_plan_id: 创建的爬取计划ID
  168. - crawler_plan_name: 创建的爬取计划名称
  169. - content_ids: 抖音视频ID列表
  170. - produce_plan_infos: 绑定的生成计划信息
  171. - produce_plan_id: 生成计划ID
  172. - produce_plan_name: 生成计划名称
  173. - is_success: 是否成功, true表示绑定成功,false表示绑定失败
  174. - msg: 绑定失败时为错误信息,绑定成功则为“成功”
  175. Note:
  176. - 建议从 metadata.result 获取结构化数据,而非解析 output 文本
  177. """
  178. if not content_ids or not isinstance(content_ids, list):
  179. logger.error(f"create_crawler_plan_by_douyin_content_id invalid content_ids. content_ids: {content_ids}")
  180. return ToolResult(
  181. title="根据抖音内容ID创建爬取计划失败",
  182. output="",
  183. error="content_ids 参数无效: content_ids必须是列表"
  184. )
  185. if len(content_ids) > 100:
  186. logger.error(f"create_crawler_plan_by_douyin_content_id invalid content_ids length. content_ids.length: {len(content_ids)}")
  187. return ToolResult(
  188. title="根据抖音内容ID创建爬取计划失败",
  189. output="",
  190. error=f"content_ids 长度异常: 期望1~100, 实际{len(content_ids)}"
  191. )
  192. dt = datetime.now().strftime("%Y%m%d%h%M%s")
  193. crawler_plan_name = f"【内容寻找Agent自动创建】抖音视频直接抓取-{dt}-抖音"
  194. params = {
  195. "channel": 2,
  196. "contentModal": 4,
  197. "crawlerComment": 0,
  198. "crawlerMode": 5,
  199. "filterAccountMatchMode": 2,
  200. "filterContentMatchMode": 2,
  201. "frequencyType": 2,
  202. "inputModeValues": content_ids,
  203. "name": crawler_plan_name,
  204. "planType": 2,
  205. "searchModeValues": [],
  206. "srtExtractFlag": 1,
  207. "videoKeyFrameType": 1,
  208. "voiceExtractFlag": 1
  209. }
  210. try:
  211. summary_lines = [f"抖音视频爬取计划"]
  212. response_json = post(CRAWLER_PLAN_CREATE_URL, params)
  213. if response_json.get("code") != 0:
  214. return ToolResult(
  215. title="根据抖音内容ID创建爬取计划失败",
  216. output=response_json.get("msg", "接口异常"),
  217. error=f"create crawler plan interface error",
  218. )
  219. crawler_plan_id = response_json.get("data", {}).get("id", "")
  220. summary_lines.append(f"爬取计划名称: {crawler_plan_name}")
  221. summary_lines.append(f" 抖音视频IDs: {','.join(content_ids)}")
  222. summary_lines.append(f" 爬取计划ID: {crawler_plan_id}")
  223. produce_plan_infos: List[Dict[str, str]] = []
  224. if produce_plan_ids:
  225. input_source_info = {
  226. "contentType": 1,
  227. "inputSourceType": 2,
  228. "inputSourceValue": crawler_plan_id,
  229. "inputSourceLabel": f"原始帖子-视频-抖音-内容添加计划-{crawler_plan_name}",
  230. "inputSourceModal": 4,
  231. "inputSourceChannel": 2
  232. }
  233. produce_plan_infos, msg = crawler_plan_bind_produce_plan(input_source_info, produce_plan_ids)
  234. if produce_plan_infos:
  235. for produce_plan_info in produce_plan_infos:
  236. summary_lines.append(" 绑定的生成计划列表: ")
  237. summary_lines.append(f" 生成计划名称: {produce_plan_info.get('produce_plan_name', '')}")
  238. summary_lines.append(f" 生成计划ID: {produce_plan_info.get('produce_plan_id', '')}")
  239. summary_lines.append(f" 绑定结果: {'绑定成功' if not produce_plan_info.get('msg') else '绑定失败'}")
  240. summary_lines.append(f" 信息: {produce_plan_info.get('msg', '成功')}")
  241. return ToolResult(
  242. title="根据抖音内容ID创建爬取计划",
  243. output="\n".join(summary_lines),
  244. metadata={
  245. "result": {
  246. "crawler_info": {
  247. "crawler_plan_id": crawler_plan_id,
  248. "crawler_plan_name": crawler_plan_name,
  249. },
  250. "produce_plan_infos": [
  251. {
  252. "produce_plan_id": produce_plan_info.get("produce_plan_id", ""),
  253. "produce_plan_name": produce_plan_info.get("produce_plan_name", ""),
  254. "is_success": "绑定成功" if not produce_plan_info.get("msg") else "绑定失败",
  255. "msg": produce_plan_info.get("msg", "成功"),
  256. }
  257. for produce_plan_info in produce_plan_infos
  258. ]
  259. }
  260. },
  261. long_term_memory="Create crawler plan by DouYin Content IDs",
  262. )
  263. except Exception as e:
  264. logger.error(f"create douyin content crawler plan error. content_ids: {content_ids}, error: {str(e)}")
  265. return ToolResult(
  266. title="根据抖音内容ID创建爬取计划失败",
  267. output="",
  268. error=f"创建爬取计划错误:{str(e)}",
  269. )
  270. def crawler_plan_bind_produce_plan(
  271. input_source_info: Dict[str, Any],
  272. produce_plan_ids: List[str],
  273. ) -> Tuple[Union[List[Dict[str, str]], None], str]:
  274. if not input_source_info or not produce_plan_ids:
  275. return None, f"input_source_info or produce_plan_ids is invalid"
  276. input_source_check_key = ["inputSourceModal", "inputSourceChannel", "contentType"]
  277. try:
  278. if not isinstance(produce_plan_ids, list):
  279. return None, f"produce_plan_ids is not list"
  280. result: List[Dict[str, str]] = []
  281. for produce_plan_id in produce_plan_ids:
  282. produce_plan_info = {
  283. "produce_plan_id": produce_plan_id,
  284. }
  285. result.append(produce_plan_info)
  286. # 获取生成计划详情,msg不为空表示获取失败
  287. produce_plan_detail_info, msg = find_produce_plan_info_by_id(produce_plan_id)
  288. if msg:
  289. produce_plan_info["msg"] = msg
  290. continue
  291. produce_plan_info["produce_plan_name"] = produce_plan_detail_info.get("name", "")
  292. input_source_groups = produce_plan_detail_info.get("inputSourceGroups", [])
  293. if not input_source_groups:
  294. produce_plan_info["msg"] = "生成计划没有输入源组"
  295. continue
  296. # 查询当前爬取计划要添加到的输入源组下标
  297. input_source_index = 0
  298. for i in range(len(input_source_groups)):
  299. input_source_group = input_source_groups[i]
  300. if not input_source_group.get("inputSources", []):
  301. continue
  302. first_input_source = input_source_group.get("inputSources")[0]
  303. if all(input_source_info.get(k, 0) == first_input_source.get(k, -1) for k in input_source_check_key):
  304. input_source_index = i
  305. break
  306. # 对应的输入源组添加输入源
  307. input_source_group = input_source_groups[input_source_index]
  308. input_source_group.get("inputSources", []).append(input_source_info)
  309. response_json = post(PRODUCE_PLAN_SAVE, produce_plan_detail_info)
  310. if response_json.get("code") != 0 or not response_json.get("data", {}):
  311. produce_plan_info["msg"] = response_json.get("msg", "爬取计划绑定生成计划异常")
  312. return result, ""
  313. except Exception as e:
  314. logger.error(f"crawler_plan_bind_produce_plan error. input_source_info: {json.dumps(input_source_info)}, produce_plan_ids: {produce_plan_ids}, error: {str(e)},")
  315. return None, str(e)
  316. def find_produce_plan_info_by_id(
  317. produce_plan_id: str,
  318. ) -> Tuple[Union[Dict[str, str], None], str]:
  319. try:
  320. if not produce_plan_id or not isinstance(produce_plan_id, str):
  321. return None, f"非法的produce_plan_id: {produce_plan_id}"
  322. params = {
  323. "id": produce_plan_id,
  324. }
  325. response_json = post(GET_PRODUCE_PLAN_DETAIL_BY_ID, params)
  326. if response_json.get("code") != 0 or not response_json.get("data", {}):
  327. return None, response_json.get("msg", "获取生成计划详情异常")
  328. return response_json.get("data", {}), ""
  329. except Exception as e:
  330. logger.error(f"find_produce_plan_info_by_id error. produce_plan_id: {produce_plan_id}, error: {str(e)},")
  331. return None, str(e)
  332. def post(url: str, params: Any) -> Dict[str, Any]:
  333. request = {
  334. "baseInfo": {
  335. "token": DEFAULT_TOKEN,
  336. },
  337. "params": params
  338. }
  339. try:
  340. logger.info(f"invoke aigc platform. url: {url}, request: {json.dumps(request)}")
  341. response = requests.post(
  342. url=url,
  343. json=request,
  344. headers={"Content-Type": "application/json"},
  345. timeout=DEFAULT_TIMEOUT
  346. )
  347. response.raise_for_status()
  348. response_json = response.json()
  349. logger.info(f"invoke aigc platform. url: {url}, request: {json.dumps(request)}, response: {json.dumps(response_json)}")
  350. return response_json
  351. except Exception as e:
  352. logger.error(f"invoke aigc platform error. url: {url}, request: {json.dumps(request)}, error: {str(e)}")
  353. return {}