aigc_system_api.py 1.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445
  1. """
  2. @author: luojunhui
  3. """
  4. from tenacity import retry
  5. from requests.exceptions import RequestException
  6. import requests
  7. import json
  8. from typing import Optional, Dict, List, TypedDict
  9. from applications.utils import request_retry
  10. retry_desc = request_retry(retry_times=3, min_retry_delay=2, max_retry_delay=30)
  11. headers = {
  12. "Accept": "application/json",
  13. "Accept-Language": "zh-CN,zh;q=0.9",
  14. "Content-Type": "application/json",
  15. "Proxy-Connection": "keep-alive",
  16. "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36",
  17. }
  18. class RelationDict(TypedDict):
  19. videoPoolTraceId: str
  20. channelContentId: str
  21. platform: str
  22. class AigcSystemApi:
  23. @retry(**retry_desc)
  24. def insert_crawler_relation_to_aigc_system(
  25. self, relation_list: List[RelationDict]
  26. ) -> Optional[Dict]:
  27. url = "http://aigc-api.cybertogether.net/aigc/crawler/content/videoPoolCrawlerRelation"
  28. payload = json.dumps({"params": {"relations": relation_list}})
  29. try:
  30. response = requests.post(url, headers=headers, data=payload, timeout=60)
  31. response.raise_for_status()
  32. return response.json()
  33. except RequestException as e:
  34. print(f"API请求失败: {e}")
  35. except json.JSONDecodeError as e:
  36. print(f"响应解析失败: {e}")
  37. return None