import json from typing import Dict, List from app.infra.internal.aigc_decode_server import AigcDecodeServer from ._const import DecodeArticleConst class AigcDecodeUtils(DecodeArticleConst): decode_server = AigcDecodeServer() async def submit_decode_batch( self, posts: List[Dict], *, config_id: int = None, skip_completed: bool = False ) -> Dict[str, Dict]: """分批提交解构任务,返回 {content_id: {status, errorMessage}}""" cfg_id = config_id or self.CONFIG_ID result = {} for i in range(0, len(posts), self.SUBMIT_BATCH): batch = posts[i : i + self.SUBMIT_BATCH] response = await self.decode_server.submit_decode( config_id=cfg_id, posts=batch, skip_completed=skip_completed ) if response.get("code") == 0: for item in response.get("data", []): result[item["channelContentId"]] = item else: # 整批失败,标记所有帖子为 FAILED for post in batch: cid = post["channelContentId"] result[cid] = { "channelContentId": cid, "status": "FAILED", "errorMessage": f"batch submit failed: {response}", } return result async def query_decode_results_batch( self, content_ids: List[str], *, config_id: int = None ) -> Dict[str, Dict]: """分批查询解构结果,返回 {content_id: {status, dataContent, html, errorMessage}} 当 API 调用失败时,对应条目 status 为 API_ERROR,调用方应保持 INIT 等待重试。 """ cfg_id = config_id or self.CONFIG_ID result = {} for i in range(0, len(content_ids), self.SUBMIT_BATCH): batch = content_ids[i : i + self.SUBMIT_BATCH] response = await self.decode_server.query_decode_results( config_id=cfg_id, channel_content_ids=batch ) if response.get("code") == 0: for item in response.get("data", []): result[item["channelContentId"]] = item else: for cid in batch: result[cid] = { "channelContentId": cid, "status": "API_ERROR", "errorMessage": f"query API failed: {response}", } return result @staticmethod def extract_decode_result(result: Dict) -> Dict: """从解构结果中解析出灵感点、目的点、关键点、选题 兼容新旧两种数据格式:v1 有 final_normalization_rebuild 包裹层,v2 无 """ final_result = result.get("final_normalization_rebuild") or result inspiration_list = final_result.get("inspiration_final_result", {}).get( "最终灵感点列表", [] ) purpose_list = final_result.get("purpose_final_result", {}).get( "最终目的点列表", [] ) keypoint_list = final_result.get("keypoint_final", {}).get("最终关键点列表", []) topic_fusion = final_result.get("topic_fusion_result", {}) topic_text = ( topic_fusion.get("最终选题", {}).get("选题", "") if isinstance(topic_fusion.get("最终选题"), dict) else "" ) def _join_points(items: list, key: str) -> str: parts = [str(p[key]) for p in items if isinstance(p, dict) and p.get(key)] return ",".join(parts) return { "inspiration": _join_points(inspiration_list, "灵感点"), "purpose": _join_points(purpose_list, "目的点"), "key_point": _join_points(keypoint_list, "关键点"), "topic": topic_text, } class AdPlatformArticlesDecodeUtils(AigcDecodeUtils): @staticmethod def format_images(images: str) -> List[str]: if not images or not images.strip(): return [] try: image_list = json.loads(images) except (json.JSONDecodeError, TypeError): return [] if not isinstance(image_list, list): return [] return [ i.get("image_url") for i in image_list if isinstance(i, dict) and i.get("image_url") ] def prepare_posts(self, articles: List[Dict]) -> List[Dict]: posts = [] for article in articles: images = self.format_images(article.get("article_images") or "") posts.append( { "channelContentId": article["wx_sn"], "title": article.get("article_title", ""), "bodyText": article.get("article_text", ""), "images": images, "video": None, "contentModal": self.ContentModal.LONG_ARTICLE, "channel": self.Channel.WECHAT, } ) return posts class InnerArticlesDecodeUtils(AigcDecodeUtils): def prepare_posts( self, articles: List[Dict], produce_info_map: Dict[str, List[Dict]] ) -> List[Dict]: posts = [] for article in articles: source_id = str(article["source_id"]) produce_info = produce_info_map.get(source_id, []) # 收集图片:封面(coverimgurl) + produce COVER + produce IMAGE images = [] if article.get("coverimgurl"): images.append(article["coverimgurl"]) for pi in produce_info: if pi["produce_module_type"] == self.ProduceModuleType.COVER: images.append(pi["output"]) for pi in produce_info: if pi["produce_module_type"] == self.ProduceModuleType.IMAGE: images.append(pi["output"]) posts.append( { "title": article.get("title", ""), "bodyText": article.get("article_text", ""), "images": images, "video": None, "contentModal": self.ContentModal.LONG_ARTICLE, "channel": self.Channel.WECHAT, "channelContentId": source_id, } ) return posts __all__ = [ "AigcDecodeUtils", "AdPlatformArticlesDecodeUtils", "InnerArticlesDecodeUtils", ]