import json import re from typing import Any from typing import Dict, List import aiohttp from tenacity import AsyncRetrying from app.infra.internal import DecodeServer from app.infra.shared.tools import request_retry from ._const import DecodeTaskConst class DecodeTaskUtil(DecodeTaskConst): decode_server = DecodeServer() def prepare_extract_body(self, article: Dict) -> Dict: return { "scene": self.BusinessScene.POINT_PICK, "content_type": self.ContentType.LONG_ARTICLE, "content": { "channel_content_id": article.get("wx_sn", ""), "video_url": "", "images": article.get("article_images"), "body_text": article.get("article_text", ""), "title": article.get("article_title", ""), "channel_account_id": article.get("gh_id", ""), "channel_account_name": article.get("account_name", ""), }, } @staticmethod def extract_decode_result(result: Dict) -> Dict: """ 从结构的结果中,解析出灵感点、目的点、关键点; """ final_result = result.get("final_normalization_rebuild") if not final_result: return {"error": "解构结果中无 final_normalization_rebuild 信息"} # 灵感点 inspiration_list = final_result.get("inspiration_final_result", {}).get( "最终灵感点列表", [] ) # 目的 purpose_list = final_result.get("purpose_final_result", {}).get( "最终目的点列表", [] ) # 关键点 keypoint_list = final_result.get("keypoint_final", {}).get("最终关键点列表", []) topic_fusion = final_result.get("topic_fusion_result", {}) # 选题 topic_text = ( topic_fusion.get("最终选题", {}).get("选题", "") if isinstance(topic_fusion.get("最终选题"), dict) else "" ) def _join_points(items: list, key: str) -> str: parts = [str(p[key]) for p in items if isinstance(p, dict) and p.get(key)] return ",".join(parts) return { "inspiration": _join_points(inspiration_list, "灵感点"), "purpose": _join_points(purpose_list, "目的点"), "key_point": _join_points(keypoint_list, "关键点"), "topic": topic_text, } async def fetch_decode_result(self, task_id: str): return await self.decode_server.fetch_result(task_id) class AdPlatformArticlesDecodeUtils(DecodeTaskUtil): @staticmethod def format_images(images: str) -> List[str]: """ 格式化图片字符串,空/非法 JSON 返回空列表。 """ if not images or not images.strip(): return [] try: image_list = json.loads(images) except (json.JSONDecodeError, TypeError): return [] if not isinstance(image_list, list): return [] return [ i.get("image_url") for i in image_list if isinstance(i, dict) and i.get("image_url") ] async def create_decode_task(self, article: Dict): images = self.format_images(article.get("article_images") or "") article["article_images"] = images request_body = self.prepare_extract_body(article) return await self.decode_server.create_decode_task(request_body) class InnerArticlesDecodeUtils(DecodeTaskUtil): RETRYABLE_EXCEPTIONS = (aiohttp.ClientError, TimeoutError) @staticmethod def extract_body_text_and_images(raw_text): """ 从文本中提取图片和正文 """ if not raw_text: return "", [] image_pattern = re.compile(r"^\[image:\s*(https?://[^\]]+)\s*\]$") body_lines = [] images = [] for line in raw_text.splitlines(): stripped_line = line.strip() if not stripped_line: if body_lines and body_lines[-1] != "": body_lines.append("") continue match = image_pattern.match(stripped_line) if match: images.append(match.group(1).strip()) continue body_lines.append(stripped_line) while body_lines and body_lines[-1] == "": body_lines.pop() body_text = "\n".join(body_lines) return body_text, images async def create_decode_task(self, payload): request_body = { "scene": self.BusinessScene.POINT_PICK, "content_type": payload["content_type"], "content": { "channel_content_id": payload.get("channel_content_id", ""), "video_url": "", "images": payload.get("images", []), "body_text": payload.get("body_text", ""), "title": payload.get("title", ""), "channel_account_id": payload.get("gh_id", ""), "channel_account_name": payload.get("account_name", ""), }, } return await self.decode_server.create_decode_task(request_body) async def create_decode_task_with_retry( self, payload: Dict[str, Any], retry_times: int, min_retry_delay: int = 1, max_retry_delay: int = 4, ): retry_kwargs = request_retry( retry_times=retry_times, min_retry_delay=min_retry_delay, max_retry_delay=max_retry_delay, ) async for attempt in AsyncRetrying(**retry_kwargs): with attempt: return await self.create_decode_task(payload) __all__ = [ "AdPlatformArticlesDecodeUtils", "InnerArticlesDecodeUtils", "DecodeTaskUtil", ]