|
@@ -0,0 +1,44 @@
|
|
|
|
+"""
|
|
|
|
+@author: luojunhui
|
|
|
|
+"""
|
|
|
|
+
|
|
|
|
+from tenacity import retry
|
|
|
|
+from requests.exceptions import RequestException
|
|
|
|
+import requests
|
|
|
|
+import json
|
|
|
|
+from typing import Optional, Dict, List, TypedDict
|
|
|
|
+
|
|
|
|
+from applications.utils import request_retry
|
|
|
|
+
|
|
|
|
+retry_desc = request_retry(retry_times=3, min_retry_delay=2, max_retry_delay=30)
|
|
|
|
+headers = {
|
|
|
|
+ "Accept": "application/json",
|
|
|
|
+ "Accept-Language": "zh-CN,zh;q=0.9",
|
|
|
|
+ "Content-Type": "application/json",
|
|
|
|
+ "Proxy-Connection": "keep-alive",
|
|
|
|
+ "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36",
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+class RelationDict(TypedDict):
|
|
|
|
+ contentTraceId: str
|
|
|
|
+ channelContentId: str
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+class AigcSystemApi:
|
|
|
|
+
|
|
|
|
+ @retry(**retry_desc)
|
|
|
|
+ def insert_crawler_relation_to_aigc_system(
|
|
|
|
+ self, relation_list: List[RelationDict]
|
|
|
|
+ ) -> Optional[Dict]:
|
|
|
|
+ url = "http://aigc-api.cybertogether.net/aigc/crawler/content/videoPoolCrawlerRelation"
|
|
|
|
+ payload = json.dumps({"params": {"relations": relation_list}})
|
|
|
|
+ try:
|
|
|
|
+ response = requests.post(url, headers=headers, json=payload, timeout=60)
|
|
|
|
+ response.raise_for_status()
|
|
|
|
+ return response.json()
|
|
|
|
+ except RequestException as e:
|
|
|
|
+ print(f"API请求失败: {e}")
|
|
|
|
+ except json.JSONDecodeError as e:
|
|
|
|
+ print(f"响应解析失败: {e}")
|
|
|
|
+ return None
|