import hashlib import random import time from typing import Dict, List import requests from app.infra.internal.aigc_decode_server import AigcDecodeServer from ._const import DecodeCardConst # 封面下载重试配置 COVER_DOWNLOAD_MAX_RETRIES = 3 COVER_DOWNLOAD_BACKOFF_BASE = 1.0 # 基础退避秒数 COVER_DOWNLOAD_BACKOFF_MAX = 10.0 # 最大退避秒数 class CardDecodeUtils(DecodeCardConst): decode_server = AigcDecodeServer() async def submit_decode_batch( self, posts: List[Dict], *, config_id: int, skip_completed: bool = False ) -> Dict[str, Dict]: """分批提交卡片解构任务,返回 {content_id: {status, errorMessage}}""" result = {} for i in range(0, len(posts), self.SUBMIT_BATCH): batch = posts[i : i + self.SUBMIT_BATCH] response = await self.decode_server.submit_decode( config_id=config_id, posts=batch, skip_completed=skip_completed ) if response.get("code") == 0: for item in response.get("data", []): result[item["channelContentId"]] = item else: for post in batch: cid = post["channelContentId"] result[cid] = { "channelContentId": cid, "status": "FAILED", "errorMessage": f"batch submit failed: {response}", } return result async def query_decode_results_batch( self, content_ids: List[str], *, config_id: int ) -> Dict[str, Dict]: """分批查询卡片解构结果""" result = {} for i in range(0, len(content_ids), self.SUBMIT_BATCH): batch = content_ids[i : i + self.SUBMIT_BATCH] response = await self.decode_server.query_decode_results( config_id=config_id, channel_content_ids=batch ) if response.get("code") == 0: for item in response.get("data", []): result[item["channelContentId"]] = item else: for cid in batch: result[cid] = { "channelContentId": cid, "status": "API_ERROR", "errorMessage": f"query API failed: {response}", } return result COVER_CDN_PREFIX = "https://rescdn.yishihui.com/" @staticmethod def _normalize_cover_url(url: str) -> str: """不以 https: 开头的封面 URL 拼接 CDN 前缀""" if not url.startswith("https:"): return f"{CardDecodeUtils.COVER_CDN_PREFIX}{url}" return url @staticmethod def compute_cover_md5(image_url: str) -> str: """下载封面图片并计算 MD5,带重试 + 指数退避 + 抖动,应对高并发 CDN 限流""" url = CardDecodeUtils._normalize_cover_url(image_url) last_exc = None for attempt in range(COVER_DOWNLOAD_MAX_RETRIES): try: resp = requests.get(url, timeout=30) resp.raise_for_status() return hashlib.md5(resp.content).hexdigest() except requests.RequestException as e: last_exc = e if attempt < COVER_DOWNLOAD_MAX_RETRIES - 1: delay = min( COVER_DOWNLOAD_BACKOFF_BASE * (2 ** attempt), COVER_DOWNLOAD_BACKOFF_MAX, ) jitter = random.uniform(0, delay * 0.5) time.sleep(delay + jitter) raise last_exc @staticmethod def prepare_posts(cards: List[Dict]) -> List[Dict]: """将卡片数据转换为 AIGC 解构 API 所需的 post 格式""" posts = [] for card in cards: images = [] cover = card.get("share_cover") if cover: images.append(CardDecodeUtils._normalize_cover_url(cover)) posts.append( { "channelContentId": str(card["card_cover_id"]), "title": card.get("share_title", ""), "bodyText": "", "images": images, "video": None, "contentModal": DecodeCardConst.ContentModal.PICTURE_TEXT, "channel": DecodeCardConst.Channel.GROWTH_AUTO_REPLY_CARD, } ) return posts __all__ = ["CardDecodeUtils"]