123456789101112131415161718192021222324252627282930313233343536373839404142434445 |
- """
- @author: luojunhui
- """
- from tenacity import retry
- from requests.exceptions import RequestException
- import requests
- import json
- from typing import Optional, Dict, List, TypedDict
- from applications.utils import request_retry
- retry_desc = request_retry(retry_times=3, min_retry_delay=2, max_retry_delay=30)
- headers = {
- "Accept": "application/json",
- "Accept-Language": "zh-CN,zh;q=0.9",
- "Content-Type": "application/json",
- "Proxy-Connection": "keep-alive",
- "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36",
- }
- class RelationDict(TypedDict):
- videoPoolTraceId: str
- channelContentId: str
- platform: str
- class AigcSystemApi:
- @retry(**retry_desc)
- def insert_crawler_relation_to_aigc_system(
- self, relation_list: List[RelationDict]
- ) -> Optional[Dict]:
- url = "http://aigc-api.cybertogether.net/aigc/crawler/content/videoPoolCrawlerRelation"
- payload = json.dumps({"params": {"relations": relation_list}})
- try:
- response = requests.post(url, headers=headers, data=payload, timeout=60)
- response.raise_for_status()
- return response.json()
- except RequestException as e:
- print(f"API请求失败: {e}")
- except json.JSONDecodeError as e:
- print(f"响应解析失败: {e}")
- return None
|