_utils.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. import hashlib
  2. import random
  3. import time
  4. from typing import Dict, List
  5. import requests
  6. from app.infra.internal.aigc_decode_server import AigcDecodeServer
  7. from ._const import DecodeCardConst
  8. # 封面下载重试配置
  9. COVER_DOWNLOAD_MAX_RETRIES = 3
  10. COVER_DOWNLOAD_BACKOFF_BASE = 1.0 # 基础退避秒数
  11. COVER_DOWNLOAD_BACKOFF_MAX = 10.0 # 最大退避秒数
  12. class CardDecodeUtils(DecodeCardConst):
  13. decode_server = AigcDecodeServer()
  14. async def submit_decode_batch(
  15. self, posts: List[Dict], *, config_id: int, skip_completed: bool = False
  16. ) -> Dict[str, Dict]:
  17. """分批提交卡片解构任务,返回 {content_id: {status, errorMessage}}"""
  18. result = {}
  19. for i in range(0, len(posts), self.SUBMIT_BATCH):
  20. batch = posts[i : i + self.SUBMIT_BATCH]
  21. response = await self.decode_server.submit_decode(
  22. config_id=config_id, posts=batch, skip_completed=skip_completed
  23. )
  24. if response.get("code") == 0:
  25. for item in response.get("data", []):
  26. result[item["channelContentId"]] = item
  27. else:
  28. for post in batch:
  29. cid = post["channelContentId"]
  30. result[cid] = {
  31. "channelContentId": cid,
  32. "status": "FAILED",
  33. "errorMessage": f"batch submit failed: {response}",
  34. }
  35. return result
  36. async def query_decode_results_batch(
  37. self, content_ids: List[str], *, config_id: int
  38. ) -> Dict[str, Dict]:
  39. """分批查询卡片解构结果"""
  40. result = {}
  41. for i in range(0, len(content_ids), self.SUBMIT_BATCH):
  42. batch = content_ids[i : i + self.SUBMIT_BATCH]
  43. response = await self.decode_server.query_decode_results(
  44. config_id=config_id, channel_content_ids=batch
  45. )
  46. if response.get("code") == 0:
  47. for item in response.get("data", []):
  48. result[item["channelContentId"]] = item
  49. else:
  50. for cid in batch:
  51. result[cid] = {
  52. "channelContentId": cid,
  53. "status": "API_ERROR",
  54. "errorMessage": f"query API failed: {response}",
  55. }
  56. return result
  57. COVER_CDN_PREFIX = "https://rescdn.yishihui.com/"
  58. @staticmethod
  59. def _normalize_cover_url(url: str) -> str:
  60. """不以 https: 开头的封面 URL 拼接 CDN 前缀"""
  61. if not url.startswith("https:"):
  62. return f"{CardDecodeUtils.COVER_CDN_PREFIX}{url}"
  63. return url
  64. @staticmethod
  65. def compute_cover_md5(image_url: str) -> str:
  66. """下载封面图片并计算 MD5,带重试 + 指数退避 + 抖动,应对高并发 CDN 限流"""
  67. url = CardDecodeUtils._normalize_cover_url(image_url)
  68. last_exc = None
  69. for attempt in range(COVER_DOWNLOAD_MAX_RETRIES):
  70. try:
  71. resp = requests.get(url, timeout=30)
  72. resp.raise_for_status()
  73. return hashlib.md5(resp.content).hexdigest()
  74. except requests.RequestException as e:
  75. last_exc = e
  76. if attempt < COVER_DOWNLOAD_MAX_RETRIES - 1:
  77. delay = min(
  78. COVER_DOWNLOAD_BACKOFF_BASE * (2 ** attempt),
  79. COVER_DOWNLOAD_BACKOFF_MAX,
  80. )
  81. jitter = random.uniform(0, delay * 0.5)
  82. time.sleep(delay + jitter)
  83. raise last_exc
  84. @staticmethod
  85. def prepare_posts(cards: List[Dict]) -> List[Dict]:
  86. """将卡片数据转换为 AIGC 解构 API 所需的 post 格式"""
  87. posts = []
  88. for card in cards:
  89. images = []
  90. cover = card.get("share_cover")
  91. if cover:
  92. images.append(CardDecodeUtils._normalize_cover_url(cover))
  93. posts.append(
  94. {
  95. "channelContentId": str(card["card_cover_id"]),
  96. "title": card.get("share_title", ""),
  97. "bodyText": "",
  98. "images": images,
  99. "video": None,
  100. "contentModal": DecodeCardConst.ContentModal.PICTURE_TEXT,
  101. "channel": DecodeCardConst.Channel.GROWTH_AUTO_REPLY_CARD,
  102. }
  103. )
  104. return posts
  105. __all__ = ["CardDecodeUtils"]