""" @author: luojunhui """ from tenacity import retry from requests.exceptions import RequestException import requests import json from typing import Optional, Dict, List, TypedDict from applications.utils import request_retry retry_desc = request_retry(retry_times=3, min_retry_delay=2, max_retry_delay=30) headers = { "Accept": "application/json", "Accept-Language": "zh-CN,zh;q=0.9", "Content-Type": "application/json", "Proxy-Connection": "keep-alive", "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36", } class RelationDict(TypedDict): videoPoolTraceId: str channelContentId: str platform: str class AigcSystemApi: @retry(**retry_desc) def insert_crawler_relation_to_aigc_system( self, relation_list: List[RelationDict] ) -> Optional[Dict]: url = "http://aigc-api.cybertogether.net/aigc/crawler/content/videoPoolCrawlerRelation" payload = json.dumps({"params": {"relations": relation_list}}) try: response = requests.post(url, headers=headers, data=payload, timeout=60) response.raise_for_status() return response.json() except RequestException as e: print(f"API请求失败: {e}") except json.JSONDecodeError as e: print(f"响应解析失败: {e}") return None