_utils.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. import json
  2. import re
  3. from typing import Any
  4. from typing import Dict, List
  5. import aiohttp
  6. from tenacity import AsyncRetrying
  7. from app.infra.internal import DecodeServer
  8. from app.infra.shared.tools import request_retry
  9. from ._const import DecodeTaskConst
  10. class DecodeTaskUtil(DecodeTaskConst):
  11. decode_server = DecodeServer()
  12. def prepare_extract_body(self, article: Dict) -> Dict:
  13. return {
  14. "scene": self.BusinessScene.POINT_PICK,
  15. "content_type": self.ContentType.LONG_ARTICLE,
  16. "content": {
  17. "channel_content_id": article.get("wx_sn", ""),
  18. "video_url": "",
  19. "images": article.get("article_images"),
  20. "body_text": article.get("article_text", ""),
  21. "title": article.get("article_title", ""),
  22. "channel_account_id": article.get("gh_id", ""),
  23. "channel_account_name": article.get("account_name", ""),
  24. },
  25. }
  26. @staticmethod
  27. def extract_decode_result(result: Dict) -> Dict:
  28. """
  29. 从结构的结果中,解析出灵感点、目的点、关键点;
  30. """
  31. final_result = result.get("final_normalization_rebuild")
  32. if not final_result:
  33. return {"error": "解构结果中无 final_normalization_rebuild 信息"}
  34. # 灵感点
  35. inspiration_list = final_result.get("inspiration_final_result", {}).get(
  36. "最终灵感点列表", []
  37. )
  38. # 目的
  39. purpose_list = final_result.get("purpose_final_result", {}).get(
  40. "最终目的点列表", []
  41. )
  42. # 关键点
  43. keypoint_list = final_result.get("keypoint_final", {}).get("最终关键点列表", [])
  44. topic_fusion = final_result.get("topic_fusion_result", {})
  45. # 选题
  46. topic_text = (
  47. topic_fusion.get("最终选题", {}).get("选题", "")
  48. if isinstance(topic_fusion.get("最终选题"), dict)
  49. else ""
  50. )
  51. def _join_points(items: list, key: str) -> str:
  52. parts = [str(p[key]) for p in items if isinstance(p, dict) and p.get(key)]
  53. return ",".join(parts)
  54. return {
  55. "inspiration": _join_points(inspiration_list, "灵感点"),
  56. "purpose": _join_points(purpose_list, "目的点"),
  57. "key_point": _join_points(keypoint_list, "关键点"),
  58. "topic": topic_text,
  59. }
  60. async def fetch_decode_result(self, task_id: str):
  61. return await self.decode_server.fetch_result(task_id)
  62. class AdPlatformArticlesDecodeUtils(DecodeTaskUtil):
  63. @staticmethod
  64. def format_images(images: str) -> List[str]:
  65. """
  66. 格式化图片字符串,空/非法 JSON 返回空列表。
  67. """
  68. if not images or not images.strip():
  69. return []
  70. try:
  71. image_list = json.loads(images)
  72. except (json.JSONDecodeError, TypeError):
  73. return []
  74. if not isinstance(image_list, list):
  75. return []
  76. return [
  77. i.get("image_url")
  78. for i in image_list
  79. if isinstance(i, dict) and i.get("image_url")
  80. ]
  81. async def create_decode_task(self, article: Dict):
  82. images = self.format_images(article.get("article_images") or "")
  83. article["article_images"] = images
  84. request_body = self.prepare_extract_body(article)
  85. return await self.decode_server.create_decode_task(request_body)
  86. class InnerArticlesDecodeUtils(DecodeTaskUtil):
  87. RETRYABLE_EXCEPTIONS = (aiohttp.ClientError, TimeoutError)
  88. @staticmethod
  89. def extract_body_text_and_images(raw_text):
  90. """
  91. 从文本中提取图片和正文
  92. """
  93. if not raw_text:
  94. return "", []
  95. image_pattern = re.compile(r"^\[image:\s*(https?://[^\]]+)\s*\]$")
  96. body_lines = []
  97. images = []
  98. for line in raw_text.splitlines():
  99. stripped_line = line.strip()
  100. if not stripped_line:
  101. if body_lines and body_lines[-1] != "":
  102. body_lines.append("")
  103. continue
  104. match = image_pattern.match(stripped_line)
  105. if match:
  106. images.append(match.group(1).strip())
  107. continue
  108. body_lines.append(stripped_line)
  109. while body_lines and body_lines[-1] == "":
  110. body_lines.pop()
  111. body_text = "\n".join(body_lines)
  112. return body_text, images
  113. async def create_decode_task(self, payload):
  114. request_body = {
  115. "scene": self.BusinessScene.POINT_PICK,
  116. "content_type": payload["content_type"],
  117. "content": {
  118. "channel_content_id": payload.get("channel_content_id", ""),
  119. "video_url": "",
  120. "images": payload.get("images", []),
  121. "body_text": payload.get("body_text", ""),
  122. "title": payload.get("title", ""),
  123. "channel_account_id": payload.get("gh_id", ""),
  124. "channel_account_name": payload.get("account_name", ""),
  125. },
  126. }
  127. return await self.decode_server.create_decode_task(request_body)
  128. async def create_decode_task_with_retry(
  129. self,
  130. payload: Dict[str, Any],
  131. retry_times: int,
  132. min_retry_delay: int = 1,
  133. max_retry_delay: int = 4,
  134. ):
  135. retry_kwargs = request_retry(
  136. retry_times=retry_times,
  137. min_retry_delay=min_retry_delay,
  138. max_retry_delay=max_retry_delay,
  139. )
  140. async for attempt in AsyncRetrying(**retry_kwargs):
  141. with attempt:
  142. return await self.create_decode_task(payload)
  143. __all__ = [
  144. "AdPlatformArticlesDecodeUtils",
  145. "InnerArticlesDecodeUtils",
  146. "DecodeTaskUtil",
  147. ]