| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169 |
- import json
- from typing import Dict, List
- from app.infra.internal.aigc_decode_server import AigcDecodeServer
- from ._const import DecodeArticleConst
- class AigcDecodeUtils(DecodeArticleConst):
- decode_server = AigcDecodeServer()
- async def submit_decode_batch(
- self, posts: List[Dict], *, config_id: int = None, skip_completed: bool = False
- ) -> Dict[str, Dict]:
- """分批提交解构任务,返回 {content_id: {status, errorMessage}}"""
- cfg_id = config_id or self.CONFIG_ID
- result = {}
- for i in range(0, len(posts), self.SUBMIT_BATCH):
- batch = posts[i : i + self.SUBMIT_BATCH]
- response = await self.decode_server.submit_decode(
- config_id=cfg_id, posts=batch, skip_completed=skip_completed
- )
- if response.get("code") == 0:
- for item in response.get("data", []):
- result[item["channelContentId"]] = item
- else:
- # 整批失败,标记所有帖子为 FAILED
- for post in batch:
- cid = post["channelContentId"]
- result[cid] = {
- "channelContentId": cid,
- "status": "FAILED",
- "errorMessage": f"batch submit failed: {response}",
- }
- return result
- async def query_decode_results_batch(
- self, content_ids: List[str], *, config_id: int = None
- ) -> Dict[str, Dict]:
- """分批查询解构结果,返回 {content_id: {status, dataContent, html, errorMessage}}
- 当 API 调用失败时,对应条目 status 为 API_ERROR,调用方应保持 INIT 等待重试。
- """
- cfg_id = config_id or self.CONFIG_ID
- result = {}
- for i in range(0, len(content_ids), self.SUBMIT_BATCH):
- batch = content_ids[i : i + self.SUBMIT_BATCH]
- response = await self.decode_server.query_decode_results(
- config_id=cfg_id, channel_content_ids=batch
- )
- if response.get("code") == 0:
- for item in response.get("data", []):
- result[item["channelContentId"]] = item
- else:
- for cid in batch:
- result[cid] = {
- "channelContentId": cid,
- "status": "API_ERROR",
- "errorMessage": f"query API failed: {response}",
- }
- return result
- @staticmethod
- def extract_decode_result(result: Dict) -> Dict:
- """从解构结果中解析出灵感点、目的点、关键点、选题
- 兼容新旧两种数据格式:v1 有 final_normalization_rebuild 包裹层,v2 无
- """
- final_result = result.get("final_normalization_rebuild") or result
- inspiration_list = final_result.get("inspiration_final_result", {}).get(
- "最终灵感点列表", []
- )
- purpose_list = final_result.get("purpose_final_result", {}).get(
- "最终目的点列表", []
- )
- keypoint_list = final_result.get("keypoint_final", {}).get("最终关键点列表", [])
- topic_fusion = final_result.get("topic_fusion_result", {})
- topic_text = (
- topic_fusion.get("最终选题", {}).get("选题", "")
- if isinstance(topic_fusion.get("最终选题"), dict)
- else ""
- )
- def _join_points(items: list, key: str) -> str:
- parts = [str(p[key]) for p in items if isinstance(p, dict) and p.get(key)]
- return ",".join(parts)
- return {
- "inspiration": _join_points(inspiration_list, "灵感点"),
- "purpose": _join_points(purpose_list, "目的点"),
- "key_point": _join_points(keypoint_list, "关键点"),
- "topic": topic_text,
- }
- class AdPlatformArticlesDecodeUtils(AigcDecodeUtils):
- @staticmethod
- def format_images(images: str) -> List[str]:
- if not images or not images.strip():
- return []
- try:
- image_list = json.loads(images)
- except (json.JSONDecodeError, TypeError):
- return []
- if not isinstance(image_list, list):
- return []
- return [
- i.get("image_url")
- for i in image_list
- if isinstance(i, dict) and i.get("image_url")
- ]
- def prepare_posts(self, articles: List[Dict]) -> List[Dict]:
- posts = []
- for article in articles:
- images = self.format_images(article.get("article_images") or "")
- posts.append(
- {
- "channelContentId": article["wx_sn"],
- "title": article.get("article_title", ""),
- "bodyText": article.get("article_text", ""),
- "images": images,
- "video": None,
- "contentModal": self.ContentModal.LONG_ARTICLE,
- "channel": self.Channel.WECHAT,
- }
- )
- return posts
- class InnerArticlesDecodeUtils(AigcDecodeUtils):
- def prepare_posts(
- self, articles: List[Dict], produce_info_map: Dict[str, List[Dict]]
- ) -> List[Dict]:
- posts = []
- for article in articles:
- source_id = str(article["source_id"])
- produce_info = produce_info_map.get(source_id, [])
- # 收集图片:封面(coverimgurl) + produce COVER + produce IMAGE
- images = []
- if article.get("coverimgurl"):
- images.append(article["coverimgurl"])
- for pi in produce_info:
- if pi["produce_module_type"] == self.ProduceModuleType.COVER:
- images.append(pi["output"])
- for pi in produce_info:
- if pi["produce_module_type"] == self.ProduceModuleType.IMAGE:
- images.append(pi["output"])
- posts.append(
- {
- "title": article.get("title", ""),
- "bodyText": article.get("article_text", ""),
- "images": images,
- "video": None,
- "contentModal": self.ContentModal.LONG_ARTICLE,
- "channel": self.Channel.WECHAT,
- "channelContentId": source_id,
- }
- )
- return posts
- __all__ = [
- "AigcDecodeUtils",
- "AdPlatformArticlesDecodeUtils",
- "InnerArticlesDecodeUtils",
- ]
|