| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119 |
- import hashlib
- import random
- import time
- from typing import Dict, List
- import requests
- from app.infra.internal.aigc_decode_server import AigcDecodeServer
- from ._const import DecodeCardConst
- # 封面下载重试配置
- COVER_DOWNLOAD_MAX_RETRIES = 3
- COVER_DOWNLOAD_BACKOFF_BASE = 1.0 # 基础退避秒数
- COVER_DOWNLOAD_BACKOFF_MAX = 10.0 # 最大退避秒数
- class CardDecodeUtils(DecodeCardConst):
- decode_server = AigcDecodeServer()
- async def submit_decode_batch(
- self, posts: List[Dict], *, config_id: int, skip_completed: bool = False
- ) -> Dict[str, Dict]:
- """分批提交卡片解构任务,返回 {content_id: {status, errorMessage}}"""
- result = {}
- for i in range(0, len(posts), self.SUBMIT_BATCH):
- batch = posts[i : i + self.SUBMIT_BATCH]
- response = await self.decode_server.submit_decode(
- config_id=config_id, posts=batch, skip_completed=skip_completed
- )
- if response.get("code") == 0:
- for item in response.get("data", []):
- result[item["channelContentId"]] = item
- else:
- for post in batch:
- cid = post["channelContentId"]
- result[cid] = {
- "channelContentId": cid,
- "status": "FAILED",
- "errorMessage": f"batch submit failed: {response}",
- }
- return result
- async def query_decode_results_batch(
- self, content_ids: List[str], *, config_id: int
- ) -> Dict[str, Dict]:
- """分批查询卡片解构结果"""
- result = {}
- for i in range(0, len(content_ids), self.SUBMIT_BATCH):
- batch = content_ids[i : i + self.SUBMIT_BATCH]
- response = await self.decode_server.query_decode_results(
- config_id=config_id, channel_content_ids=batch
- )
- if response.get("code") == 0:
- for item in response.get("data", []):
- result[item["channelContentId"]] = item
- else:
- for cid in batch:
- result[cid] = {
- "channelContentId": cid,
- "status": "API_ERROR",
- "errorMessage": f"query API failed: {response}",
- }
- return result
- COVER_CDN_PREFIX = "https://rescdn.yishihui.com/"
- @staticmethod
- def _normalize_cover_url(url: str) -> str:
- """不以 https: 开头的封面 URL 拼接 CDN 前缀"""
- if not url.startswith("https:"):
- return f"{CardDecodeUtils.COVER_CDN_PREFIX}{url}"
- return url
- @staticmethod
- def compute_cover_md5(image_url: str) -> str:
- """下载封面图片并计算 MD5,带重试 + 指数退避 + 抖动,应对高并发 CDN 限流"""
- url = CardDecodeUtils._normalize_cover_url(image_url)
- last_exc = None
- for attempt in range(COVER_DOWNLOAD_MAX_RETRIES):
- try:
- resp = requests.get(url, timeout=30)
- resp.raise_for_status()
- return hashlib.md5(resp.content).hexdigest()
- except requests.RequestException as e:
- last_exc = e
- if attempt < COVER_DOWNLOAD_MAX_RETRIES - 1:
- delay = min(
- COVER_DOWNLOAD_BACKOFF_BASE * (2 ** attempt),
- COVER_DOWNLOAD_BACKOFF_MAX,
- )
- jitter = random.uniform(0, delay * 0.5)
- time.sleep(delay + jitter)
- raise last_exc
- @staticmethod
- def prepare_posts(cards: List[Dict]) -> List[Dict]:
- """将卡片数据转换为 AIGC 解构 API 所需的 post 格式"""
- posts = []
- for card in cards:
- images = []
- cover = card.get("share_cover")
- if cover:
- images.append(CardDecodeUtils._normalize_cover_url(cover))
- posts.append(
- {
- "channelContentId": str(card["card_cover_id"]),
- "title": card.get("share_title", ""),
- "bodyText": "",
- "images": images,
- "video": None,
- "contentModal": DecodeCardConst.ContentModal.PICTURE_TEXT,
- "channel": DecodeCardConst.Channel.GROWTH_AUTO_REPLY_CARD,
- }
- )
- return posts
- __all__ = ["CardDecodeUtils"]
|