from typing import Dict, List from app.infra.internal.aigc_decode_server import AigcDecodeServer from ._const import DecodeMaterialConst class MaterialDecodeUtils(DecodeMaterialConst): decode_server = AigcDecodeServer() async def submit_decode_batch( self, posts: List[Dict], *, skip_completed: bool = False ) -> Dict[str, Dict]: """分批提交素材解构任务""" result = {} for i in range(0, len(posts), self.SUBMIT_BATCH): batch = posts[i : i + self.SUBMIT_BATCH] response = await self.decode_server.submit_decode( config_id=self.CONFIG_ID, posts=batch, skip_completed=skip_completed ) if response.get("code") == 0: for item in response.get("data", []): result[item["channelContentId"]] = item else: for post in batch: cid = post["channelContentId"] result[cid] = { "channelContentId": cid, "status": "FAILED", "errorMessage": f"batch submit failed: {response}", } return result async def query_decode_results_batch( self, content_ids: List[str] ) -> Dict[str, Dict]: """分批查询素材解构结果""" result = {} for i in range(0, len(content_ids), self.SUBMIT_BATCH): batch = content_ids[i : i + self.SUBMIT_BATCH] response = await self.decode_server.query_decode_results( config_id=self.CONFIG_ID, channel_content_ids=batch ) if response.get("code") == 0: for item in response.get("data", []): result[item["channelContentId"]] = item else: for cid in batch: result[cid] = { "channelContentId": cid, "status": "API_ERROR", "errorMessage": f"query API failed: {response}", } return result @staticmethod def prepare_posts(materials: List[Dict]) -> List[Dict]: """将素材数据转换为 AIGC 解构 API 所需的 post 格式""" posts = [] for m in materials: images = [] cover = m.get("material_cover") if cover: images.append(cover) posts.append( { "channelContentId": str(m["material_id"]), "title": m.get("material_title", ""), "bodyText": "", "images": images, "video": None, "contentModal": DecodeMaterialConst.ContentModal.PICTURE_TEXT, "channel": DecodeMaterialConst.Channel.GROWTH_MATERIAL, } ) return posts __all__ = ["MaterialDecodeUtils"]