| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175 |
- import json
- import re
- from typing import Any
- from typing import Dict, List
- import aiohttp
- from tenacity import AsyncRetrying
- from app.infra.internal import DecodeServer
- from app.infra.shared.tools import request_retry
- from ._const import DecodeTaskConst
- class DecodeTaskUtil(DecodeTaskConst):
- decode_server = DecodeServer()
- def prepare_extract_body(self, article: Dict) -> Dict:
- return {
- "scene": self.BusinessScene.POINT_PICK,
- "content_type": self.ContentType.LONG_ARTICLE,
- "content": {
- "channel_content_id": article.get("wx_sn", ""),
- "video_url": "",
- "images": article.get("article_images"),
- "body_text": article.get("article_text", ""),
- "title": article.get("article_title", ""),
- "channel_account_id": article.get("gh_id", ""),
- "channel_account_name": article.get("account_name", ""),
- },
- }
- @staticmethod
- def extract_decode_result(result: Dict) -> Dict:
- """
- 从结构的结果中,解析出灵感点、目的点、关键点;
- """
- final_result = result.get("final_normalization_rebuild")
- if not final_result:
- return {"error": "解构结果中无 final_normalization_rebuild 信息"}
- # 灵感点
- inspiration_list = final_result.get("inspiration_final_result", {}).get(
- "最终灵感点列表", []
- )
- # 目的
- purpose_list = final_result.get("purpose_final_result", {}).get(
- "最终目的点列表", []
- )
- # 关键点
- keypoint_list = final_result.get("keypoint_final", {}).get("最终关键点列表", [])
- topic_fusion = final_result.get("topic_fusion_result", {})
- # 选题
- topic_text = (
- topic_fusion.get("最终选题", {}).get("选题", "")
- if isinstance(topic_fusion.get("最终选题"), dict)
- else ""
- )
- def _join_points(items: list, key: str) -> str:
- parts = [str(p[key]) for p in items if isinstance(p, dict) and p.get(key)]
- return ",".join(parts)
- return {
- "inspiration": _join_points(inspiration_list, "灵感点"),
- "purpose": _join_points(purpose_list, "目的点"),
- "key_point": _join_points(keypoint_list, "关键点"),
- "topic": topic_text,
- }
- async def fetch_decode_result(self, task_id: str):
- return await self.decode_server.fetch_result(task_id)
- class AdPlatformArticlesDecodeUtils(DecodeTaskUtil):
- @staticmethod
- def format_images(images: str) -> List[str]:
- """
- 格式化图片字符串,空/非法 JSON 返回空列表。
- """
- if not images or not images.strip():
- return []
- try:
- image_list = json.loads(images)
- except (json.JSONDecodeError, TypeError):
- return []
- if not isinstance(image_list, list):
- return []
- return [
- i.get("image_url")
- for i in image_list
- if isinstance(i, dict) and i.get("image_url")
- ]
- async def create_decode_task(self, article: Dict):
- images = self.format_images(article.get("article_images") or "")
- article["article_images"] = images
- request_body = self.prepare_extract_body(article)
- return await self.decode_server.create_decode_task(request_body)
- class InnerArticlesDecodeUtils(DecodeTaskUtil):
- RETRYABLE_EXCEPTIONS = (aiohttp.ClientError, TimeoutError)
- @staticmethod
- def extract_body_text_and_images(raw_text):
- """
- 从文本中提取图片和正文
- """
- if not raw_text:
- return "", []
- image_pattern = re.compile(r"^\[image:\s*(https?://[^\]]+)\s*\]$")
- body_lines = []
- images = []
- for line in raw_text.splitlines():
- stripped_line = line.strip()
- if not stripped_line:
- if body_lines and body_lines[-1] != "":
- body_lines.append("")
- continue
- match = image_pattern.match(stripped_line)
- if match:
- images.append(match.group(1).strip())
- continue
- body_lines.append(stripped_line)
- while body_lines and body_lines[-1] == "":
- body_lines.pop()
- body_text = "\n".join(body_lines)
- return body_text, images
- async def create_decode_task(self, payload):
- request_body = {
- "scene": self.BusinessScene.POINT_PICK,
- "content_type": payload["content_type"],
- "content": {
- "channel_content_id": payload.get("channel_content_id", ""),
- "video_url": "",
- "images": payload.get("images", []),
- "body_text": payload.get("body_text", ""),
- "title": payload.get("title", ""),
- "channel_account_id": payload.get("gh_id", ""),
- "channel_account_name": payload.get("account_name", ""),
- },
- }
- return await self.decode_server.create_decode_task(request_body)
- async def create_decode_task_with_retry(
- self,
- payload: Dict[str, Any],
- retry_times: int,
- min_retry_delay: int = 1,
- max_retry_delay: int = 4,
- ):
- retry_kwargs = request_retry(
- retry_times=retry_times,
- min_retry_delay=min_retry_delay,
- max_retry_delay=max_retry_delay,
- )
- async for attempt in AsyncRetrying(**retry_kwargs):
- with attempt:
- return await self.create_decode_task(payload)
- __all__ = [
- "AdPlatformArticlesDecodeUtils",
- "InnerArticlesDecodeUtils",
- "DecodeTaskUtil",
- ]
|