aigc_platform_api.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469
  1. """
  2. AIGC接口调用
  3. 调用AIGC接口创建爬取计划,绑定生成计划
  4. """
  5. import json
  6. import logging
  7. import os
  8. from datetime import datetime
  9. from pathlib import Path
  10. from typing import List, Dict, Union, Tuple, Any
  11. import requests
  12. from agent import ToolResult, tool
  13. from db import update_content_plan_ids
  14. logger = logging.getLogger(__name__)
  15. AIGC_BASE_URL = "https://aigc-api.aiddit.com"
  16. CRAWLER_PLAN_CREATE_URL = f"{AIGC_BASE_URL}/aigc/crawler/plan/save"
  17. GET_PRODUCE_PLAN_DETAIL_BY_ID = f"{AIGC_BASE_URL}/aigc/produce/plan/detail"
  18. PRODUCE_PLAN_SAVE = f"{AIGC_BASE_URL}/aigc/produce/plan/save"
  19. DEFAULT_TOKEN = "8bf14f27fc3a486788f3383452422d72"
  20. DEFAULT_TIMEOUT = 60.0
  21. def _load_output_json(trace_id: str, output_dir: str) -> Dict[str, Any]:
  22. """Load {output_dir}/{trace_id}/output.json."""
  23. path = Path(output_dir) / trace_id / "output.json"
  24. if not path.exists():
  25. raise FileNotFoundError(f"output.json not found: {path}")
  26. with path.open("r", encoding="utf-8") as f:
  27. return json.load(f)
  28. def _extract_content_ids(data: Dict[str, Any]) -> List[str]:
  29. """Extract aweme_id list from output json."""
  30. contents = data.get("contents") or []
  31. if not isinstance(contents, list):
  32. return []
  33. content_ids: List[str] = []
  34. for item in contents:
  35. if not isinstance(item, dict):
  36. continue
  37. aweme_id = item.get("aweme_id")
  38. if aweme_id is None:
  39. continue
  40. aweme_id_str = str(aweme_id).strip()
  41. if aweme_id_str:
  42. content_ids.append(aweme_id_str)
  43. return content_ids
  44. def _get_produce_plan_ids_from_env() -> List[str]:
  45. """Read AIGC_DEMAND_DOUYIN_CONTENT_PRODUCE_PLAN_ID from env."""
  46. raw = os.getenv("AIGC_DEMAND_DOUYIN_CONTENT_PRODUCE_PLAN_ID", "").strip()
  47. if not raw:
  48. return []
  49. # 接口需要 List[str],因此把 env 字段(字符串)包装成 list。
  50. return [raw]
  51. @tool(description="根据抖音账号ID创建爬取计划")
  52. async def create_crawler_plan_by_douyin_account_id(
  53. account_id: str,
  54. sort_type: str = "最新",
  55. produce_plan_ids: List[str] = []
  56. ) -> ToolResult:
  57. """
  58. 根据抖音账号ID创建爬取计划
  59. Args:
  60. account_id: 抖音账号ID
  61. sort_type: 搜索时的视频排序方式(最新/最热),默认最新
  62. produce_plan_ids: 爬取计划要绑定的生成计划ID,默认为空列表
  63. Returns:
  64. ToolResult: 包含以下内容
  65. - output: 文本格式的爬取计划创建结果摘要
  66. - metadata.result: 结构化的爬取计划创建结果
  67. - crawler_info: 爬取计划信息
  68. - crawler_plan_id: 创建的爬取计划ID
  69. - crawler_plan_name: 创建的爬取计划名称
  70. - sort_type: 排序方式
  71. - produce_plan_infos: 绑定的生成计划信息
  72. - produce_plan_id: 生成计划ID
  73. - produce_plan_name: 生成计划名称
  74. - is_success: 是否成功, true表示绑定成功,false表示绑定失败
  75. - msg: 绑定失败时为错误信息,绑定成功则为“成功”
  76. Note:
  77. - 建议从 metadata.result 获取结构化数据,而非解析 output 文本
  78. """
  79. # 验证 account_id 格式
  80. if not account_id or not isinstance(account_id, str):
  81. logger.error(f"create_crawler_plan_by_douyin_account_id invalid account_id: {account_id}")
  82. return ToolResult(
  83. title="根据抖音账号ID创建爬取计划失败",
  84. output="",
  85. error="account_id 参数无效:必须是非空字符串",
  86. )
  87. if not account_id.startswith("MS4wLjABAAAA"):
  88. logger.error(f"create_crawler_plan_by_douyin_account_id invalid sec_uid format account_id:{account_id}")
  89. return ToolResult(
  90. title="根据抖音账号ID创建爬取计划失败",
  91. output="",
  92. error=f"account_id 格式错误:必须以 MS4wLjABAAAA 开头,当前值: {account_id[:min(20, len(account_id))]}...",
  93. )
  94. if produce_plan_ids is None:
  95. produce_plan_ids = []
  96. dt = datetime.now().strftime("%Y%m%d%h%M%s")
  97. crawler_plan_name = f"【内容寻找Agent自动创建】{dt}_抖音账号ID爬取计划_{account_id[:min(30, len(account_id))]}"
  98. params = {
  99. "accountFilters": [],
  100. "channel": 2,
  101. "contentFilters": [],
  102. "contentModal": 4,
  103. "crawlerComment": 0,
  104. "crawlerMode": 4,
  105. "filterAccountMatchMode": 2,
  106. "filterContentMatchMode": 2,
  107. "frequencyType": 1,
  108. "inputModeValues": [
  109. account_id
  110. ],
  111. "modelValueConfig": {
  112. "sortType": sort_type
  113. },
  114. "name": crawler_plan_name,
  115. "planType": 2,
  116. "searchModeValues": [],
  117. "selectModeValues": [],
  118. "srtExtractFlag": 1,
  119. "videoKeyFrameType": 1,
  120. "voiceExtractFlag": 1
  121. }
  122. try:
  123. summary_lines = [f"抖音账号【{account_id}】创建爬取计划"]
  124. response_json = post(CRAWLER_PLAN_CREATE_URL, params)
  125. if response_json.get("code") != 0:
  126. return ToolResult(
  127. title="根据抖音账号ID创建爬取计划失败",
  128. output=response_json.get("msg", "接口异常"),
  129. error=f"create crawler plan interface error",
  130. )
  131. crawler_plan_id = response_json.get("data", {}).get("id", "")
  132. summary_lines.append(f"爬取计划名称: {crawler_plan_name}")
  133. summary_lines.append(f" 抖音账号ID: {account_id}")
  134. summary_lines.append(f" 爬取计划ID: {crawler_plan_id}")
  135. summary_lines.append(f" 爬取计划排序方式: {sort_type}")
  136. produce_plan_infos: List[Dict[str, str]] = []
  137. if produce_plan_ids:
  138. input_source_info = {
  139. "contentType": 1,
  140. "inputSourceType": 2,
  141. "inputSourceValue": crawler_plan_id,
  142. "inputSourceLabel": f"原始帖子-视频-抖音-内容添加计划-{crawler_plan_name}",
  143. "inputSourceModal": 4,
  144. "inputSourceChannel": 2
  145. }
  146. produce_plan_infos, msg = crawler_plan_bind_produce_plan(input_source_info, produce_plan_ids)
  147. if produce_plan_infos:
  148. for produce_plan_info in produce_plan_infos:
  149. summary_lines.append(" 绑定的生成计划列表: ")
  150. summary_lines.append(f" 生成计划名称: {produce_plan_info.get('produce_plan_name', '')}")
  151. summary_lines.append(f" 生成计划ID: {produce_plan_info.get('produce_plan_id', '')}")
  152. summary_lines.append(f" 绑定结果: {'绑定成功' if not produce_plan_info.get('msg') else '绑定失败'}")
  153. summary_lines.append(f" 信息: {produce_plan_info.get('msg', '成功')}")
  154. return ToolResult(
  155. title="根据抖音账号ID创建爬取计划",
  156. output="\n".join(summary_lines),
  157. metadata={
  158. "result": {
  159. "crawler_info": {
  160. "crawler_plan_id": crawler_plan_id,
  161. "crawler_plan_name": crawler_plan_name,
  162. "sort_type": sort_type,
  163. },
  164. "produce_plan_infos": [
  165. {
  166. "produce_plan_id": produce_plan_info.get("produce_plan_id", ""),
  167. "produce_plan_name": produce_plan_info.get("produce_plan_name", ""),
  168. "is_success": "绑定成功" if not produce_plan_info.get("msg") else "绑定失败",
  169. "msg": produce_plan_info.get("msg", "成功"),
  170. }
  171. for produce_plan_info in produce_plan_infos
  172. ]
  173. }
  174. },
  175. long_term_memory="Create crawler plan by DouYin Account ID",
  176. )
  177. except Exception as e:
  178. logger.error(f"create douyin account crawler plan error: {str(e)}, account_id: {account_id} ")
  179. return ToolResult(
  180. title="根据抖音账号ID创建爬取计划失败",
  181. output="",
  182. error=f"创建爬取计划错误:{str(e)}",
  183. )
  184. @tool(description="根据抖音视频ID创建爬取计划")
  185. async def create_crawler_plan_by_douyin_content_id(
  186. trace_id: str,
  187. ) -> ToolResult:
  188. """
  189. 根据抖音视频ID创建爬取计划
  190. Args:
  191. trace_id: 内容寻找任务 trace_id(用于读取 {output_dir}/{trace_id}/output.json)
  192. Returns:
  193. Returns:
  194. ToolResult: 包含以下内容
  195. - output: 文本格式的爬取计划创建结果摘要
  196. - metadata.result: 结构化的爬取计划创建结果
  197. - crawler_info: 爬取计划信息
  198. - crawler_plan_id: 创建的爬取计划ID
  199. - crawler_plan_name: 创建的爬取计划名称
  200. - content_ids: 抖音视频ID列表
  201. - produce_plan_infos: 绑定的生成计划信息
  202. - produce_plan_id: 生成计划ID
  203. - produce_plan_name: 生成计划名称
  204. - is_success: 是否成功, true表示绑定成功,false表示绑定失败
  205. - msg: 绑定失败时为错误信息,绑定成功则为“成功”
  206. Note:
  207. - 建议从 metadata.result 获取结构化数据,而非解析 output 文本
  208. """
  209. if not trace_id or not isinstance(trace_id, str):
  210. logger.error(f"create_crawler_plan_by_douyin_content_id invalid trace_id: {trace_id}")
  211. return ToolResult(
  212. title="根据抖音内容创建爬取计划失败",
  213. output="",
  214. error="trace_id 参数无效: trace_id 必须是非空字符串",
  215. )
  216. output_dir = os.getenv("OUTPUT_DIR", ".cache/output")
  217. try:
  218. data = _load_output_json(trace_id=trace_id, output_dir=output_dir)
  219. content_ids = _extract_content_ids(data)
  220. except Exception as e:
  221. msg = f"加载/解析 output.json 失败: {e}"
  222. logger.error(msg, exc_info=True)
  223. return ToolResult(
  224. title="根据抖音内容创建爬取计划失败",
  225. output="",
  226. error=msg,
  227. )
  228. if not content_ids:
  229. return ToolResult(
  230. title="根据抖音内容创建爬取计划失败",
  231. output="",
  232. error="未在 output.json.contents 中找到有效 aweme_id",
  233. )
  234. if len(content_ids) > 100:
  235. logger.error(
  236. "create_crawler_plan_by_douyin_content_id invalid content_ids length. "
  237. f"content_ids.length: {len(content_ids)}"
  238. )
  239. return ToolResult(
  240. title="根据抖音内容创建爬取计划失败",
  241. output="",
  242. error=f"content_ids 长度异常: 期望1~100, 实际{len(content_ids)}",
  243. )
  244. produce_plan_ids = _get_produce_plan_ids_from_env()
  245. dt = datetime.now().strftime("%Y%m%d%h%M%s")
  246. crawler_plan_name = f"【内容寻找Agent自动创建】抖音视频直接抓取-{dt}-抖音"
  247. params = {
  248. "channel": 2,
  249. "contentModal": 4,
  250. "crawlerComment": 0,
  251. "crawlerMode": 5,
  252. "filterAccountMatchMode": 2,
  253. "filterContentMatchMode": 2,
  254. "frequencyType": 2,
  255. "inputModeValues": content_ids,
  256. "name": crawler_plan_name,
  257. "planType": 2,
  258. "searchModeValues": [],
  259. "srtExtractFlag": 1,
  260. "videoKeyFrameType": 1,
  261. "voiceExtractFlag": 1
  262. }
  263. try:
  264. summary_lines = [f"抖音视频爬取计划"]
  265. response_json = post(CRAWLER_PLAN_CREATE_URL, params)
  266. if response_json.get("code") != 0:
  267. return ToolResult(
  268. title="根据抖音内容ID创建爬取计划失败",
  269. output=response_json.get("msg", "接口异常"),
  270. error=f"create crawler plan interface error",
  271. )
  272. crawler_plan_id = response_json.get("data", {}).get("id", "")
  273. summary_lines.append(f"爬取计划名称: {crawler_plan_name}")
  274. summary_lines.append(f" 抖音视频IDs: {','.join(content_ids)}")
  275. summary_lines.append(f" 爬取计划ID: {crawler_plan_id}")
  276. produce_plan_infos: List[Dict[str, str]] = []
  277. db_updated_rows = 0
  278. # 环境里的生成计划 ID(字符串);与是否执行绑定接口无关,用于写库
  279. env_produce_plan_id = (produce_plan_ids[0] if produce_plan_ids else "").strip()
  280. if produce_plan_ids:
  281. input_source_info = {
  282. "contentType": 1,
  283. "inputSourceType": 2,
  284. "inputSourceValue": crawler_plan_id,
  285. "inputSourceLabel": f"原始帖子-视频-抖音-内容添加计划-{crawler_plan_name}",
  286. "inputSourceModal": 4,
  287. "inputSourceChannel": 2
  288. }
  289. produce_plan_infos, msg = crawler_plan_bind_produce_plan(input_source_info, produce_plan_ids)
  290. if produce_plan_infos:
  291. for produce_plan_info in produce_plan_infos:
  292. summary_lines.append(" 绑定的生成计划列表: ")
  293. summary_lines.append(f" 生成计划名称: {produce_plan_info.get('produce_plan_name', '')}")
  294. summary_lines.append(f" 生成计划ID: {produce_plan_info.get('produce_plan_id', '')}")
  295. summary_lines.append(f" 绑定结果: {'绑定成功' if not produce_plan_info.get('msg') else '绑定失败'}")
  296. summary_lines.append(f" 信息: {produce_plan_info.get('msg', '成功')}")
  297. # 爬取计划 id 与生成计划 id 任一存在则写库(不依赖是否已配置 produce_plan_ids 去走绑定)
  298. if (crawler_plan_id or "").strip() or env_produce_plan_id:
  299. try:
  300. db_updated_rows = update_content_plan_ids(
  301. trace_id=trace_id,
  302. aweme_ids=content_ids,
  303. crawler_plan_id=crawler_plan_id or "",
  304. produce_plan_id=env_produce_plan_id,
  305. )
  306. except Exception as e:
  307. logger.error(f"update content plan ids failed: {e}", exc_info=True)
  308. return ToolResult(
  309. title="根据抖音内容ID创建爬取计划",
  310. output="\n".join(summary_lines),
  311. metadata={
  312. "result": {
  313. "crawler_info": {
  314. "crawler_plan_id": crawler_plan_id,
  315. "crawler_plan_name": crawler_plan_name,
  316. },
  317. "produce_plan_infos": [
  318. {
  319. "produce_plan_id": produce_plan_info.get("produce_plan_id", ""),
  320. "produce_plan_name": produce_plan_info.get("produce_plan_name", ""),
  321. "is_success": "绑定成功" if not produce_plan_info.get("msg") else "绑定失败",
  322. "msg": produce_plan_info.get("msg", "成功"),
  323. }
  324. for produce_plan_info in produce_plan_infos
  325. ]
  326. },
  327. "db": {"updated_rows": db_updated_rows},
  328. },
  329. long_term_memory="Create crawler plan by DouYin Content IDs",
  330. )
  331. except Exception as e:
  332. logger.error(f"create douyin content crawler plan error. content_ids: {content_ids}, error: {str(e)}")
  333. return ToolResult(
  334. title="根据抖音内容ID创建爬取计划失败",
  335. output="",
  336. error=f"创建爬取计划错误:{str(e)}",
  337. )
  338. def crawler_plan_bind_produce_plan(
  339. input_source_info: Dict[str, Any],
  340. produce_plan_ids: List[str],
  341. ) -> Tuple[Union[List[Dict[str, str]], None], str]:
  342. if not input_source_info or not produce_plan_ids:
  343. return None, f"input_source_info or produce_plan_ids is invalid"
  344. input_source_check_key = ["inputSourceModal", "inputSourceChannel", "contentType"]
  345. try:
  346. if not isinstance(produce_plan_ids, list):
  347. return None, f"produce_plan_ids is not list"
  348. result: List[Dict[str, str]] = []
  349. for produce_plan_id in produce_plan_ids:
  350. produce_plan_info = {
  351. "produce_plan_id": produce_plan_id,
  352. }
  353. result.append(produce_plan_info)
  354. # 获取生成计划详情,msg不为空表示获取失败
  355. produce_plan_detail_info, msg = find_produce_plan_info_by_id(produce_plan_id)
  356. if msg:
  357. produce_plan_info["msg"] = msg
  358. continue
  359. produce_plan_info["produce_plan_name"] = produce_plan_detail_info.get("name", "")
  360. input_source_groups = produce_plan_detail_info.get("inputSourceGroups", [])
  361. if not input_source_groups:
  362. produce_plan_info["msg"] = "生成计划没有输入源组"
  363. continue
  364. # 查询当前爬取计划要添加到的输入源组下标
  365. input_source_index = 0
  366. for i in range(len(input_source_groups)):
  367. input_source_group = input_source_groups[i]
  368. if not input_source_group.get("inputSources", []):
  369. continue
  370. first_input_source = input_source_group.get("inputSources")[0]
  371. if all(input_source_info.get(k, 0) == first_input_source.get(k, -1) for k in input_source_check_key):
  372. input_source_index = i
  373. break
  374. # 对应的输入源组添加输入源
  375. input_source_group = input_source_groups[input_source_index]
  376. input_source_group.get("inputSources", []).append(input_source_info)
  377. response_json = post(PRODUCE_PLAN_SAVE, produce_plan_detail_info)
  378. if response_json.get("code") != 0 or not response_json.get("data", {}):
  379. produce_plan_info["msg"] = response_json.get("msg", "爬取计划绑定生成计划异常")
  380. return result, ""
  381. except Exception as e:
  382. logger.error(f"crawler_plan_bind_produce_plan error. input_source_info: {json.dumps(input_source_info)}, produce_plan_ids: {produce_plan_ids}, error: {str(e)},")
  383. return None, str(e)
  384. def find_produce_plan_info_by_id(
  385. produce_plan_id: str,
  386. ) -> Tuple[Union[Dict[str, str], None], str]:
  387. try:
  388. if not produce_plan_id or not isinstance(produce_plan_id, str):
  389. return None, f"非法的produce_plan_id: {produce_plan_id}"
  390. params = {
  391. "id": produce_plan_id,
  392. }
  393. response_json = post(GET_PRODUCE_PLAN_DETAIL_BY_ID, params)
  394. if response_json.get("code") != 0 or not response_json.get("data", {}):
  395. return None, response_json.get("msg", "获取生成计划详情异常")
  396. return response_json.get("data", {}), ""
  397. except Exception as e:
  398. logger.error(f"find_produce_plan_info_by_id error. produce_plan_id: {produce_plan_id}, error: {str(e)},")
  399. return None, str(e)
  400. def post(url: str, params: Any) -> Dict[str, Any]:
  401. request = {
  402. "baseInfo": {
  403. "token": DEFAULT_TOKEN,
  404. },
  405. "params": params
  406. }
  407. try:
  408. logger.info(f"invoke aigc platform. url: {url}, request: {json.dumps(request)}")
  409. response = requests.post(
  410. url=url,
  411. json=request,
  412. headers={"Content-Type": "application/json"},
  413. timeout=DEFAULT_TIMEOUT
  414. )
  415. response.raise_for_status()
  416. response_json = response.json()
  417. logger.info(f"invoke aigc platform. url: {url}, request: {json.dumps(request)}, response: {json.dumps(response_json)}")
  418. return response_json
  419. except Exception as e:
  420. logger.error(f"invoke aigc platform error. url: {url}, request: {json.dumps(request)}, error: {str(e)}")
  421. return {}